In [1]:
# Reference: https://curiousily.com/posts/sentiment-analysis-with-bert-and-hugging-face-using-pytorch-and-python/

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import confusion_matrix, classification_report

import torch
from torch import nn, optim

from collections import defaultdict
import seaborn as sns

In [2]:
from transformers import BertModel, BertTokenizer, AdamW, get_linear_schedule_with_warmup

In [None]:
PRE_TRAINED_MODEL_NAME = 'bert-base-uncased'
MAX_LEN = 100

BATCH_SIZE = 4
EPOCHS = 3

tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)
model = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME)

In [None]:
full_df = pd.read_csv('data/kokil dec 6 reprepare/conf_pc_worker_sem.csv')
full_df = full_df.dropna() # dataset contains NaN values, dropping NaNs here

le = LabelEncoder()
full_df['Input.deception_quadrant'] = le.fit_transform(full_df['Input.deception_quadrant'])

df_train, df_test = train_test_split(full_df, test_size=0.2)
df_val, df_test = train_test_split(df_test, test_size=0.2)

In [None]:
from torch.utils.data import Dataset, DataLoader

class Dataset(Dataset):
    def __init__(self, full_texts, deception_quadrants, tokenizer, max_len):
        self.full_texts = full_texts
        self.deception_quadrants = deception_quadrants
        self.tokenizer = tokenizer
        self.max_len = max_len
        
    def __len__(self):
        return len(self.full_texts)
    
    def __getitem__(self, item):
        full_text = str(self.full_texts[item])
        deception_quadrant = self.deception_quadrants[item]
        encoding = self.tokenizer.encode_plus(
          full_text,
          add_special_tokens=True,
          max_length=self.max_len,
          return_token_type_ids=False,
          pad_to_max_length=True,
          return_attention_mask=True,
          return_tensors='pt',
        )
        return {
          'full_text': full_text,
          'input_ids': encoding['input_ids'].flatten(),
          'attention_mask': encoding['attention_mask'].flatten(),
          'deception_quadrant': torch.tensor(deception_quadrant, dtype=torch.long)
        }

def create_data_loader(df, tokenizer, max_len, batch_size):
    ds = Dataset(
        full_texts=df['Input.full_text'].to_numpy(),
        deception_quadrants=df['Input.deception_quadrant'].to_numpy(),
        tokenizer=tokenizer,
        max_len=max_len
    )
    return DataLoader(
        ds,
        batch_size=batch_size,
        num_workers=0
    )

In [None]:
train_data_loader = create_data_loader(df_train, tokenizer, MAX_LEN, BATCH_SIZE)
test_data_loader = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE)
val_data_loader = create_data_loader(df_val, tokenizer, MAX_LEN, BATCH_SIZE)

In [None]:
# Sanity checks
data = next(iter(train_data_loader))
data.keys()
print(data['input_ids'].shape)
print(data['attention_mask'].shape)
print(data['deception_quadrant'].shape)

In [None]:
class DeceptionQuadrantClassifier(nn.Module):
    def __init__(self, n_classes):
        super(DeceptionQuadrantClassifier, self).__init__()
        self.modelitem = model.from_pretrained(PRE_TRAINED_MODEL_NAME)
        self.drop = nn.Dropout(p=0.3)
        self.out = nn.Linear(self.modelitem.config.hidden_size, n_classes)
        
    def forward(self, input_ids, attention_mask):
        _, pooled_output = self.modelitem (
          input_ids=input_ids,
          attention_mask=attention_mask
        )
        output = self.drop(pooled_output)
        return self.out(output)

In [None]:
class_names = df_train['Input.deception_quadrant'].unique()

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device) 
deception_model = DeceptionQuadrantClassifier(len(class_names))
deception_model = deception_model.to(device)

In [None]:
input_ids = data['input_ids'].to(device)
attention_mask = data['attention_mask'].to(device)

In [None]:
nn.functional.softmax(deception_model(input_ids, attention_mask), dim=1)

In [None]:
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
total_steps = len(train_data_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(
  optimizer,
  num_warmup_steps=0,
  num_training_steps=total_steps
)
loss_fn = nn.CrossEntropyLoss().to(device)

In [None]:
def train_epoch(model, data_loader, loss_fn, optimizer, device, scheduler, n_examples):
    model = model.train()
    losses = []
    correct_predictions = 0

    for d in data_loader:
        input_ids = d["input_ids"].to(device)
        attention_mask = d["attention_mask"].to(device)
        deception_quadrants = d["deception_quadrant"].to(device)
        outputs = model(
          input_ids=input_ids,
          attention_mask=attention_mask
        )
        _, preds = torch.max(outputs, dim=1)
        loss = loss_fn(outputs, deception_quadrants)
        correct_predictions += torch.sum(preds == deception_quadrants)
        losses.append(loss.item())
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
    return correct_predictions.double() / n_examples, np.mean(losses)

In [None]:
def eval_model(model, data_loader, loss_fn, device, n_examples):
    model = model.eval()
    losses = []
    correct_predictions = 0
    
    with torch.no_grad():
        for d in data_loader:
            input_ids = d["input_ids"].to(device)
            attention_mask = d["attention_mask"].to(device)
            deception_quadrants = d["deception_quadrant"].to(device)
            
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            _, preds = torch.max(outputs, dim=1)
            
            loss = loss_fn(outputs, deception_quadrants)
            correct_predictions += torch.sum(preds == deception_quadrants)
            losses.append(loss.item())
            
    return correct_predictions.double() / n_examples, np.mean(losses)

In [None]:
history = defaultdict(list)
best_accuracy = 0

for epoch in range(EPOCHS):
    print(f'Epoch {epoch + 1}/{EPOCHS}')
    print('-' * 10)
    train_acc, train_loss = train_epoch(
        deception_model,
        train_data_loader,
        loss_fn,
        optimizer,
        device,
        scheduler,
        len(df_train)
    )
    
    print(f'Train loss {train_loss} accuracy {train_acc}')
    
    val_acc, val_loss = eval_model(
        deception_model,
        val_data_loader,
        loss_fn,
        device,
        len(df_val)
      )
        
    history['train_acc'].append(train_acc)
    history['train_loss'].append(train_loss)
    history['val_acc'].append(val_acc)
    history['val_loss'].append(val_loss)
    
    if val_acc > best_accuracy:
        torch.save(model.state_dict(), 'best_model_state.bin')
        best_accuracy = val_acc

In [None]:
plt.plot(history['train_acc'], label='train accuracy')
plt.plot(history['val_acc'], label='validation accuracy')
plt.title('Training history')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()
plt.ylim([0, 1]);

In [None]:
test_acc, _ = eval_model(
  deception_model,
  test_data_loader,
  loss_fn,
  device,
  len(df_test)
)
test_acc.item()

In [None]:
def get_predictions(model, data_loader):
    model = model.eval()
    full_texts = []
    predictions = []
    prediction_probs = []
    real_values = []
    
    with torch.no_grad():
        for d in data_loader:
            texts = d["full_text"]
            input_ids = d["input_ids"].to(device)
            attention_mask = d["attention_mask"].to(device)
            deception_quadrants = d["deception_quadrant"].to(device)
            
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask
                )
            
            _, preds = torch.max(outputs, dim=1)
            full_texts.extend(texts)
            predictions.extend(preds)
            prediction_probs.extend(outputs)
            real_values.extend(deception_quadrants)
            
    predictions = torch.stack(predictions).cpu()
    prediction_probs = torch.stack(prediction_probs).cpu()
    real_values = torch.stack(real_values).cpu()
    return full_texts, predictions, prediction_probs, real_values

In [None]:
y_full_texts, y_pred, y_pred_probs, y_test = get_predictions(
  deception_model,
  test_data_loader
)
y_pred = y_pred.numpy()
y_test = y_test.numpy()

In [None]:
report = classification_report(y_test, y_pred, output_dict=True)
pd.DataFrame(report).transpose()

In [None]:
def show_confusion_matrix(confusion_matrix):
    hmap = sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues")
    hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right')
    hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right')
    plt.ylabel('True')
    plt.xlabel('Predicted')

cm = confusion_matrix(y_test, y_pred)
df_cm = pd.DataFrame(cm, index=class_names, columns=class_names)
show_confusion_matrix(df_cm)