In [1]:
! pip install transformers datasets



In [24]:
import transformers as tf 
from datasets import load_dataset

from torch.utils.data import DataLoader
from torch import nn
import torch

from tqdm import tqdm
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split

from typing import Union, List
import copy

In [3]:
MODEL_NAME = 'google-bert/bert-base-uncased'
DATA_NAME = 'SetFit/emotion'
CHECKPOINTS = 'checkpoints_twitter'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
TEST_SIZE = 0.1
BATCH_SIZE = 10
EPOCHS = 3
LR = 1e-5

In [4]:
data = load_dataset(DATA_NAME)

Repo card metadata block was not found. Setting CardData to empty.


In [5]:
data

DatasetDict({
    train: Dataset({
        features: ['text', 'label', 'label_text'],
        num_rows: 16000
    })
    validation: Dataset({
        features: ['text', 'label', 'label_text'],
        num_rows: 2000
    })
    test: Dataset({
        features: ['text', 'label', 'label_text'],
        num_rows: 2000
    })
})

In [6]:
data['train'][0]

{'text': 'i didnt feel humiliated', 'label': 0, 'label_text': 'sadness'}

In [7]:
NUM_CLASSES = len(set(data['test']['label']))

In [42]:
itoe = {}
for sample in data['train']:
    if sample['label'] not in itoe.keys():
        itoe[sample['label']] = sample['label_text']

In [43]:
itoe

{0: 'sadness', 3: 'anger', 2: 'love', 5: 'surprise', 4: 'fear', 1: 'joy'}

In [9]:
class TwitterClassifierModel(nn.Module):
    def __init__(self, backbone: Union[str, nn.Module],
                 embed_shape: int = 768,
                 num_cls: int = NUM_CLASSES):
        super().__init__()

        if isinstance(backbone, nn.Module):
            self.backbone = backbone
        else:
            self.backbone = tf.AutoModel.from_pretrained(backbone)

        self.classifier = nn.Linear(embed_shape, num_cls)
        
    def forward(self, x):
        att = (x != 0).to(torch.int)

        out = self.backbone(input_ids=x, attention_mask=att).pooler_output
        logits = self.classifier(out)
        return logits
        
model = TwitterClassifierModel(MODEL_NAME).to(DEVICE)
tokenizer = tf.AutoTokenizer.from_pretrained(MODEL_NAME)

In [10]:
text = 'hello word!'
tokenized_text = tokenizer(text, padding=True, truncation=True).input_ids
model(torch.LongTensor(tokenized_text).unsqueeze(0).to(DEVICE))

tensor([[ 0.0130, -0.7420, -0.1304,  0.1563, -0.3831,  0.2253]],
       device='cuda:0', grad_fn=<AddmmBackward0>)

In [11]:
def tokenize_text(sample):
    return tokenizer(sample['text'])

In [12]:
X_train = [sample['text']  for sample in data['train']]
X_test = [sample['text'] for sample in data['test']]

X_train_tokenized = tokenizer(X_train, padding=True).input_ids
X_test_tokenized = tokenizer(X_test, padding=True).input_ids

train_dataset = [(torch.tensor(X_train_tokenized[i]), torch.tensor(data['train'][i]['label'])) for i in range(len(X_train))]
test_dataset = [(torch.tensor(X_test_tokenized[i]), torch.tensor(data['test'][i]['label'])) for i in range(len(X_test))]

In [13]:
train_dataloader = DataLoader(train_dataset,
                              batch_size=BATCH_SIZE,
                              shuffle=True,
                            #   collate_fn=collate_fn_with_padding
                              )
test_dataloader = DataLoader(test_dataset,
                             batch_size=BATCH_SIZE,
                             shuffle=True,
                            #  collate_fn=collate_fn_with_padding
                             )

In [14]:
def freeze_backbone_fn(model: TwitterClassifierModel):
    for param in model.backbone.parameters():
        param.requires_grad = False

    return model

In [15]:
model

TwitterClassifierModel(
  (backbone): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12

In [16]:
def accuracy(y_true: torch.Tensor, y_pred: torch.Tensor):
    return (y_pred == y_true).to(torch.int16).reshape(-1).sum(0) / y_true.size(0) / y_true.size(1)

In [17]:
torch.cuda.empty_cache()

In [18]:
def train_transformer(transformer_model: nn.Module, 
                      freeze_backbone: bool = True, 
                      lr: int = LR,
                      epochs: int = EPOCHS):
    model = copy.deepcopy(transformer_model)
    loss_fn = nn.CrossEntropyLoss()
    if freeze_backbone:
        model = freeze_backbone_fn(model)
    optimizer = torch.optim.Adam(params=model.parameters(), lr=lr)

    model.train()

    for i in range(epochs):
        loss_sum = 0
        acc_sum = 0
        print(f'------------------epoch: {i + 1}------------------')
        for X, y in tqdm(train_dataloader):
            X, y = X.to(DEVICE), y.to(DEVICE)
            y_pred = model(X)
            loss = loss_fn(y_pred, y)
            acc = accuracy(y.unsqueeze(1), y_pred.argmax(-1).unsqueeze(1))
            loss_sum += loss
            acc_sum += acc
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        print(f'train loss {loss_sum / len(list(train_dataloader))} acc {acc_sum / len(list(train_dataloader))}')
        torch.cuda.empty_cache()


    return model

In [21]:
def test_transformer(transformer_model: nn.Module):
    model = copy.deepcopy(transformer_model)
    loss_fn = nn.CrossEntropyLoss()

    loss_sum = 0
    acc_sum = 0

    model.eval()
    for X, y in tqdm(test_dataloader):
        X, y = X.to(DEVICE), y.to(DEVICE)
        with torch.inference_mode():
            y_pred = model(X)
        loss = loss_fn(y_pred, y)
        acc = accuracy(y.unsqueeze(1), y_pred.argmax(-1).unsqueeze(1))
        loss_sum += loss
        acc_sum += acc
    print(f'test loss {loss_sum / len(list(test_dataloader))} acc {acc_sum / len(list(test_dataloader))}')

In [20]:
model = train_transformer(model, freeze_backbone=False)

------------------epoch: 1------------------


  0%|          | 0/1600 [00:00<?, ?it/s]

100%|██████████| 1600/1600 [05:08<00:00,  5.19it/s]


train loss 0.6073542833328247 acc 0.7920041680335999
------------------epoch: 2------------------


100%|██████████| 1600/1600 [05:10<00:00,  5.15it/s]


train loss 0.1512235701084137 acc 0.937631368637085
------------------epoch: 3------------------


100%|██████████| 1600/1600 [05:11<00:00,  5.14it/s]


train loss 0.10863865911960602 acc 0.9497554302215576


In [22]:
test_transformer(model)

100%|██████████| 200/200 [00:08<00:00, 24.65it/s]

test loss 0.17035098373889923 acc 0.9249998331069946





In [29]:
def inference(text: Union[str, List[str]], model: nn.Module = model):
    model.eval()

    tokenized_text = tokenizer(text, padding=True, truncation=True).input_ids
    logits = model(torch.LongTensor(tokenized_text).unsqueeze(0).to(DEVICE))

    return itoe[logits.argmax(-1).item()]

In [53]:
inference('I feel very bad')

'sadness'