In [None]:
!pip install evaluate
!pip install accelerate -U
!pip install transformers[torch]
!pip install torchinfo
import torch

torch.__version__

In [None]:
import warnings

warnings.filterwarnings("ignore")

import os

import torch
import torch._dynamo

torch._dynamo.config.suppress_errors = True

from transformers import AutoTokenizer
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer
from transformers import DataCollatorWithPadding

from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from torchinfo import summary

import evaluate

metric_acc = evaluate.load("accuracy")  #
metric_f1 = evaluate.load("f1")
import numpy as np

tokenizer = None

# Tokenize helper function
def tokenize(batch):
    return tokenizer(batch['text'], padding=True, truncation=True, max_length=128)


def get_tokenizer(model_path):
    bert_tokenizer = AutoTokenizer.from_pretrained(model_path)

    return bert_tokenizer


def compute_metrics1(eval_pred, metric=metric_acc):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return metric.compute(predictions=predictions, references=labels)


def compute_metrics(pred):
    """
    Computes accuracy, F1, precision, and recall for a given set of predictions.
    """
    # Extract true labels from the input object
    labels = pred.label_ids
    # Obtain predicted class labels by finding the column index with the maximum probability
    preds = pred.predictions.argmax(-1)
    # Compute macro precision, recall, and F1 score using sklearn's precision_recall_fscore_support function
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro')
    # Calculate the accuracy score using sklearn's accuracy_score function
    acc = accuracy_score(labels, preds)

    # Return the computed metrics as a dictionary
    return {
        'Accuracy': acc,
        'F1': f1,
        'Precision': precision,
        'Recall': recall
    }



def train_classifier(model_path: str,
                     dataset,
                     output_dir="output",
                     train_batch_size=16,
                     eval_batch_size=8,
                     learning_rate= 5e-7, #1.25e-5
                     num_epochs=10,
                     metric_for_best_model="accuracy"
                     ):
    dataset = dataset.rename_column("label", "labels")  # to match Trainer
    print(dataset)
    tokenized_dataset = dataset.map(tokenize, batched=True, remove_columns=["text"])
    print(tokenized_dataset["train"].features.keys())

    # Prepare model labels - useful for inference
    num_labels = 2
    id2label = {0: "PLASMA", 1: "NO_PLASMA"}
    label2id = {"PLASMA": 0, "NO_PLASMA": 1}

    # Fine-tune & evaluate
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
    model = AutoModelForSequenceClassification.from_pretrained(
        model_path,
        num_labels=num_labels,
        id2label=id2label,
        label2id=label2id,
        hidden_dropout_prob=0.3,
        attention_probs_dropout_prob=0.25
    )

    for param in model.parameters(): param.data = param.data.contiguous()

    print(" ############ Model Summary ######")
    print(model.cuda())

    training_args = TrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=train_batch_size,
        per_device_eval_batch_size=eval_batch_size,
        learning_rate=learning_rate,
        lr_scheduler_type='linear', #constant constant_with_warmup
        warmup_steps=0,
        num_train_epochs=num_epochs,
        torch_compile=True,  # optimizations
        optim="adamw_torch",  # improved optimizer
        logging_strategy="steps",
        logging_steps=100,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        weight_decay=0.00, # prevent overfitting default 0.01
        #fp16=True,
        save_total_limit=2,
        load_best_model_at_end=True,
        #metric_for_best_model=metric_for_best_model,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset["train"],
        eval_dataset=tokenized_dataset["test"],
        tokenizer=tokenizer,
        data_collator=data_collator,
        compute_metrics=compute_metrics,
    )

    trainer.train()

    return model, trainer



In [None]:
from datasets import load_dataset

def prepare_trainingset(dataset_file_path:str, test_size=0.25):
    dataset = load_dataset("csv", data_files=dataset_file_path)
    dataset = dataset['train'].train_test_split(test_size=test_size, shuffle=True)

    return dataset

In [None]:
model_path = "allenai/scibert_scivocab_uncased" #"anferico/bert-for-patents"
tokenizer = get_tokenizer(model_path)


dataset = prepare_trainingset('plasma_training_dataset_0_1.csv')


In [None]:
model, trainer = train_classifier(model_path, dataset, num_epochs=10)

In [None]:
print(model.num_parameters())

In [None]:
#save the model
def save_model(model_dir_path:str, trainer, tokenizer):
  trainer.save_model(model_dir_path)
  tokenizer.save_pretrained(model_dir_path)
  print('Model is saved ..')

save_model("plasma_model", trainer, tokenizer)

In [None]:
from transformers import AutoTokenizer
from transformers import AutoModelForSequenceClassification, TextClassificationPipeline, pipeline
import pandas as pd

import evaluate
from evaluate import evaluator
from datasets import Dataset

def evaluate_model(test_data_path, model_path):
    pipe = pipeline(
        "text-classification", model=model_path, max_length=128
    )

    # Define dataset
    test_data = pd.read_csv(test_data_path)
    test_dataset = Dataset.from_pandas(test_data)

    # Define evaluator
    accuracy = evaluate.load("accuracy")

    # Evaluate accuracy
    eval = evaluator("text-classification")
    acc_result = eval.compute(
        model_or_pipeline=pipe,
        data=test_dataset,
        metric=accuracy,
        label_mapping={"PLASMA": 0, "NO_PLASMA": 1},
        strategy="bootstrap",
        n_resamples=100,
    )

    # Evaluate F1 score
    f1_metric = evaluate.load("f1")
    f1_result = eval.compute(
        model_or_pipeline=pipe,
        data=test_dataset,
        metric=f1_metric,
        label_mapping={"PLASMA": 0, "NO_PLASMA": 1},
        strategy="bootstrap",
        n_resamples=100,
    )

    return acc_result, f1_result

In [None]:
acc, f = evaluate_model("plasma_test_data_annotated_.csv", "plasma_model")

In [None]:
acc

In [None]:
f

In [None]:
model_path = "plasma_model/"


model = AutoModelForSequenceClassification.from_pretrained(model_path)
tokenizer= AutoTokenizer.from_pretrained(model_path)
fined_model= pipeline("text-classification", model=model, tokenizer=tokenizer, truncation=True, max_length=128)

In [None]:
dataset['test']

In [None]:
predictions = fined_model(dataset['test']['text'])
predictions[:5]


In [None]:
!pip install seaborn

In [None]:
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix
import seaborn as sns


def get_label(d):
  if d['label'] == 'PLASMA':
    return 0
  else:
    return 1



predictions = [get_label(d) for d in predictions]


print("acc:",accuracy_score(dataset['test']['label'], predictions))
print("f1:",f1_score(dataset['test']['label'], predictions, average = 'macro'))

# create function for plotting confusion matrix
def plot_cm(cm):
  classes = ['PLASMA','NO_PLASMA']
  df_cm = pd.DataFrame(cm, index=classes, columns=classes)
  ax = sns.heatmap(df_cm, annot = True, fmt='g')
  ax.set_xlabel('Predicted')
  ax.set_ylabel('Actual')

cm = confusion_matrix(dataset['test']['label'],predictions, normalize = 'true')
plot_cm(cm)


In [None]:
fined_model("The present invention relates to an electromagnetic pulse protection method and an electromagnetic pulse protection system")

In [None]:
!pip install pandas

In [None]:
import pandas as pd
df = pd.read_csv("plasma_test_data_annotated_.csv", encoding="utf-8")

predictions_df = fined_model(df['text'].tolist())
#predictions_df[:5]


In [None]:
predictions_df[:50]

In [None]:
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix
import seaborn as sns


def get_label(d):
  if d['label'] == 'PLASMA':
    return 0
  else:
    return 1



predictions = [get_label(d) for d in predictions_df]


print("acc:",accuracy_score(df['label'].tolist(), predictions))
print("f1:",f1_score(df['label'].tolist(), predictions, average = 'macro'))

# create function for plotting confusion matrix
def plot_cm(cm):
  classes = ['PLASMA','NO_PLASMA']
  df_cm = pd.DataFrame(cm, index=classes, columns=classes)
  ax = sns.heatmap(df_cm, annot = True, fmt='g')
  ax.set_xlabel('Predicted')
  ax.set_ylabel('Actual')

cm = confusion_matrix(df['label'].tolist(),predictions, normalize = 'true')
plot_cm(cm)


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib as mpl
from sklearn.metrics import roc_curve,confusion_matrix,auc


In [None]:

mpl.rcParams['figure.figsize'] = (12, 10)
colors = plt.rcParams['axes.prop_cycle'].by_key()['color']

def plot_loss(history):
# Use a log scale to show the wide range of values.
    plt.semilogy(history.epoch,  history.history['loss'],
               color='red', label='Train Loss')
    plt.semilogy(history.epoch,  history.history['val_loss'],
          color='green', label='Val Loss',
          linestyle="--")
    plt.xlabel('Epoch')
    plt.ylabel('Loss')

    plt.legend()


def plot_metrics(history):
    metrics =  ['loss', 'auc', 'precision', 'recall']
    for n, metric in enumerate(metrics):
        name = metric.replace("_"," ").capitalize()
        plt.subplot(2,2,n+1)
        plt.plot(history.epoch,  history.history[metric], color=colors[0], label='Train')
        plt.plot(history.epoch, history.history['val_'+metric],
                 color=colors[0], linestyle="--", label='Val')
        plt.xlabel('Epoch')
        plt.ylabel(name)
        if metric == 'loss':
            plt.ylim([0, plt.ylim()[1]])
        elif metric == 'auc':
            plt.ylim([0.8,1])
        else:
            plt.ylim([0,1])

        plt.legend()

def plot_cm(y_true, y_pred, title):
    ''''
    input y_true-Ground Truth Labels
          y_pred-Predicted Value of Model
          title-What Title to give to the confusion matrix

    Draws a Confusion Matrix for better understanding of how the model is working

    return None

    '''

    figsize=(10,10)
    cm = confusion_matrix(y_true, y_pred, labels=np.unique(y_true))
    cm_sum = np.sum(cm, axis=1, keepdims=True)
    cm_perc = cm / cm_sum.astype(float) * 100
    annot = np.empty_like(cm).astype(str)
    nrows, ncols = cm.shape
    for i in range(nrows):
        for j in range(ncols):
            c = cm[i, j]
            p = cm_perc[i, j]
            if i == j:
                s = cm_sum[i]
                annot[i, j] = '%.1f%%\n%d/%d' % (p, c, s)
            elif c == 0:
                annot[i, j] = ''
            else:
                annot[i, j] = '%.1f%%\n%d' % (p, c)
    cm = pd.DataFrame(cm, index=np.unique(y_true), columns=np.unique(y_true))
    cm.index.name = 'Actual'
    cm.columns.name = 'Predicted'
    fig, ax = plt.subplots(figsize=figsize)
    plt.title(title)
    sns.heatmap(cm, cmap= "YlGnBu", annot=annot, fmt='', ax=ax)

def roc_curve_plot(fpr,tpr,roc_auc):
    plt.figure()
    lw = 2
    plt.plot(fpr, tpr, color='darkorange',
             lw=lw, label='ROC curve (area = %0.2f)' %roc_auc)
    plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver operating characteristic example')
    plt.legend(loc="lower right")
    plt.show()


## Calculate AUC (Area Under the Curve) 

In [None]:
y_predict_prob = []
for element in predictions_df:
  y_predict_prob.append(element['score'])


In [None]:

y_valid = df['label'].tolist()
y_predict_prob = predictions
fpr, tpr, _ = roc_curve(y_valid,y_predict_prob)
roc_auc = auc(fpr, tpr)
roc_curve_plot(fpr,tpr,roc_auc)