<a href="https://colab.research.google.com/github/syedmahmoodiagents/transformers/blob/main/Full_Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [24]:
class SelfAttentionHead(nn.Module):
    def __init__(self, embedding_dim, block_size, head_size):
        super().__init__()
        self.key = nn.Linear(embedding_dim, head_size, bias=False)
        self.query = nn.Linear(embedding_dim, head_size, bias=False)
        self.value = nn.Linear(embedding_dim, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        sm = q @ k.transpose(-2, -1) / (C ** 0.5)
        msk = sm.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        sft = F.softmax(msk, dim=-1)
        v = self.value(x)
        out = sft @ v
        return out


In [25]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embedding_dim, block_size, num_heads):
        super().__init__()
        head_size = embedding_dim // num_heads
        self.heads = nn.ModuleList([SelfAttentionHead(embedding_dim, block_size, head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(num_heads * head_size, embedding_dim)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        return self.proj(out)


In [26]:
class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd)
        )
    def forward(self, x):
        return self.net(x)

In [27]:
class Block(nn.Module):
    def __init__(self, embedding_dim, block_size, n_heads):
        super().__init__()
        self.sa = MultiHeadAttention(embedding_dim, block_size, n_heads)
        self.ffwd = FeedForward(embedding_dim)
        self.ln1 = nn.LayerNorm(embedding_dim)
        self.ln2 = nn.LayerNorm(embedding_dim)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

In [28]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, embedding_dim, block_size, n_heads, n_layers):
        super().__init__()
        self.token_embedding_table = nn.Embedding(src_vocab_size, embedding_dim)
        self.position_embedding = nn.Embedding(block_size, embedding_dim)
        self.blocks = nn.Sequential(*[Block(embedding_dim, block_size, n_heads) for _ in range(n_layers)])
        self.ln_f = nn.LayerNorm(embedding_dim)
        self.lm_head = nn.Linear(embedding_dim, tgt_vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        tok_emb = self.token_embedding_table(idx)
        pos_emb = self.position_embedding(torch.arange(T, device=idx.device))
        x = tok_emb + pos_emb
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B * T, C)
            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)
        return logits, loss

In [2]:
pairs = [
    ("i am hungry", "ik ben hongerig"),
    ("he is tired", "hij is moe"),
    ("she is happy", "zij is blij"),
    ("she is not tired", "zij is niet moe"),
    ("he is not happy", "hij is niet blij"),
]

In [3]:
def tokenize(text):
    return text.lower().split()

In [4]:

def build_vocab(sentences):
    vocab = {"<PAD>": 0, "<BOS>": 1, "<EOS>": 2, "<UNK>": 3}
    for sentence in sentences:
        for token in tokenize(sentence):
            if token not in vocab:
                vocab[token] = len(vocab)
    return vocab

In [5]:
src_sentences = [src for src, _ in pairs]
tgt_sentences = [tgt for _, tgt in pairs]

In [6]:
src_sentences

['i am hungry',
 'he is tired',
 'she is happy',
 'she is not tired',
 'he is not happy']

In [7]:
src_vocab = build_vocab(src_sentences)
tgt_vocab = build_vocab(tgt_sentences)

In [8]:
src_vocab

{'<PAD>': 0,
 '<BOS>': 1,
 '<EOS>': 2,
 '<UNK>': 3,
 'i': 4,
 'am': 5,
 'hungry': 6,
 'he': 7,
 'is': 8,
 'tired': 9,
 'she': 10,
 'happy': 11,
 'not': 12}

In [9]:
inv_tgt_vocab = {idx: tok for tok, idx in tgt_vocab.items()}

In [10]:
inv_tgt_vocab

{0: '<PAD>',
 1: '<BOS>',
 2: '<EOS>',
 3: '<UNK>',
 4: 'ik',
 5: 'ben',
 6: 'hongerig',
 7: 'hij',
 8: 'is',
 9: 'moe',
 10: 'zij',
 11: 'blij',
 12: 'niet'}

In [12]:
src_vocab_size = len(src_vocab)
tgt_vocab_size = len(tgt_vocab)
embedding_dim = 64
block_size = 8
n_heads = 4
n_layers = 2
learning_rate = 1e-3
max_iters = 1000

In [13]:
def prepare_data(src_sentences, tgt_sentences, src_vocab, tgt_vocab, block_size):
    src_data = []
    tgt_data = []

    for src_s, tgt_s in zip(src_sentences, tgt_sentences):
        # Process source sentence
        src_tokens = tokenize(src_s)
        src_indexed = [src_vocab.get(token, src_vocab['<UNK>']) for token in src_tokens]

        # Pad source sequence
        if len(src_indexed) > block_size:
            src_indexed = src_indexed[:block_size]
        else:
            src_indexed = src_indexed + [src_vocab['<PAD>']] * (block_size - len(src_indexed))
        src_data.append(src_indexed)

        # Process target sentence
        tgt_tokens = tokenize(tgt_s)
        # Add BOS and EOS tokens
        tgt_indexed = [tgt_vocab['<BOS>']] + [tgt_vocab.get(token, tgt_vocab['<UNK>']) for token in tgt_tokens] + [tgt_vocab['<EOS>']]

        # Pad target sequence
        if len(tgt_indexed) > block_size:
            tgt_indexed = tgt_indexed[:block_size]
        else:
            tgt_indexed = tgt_indexed + [tgt_vocab['<PAD>']] * (block_size - len(tgt_indexed))
        tgt_data.append(tgt_indexed)

    src_tensor = torch.tensor(src_data, dtype=torch.long)
    tgt_tensor = torch.tensor(tgt_data, dtype=torch.long)

    return src_tensor, tgt_tensor

print("The `prepare_data` function has been defined.")

The `prepare_data` function has been defined.


In [14]:
src_tensor, tgt_tensor = prepare_data(src_sentences, tgt_sentences, src_vocab, tgt_vocab, block_size)

print(f"Source Tensor Shape: {src_tensor.shape}")
print(f"Target Tensor Shape: {tgt_tensor.shape}")
print("Example Source Tensor (first row):", src_tensor[0])
print("Example Target Tensor (first row):", tgt_tensor[0])

Source Tensor Shape: torch.Size([5, 8])
Target Tensor Shape: torch.Size([5, 8])
Example Source Tensor (first row): tensor([4, 5, 6, 0, 0, 0, 0, 0])
Example Target Tensor (first row): tensor([1, 4, 5, 6, 2, 0, 0, 0])


In [29]:
model = Transformer(src_vocab_size, tgt_vocab_size, embedding_dim, block_size, n_heads, n_layers)
print("Transformer model instantiated.")

Transformer model instantiated.


In [31]:
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
print(f"Optimizer initialized with learning rate: {learning_rate}")

Optimizer initialized with learning rate: 0.001


In [32]:
for iter in range(max_iters):
    # Forward pass
    logits, loss = model(src_tensor, tgt_tensor)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if iter % 100 == 0:
        print(f"Iteration {iter}, Loss: {loss.item():.4f}")

print("Training complete.")

Iteration 0, Loss: 2.5861
Iteration 100, Loss: 0.0146
Iteration 200, Loss: 0.0057
Iteration 300, Loss: 0.0032
Iteration 400, Loss: 0.0021
Iteration 500, Loss: 0.0015
Iteration 600, Loss: 0.0011
Iteration 700, Loss: 0.0008
Iteration 800, Loss: 0.0007
Iteration 900, Loss: 0.0006
Training complete.


In [43]:
def translate_sentence(model, sentence, src_vocab, tgt_vocab, inv_tgt_vocab, block_size, device='cpu'):
    model.eval()
    tokens = tokenize(sentence)
    indexed = [src_vocab.get(token, src_vocab['<UNK>']) for token in tokens]

    # Pad source sentence to block_size
    if len(indexed) > block_size:
        indexed = indexed[:block_size]
    else:
        indexed = indexed + [src_vocab['<PAD>']] * (block_size - len(indexed))

    src_input = torch.tensor([indexed], dtype=torch.long, device=device)

    translated_tokens = []

    with torch.no_grad():
        # Call the model once with src_input and targets=None for direct prediction
        logits, _ = model(src_input, targets=None)  # Logits should now be (1, block_size, tgt_vocab_size)

        # Iterate through the output positions to get the predicted tokens
        for i in range(block_size):
            prediction_logits = logits[0, i, :]  # Get logits for the current output position
            probs = F.softmax(prediction_logits, dim=-1)
            next_token_id = torch.argmax(probs, dim=-1).item()

            # Stop decoding if an EOS or PAD token is predicted
            if next_token_id == tgt_vocab['<EOS>']:
                break
            if next_token_id == tgt_vocab['<PAD>']:
                break
            # Skip BOS token if it's predicted as the first token, as it's an artifact of training target sequences
            if next_token_id == tgt_vocab['<BOS>'] and i == 0:
                continue

            translated_tokens.append(inv_tgt_vocab[next_token_id])

    # model.train()
    return ' '.join(translated_tokens)


In [44]:
english_sentence = "i am happy"
dutch_translation = translate_sentence(model, english_sentence, src_vocab, tgt_vocab, inv_tgt_vocab, block_size)

print(f"English: {english_sentence}")
print(f"Dutch Translation: {dutch_translation}")

English: i am happy
Dutch Translation: ik is blij


In [45]:
english_sentence_2 = "he is tired"
dutch_translation_2 = translate_sentence(model, english_sentence_2, src_vocab, tgt_vocab, inv_tgt_vocab, block_size)

print(f"English: {english_sentence_2}")
print(f"Dutch Translation: {dutch_translation_2}")

English: he is tired
Dutch Translation: hij is moe
