<a href="https://colab.research.google.com/github/nisbenz/Mini-Transformer/blob/main/Encoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
import torch
import torch.nn as nn
import pandas as pd
import os
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from tokenizers import ByteLevelBPETokenizer

In [12]:
from google.colab import files

uploaded = files.upload()

Saving train-00000-of-00001.csv to train-00000-of-00001.csv


In [13]:
CSV_FILE = "train-00000-of-00001.csv"
VOCAB_SIZE = 5000

df = pd.read_csv(CSV_FILE)
print(f"Loaded dataset with {len(df)} rows.")

all_text = df['english'].astype(str).tolist() + df['darija'].astype(str).tolist()

with open("temp_corpus.txt", "w", encoding="utf-8") as f:
    for line in all_text:
        f.write(line + "\n")

tokenizer = ByteLevelBPETokenizer()
tokenizer.train(
    files=["temp_corpus.txt"],
    vocab_size=VOCAB_SIZE,
    min_frequency=2,
    special_tokens=["<s>", "<pad>", "</s>", "<unk>", "<mask>"]
)

if not os.path.exists("tokenizer"):
    os.makedirs("tokenizer")
tokenizer.save_model("tokenizer")

os.remove("temp_corpus.txt")
print("Tokenizer trained successfully!")

Loaded dataset with 16089 rows.
Tokenizer trained successfully!


In [14]:
class TranslationDataset(Dataset):
    def __init__(self, csv_path, tokenizer):
        self.df = pd.read_csv(csv_path)
        self.tokenizer = tokenizer

        self.sos_token = tokenizer.token_to_id("<s>")
        self.eos_token = tokenizer.token_to_id("</s>")

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        src_text = str(self.df.iloc[idx]['english'])
        trg_text = str(self.df.iloc[idx]['darija'])

        src_ids = self.tokenizer.encode(src_text).ids
        trg_ids = self.tokenizer.encode(trg_text).ids


        src_ids = [self.sos_token] + src_ids + [self.eos_token]

        trg_ids = [self.sos_token] + trg_ids + [self.eos_token]

        return {
            "src": torch.tensor(src_ids, dtype=torch.long),
            "trg": torch.tensor(trg_ids, dtype=torch.long)
        }

In [15]:
def get_collate_fn(pad_token_id):
    def collate_fn(batch):
        src_batch = [item['src'] for item in batch]
        trg_batch = [item['trg'] for item in batch]

        src_padded = pad_sequence(src_batch, batch_first=True, padding_value=pad_token_id)
        trg_padded = pad_sequence(trg_batch, batch_first=True, padding_value=pad_token_id)

        return src_padded, trg_padded
    return collate_fn

In [16]:
BATCH_SIZE = 4
pad_id = tokenizer.token_to_id("<pad>")

dataset = TranslationDataset(CSV_FILE, tokenizer)
dataloader = DataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=get_collate_fn(pad_id)
)

data_iter = iter(dataloader)
src_batch, trg_batch = next(data_iter)

print("--- Data Pipeline Verification ---")
print(f"Batch Size: {BATCH_SIZE}")
print(f"Source (English) Shape: {src_batch.shape}") # Expect: [4, Longest_English_Len]
print(f"Target (Darija) Shape:  {trg_batch.shape}") # Expect: [4, Longest_Darija_Len]

print("\n--- Example Output ---")
print("Raw Tensor (English):", src_batch[0])
print("Decoded (English):   ", tokenizer.decode(src_batch[0].tolist()))
print("Decoded (Darija):    ", tokenizer.decode(trg_batch[0].tolist()))

--- Data Pipeline Verification ---
Batch Size: 4
Source (English) Shape: torch.Size([4, 47])
Target (Darija) Shape:  torch.Size([4, 46])

--- Example Output ---
Raw Tensor (English): tensor([  0,  44, 381, 465, 929, 459, 613,  18,   2,   1,   1,   1,   1,   1,
          1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
          1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,   1,
          1,   1,   1,   1,   1])
Decoded (English):    Here's Lamia.
Decoded (Darija):     hahya Lamia.


In [17]:
import math

class TransformerEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model, max_len=5000):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe.unsqueeze(0))
        self.d_model = d_model

    def forward(self, x):
        x = self.embedding(x) * math.sqrt(self.d_model)
        x = x + self.pe[:, :x.size(1)]

        return x

In [18]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_head):
        super().__init__()
        self.n_head = n_head
        self.d_head = d_model // n_head
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)

        self.w_o = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        batch_size = q.size(0)

        Q = self.w_q(q).view(batch_size, -1, self.n_head, self.d_head).transpose(1, 2)
        K = self.w_k(k).view(batch_size, -1, self.n_head, self.d_head).transpose(1, 2)
        V = self.w_v(v).view(batch_size, -1, self.n_head, self.d_head).transpose(1, 2)

        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_head)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attn_weights = torch.softmax(scores, dim=-1)
        context = torch.matmul(attn_weights, V)
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.n_head * self.d_head)
        return self.w_o(context)

In [20]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, n_head)
        self.norm1 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)

        self.ffn = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout1(attn_output))

        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout2(ffn_output))

        return x

class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_head, d_ff, num_layers, max_len):
        super().__init__()
        self.embedding = TransformerEmbedding(vocab_size, d_model, max_len)
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, n_head, d_ff) for _ in range(num_layers)
        ])

    def forward(self, x, mask):
        x = self.embedding(x)

        for layer in self.layers:
            x = layer(x, mask)

        return x

In [22]:
D_MODEL = 512
N_HEAD = 4
D_FF = 512
NUM_LAYERS = 4
VOCAB_SIZE = tokenizer.get_vocab_size()

encoder = TransformerEncoder(VOCAB_SIZE, D_MODEL, N_HEAD, D_FF, NUM_LAYERS, max_len=100)


src_mask = (src_batch != pad_id).unsqueeze(1).unsqueeze(2)

print("--- Encoder Test ---")
output = encoder(src_batch, src_mask)

print(f"Input Shape: {src_batch.shape}")
print(f"Output Shape: {output.shape}")


--- Encoder Test ---
Input Shape: torch.Size([4, 47])
Output Shape: torch.Size([4, 47, 512])


In [None]:
Decoder_input =