<a href="https://colab.research.google.com/github/willy-arison/Machine-learning/blob/master/transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
amananandrai_ag_news_classification_dataset_path = kagglehub.dataset_download('amananandrai/ag-news-classification-dataset')

print('Data source import complete.')


In [None]:
# requirements
! pip install tokenizers datasets transformers[torch]
! pip install lightning

In [None]:
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

import torch
from torch import nn

#### **GHF activation**

In [None]:
from torch.autograd import Function

class ActGHFFunction(Function):
    @staticmethod
    def forward(ctx, s, t, m1, m2):

        # Forward computation
        num = 1 + m1 * t
        den = 1 + m2 * t * torch.exp(-s/t)
        output = num / den

        # Save for backward pass
        ctx.save_for_backward(t, m1, output)
        return output

    @staticmethod
    def backward(ctx, grad_output):
        t, m1, output = ctx.saved_tensors
        grad_s = (1/t) * output * (1 - (1/(1 + m1*t)) * output)
        return grad_output * grad_s, None, None, None

class ActGHF(nn.Module):
    def __init__(self, t=0.5, m1=-1.001, m2=50):
        super(ActGHF, self).__init__()
        self.register_buffer('t', torch.tensor(float(t)))
        self.register_buffer('m1', torch.tensor(float(m1)))
        self.register_buffer('m2', torch.tensor(float(m2)))

    def forward(self, s):
        return ActGHFFunction.apply(s, self.t, self.m1, self.m2)

#### **Implement Transformer layer**
* Transformer layer consists of *Multi-Head Attention* or *Single Head Attention*

#### **Implement Transformer Encoder Layer**
* It consists of:
> * A self-Attention mechanism to capture long-range dependencies
> * Fully connected layers to transform representations
> * Layer normalization to stabilize training
> * Residual connections to improve gradient flow and prevent vanishing gradient


#### **Implement Transformer Network**
* The full transformer network consists of:
> * Embedding Module: transform input tokens into dense vectors
> * Transformer Layers: A stack of transformer encoder layer
> * Classification head: Processes the output of the transformer layers into produce prediction

In [None]:
class SingleHeadAttention(nn.Module):
    def __init__(self, embed_dim):
        super().__init__()
        self.W_q = nn.Linear(embed_dim, embed_dim, bias=False)
        self.W_k = nn.Linear(embed_dim, embed_dim, bias=False)
        self.W_v = nn.Linear(embed_dim, embed_dim, bias=False)

    def forward(self, x, attention_mask=None):
        # x: [batch size, seq_length, embed_dim]
        Q = self.W_q(x) # --> [batch size, seq_length, embed_dim]
        K = self.W_k(x)
        V = self.W_v(x)

        # key dimension
        dk = Q.shape[-1]

        # Dot-products similarities
        scores = Q @ K.transpose(1, 2) # --> [batch size, seq_length, seq_length]

        # scaled by dimension
        scores = scores / dk ** 0.5

        if attention_mask is not None:
            mask = attention_mask.float().masked_fill(attention_mask == 0, float('-inf'))
            scores = scores + mask.unsqueeze(1)

        # transform into probabilities
        scores = nn.Softmax(dim=2)(scores)

        # update x
        x = scores @ V # --> [batch_size, seq_length, embed_dim]

        return x

class TransformerEncoderLayer(nn.Module):
    def __init__(self, embed_dim, activation_fn=nn.ReLU()):
        super().__init__()
        self.attention = SingleHeadAttention(embed_dim=embed_dim)

        # layer normalization
        self.layer_norm1 = nn.LayerNorm(embed_dim)
        self.layer_norm2 = nn.LayerNorm(embed_dim)

        # fully connected layer
        self.fc_layer = nn.Sequential(
            nn.Linear(embed_dim, embed_dim),
            activation_fn,
            nn.Linear(embed_dim, embed_dim)
        )

    def forward(self, x, attention_mask=None):
        x = x + self.attention(self.layer_norm1(x), attention_mask)
        x = x + self.fc_layer(self.layer_norm2(x))
        return x


import math

class Embedding(nn.Module):
    def __init__(self, embed_dim, vocab_size, seq_length):
        super().__init__()
        self.embed_dim = embed_dim
        self.vocab_size = vocab_size
        self.seq_length = seq_length

        # Token embeddings
        self.embed_token = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embed_dim)

        # positional encodings
        self.register_buffer('positional_encodings', self._get_cosine_positional_encodings())

    def _get_cosine_positional_encodings(self):
        position = torch.arange(self.seq_length).unsqueeze(1)  # [seq_length, 1]
        div_term = torch.exp(torch.arange(0, self.embed_dim, 2) * (-math.log(10000.0) / self.embed_dim))

        encodings = torch.zeros(self.seq_length, self.embed_dim)
        encodings[:, 0::2] = torch.sin(position * div_term)  # even indices: sin
        encodings[:, 1::2] = torch.cos(position * div_term)  # odd indices: cos

        return encodings  # [seq_length, embed_dim]

    def forward(self, x):
        # x --> [batch_size, seq_length]
        x = self.embed_token(x)  # [batch_size, seq_length, embed_dim]

        # Add positional encodings
        x = x + self.positional_encodings[:x.size(1), :]  # [batch_size, seq_length, embed_dim]

        return x

class Transformer(nn.Module):
    def __init__(self, embed_dim, vocab_size, seq_length=5, num_layers=2, num_classes=2, activation_fn=nn.ReLU()):
        super().__init__()

        # embedding module
        self.embed = Embedding(embed_dim=embed_dim, vocab_size=vocab_size, seq_length=seq_length)
        # encoder layers
        self.encoder_layers = nn.ModuleList([
            TransformerEncoderLayer(embed_dim=embed_dim, activation_fn=activation_fn) for _ in range(num_layers)
        ])

        self.fc_layer = nn.Sequential(
            nn.Linear(embed_dim, num_classes),
        )


    def forward(self, x, attention_mask=None):
        x = self.embed(x) # --> [batch size, seq_length, embed_dim]

        for layer in self.encoder_layers:
            x = layer(x, attention_mask) # --> [batch size, seq_length, embed_dim]

        # for classification
        x = x.mean(dim=1)  # --> [batch size, embed_dim]
        x = self.fc_layer(x)  # --> [batch size, num_classes]

        return x

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
from datasets import Dataset


train_df = pd.read_csv('/kaggle/input/ag-news-classification-dataset/train.csv')
test_df = pd.read_csv('/kaggle/input/ag-news-classification-dataset/test.csv')

train_df['text'] = train_df['Title'] + ': ' + train_df['Description']
test_df['text'] = test_df['Title'] + ': ' + test_df['Description']
train_df['Class Index'] = train_df['Class Index'] - train_df['Class Index'].min()
test_df['Class Index'] = test_df['Class Index'] - test_df['Class Index'].min()

train_df['label'] = train_df['Class Index']
test_df['label'] = test_df['Class Index']


train_df = train_df[['label', 'text']]
test_df = test_df[['label', 'text']]

train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)

train_df.head()

Unnamed: 0,label,text
0,2,Wall St. Bears Claw Back Into the Black (Reute...
1,2,Carlyle Looks Toward Commercial Aerospace (Reu...
2,2,Oil and Economy Cloud Stocks' Outlook (Reuters...
3,2,Iraq Halts Oil Exports from Main Southern Pipe...
4,2,"Oil prices soar to all-time record, posing new..."


In [None]:
max_length = 512
batch_size = 128
columns = ['text', 'label']

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

def tokenize_function(sentence):
    return tokenizer(
        sentence[columns[0]],
        padding='max_length',   # pad to max length
        truncation=True,        # truncate if too large
        max_length=max_length,
    )

tokenized_train = train_dataset.map(tokenize_function, batched=True)
tokenized_test = test_dataset.map(tokenize_function, batched=True)

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Map:   0%|          | 0/120000 [00:00<?, ? examples/s]

Map:   0%|          | 0/7600 [00:00<?, ? examples/s]

In [None]:
from torch.utils.data import DataLoader, Dataset

class MyDataset(Dataset):
    def __init__(self, tokenized_data):
        super().__init__()
        self.tokenized_data = tokenized_data

    def __len__(self):
        return len(self.tokenized_data)

    def __getitem__(self, idx):
        return {
            "input_ids": torch.tensor(self.tokenized_data[idx]["input_ids"]),
            "attention_mask": torch.tensor(self.tokenized_data[idx]["attention_mask"]),
            "labels": torch.tensor(self.tokenized_data[idx][columns[1]]),
        }

# Create datasets
train_pt = MyDataset(tokenized_train)
test_pt = MyDataset(tokenized_test)

train_loader = DataLoader(
    train_pt,
    batch_size=batch_size,
    shuffle=True,
    num_workers=3
)

test_loader = DataLoader(
    test_pt,
    batch_size=batch_size,
    shuffle=False,
    num_workers=3
)


# Example: Inspect a batch
batch = next(iter(train_loader))
print("Input IDs shape:", batch["input_ids"].shape)
print("Labels:", batch["labels"])

Input IDs shape: torch.Size([128, 512])
Labels: tensor([1, 2, 1, 2, 0, 1, 3, 2, 2, 2, 0, 0, 3, 1, 2, 2, 0, 2, 3, 3, 1, 2, 0, 1,
        3, 2, 2, 2, 2, 3, 3, 2, 0, 2, 1, 1, 1, 0, 0, 2, 2, 2, 0, 3, 0, 3, 3, 0,
        0, 2, 0, 0, 2, 2, 3, 2, 0, 1, 1, 3, 1, 2, 2, 2, 0, 0, 2, 1, 1, 2, 3, 0,
        3, 0, 0, 0, 1, 0, 3, 0, 3, 1, 3, 2, 0, 2, 1, 2, 3, 3, 0, 1, 2, 3, 2, 2,
        1, 2, 3, 0, 2, 2, 2, 3, 2, 1, 2, 3, 3, 3, 3, 3, 3, 2, 0, 2, 0, 1, 2, 2,
        0, 1, 3, 1, 3, 1, 3, 3])


In [None]:
import pytorch_lightning as pl
from torchmetrics import Accuracy

def create_trainer(activation, max_epochs=5):
    import logging
    logging.getLogger("pytorch_lightning").setLevel(logging.WARNING)

    return pl.Trainer(
        accelerator="gpu",
        strategy="auto",
        precision="16-mixed",
        devices=-1,
        max_epochs=max_epochs,
        logger=pl.loggers.TensorBoardLogger(f'logs/', name=activation),
        callbacks=[
            pl.callbacks.ModelCheckpoint(
                monitor="val_acc",                  # Metric to monitor
                mode="max",                         # Save when max accuracy
                save_top_k=1,                       # Save only the best model
                filename="{epoch}-{val_acc:.4f}",   # Include accuracy in filename
                save_last=True,                     # save final epoch
                verbose=True                        # Print when new best model is saved
            )
        ],
    )

class Classifier(pl.LightningModule):
    def __init__(self, model, num_classes):
        super().__init__()
        self.model = model
        self.cross_entropy_loss = nn.CrossEntropyLoss()
        self.test_accuracy = Accuracy(task="multiclass", num_classes=num_classes)

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y, mask = batch['input_ids'], batch['labels'], batch['attention_mask']
        logits = self.model(x, mask)
        loss = self.cross_entropy_loss(logits, y)
        self.log('train_loss', loss, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y, mask = batch['input_ids'], batch['labels'], batch['attention_mask']
        logits = self.model(x, mask)
        loss = self.cross_entropy_loss(logits, y)
        acc = (logits.argmax(dim=1) == y).float().mean()
        self.log('val_loss', loss, prog_bar=True)
        self.log('val_acc', acc, prog_bar=True)
        return {'val_loss': loss, 'val_acc': acc}

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters()) # lr=0.005
        return optimizer

    def test_step(self, batch, batch_idx):
        x, y, mask = batch['input_ids'], batch['labels'], batch['attention_mask']
        preds = self.model(x, mask)  # Forward pass
        self.test_accuracy(preds, y)  # Update accuracy metric
        return preds

    def on_test_epoch_end(self):
        self.log("test_acc", self.test_accuracy.compute(), prog_bar=True)

In [None]:
vocab_size = tokenizer.vocab_size
seq_length = max_length
embed_dim = 128
epochs = 6
classes = 4

In [None]:
activations = {
    'relu': nn.ReLU(),
    'mish': nn.Mish(),
    'ghf': ActGHF(t=1, m1=5.5, m2=20)
}

In [None]:
! rm -r logs/

for name, act in activations.items():
    # model
    model = Transformer(
        embed_dim=embed_dim,
        seq_length=seq_length,
        vocab_size=vocab_size,
        num_layers=1,
        num_classes=classes,
        activation_fn=act
    )

    # trainer
    trainer = create_trainer(activation=name, max_epochs=epochs)
    print(f'Train with {name} activation')

    # classifier
    classifier = Classifier(model, num_classes=classes)
    trainer.fit(classifier, train_loader, test_loader)
    trainer.test(classifier, test_loader)

Train with relu activation


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Testing: |          | 0/? [00:00<?, ?it/s]

Train with mish activation


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Testing: |          | 0/? [00:00<?, ?it/s]

Train with ghf activation


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Testing: |          | 0/? [00:00<?, ?it/s]

In [None]:
import glob

for name, act in activations.items():
    file_name = glob.glob(f'/kaggle/working/logs/{name}/version_0/checkpoints/epoch*.ckpt')[0]
    model = Transformer(
        embed_dim=embed_dim,
        seq_length=seq_length,
        vocab_size=vocab_size,
        num_layers=1,
        num_classes=classes,
        activation_fn=act
    )
    print(f'Accuracy with {name} activation')
    classifier = Classifier.load_from_checkpoint(file_name, model=model, num_classes=classes)
    trainer.test(classifier, test_loader)

Accuracy with relu activation


Testing: |          | 0/? [00:00<?, ?it/s]

Accuracy with mish activation


Testing: |          | 0/? [00:00<?, ?it/s]

Accuracy with ghf activation


Testing: |          | 0/? [00:00<?, ?it/s]