In [2]:
import sys
# Check if we are in Google Colab
IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
  from google.colab import drive
  # Mounting the Gdrive
  drive.mount('/content/drive', force_remount=True)
  GDRIVE_ROOT_DIR = '/content/drive/MyDrive'
  !pip install transformers
  !pip install sentencepiece

Mounted at /content/drive


In [3]:
import os
import time
import random
import argparse
from tqdm.notebook import tqdm

import torch
import numpy as np
import pandas as pd

from torch.utils.data import TensorDataset, DataLoader, SequentialSampler
from transformers import CamembertTokenizer, CamembertForSequenceClassification

In [11]:
args_dict = {
    'data_path': 'Deep Learning Course/Project/sentiment_analysis/data/final/test_data.tsv',
    'model_dir': 'Deep Learning Course/Project/sentiment_analysis/models/camembert_mad_v1',
    'batch_size': 32,
    'num_labels': 2,
    'max_len': 64,
    'device_id': 0
}

args = argparse.Namespace(**args_dict)

In [5]:
# Check for GPU
if torch.cuda.is_available():
    # Tell PyTorch to use the GPU.
    print(f'There are {torch.cuda.device_count()} GPU(s) available.')
    print('We will use the GPU:', torch.cuda.get_device_name(args.device_id))
    device = torch.device(args.device_id)
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")
print(f'Device ID -> {device}')

There are 1 GPU(s) available.
We will use the GPU: Tesla T4
Device ID -> cuda:0


In [6]:
# Fix Random Seed
SEED_VAL = 999
random.seed(SEED_VAL)
np.random.seed(SEED_VAL)
torch.manual_seed(SEED_VAL)
torch.cuda.manual_seed_all(SEED_VAL)

In [9]:
# Uploading data
if IN_COLAB:
  data_path = os.path.join(GDRIVE_ROOT_DIR, args.data_path)
else:
  data_path = args.data_path

data = pd.read_csv(
  data_path,
  sep='\t',
  on_bad_lines='skip'
)
print(f'Size of dataset: {len(data)}')

text_list = data['text'].values[:]
label_list = data['label'].values[:]

print(f'Maximum length of text: {max([len(t) for t in text_list])}')

Size of dataset: 26785
Maximum length of text: 4570


In [12]:
# Uploading model
if IN_COLAB:
  model_dir = os.path.join(GDRIVE_ROOT_DIR, args.model_dir)
else:
  model_dir = args.model_dir

# Initializing tokenizer
tokenizer = CamembertTokenizer.from_pretrained(model_dir)
# Load Model
model = CamembertForSequenceClassification.from_pretrained(
    pretrained_model_name_or_path=model_dir, # Use the 12-layer CamemBERT
    num_labels=args.num_labels,              # Binary classification.
    output_attentions=False,                 # Whether the model returns attentions weights.
    output_hidden_states=False,              # Whether the model returns all hidden-states.
)
model.to(device)

CamembertForSequenceClassification(
  (roberta): CamembertModel(
    (embeddings): CamembertEmbeddings(
      (word_embeddings): Embedding(32005, 1024, padding_idx=1)
      (position_embeddings): Embedding(514, 1024, padding_idx=1)
      (token_type_embeddings): Embedding(1, 1024)
      (LayerNorm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): CamembertEncoder(
      (layer): ModuleList(
        (0-23): 24 x CamembertLayer(
          (attention): CamembertAttention(
            (self): CamembertSelfAttention(
              (query): Linear(in_features=1024, out_features=1024, bias=True)
              (key): Linear(in_features=1024, out_features=1024, bias=True)
              (value): Linear(in_features=1024, out_features=1024, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): CamembertSelfOutput(
              (dense): Linear(in_features=1024, out_features=10

In [13]:
input_ids = []
attention_masks = []
for text in text_list:
    # `encode_plus` will:
    #   (1) Tokenize the sentence.
    #   (2) Prepend the `[CLS]` token to the start.
    #   (3) Append the `[SEP]` token to the end.
    #   (4) Map tokens to their IDs.
    #   (5) Pad or truncate the sentence to `max_length`
    #   (6) Create attention masks for [PAD] tokens.
    encoded_dict = tokenizer.encode_plus(
        str(text),                  # Sentence to encode.
        add_special_tokens=True,    # Add '[CLS]' and '[SEP]'
        max_length=args.max_len,    # Set maximum length of sequence
        padding='max_length',       # Pad to max length
        truncation=True,            # Truncate to max length
        return_attention_mask=True, # Construct attn. masks.
        return_tensors='pt',        # Return pytorch tensors.
    )

    # Add the encoded sentence to the list.
    input_ids.append(encoded_dict['input_ids'])

    # And its attention mask (simply differentiates padding from non-padding).
    attention_masks.append(encoded_dict['attention_mask'])

In [14]:
# Convert the lists into tensors.
input_ids = torch.cat(input_ids, dim=0)
attention_masks = torch.cat(attention_masks, dim=0)
labels = torch.tensor(label_list)

In [15]:
# Create the DataLoader for our test set.
test_data = TensorDataset(input_ids, attention_masks, labels)
test_sampler = SequentialSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=args.batch_size)

In [16]:
def test(model, test_dataloader, device, desc) -> float:
    # Put model in evaluation mode
    model.eval()

    # Tracking variables
    predictions , true_labels = [], []

    # Predict
    for batch in tqdm(test_dataloader,desc=desc):
        # Add batch to GPU
        batch = tuple(t.to(device) for t in batch)

        # Unpack the inputs from our dataloader
        b_input_ids, b_input_mask, b_labels = batch

        # Telling the model not to compute or store gradients, saving memory and
        # speeding up prediction
        with torch.no_grad():
            # Forward pass, calculate logit predictions
            outputs = model(
                b_input_ids,
                token_type_ids=None,
                attention_mask=b_input_mask
            )

        logits = outputs[0]

        # Move logits and labels to CPU
        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()

        # Store predictions and true labels
        predictions.append(logits)
        true_labels.append(label_ids)

    # Combine the predictions for each batch into a single list of 0s and 1s.
    flat_predictions = [item for sublist in predictions for item in sublist]
    flat_predictions = np.argmax(flat_predictions, axis=1).flatten()

    # Combine the correct labels for each batch into a single list.
    flat_true_labels = [item for sublist in true_labels for item in sublist]

    return flat_predictions, flat_true_labels

In [35]:
 # Testing Model
tic = time.time()
pred, lab = test(
    model=model,
    test_dataloader=test_dataloader,
    device=device,
    desc=f'[{str(device).upper()}] Testing Model'
)
tt = (time.time() - tic)/60
print(f'Inference time for {len(lab)} sentences: {tt:.2f} mins')

[CUDA:0] Testing Model:   0%|          | 0/838 [00:00<?, ?it/s]

Inference time for 26785 sentences: 5.07 mins


In [36]:
from sklearn.metrics import confusion_matrix, f1_score, recall_score, precision_score

cm = confusion_matrix(lab, pred)
f1 = f1_score(lab, pred, average='weighted')
recall = recall_score(lab, pred, average='weighted')
precision = precision_score(lab, pred, average='weighted')
accuracy = sum([x == y for x, y in zip(list(pred), lab)])/len(lab)

# Print results
print(f"F1 Score  : {100*f1:.2f} %")
print(f"Recall    : {100*recall:.2f} %")
print(f"Precision : {100*precision:.2f} %")
print(f'Accuracy  : {100*accuracy:.2f} %')
print(f"\nConfusion Matrix :")

cm_df = pd.DataFrame(cm, index=labels, columns=['Non-hateful', 'Hateful'])
cm_df

F1 Score  : 86.27 %
Recall    : 86.31 %
Precision : 86.94 %
Accuracy  : 86.31 %

Confusion Matrix :


Unnamed: 0,Non-hateful,Hateful
Non-hateful,12182,980
Hateful,2686,10937
