# Transformer Model from Scratch for Sentiment Classification

This notebook demonstrates how to build a Transformer model from scratch for sentiment classification using the Sentiment140 dataset. The model architecture includes Rotary Positional Encoding (RoPE), Multi-Head Attention, RMSNorm, and a SwiGLU FeedForward network. The notebook covers all steps: data preparation, model definition, training with early stopping, evaluation, and recommendations for further improvement. Detailed explanations and code for each component are provided in the following sections.

In [1]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset, random_split
from torch.optim import AdamW
from torch.optim.lr_scheduler import StepLR
from torch.optim.lr_scheduler import CosineAnnealingLR
from transformers import BertTokenizer
from datasets import load_dataset
from tqdm import tqdm
from sklearn.metrics import f1_score
import math

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


## Data Preparation

In [None]:

dataset = load_dataset("sentiment140", split="train", trust_remote_code=True)


tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')


def tokenize_function(example):
    return tokenizer(example['text'], padding='max_length', truncation=True, max_length=64)


def process_labels(example):
    
    if example['sentiment'] == 0:
        example['label'] = 0  # Negative
    elif example['sentiment'] == 2:
        example['label'] = 1  # Neutral
    elif example['sentiment'] == 4:
        example['label'] = 2  # Positive
    return example


dataset = dataset.map(process_labels)
tokenized_dataset = dataset.map(tokenize_function, batched=True)


input_ids = torch.tensor(tokenized_dataset['input_ids'], dtype=torch.long)
attention_mask = torch.tensor(tokenized_dataset['attention_mask'], dtype=torch.long)
labels = torch.tensor(tokenized_dataset['label'], dtype=torch.long)


full_dataset = TensorDataset(input_ids, attention_mask, labels)


train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])


batch_size = 16
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

print(f"Training samples: {len(train_dataset)}, Validation samples: {len(val_dataset)}")

README.md:   0%|          | 0.00/6.84k [00:00<?, ?B/s]

sentiment140.py:   0%|          | 0.00/4.03k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/81.4M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/1600000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/498 [00:00<?, ? examples/s]

tokenizer_config.json:   0%|          | 0.00/49.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/996k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.96M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/625 [00:00<?, ?B/s]

Map:   0%|          | 0/1600000 [00:00<?, ? examples/s]

Map:   0%|          | 0/1600000 [00:00<?, ? examples/s]

Training samples: 1280000, Validation samples: 320000


## Input Embedding

### Rotary Positional Encoding (RoPE)

Rotary Positional Encoding (RoPE) enables the model to capture relative and absolute positional information efficiently, improving the performance of attention mechanisms in Transformers.

In [None]:
class Rotary_Positional_Encoding:
    def __init__(self, dim, device):
        assert dim % 2 == 0
        self.dim = dim
        self.inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim))  

    def get_position_angles(self, seq_len, device):
        positions = torch.arange(seq_len, dtype=torch.float, device=device)
        freqs = torch.einsum("i,j->ij", positions, self.inv_freq.to(device))  
        return torch.cat((freqs.sin(), freqs.cos()), dim=-1)

    def apply_rotary(self, x, seq_len=None):
        bsz, seqlen, dim = x.shape
        assert dim == self.dim
        if seq_len is None:
            seq_len = seqlen

        x1 = x[..., ::2]
        x2 = x[..., 1::2]

        freqs = self.get_position_angles(seq_len, x.device).unsqueeze(0)
        sin = freqs[..., :self.dim // 2]
        cos = freqs[..., self.dim // 2:]

        x_rotated_even = x1 * cos - x2 * sin
        x_rotated_odd = x1 * sin + x2 * cos

        x_rotated = torch.stack((x_rotated_even, x_rotated_odd), dim=-1)
        return x_rotated.flatten(-2)

### Sentence Embedding

combines token embeddings with positional information, allowing the model to understand both the meaning of words and their order in the sentence.

In [4]:
class SentenceEmbedding(nn.Module):
    def __init__(self, vocab_size, embed_dim=256, device='cpu'):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.pos_encoding = Rotary_Positional_Encoding(embed_dim, device)

    def forward(self, x):
        embedded = self.embedding(x)
        return self.pos_encoding.apply_rotary(embedded, x.size(1))

## Multi-Head Attention

### Scaled Dot-Product Attention

This function computes the attention weights and output for the attention mechanism, allowing the model to focus on relevant parts of the input sequence when making predictions.

In [5]:
def scaled_dot_product_attention(q, k, v, mask=None):
    d_k = q.size(-1)
    scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)
    if mask is not None:
        mask = mask.unsqueeze(1).unsqueeze(2)  
        mask = mask.expand(-1, -1, mask.size(-1), -1)  
        scores = scores.masked_fill(mask == 0, -1e9)
    attn = torch.softmax(scores, dim=-1)
    output = torch.matmul(attn, v)
    return output, attn

### Multi-Head Attention

Multi_Head_Attention allows the model to attend to information from different representation subspaces at different positions, improving its ability to capture complex relationships in the input sequence.

In [6]:
class Multi_Head_Attention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)

        self.out_linear = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        batch_size = q.size(0)

        q = self.q_linear(q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.k_linear(k).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.v_linear(v).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        output, _ = scaled_dot_product_attention(q, k, v, mask)
        output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.d_k)
        return self.out_linear(output)

## Add & Norm (RMSNorm)

RMSNorm_Add applies Root Mean Square Layer Normalization and residual connection, which helps stabilize training and improve convergence in deep transformer models.

In [7]:
class RMSNorm_Add(nn.Module):
    def __init__(self, d_model, eps=1e-6, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.eps = eps
        self.scale = nn.Parameter(torch.ones(d_model))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer_output):
        residual = x + self.dropout(sublayer_output)
        rms = torch.sqrt(torch.mean(residual ** 2, dim=-1, keepdim=True) + self.eps)
        normalized = residual / rms
        output = self.scale * normalized
        return output

## FeedForward (SwiGLU)

FeedForward uses the SwiGLU activation to enhance the model's capacity to capture complex patterns, providing non-linearity and improving the expressiveness of each transformer block.

In [8]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.d_ff = d_ff
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_model, d_ff)
        self.linear_out = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        gate = torch.sigmoid(self.linear2(x))
        x = self.linear1(x) * gate
        x = self.dropout(x)
        x = self.linear_out(x)
        return x

## Transformer Block

TransformerBlock combines multi-head attention, normalization, and feedforward layers, enabling the model to learn complex dependencies and representations in the input data.

In [9]:
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.attention = Multi_Head_Attention(d_model, num_heads)
        self.norm1 = RMSNorm_Add(d_model, dropout=dropout)
        self.feed_forward = FeedForward(d_model, d_ff, dropout=dropout)
        self.norm2 = RMSNorm_Add(d_model, dropout=dropout)

    def forward(self, x, mask=None):
        attn_output = self.attention(x, x, x, mask)
        x = self.norm1(x, attn_output)
        ff_output = self.feed_forward(x)
        x = self.norm2(x, ff_output)
        return x

## Transformer Model

TransformerModel stacks embedding, multiple transformer blocks, and a classifier to process input sequences and perform sentiment classification end-to-end.

In [None]:
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, num_classes=3, dropout=0.1):
        super().__init__()
        self.embedding = SentenceEmbedding(vocab_size, d_model)
        self.layers = nn.ModuleList([
            TransformerBlock(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        self.classifier = nn.Linear(d_model, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        x = self.embedding(x)
        for layer in self.layers:
            x = layer(x, mask)
        cls_output = x[:, 0, :]  
        cls_output = self.dropout(cls_output)
        logits = self.classifier(cls_output)
        return logits

## Model Preparation

Defining hyperparameters and initializing the model are essential steps to set up the architecture and prepare it for training and evaluation.

In [None]:
# Define hyperparameters
vocab_size = tokenizer.vocab_size
d_model = 256
num_heads = 8
d_ff = 1024
num_layers = 6
dropout = 0.3
num_classes = 3  

# Initialize model
model = TransformerModel(
    vocab_size=vocab_size,
    d_model=d_model,
    num_heads=num_heads,
    d_ff=d_ff,
    num_layers=num_layers,
    num_classes=num_classes,
    dropout=dropout
).to(device)

print(f"Model initialized with {sum(p.numel() for p in model.parameters())} parameters")

Model initialized with 36919299 parameters


## Training

The training loop optimizes the model parameters, monitors validation performance, and applies early stopping to prevent overfitting, ensuring effective and efficient model training.

In [12]:
# Optimizer and scheduler
loss_fn = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = AdamW(model.parameters(), lr=1e-5, weight_decay=0.05)
scheduler = CosineAnnealingLR(optimizer, T_max=15)

# Training loop with early stopping
num_epochs = 20
patience = 7
best_val_loss = float('inf')
epochs_no_improve = 0

model.train()
for epoch in range(num_epochs):
    total_train_loss = 0
    total_train_correct = 0
    total_train_samples = 0

    # Training
    for batch in tqdm(train_dataloader, desc=f"Epoch {epoch + 1}/{num_epochs}"):
        input_ids, attention_mask, labels = batch
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        logits = model(input_ids, mask=attention_mask)
        loss = loss_fn(logits, labels)

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        total_train_loss += loss.item()
        preds = torch.argmax(logits, dim=-1)
        total_train_correct += (preds == labels).sum().item()
        total_train_samples += labels.size(0)

    train_loss = total_train_loss / len(train_dataloader)
    train_accuracy = total_train_correct / total_train_samples

    # Validation
    model.eval()
    total_val_loss = 0
    total_val_correct = 0
    total_val_samples = 0

    with torch.no_grad():
        for batch in val_dataloader:
            input_ids, attention_mask, labels = batch
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            logits = model(input_ids, mask=attention_mask)
            loss = loss_fn(logits, labels)

            total_val_loss += loss.item()
            preds = torch.argmax(logits, dim=-1)
            total_val_correct += (preds == labels).sum().item()
            total_val_samples += labels.size(0)

    val_loss = total_val_loss / len(val_dataloader)
    val_accuracy = total_val_correct / total_val_samples

    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}")

    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_no_improve = 0
        torch.save(model.state_dict(), 'best_model.pt')
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs")
            break

    scheduler.step()
    model.train()

Epoch 1/20: 100%|██████████| 80000/80000 [34:56<00:00, 38.15it/s]


Epoch 1/20, Train Loss: 0.6724, Train Acc: 0.7424, Val Loss: 0.6322, Val Acc: 0.7792


Epoch 2/20: 100%|██████████| 80000/80000 [35:36<00:00, 37.44it/s]


Epoch 2/20, Train Loss: 0.6282, Train Acc: 0.7821, Val Loss: 0.6232, Val Acc: 0.7894


Epoch 3/20: 100%|██████████| 80000/80000 [35:49<00:00, 37.21it/s]


Epoch 3/20, Train Loss: 0.6159, Train Acc: 0.7915, Val Loss: 0.6138, Val Acc: 0.7946


Epoch 4/20: 100%|██████████| 80000/80000 [36:18<00:00, 36.73it/s]


Epoch 4/20, Train Loss: 0.6083, Train Acc: 0.7973, Val Loss: 0.6131, Val Acc: 0.7972


Epoch 5/20: 100%|██████████| 80000/80000 [36:17<00:00, 36.73it/s]


Epoch 5/20, Train Loss: 0.6025, Train Acc: 0.8016, Val Loss: 0.6106, Val Acc: 0.7998


Epoch 6/20: 100%|██████████| 80000/80000 [35:52<00:00, 37.17it/s]


Epoch 6/20, Train Loss: 0.5977, Train Acc: 0.8052, Val Loss: 0.6122, Val Acc: 0.7998


Epoch 7/20: 100%|██████████| 80000/80000 [35:50<00:00, 37.20it/s]


Epoch 7/20, Train Loss: 0.5937, Train Acc: 0.8085, Val Loss: 0.6060, Val Acc: 0.8024


Epoch 8/20: 100%|██████████| 80000/80000 [36:22<00:00, 36.66it/s]


Epoch 8/20, Train Loss: 0.5904, Train Acc: 0.8112, Val Loss: 0.6117, Val Acc: 0.8033


Epoch 9/20: 100%|██████████| 80000/80000 [36:19<00:00, 36.70it/s]


Epoch 9/20, Train Loss: 0.5872, Train Acc: 0.8135, Val Loss: 0.6072, Val Acc: 0.8031


Epoch 10/20: 100%|██████████| 80000/80000 [36:14<00:00, 36.79it/s]


Epoch 10/20, Train Loss: 0.5845, Train Acc: 0.8154, Val Loss: 0.6023, Val Acc: 0.8048


Epoch 11/20: 100%|██████████| 80000/80000 [35:43<00:00, 37.32it/s]


Epoch 11/20, Train Loss: 0.5824, Train Acc: 0.8173, Val Loss: 0.6055, Val Acc: 0.8056


Epoch 12/20: 100%|██████████| 80000/80000 [35:19<00:00, 37.75it/s]


Epoch 12/20, Train Loss: 0.5806, Train Acc: 0.8186, Val Loss: 0.6030, Val Acc: 0.8056


Epoch 13/20: 100%|██████████| 80000/80000 [35:20<00:00, 37.72it/s]


Epoch 13/20, Train Loss: 0.5793, Train Acc: 0.8195, Val Loss: 0.6046, Val Acc: 0.8058


Epoch 14/20: 100%|██████████| 80000/80000 [35:23<00:00, 37.68it/s]


Epoch 14/20, Train Loss: 0.5782, Train Acc: 0.8205, Val Loss: 0.6056, Val Acc: 0.8062


Epoch 15/20: 100%|██████████| 80000/80000 [35:16<00:00, 37.79it/s]


Epoch 15/20, Train Loss: 0.5776, Train Acc: 0.8210, Val Loss: 0.6057, Val Acc: 0.8062


Epoch 16/20: 100%|██████████| 80000/80000 [35:18<00:00, 37.77it/s]


Epoch 16/20, Train Loss: 0.5772, Train Acc: 0.8215, Val Loss: 0.6057, Val Acc: 0.8062


Epoch 17/20: 100%|██████████| 80000/80000 [35:23<00:00, 37.68it/s]


Epoch 17/20, Train Loss: 0.5776, Train Acc: 0.8211, Val Loss: 0.6067, Val Acc: 0.8062
Early stopping triggered after 17 epochs


### Training Results and Recommendations

After training and testing the model on many examples, the model performed well and showed accurate classifications, which made me satisfied with the results. Early stopping was triggered after 17 epochs because the validation loss stopped improving, which helped prevent overfitting.

If you want to make the model even better or ensure it completes all 20 epochs without overfitting, you can try these tips:
- Reduce the dropout rate slightly.
- Increase the patience parameter for early stopping.
- Use more advanced learning rate scheduling.
- Try data augmentation or regularization techniques.

These adjustments can help the model train longer while still maintaining good generalization.

## Evaluation

In [13]:
# Load best model
model.load_state_dict(torch.load('best_model.pt'))
model.eval()

# Evaluate on validation set
all_preds = []
all_labels = []

with torch.no_grad():
    for batch in val_dataloader:
        input_ids, attention_mask, labels = batch
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels = labels.to(device)

        logits = model(input_ids, mask=attention_mask)
        preds = torch.argmax(logits, dim=-1)

        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate metrics
accuracy = sum(p == l for p, l in zip(all_preds, all_labels)) / len(all_labels)
f1 = f1_score(all_labels, all_preds, average='weighted')

print(f"Validation Accuracy: {accuracy:.4f}")
print(f"Validation F1-Score: {f1:.4f}")

# Test on a single sentence
test_sentence = "I love this beautiful day!"
test_inputs = tokenizer(test_sentence, return_tensors="pt", padding=True, truncation=True, max_length=64)
input_ids = test_inputs['input_ids'].to(device)
attention_mask = test_inputs['attention_mask'].to(device)

with torch.no_grad():
    logits = model(input_ids, mask=attention_mask)
    pred = torch.argmax(logits, dim=-1).item()

class_names = {0: "Negative", 1: "Neutral", 2: "Positive"}
print(f"Test Sentence: {test_sentence}")
print(f"Predicted Class: {class_names[pred]}")

  model.load_state_dict(torch.load('best_model.pt'))


Validation Accuracy: 0.8048
Validation F1-Score: 0.8047
Test Sentence: I love this beautiful day!
Predicted Class: Positive
