# Transformer

## Importation

In [2]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from datasets import load_dataset
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import accuracy_score
from datasets import load_dataset

  from .autonotebook import tqdm as notebook_tqdm


## Dataset

In [3]:
# Load the dataset
dataset = load_dataset("dair-ai/emotion", "split")

# Split into train, validation, and test sets
train_set = dataset["train"]
val_set = dataset["validation"]
test_set = dataset["test"]

## Vocabulary and tokenization functions

In [4]:

def tokenize(text):
    return text.lower().split()

def build_vocab(texts, max_vocab_size):
    from collections import Counter
    counter = Counter(token for text in texts for token in tokenize(text))
    most_common = counter.most_common(max_vocab_size - 2)
    vocab = {word: idx + 2 for idx, (word, _) in enumerate(most_common)}  # Reserve 0 and 1
    vocab["<PAD>"] = 0
    vocab["<UNK>"] = 1
    return vocab

def encode(text, vocab, max_len):
    tokens = tokenize(text)
    ids = [vocab.get(token, vocab["<UNK>"]) for token in tokens]
    if len(ids) > max_len:
        return ids[:max_len]
    return ids + [vocab["<PAD>"]] * (max_len - len(ids))

## Build Vocabulary

In [5]:
# Build vocabulary
all_texts = train_set["text"]
VOCAB_SIZE = 10000
MAX_LEN = 50
vocab = build_vocab(all_texts, VOCAB_SIZE)

## Pytorch Dataset Class

In [6]:
class EmotionDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_len):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        input_ids = torch.tensor(encode(text, self.vocab, self.max_len), dtype=torch.long)
        return {
            "input_ids": input_ids,
            "labels": torch.tensor(label, dtype=torch.long),
        }

## Pytorch data preparation

In [7]:

# Prepare datasets
train_dataset = EmotionDataset(train_set["text"], train_set["label"], vocab, MAX_LEN)
val_dataset = EmotionDataset(val_set["text"], val_set["label"], vocab, MAX_LEN)
test_dataset = EmotionDataset(test_set["text"], test_set["label"], vocab, MAX_LEN)

# DataLoaders
BATCH_SIZE = 32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

NUM_CLASSES = len(set(train_set["label"]))  # Number of unique emotions

## Transformer model

In [8]:

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, num_layers, num_classes, max_len, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.positional_encoding = nn.Parameter(self._get_positional_encoding(max_len, embed_dim))
        
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        self.fc = nn.Linear(embed_dim, num_classes)

    def _get_positional_encoding(self, max_len, embed_dim):
        import math
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_dim, 2) * (-math.log(10000.0) / embed_dim))
        pe = torch.zeros(max_len, embed_dim)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(0)

    def forward(self, x, attention_mask=None):
        x = self.embedding(x) + self.positional_encoding[:, : x.size(1), :]
        if attention_mask is not None:
            # Convert attention_mask to the expected format for `nn.TransformerEncoder`
            attention_mask = attention_mask == 0  # Mask padded tokens (True for padding)
        x = self.transformer_encoder(x, src_key_padding_mask=attention_mask)
        x = x.mean(dim=1)  # Global pooling
        logits = self.fc(x)
        return logits

## Hyperparameters

In [9]:
EMBED_DIM = 128
NUM_HEADS = 4
NUM_LAYERS = 2
DROPOUT = 0.1
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
EPOCHS = 5

## Model, Loss and Optimizer

In [10]:
# Model, loss, and optimizer
model = TransformerModel(
    vocab_size=len(vocab),
    embed_dim=EMBED_DIM,
    num_heads=NUM_HEADS,
    num_layers=NUM_LAYERS,
    num_classes=NUM_CLASSES,
    max_len=MAX_LEN,
    dropout=DROPOUT,
).to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=5e-4)



## Training function

In [11]:
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs):
    print("Training the model")
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for batch in train_loader:
            input_ids = batch["input_ids"].to(DEVICE)
            labels = batch["labels"].to(DEVICE)

            optimizer.zero_grad()
            outputs = model(input_ids)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        # Validation
        model.eval()
        val_loss = 0
        val_preds, val_labels = [], []
        with torch.no_grad():
            print ("Validating the model")
            for batch in val_loader:
                input_ids = batch["input_ids"].to(DEVICE)
                labels = batch["labels"].to(DEVICE)

                outputs = model(input_ids)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                preds = torch.argmax(outputs, dim=1).cpu().numpy()
                val_preds.extend(preds)
                val_labels.extend(labels.cpu().numpy())

        val_acc = accuracy_score(val_labels, val_preds)
        print(f"Epoch {epoch + 1}: Train Loss = {train_loss:.4f}, Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}")


In [None]:

# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, EPOCHS)

Training the model
