In [None]:
#import libraries
import pandas as pd
import numpy as np
import time
import matplotlib.pyplot as plt
import sys
import nltk
import ssl
import re
import sklearn
from sklearn import feature_selection, feature_extraction, naive_bayes, pipeline, metrics
import transformers
from transformers import AutoTokenizer
import torch
import gc
from torch import nn
from torch.optim import Adam
from tqdm import tqdm

In [None]:
#load data
df = pd.read_csv('spectrum.csv')
df = df.dropna()
df = df[['spectrum', 'body']]

#split data 
train, test = sklearn.model_selection.train_test_split(df, test_size=0.3, random_state=1)
train = train.reset_index(drop=True)
test = test.reset_index(drop=True)

In [None]:
#tokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

In [None]:
#model
model = transformers.AutoModelForSequenceClassification.from_pretrained(
    "distilbert-base-uncased", num_labels=2
)

In [None]:
#tokenizer function
@torch.no_grad()
def tokenizer_func(data):
    text = [i[0] for i in data]
    labels = torch.tensor([i[1] for i in data])
    tokened = tokenizer(text,
                        padding='max_length',
                        truncation=True,
                        return_tensors='pt')
    return tokened, labels

In [None]:
#dataset for torch
class Dataset(torch.utils.data.Dataset):

    def __init__(self, data):
        
        self.labels = [1 if x == 'right' else 0 for x in data['spectrum'].values]
        self.dataframe = data

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        return self.dataframe.iloc[idx]
    
    def __getitem__(self, idx):

        texts = self.dataframe.iloc[idx]['body']
        labels = self.labels[idx]

        return texts, labels

In [None]:
#create torch dataset for train
train_data = Dataset(train)

train_dataloader = torch.utils.data.DataLoader(train_data, 
                                               batch_size=300,
                                               shuffle=True,
                                               collate_fn=tokenizer_func)

In [None]:
# freeze all layers except classifiers to speed up training - fine tuning
for p in model.parameters():
    p.requires_grad = False

for p in model.classifier.parameters():
    p.requires_grad = True

for p in model.pre_classifier.parameters():
    p.requires_grad = True

model.eval()
model.classifier.train()
model.pre_classifier.train()

In [None]:
#train model
def trainer(model, train_dataloader, learning_rate, epochs):
    
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr= learning_rate)
    

    if use_cuda:
        model = model.cuda()
        criterion = criterion.cuda()

    for epoch_num in range(epochs):
        total_acc_train = 0
        total_loss_train = 0

        for i, (train_input, train_label) in enumerate(train_dataloader):

            train_label = train_label.to(device)
            mask = train_input['attention_mask'].to(device)
            input_id = train_input['input_ids'].squeeze(1).to(device)

            optimizer.zero_grad()
            model.zero_grad()
            output = model(input_id, mask)

            batch_loss = criterion(output.logits, train_label)
            total_loss_train += batch_loss.item()

            acc = (output.logits.argmax(dim=1) == train_label).sum().item() / len(
                train_label
            )
            if i % 10 == 0:
                print(i, acc)
            total_acc_train += acc
            
            batch_loss.backward()
            optimizer.step()

        print(f'Epochs: {epoch_num + 1} | Train Loss: {total_loss_train / len(train_input): .3f} \
        | Train Accuracy: {total_acc_train / len(train_input): .3f}')

                  
EPOCHS = 2
model = model
LR = 1e-4
              
trainer(model, train_dataloader, LR, EPOCHS)

In [None]:
#save model
model = model.cpu()
model = model.eval()
torch.save(model.state_dict(), "modeltrained2.pt")
exit(1)

In [None]:
#load model
#model = transformers.AutoModelForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=2)
#model.load_state_dict(torch.load('modeltrained.pt'))

In [None]:
#create torch dataset for test
test_data = Dataset(test.sample(20))

test_dataloader = torch.utils.data.DataLoader(test_data, 
                                               batch_size=20,
                                               shuffle=True,
                                               collate_fn=tokenizer_func)

In [None]:
def evaluate(model, test_dataloader):

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    if use_cuda:
        model = model.cuda()

    total_acc_test = 0
    pred_labels = []
    y_labels = []
    y_prob = []
    with torch.no_grad():

        for test_input, test_label in tqdm(test_dataloader):
            test_label = test_label.to(device)
            mask = test_input['attention_mask'].to(device)
            input_id = test_input['input_ids'].squeeze(1).to(device)
            output = model(input_id, mask)
            pred_labels += output.logits.argmax(dim=1).tolist()
            y_labels += test_label.tolist()
            y_prob += nn.functional.softmax(output.logits, dim=1)
    
    return(pred_labels, y_labels, y_prob)
    
test_pred_labels, test_y_labels, test_y_prob = evaluate(model, test_dataloader)

In [None]:
#save results
#torch.save(test_pred_labels, 'test_pred_labels.pt')
#torch.save(test_y_labels, 'test_y_labels.pt')
#torch.save(test_y_prob, 'test_y_prob.pt')

In [None]:
#load results
#test_pred_labels = torch.load('test_pred_labels.pt')
#test_y_labels = torch.load('test_y_labels.pt')
#test_y_prob = torch.load('test_y_prob.pt')

In [None]:
#print results
classes = [1,0]
y_test_array = pd.get_dummies(test_y_labels, drop_first=False).values
test_y_prob = torch.stack(test_y_prob)

## Accuracy, Precision, Recall
accuracy = metrics.accuracy_score(test_y_labels, test_pred_labels)
auc = metrics.roc_auc_score(test_y_labels, test_y_prob[:,1])
print("Accuracy:",  round(accuracy,2))
print("Auc:", round(auc,2))
print("Detail:")
print(metrics.classification_report(test_y_labels, test_pred_labels))

fig, ax = plt.subplots(nrows=1, ncols=2)
## Plot roc
for i in range(len(classes)):
    fpr, tpr, thresholds = metrics.roc_curve(y_test_array[:,i],  
                           test_y_prob[:,i])
    ax[0].plot(fpr, tpr, lw=3, 
              label='{0} (area={1:0.2f})'.format(classes[i], 
                              metrics.auc(fpr, tpr))
               )
ax[0].plot([0,1], [0,1], color='navy', lw=3, linestyle='--')
ax[0].set(xlim=[-0.05,1.0], ylim=[0.0,1.05], 
          xlabel='False Positive Rate', 
          ylabel="True Positive Rate (Recall)", 
          title="Receiver operating characteristic")
ax[0].legend(loc="lower right")
ax[0].grid(True)
    
## Plot precision-recall curve
for i in range(len(classes)):
    precision, recall, thresholds = metrics.precision_recall_curve(
                 y_test_array[:,i], test_y_prob[:,i])
    ax[1].plot(recall, precision, lw=3, 
               label='{0} (area={1:0.2f})'.format(classes[i], 
                                  metrics.auc(recall, precision))
              )
ax[1].set(xlim=[0.0,1.05], ylim=[0.0,1.05], xlabel='Recall', 
          ylabel="Precision", title="Precision-Recall curve")
ax[1].legend(loc="best")
ax[1].grid(True)
plt.show()