# Imports

In [1]:
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim

import matplotlib.pyplot as plt

# Model

## Sizes


h*dv = model_dim

- x : (batch_size, max_length) 

- tokens_id in x between 0 and vocab_size

- Embedd(x) : (batch_size, max_length, model_dim)

- K : (model_dim, dk)
- Kx : (batch_size, max_length, dk)

- Q : (model_dim, dk)
- Qx : (batch_size, max_length, dk)

- Qx*Kx^T : (batch_size, max_length, max_length)
- V : (model_dim, dv)
- Vx : (batch_size, max_length, dv)



For the mixed attention: (can have ≠ max_lengths for encoder and decoder)

- Kx : (batch_size, max_length_encoder, dk)

- Qx : (batch_size, max_length_decoder, dk)

- Qx*Kx^T : (batch_size, max_length_decoder, max_length_encoder)

- Vx : (batch_size, max_length_encoder, dv)

- SingleHeadAttention : (batch_size, max_length_decoder, dv)

- MultiHeadAttention : (batch_size, max_length_decoder, h*dv)





## Implementation

In [29]:
import numpy as np
import torch
import torch.nn as nn


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_length):
        super().__init__()
        self.d_model = d_model
        self.max_length = max_length
        self.compute()

    def SinPos(self, i: int, pos: int):
        if i % 2 == 0:
            return np.sin(pos / 10000 ** (2 * i / self.d_model))
        else:
            return np.cos(pos / 10000 ** (2 * i / self.d_model))

    def compute(self):
        self.Mat = torch.Tensor([[self.SinPos(i, pos) for i in range(self.d_model)] for pos in range(self.max_length)])


    def forward(self, x):
        return self.Mat[:x.shape[-1], :]


class Embedding(nn.Module):
    def __init__(self, d_model, max_length, n_embedding, dropout):
        super().__init__()
        self.max_length = max_length
        self.d_model = d_model
        self.n_embedding = n_embedding
        self.embedding = nn.Embedding(num_embeddings=n_embedding, embedding_dim=d_model)
        self.pos_encoding = PositionalEncoding(d_model=d_model, max_length=max_length)
        self.dropout = nn.Dropout(p=dropout)
        pass

    def forward(self, x):
        device = x.device

        x = self.embedding(x) + self.pos_encoding(x).to(device)
        return self.dropout(x)


class SingleHeadAttention(nn.Module):
    def __init__(self, dk: int, dv: int, d_model: int):
        super().__init__()
        self.dk = dk
        self.dv = dv
        self.d_model = d_model
        self.K = nn.Linear(in_features=d_model, out_features=dk)
        self.Q = nn.Linear(in_features=d_model, out_features=dk)
        self.V = nn.Linear(in_features=d_model, out_features=dv)

    def forward(self, x: torch.Tensor, x_encoder: torch.Tensor = None, mask=None):
        Kx = self.K(x_encoder) if x_encoder is not None else self.K(x)
        Vx = self.V(x_encoder) if x_encoder is not None else self.V(x)
        Qx = self.Q(x)
        QK = torch.matmul(Qx, Kx.transpose(-2, -1)) / np.sqrt(self.dk)
        if mask is not None:
            QK = QK + mask
        QK = torch.softmax(QK, dim=-1)
        return torch.matmul(QK, Vx)


class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads: int, dk: int, dv: int, d_model: int):
        super().__init__()
        assert num_heads * dv == d_model, "num_heads * dv should be equal to the model dim"
        self.attention_heads = nn.ModuleList([SingleHeadAttention(dk=dk, dv=dv, d_model=d_model) for _ in range(num_heads)])
        self.WO = nn.Linear(in_features=num_heads * dv, out_features=d_model)

    def forward(self, x: torch.Tensor, x_encoder: torch.Tensor = None, mask=None):
        outputs = [head(x, x_encoder, mask) for head in self.attention_heads]
        x = torch.cat(outputs, dim=-1)
        x = self.WO(x)
        return x
    

class SublayerConnection(nn.Module): #Dropout, Add and Norm
    def __init__(self, features, dropout, eps=1e-6):
        super().__init__()
        self.a_2 = nn.Parameter(torch.ones(features))
        self.b_2 = nn.Parameter(torch.zeros(features))
        self.dropout = nn.Dropout(p=dropout)

        self.eps = eps

    def forward(self, x, sublayer_output):
        x = x + self.dropout(sublayer_output) #dropout and add
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        x = self.a_2 * (x - mean) / (std + self.eps) + self.b_2 #norm
        return x



class EncoderBlock(nn.Module):
    def __init__(self, num_heads, dk, dv, d_ff, d_model, dropout):
        super().__init__()
        self.attention = MultiHeadAttention(num_heads=num_heads, dk=dk, dv=dv, d_model=d_model)
        self.sublayer1 = SublayerConnection(features=d_model, dropout=dropout)
        self.sublayer2 = SublayerConnection(features=d_model, dropout=dropout)
        self.ff = nn.Sequential(
            nn.Linear(in_features=d_model, out_features=d_ff),
            nn.ReLU(),
            nn.Dropout(p=dropout),
            nn.Linear(in_features=d_ff, out_features=d_model),
        )

    def forward(self, x, mask=None):
        attention = self.attention(x=x, mask=mask)
        x = self.sublayer1(x, attention)
        feedforward = self.ff(x)
        x = self.sublayer2(x, feedforward)
        return x


class Encoder(nn.Module):
    def __init__(self, num_heads, dk, dv, d_ff, d_model, dropout, num_encoders):
        super().__init__()
        self.encoders_list = [EncoderBlock(num_heads=num_heads, dk=dk, dv=dv, d_ff=d_ff, d_model=d_model, dropout=dropout) for _ in range(num_encoders)]
        self.encoders = nn.ModuleList(self.encoders_list)

    def forward(self, x, mask=None):
        for encoder in self.encoders_list:
            x = encoder(x, mask)
        return x
    

class DecoderBlock(nn.Module):
    def __init__(self, num_heads, dk, dv, d_ff, d_model, dropout):
        super().__init__()
        self.masked_attention = MultiHeadAttention(num_heads=num_heads, dk=dk, dv=dv, d_model=d_model)
        self.mixed_attention = MultiHeadAttention(num_heads=num_heads, dk=dk, dv=dv, d_model=d_model)
        self.sublayer1 = SublayerConnection(features=d_model, dropout=dropout)
        self.sublayer2 = SublayerConnection(features=d_model, dropout=dropout)
        self.sublayer3 = SublayerConnection(features=d_model, dropout=dropout)
        self.ff = nn.Sequential(
            nn.Linear(in_features=d_model, out_features=d_ff),
            nn.ReLU(),
            nn.Dropout(p=dropout),
            nn.Linear(in_features=d_ff, out_features=d_model),
        )

    def forward(self, x, x_encoder, causal_mask=None, mixed_mask=None):
        attention = self.masked_attention(x, mask=causal_mask)
        x = self.sublayer1(x, attention)
        attention = self.mixed_attention(x, x_encoder=x_encoder, mask=mixed_mask)
        x = self.sublayer2(x, attention)
        feedforward = self.ff(x)
        x = self.sublayer3(x, feedforward)
        return x

class Decoder(nn.Module):
    def __init__(self, num_heads, dk, dv, d_ff, d_model, dropout, num_decoders):
        super().__init__()
        decoders_list = [DecoderBlock(num_heads=num_heads, dk=dk, dv=dv, d_ff=d_ff, d_model=d_model, dropout=dropout) for _ in range(num_decoders)]
        self.decoders = nn.ModuleList(decoders_list)

    def forward(self, x, x_encoder, causal_mask=None, mixed_mask=None):
        for decoder in self.decoders:
            x = decoder(x, x_encoder, causal_mask, mixed_mask)
        return x


class Transformer(nn.Module):
    def __init__(self, d_model, max_length_encoder, vocab_size_encoder, max_length_decoder, vocab_size_decoder, num_out, num_heads, dv, dk, d_ff, dropout, num_encoders, num_decoders):
        super().__init__()
        self.embedding_encoder = Embedding(d_model, max_length_encoder, n_embedding=vocab_size_encoder, dropout=dropout)
        self.embedding_decoder = Embedding(d_model, max_length_decoder, n_embedding=vocab_size_decoder, dropout=dropout)
        self.encoder = Encoder(num_heads, dk, dv, d_ff, d_model, dropout, num_encoders)
        self.decoder = Decoder(num_heads, dk, dv, d_ff, d_model, dropout, num_decoders)
        self.linear = nn.Linear(in_features=d_model, out_features=num_out)
        self.ff_mask = torch.zeros(max_length_decoder, max_length_decoder) + torch.triu(torch.full((max_length_decoder, max_length_decoder), float(-1e9)), diagonal=1)
        self.max_length_encoder = max_length_encoder
        self.max_length_decoder = max_length_decoder

    def forward(self, input, output, padding_mask_encoder, padding_mask_decoder):
        device = input.device

        padding_mask_decoder[padding_mask_decoder == 1] = -1e9
        padding_mask_decoder[padding_mask_decoder == 0] = 0.0

        padding_mask_encoder[padding_mask_encoder == 1] = -1e9
        padding_mask_encoder[padding_mask_encoder == 0] = 0.0

        encoder_mask = padding_mask_encoder.unsqueeze(1).expand(-1, self.max_length_encoder, -1)

        decoder_mask = self.ff_mask.to(device) + padding_mask_decoder.unsqueeze(1).expand(-1, self.max_length_decoder, -1) #Causal mask
        mixed_mask = padding_mask_decoder.unsqueeze(1).expand(-1, self.max_length_encoder, -1).transpose(-1, -2) + padding_mask_encoder.unsqueeze(1).expand(-1, self.max_length_decoder,-1)
        # In reality we don't care about the padded rows of our attention matrix, at the end the loss won't take them into account and we won't update the weights during the backward pass.
        input_embed = self.embedding_encoder(input)
        output_embed = self.embedding_decoder(output)

        x_encoder = self.encoder(input_embed, mask=encoder_mask)
        x = self.decoder(output_embed, x_encoder, causal_mask=decoder_mask, mixed_mask=mixed_mask)
        x = self.linear(x)
        return x
    

    def generate(self, input, max_gen_length, start_token, end_token, padding_mask_encoder): #greed decoding
        self.eval()
        input_embed = self.embedding_encoder(input)
        x_encoder = self.encoder(input_embed, mask=padding_mask_encoder)

        generated_tokens = [start_token]
        generated_tokens_probas = [1]

        for _ in range(max_gen_length):
            output = torch.tensor(generated_tokens).unsqueeze(0) # size [1, sequence_length]
            out_embed = self.embedding_decoder(output)
            causal_mask = self.ff_mask[:out_embed.size(1), :out_embed.size(1)]
            x = self.decoder(out_embed, x_encoder, causal_mask=causal_mask)
            x = self.linear(x)
            probas = torch.softmax(x, dim=-1)
            max_proba, next_token = torch.max(probas[:, -1, :], dim=-1) #greedy decoding : only max_proba
            generated_tokens.append(next_token.item())
            generated_tokens_probas.append(max_proba.item())
            if next_token == end_token:
                break
            
        return generated_tokens, generated_tokens_probas

# Tests

In [58]:
#Parameters
batch_size = 2
d_model = 128
max_length_encoder = 100
max_length_decoder = 80
vocab_size_encoder = 10000
vocab_size_decoder = 25000
num_out = vocab_size_decoder
num_heads = 8
dv = 16
dk = 16
d_ff = 512
dropout = 0.1
num_encoders = 2
num_decoders = 2
dropout = 0.1

In [21]:
x = torch.randint(0, vocab_size_encoder, (batch_size, max_length_encoder))
print(x.shape)
MyEmbeddingEncoder = Embedding(d_model, max_length_encoder, n_embedding=vocab_size_encoder, dropout=dropout)
x= MyEmbeddingEncoder(x)
print(x.shape)

MyEncoder = Encoder(num_heads, dk, dv, d_ff, d_model, dropout, num_encoders)
x_encoder = MyEncoder(x)
print(x.shape)

y = torch.randint(0, vocab_size_decoder, (batch_size, max_length_decoder))
MyEmbeddingDecoder = Embedding(d_model, max_length_decoder, n_embedding=vocab_size_decoder, dropout=dropout)
y = MyEmbeddingDecoder(y)

MyDecoder = Decoder(num_heads, dk, dv, d_ff, d_model, dropout, num_decoders)
y = MyDecoder(y, x_encoder) 
print(y.shape)


torch.Size([2, 100])
torch.Size([2, 100, 128])
torch.Size([2, 100, 128])
torch.Size([2, 80, 128])


In [59]:
def init_transformer():
    model = Transformer(
        d_model=d_model,
        max_length_encoder=16,
        max_length_decoder=14,
        vocab_size_encoder=10,
        vocab_size_decoder=20,
        num_out=num_out,
        num_heads=num_heads,
        dv=dv,
        dk=dk,
        d_ff=d_ff,
        dropout=dropout,
        num_encoders=num_encoders,
        num_decoders=num_decoders,
    )
    return model



#######  Forward Test  #############
input = torch.tensor([[7, 9, 2, 2, 7, 5, 2, 3, 4, 5, 6, 4, 3, 0, 0, 0],
        [6, 8, 1, 2, 3, 4, 9, 8, 1, 2, 4, 4, 0, 0, 0, 0]])
output = torch.tensor([[14,  5, 11,  3,  3,  9,  4,  2, 12,  4,  5,  7, 0,  0],
        [ 5,  8, 10,  8,  6,  7, 11, 14, 10,  0,  0,  0,  0,  0]])
print(input)
print(output)
padding_mask_encoder = (input == 0)
padding_mask_encoder = padding_mask_encoder.to(torch.float32)





padding_mask_decoder = (output == 0)
padding_mask_decoder = padding_mask_decoder.to(torch.float32)




print(padding_mask_encoder.shape)
print(padding_mask_decoder.shape)


MyTransformer = init_transformer()
out = MyTransformer(input, output, padding_mask_encoder, padding_mask_decoder)

print(out)
# ####### Generation Test ######
# input_seq = torch.tensor([[0, 2, 3, 4, 5, 1]])
# input_seq_mask = torch.tensor([[0, 0, 0, 0, -torch.inf, -torch.inf]])
# start_token = 0 
# end_token = 1

# generated_seq, corresponding_probas = MyTransformer.generate(input_seq, max_gen_length=10, start_token=start_token, end_token=end_token, padding_mask_encoder=input_seq_mask)
# print(generated_seq)


tensor([[7, 9, 2, 2, 7, 5, 2, 3, 4, 5, 6, 4, 3, 0, 0, 0],
        [6, 8, 1, 2, 3, 4, 9, 8, 1, 2, 4, 4, 0, 0, 0, 0]])
tensor([[14,  5, 11,  3,  3,  9,  4,  2, 12,  4,  5,  7,  0,  0],
        [ 5,  8, 10,  8,  6,  7, 11, 14, 10,  0,  0,  0,  0,  0]])
torch.Size([2, 16])
torch.Size([2, 14])
tensor([[[ 0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,
           0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,
           0.0000e+00,  0.0000e+00,  0.0000e+00, -1.0000e+09, -1.0000e+09,
          -1.0000e+09],
         [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,
           0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,
           0.0000e+00,  0.0000e+00,  0.0000e+00, -1.0000e+09, -1.0000e+09,
          -1.0000e+09],
         [ 0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,
           0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,  0.0000e+00,
           0.0000e+00,  0.0000e+00,  0.0000e+00, -1.0000e+09, 

# Character Level Tokenizer

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np

class MyTokenizer:
    def __init__(self, text):
        self.bos_token = "<"
        self.eos_token = ">"
        self.pad_token = "_"
        self.chars = sorted(list(set(text)))
        self.chars = [self.eos_token, self.bos_token, self.pad_token] + self.chars
        self.char_to_idx = {char: idx for idx, char in enumerate(self.chars)}
        self.idx_to_char = {idx: char for idx, char in enumerate(self.chars)}
        self.bos_token_id = self.char_to_idx[self.bos_token]
        self.eos_token_id = self.char_to_idx[self.eos_token]
        self.pad_token_id = self.char_to_idx[self.pad_token]

    def encode(self, text):
        return [self.char_to_idx[char] for char in text]

    def decode(self, indices):
        return ''.join([self.idx_to_char[idx] for idx in indices if idx not in {self.bos_token_id, self.eos_token_id, self.pad_token_id}])


In [7]:
import torch
from torch.utils.data import Dataset

class TextDataset(Dataset):
    def __init__(self, text, tokenizer, max_length=100):
        self.text = text
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.data = self.preprocess(text)

    def preprocess(self, text):
        data = []
        for i in range(1, len(text)):
            input_seq = text[:i]
            target_seq = text[i:i + self.max_length - 1]
            input_seq = self.tokenizer.bos_token + input_seq
            target_seq = target_seq + self.tokenizer.eos_token
            input_idx = self.tokenizer.encode(input_seq)
            target_idx = self.tokenizer.encode(target_seq)
            # Pad sequences to the max_length
            input_idx += [self.tokenizer.pad_token_id] * (self.max_length - len(input_idx))
            target_idx += [self.tokenizer.pad_token_id] * (self.max_length - len(target_idx))
            data.append((input_idx, target_idx))
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_idx, target_idx = self.data[idx]
        return torch.tensor(input_idx), torch.tensor(target_idx)

# Example usage
text = ["hello world.", "this is a simple text dataset for training a transformer."]
max_length = 20
tokenizer = MyTokenizer(text)
dataset = TextDataset(text, tokenizer, max_length=max_length)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

# Print some examples
for input_seq, target_seq in dataloader:
    print("Input:", tokenizer.decode(input_seq[0].tolist()))
    print("Target:", tokenizer.decode(target_seq[0].tolist()))
    break

TypeError: can only concatenate str (not "list") to str

In [8]:

# Example usage
text = "hello world. this is a simple toy text dataset for training a transformer."
seq_length = 10
tokenizer = MyTokenizer(text)
dataset = TextDataset(text, 50, tokenizer)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)



TypeError: unsupported operand type(s) for +: 'int' and 'MyTokenizer'

In [22]:
print(tokenizer.pad_token_id)
dataset.__getitem__(20)

2


(tensor([ 1,  3,  5,  3, 18, 11, 13, 16, 12,  7,  3, 19, 15, 22,  3, 19,  7, 21,
         19,  3,  6,  5, 19,  5, 18,  7, 19,  3,  8, 15, 17,  3, 19, 17,  5, 11,
         14, 11, 14,  9,  3,  5,  3, 19, 17,  5, 14, 18,  8, 15, 17]),
 tensor([ 5,  3, 18, 11, 13, 16, 12,  7,  3, 19, 15, 22,  3, 19,  7, 21, 19,  3,
          6,  5, 19,  5, 18,  7, 19,  3,  8, 15, 17,  3, 19, 17,  5, 11, 14, 11,
         14,  9,  3,  5,  3, 19, 17,  5, 14, 18,  8, 15, 17, 13,  0]))

In [9]:

# Define the Transformer model (assuming the Transformer class is already defined)
d_model = 128
max_length = seq_length + 1  # Adjust for start token
vocab_size = len(tokenizer.chars)
num_out = vocab_size
num_heads = 8
dv = 16
dk = 16
d_ff = 512
dropout = 0.1
num_encoders = 2
num_decoders = 2
num_epochs = 10
learning_rate = 0.001

model = Transformer(d_model, 100, vocab_size, num_out, num_heads, dv, dk, d_ff, dropout, num_encoders, num_decoders)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for input_seq, target_seq in dataloader:
        optimizer.zero_grad()
        output = model(input_seq, input_seq)
        loss = criterion(output.view(-1, vocab_size), target_seq.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(dataloader)}")

# Save the model
torch.save(model.state_dict(), "transformer_model.pth")


TypeError: Transformer.__init__() missing 2 required positional arguments: 'num_encoders' and 'num_decoders'

In [10]:

# Load the model
model.load_state_dict(torch.load("transformer_model.pth", weights_only=True))
model.eval()

# Generate text

input_seq = tokenizer.encode(tokenizer.bos_token + "hello")


generated_text = model.generate(torch.tensor(input_seq), max_gen_length=50, start_token=tokenizer.bos_token_id, end_token=tokenizer.eos_token_id)
print("Input:", tokenizer.decode(input_seq))
print("Generated Text:", tokenizer.decode(generated_text[0]))

NameError: name 'model' is not defined

In [11]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

class MyTokenizer:
    def __init__(self, text):
        self.chars = sorted(list(set(text)))
        self.char_to_idx = {char: idx for idx, char in enumerate(self.chars)}
        self.idx_to_char = {idx: char for idx, char in enumerate(self.chars)}

    def encode(self, text):
        return [self.char_to_idx[char] for char in text]

    def decode(self, indices):
        return ''.join([self.idx_to_char[idx] for idx in indices])

class TextDataset(Dataset):
    def __init__(self, text, seq_length, tokenizer):
        self.text = text
        self.seq_length = seq_length
        self.tokenizer = tokenizer
        self.data = self.preprocess(text)

    def preprocess(self, text):
        data = []
        for i in range(0, len(text) - self.seq_length):
            input_seq = text[i:i + self.seq_length]
            target_seq = text[i + 1:i + self.seq_length + 1]
            input_idx = self.tokenizer.encode(input_seq)
            target_idx = self.tokenizer.encode(target_seq)
            data.append((input_idx, target_idx))
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_idx, target_idx = self.data[idx]
        return torch.tensor(input_idx), torch.tensor(target_idx)

# Example usage
text = "hello world. this is a simple text dataset for training a transformer."
seq_length = 10
tokenizer = MyTokenizer(text)
dataset = TextDataset(text, seq_length, tokenizer)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

# Print some examples
for input_seq, target_seq in dataloader:
    print("Input:", tokenizer.decode(input_seq[0].tolist()))
    print("Target:", tokenizer.decode(target_seq[0].tolist()))
    break

Input: set for tr
Target: et for tra


In [12]:
import torch.optim as optim
import torch.nn.functional as F

# Hyperparameters
d_model = 128
max_length = seq_length
vocab_size = len(tokenizer.chars)
num_out = vocab_size
num_heads = 8
dv = 16
dk = 16
d_ff = 512
dropout = 0.1
num_encoders = 2
num_decoders = 2
num_epochs = 20
learning_rate = 0.001

# Initialize model, loss function, and optimizer
model = Transformer(d_model, max_length, vocab_size, num_out, num_heads, dv, dk, d_ff, dropout, num_encoders, num_decoders)
# for p in model.parameters():
#     if p.dim() > 1:
#         nn.init.xavier_uniform_(p)
        
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for input_seq, target_seq in dataloader:
        optimizer.zero_grad()
        output = model(input_seq, input_seq)
        loss = criterion(output.view(-1, vocab_size), target_seq.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(dataloader)}")

# Save the model
torch.save(model.state_dict(), "transformer_model.pth")

TypeError: Transformer.__init__() missing 2 required positional arguments: 'num_encoders' and 'num_decoders'

In [13]:
# Load the model
model.load_state_dict(torch.load("transformer_model.pth"))
model.eval()

# Generate text


start_text = "hello"
input_seq = tokenizer.encode("hello")
print(input_seq)


# generated_text = model.generate(input, start_token=0, 50, dataset)
# print("Generated Text:", generated_text)

NameError: name 'model' is not defined

## Tokenizer

In [14]:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace

tokenizer = Tokenizer(BPE())

tokenizer.pre_tokenizer = Whitespace()
special_tokens = ["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"]
trainer = BpeTrainer(special_tokens=special_tokens)

tokenizer.train(["data.txt"], trainer)
tokenizer.save("bpe_tokenizer.json")

tokenizer = Tokenizer.from_file("bpe_tokenizer.json")

with open('/users/eleves-b/2021/valentin.dorseuil/Desktop/ScratchML/transformers/data.txt', 'r') as file:
    lines = file.readlines()


chunk_size = 10
overlap_size = 5
chunks = [''.join(lines[i:i+chunk_size]) for i in range(0, len(lines) - chunk_size + 1, chunk_size - overlap_size)]

print(f"Vocabulary size: {tokenizer.get_vocab_size()}")

encoded_text = tokenizer.encode_batch(chunks)
for token in special_tokens:
    token_id = tokenizer.token_to_id(token)
    print(f"Token: {token}, ID: {token_id}")

print(f"Number of chunks: {len(encoded_text)}")
print(f"Average chunk length (num tokens): {sum([len(chunk) for chunk in encoded_text])/len(encoded_text):.2f}")
print()

Exception: No such file or directory (os error 2)

## Data

In [15]:
import random

random.seed(123)
random.shuffle(chunks)

train_size = int(0.8 * len(chunks))
val_size = int(0.1 * len(chunks))
test_size = len(chunks) - train_size - val_size

train_chunks = chunks[:train_size]
val_chunks = chunks[train_size:train_size + val_size]
test_chunks = chunks[train_size + val_size:]

NameError: name 'chunks' is not defined

## Test

In [10]:


# Hyperparameters
batch_size = 16
model_dim = 512
max_length = 100
vocab_size = 32000
num_out = 4  # Four classes for AG News
num_heads = 8
dv = 64
dk = 64
d_ff = 2048
dropout = 0.1
num_encoders = 6
num_decoders = 6
num_epochs = 10
learning_rate = 0.001



# Prepare the AG News dataset
def prepare_data():
    TEXT = Field(tokenize='spacy', tokenizer_language='en_core_web_sm', include_lengths=True)
    LABEL = Field(sequential=False, use_vocab=False)
    
    train_data, test_data = AG_NEWS.splits(TEXT, LABEL)
    TEXT.build_vocab(train_data, max_size=vocab_size)
    
    train_iterator, test_iterator = BucketIterator.splits(
        (train_data, test_data),
        batch_size=batch_size,
        sort_within_batch=True,
        device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    )
    
    return train_iterator, test_iterator, TEXT.vocab

# Training loop
def train(model, train_iterator, num_epochs, learning_rate):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0
        
        for batch in train_iterator:
            input_data, input_lengths = batch.text
            target_data = batch.label
            
            optimizer.zero_grad()
            output = model(input_data, target_data)
            
            loss = criterion(output, target_data)
            loss.backward()
            optimizer.step()
            
            epoch_loss += loss.item()
        
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss/len(train_iterator):.4f}")

# if __name__ == "__main__":
#     MyTransformer = init_transformer()
#     train_iterator, test_iterator, vocab = prepare_data()
#     train(MyTransformer, train_iterator, num_epochs, learning_rate)

OSError: /users/eleves-b/2021/valentin.dorseuil/.local/lib/python3.11/site-packages/torchtext/lib/libtorchtext.so: undefined symbol: _ZN5torch3jit21setUTF8DecodingIgnoreEb

# Translation

__For an English to French translation__

- Encoder Input : [tokenA, tokenB, tokenC] #English
- Target Sequence: [BOS, token1, token2, token3, EOS] #French
- Decoder Input: [BOS, token1, token2, token3] #French
- Labels: [token1, token2, token3, EOS] # French

In [5]:
import os
import urllib.request
import zipfile
import torch
from torch.utils.data import Dataset, DataLoader, random_split

# Download and extract the dataset
filename = "fra-eng.zip"

with zipfile.ZipFile(filename, 'r') as zip_ref:
    zip_ref.extractall(".")

# Read the dataset
with open("fra.txt", "r", encoding="utf-8") as file:
    lines = file.readlines()

# Prepare the dataset
pairs = [line.strip().split('\t') for line in lines]
english_sentences = [pair[0] for pair in pairs]
french_sentences = [pair[1] for pair in pairs]


In [6]:
# Tokenizer class
class Tokenizer:
    def __init__(self, sentences):
        self.bos_token = "<BOS>"
        self.eos_token = "<EOS>"
        self.pad_token = "<PAD>"
        self.special_tokens = [self.bos_token, self.eos_token, self.pad_token]
        self.build_vocab(sentences)

    def build_vocab(self, sentences):
        self.vocab = set()
        for sentence in sentences:
            self.vocab.update(sentence.split())
        self.vocab = sorted(list(self.vocab))
        self.vocab = self.special_tokens + self.vocab
        self.word_to_idx = {word: idx for idx, word in enumerate(self.vocab)}
        self.idx_to_word = {idx: word for idx, word in enumerate(self.vocab)}
        self.sos_token_id = self.word_to_idx[self.bos_token]
        self.eos_token_id = self.word_to_idx[self.eos_token]
        self.pad_token_id = self.word_to_idx[self.pad_token]

    def encode(self, sentence):
        return [self.word_to_idx[word] for word in sentence.split()]

    def decode(self, indices):
        return ' '.join([self.idx_to_word[idx] for idx in indices if idx not in {self.sos_token_id, self.eos_token_id, self.pad_token_id}])

# Create tokenizers for English and French
english_tokenizer = Tokenizer(english_sentences)
french_tokenizer = Tokenizer(french_sentences)


In [34]:
# Dataset class
class TranslationDataset(Dataset):
    """English to French translation Dataset

    Args:
        Dataset (class): The base pytorch class for datasets

    Comments:
        - Encoder Input : [tokenA, tokenB, tokenC] #English
        - Target Sequence: [BOS, token1, token2, token3, EOS] #French
        - Decoder Input: [BOS, token1, token2, token3] #French
        - Labels: [token1, token2, token3, EOS] # French
    """

    def __init__(self, english_sentences, french_sentences, english_tokenizer, french_tokenizer, max_length_french, max_length_english):
        self.english_sentences = english_sentences
        self.french_sentences = french_sentences
        self.english_tokenizer = english_tokenizer
        self.french_tokenizer = french_tokenizer
        self.max_length_french = max_length_french
        self.max_length_english = max_length_english
        self.data = self.preprocess()

    def preprocess(self): #
        data = []
        for eng, fra in zip(self.english_sentences, self.french_sentences):
            if len(eng.split()) > self.max_length_english or len(fra.split()) > self.max_length_french-1:
                continue #We remove the sequences that are too long (only 800 and 150 of each -> not a lot)
            
            eng_tokens = eng.split() 
            fra_tokens =[self.french_tokenizer.bos_token] + fra.split() + [self.french_tokenizer.eos_token]

            eng_idx = self.english_tokenizer.encode(' '.join(eng_tokens))
            fra_idx = self.french_tokenizer.encode(' '.join(fra_tokens))

            eng_idx += [self.english_tokenizer.pad_token_id] * (self.max_length_english - len(eng_idx))
            fra_idx += [self.french_tokenizer.pad_token_id] * (self.max_length_french - len(fra_idx) + 1) #+1 because then we will shift outputs to the right

            encoder_input = torch.tensor(eng_idx)
            decoder_input = torch.tensor(fra_idx[:-1])
            label = torch.tensor(fra_idx[1:])

            encoder_mask = (encoder_input == self.english_tokenizer.pad_token_id).to(torch.float32)


            decoder_mask = (decoder_input == self.french_tokenizer.pad_token_id).to(torch.float32)


            data.append({"encoder_input": encoder_input, "decoder_input":decoder_input, "label":label, "encoder_mask":encoder_mask, "decoder_mask":decoder_mask})
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        return item["encoder_input"], item["decoder_input"], item["label"], item["encoder_mask"], item["decoder_mask"]
    
max_length_english = 20
max_length_french = 25
dataset = TranslationDataset(english_sentences, french_sentences, english_tokenizer, french_tokenizer, max_length_french, max_length_english)


In [64]:
for k in range(dataset.__len__()):
    input, output, label, mask_encoder, mask_decoder = dataset.__getitem__(k)
    if len(input) > 20 or len(mask_encoder) > 20 or len(output) > 25 or len(label)>25 or len(mask_decoder)> 25:
        print("stop : ", k)
        break
    if k >= 817*16 and k <= 820*16:

        print(len(input), len(mask_encoder), len(output), len(label),len(mask_decoder))


20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25
20 20 25 25 25


In [65]:
# Split the dataset into train, validation, and test sets
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Create DataLoaders for each split
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [66]:
for batch in train_loader:
    encoder_input, decoder_input, label, encoder_mask, decoder_mask = batch
    print("Encoder Input:", encoder_input.shape)
    print("Decoder Input:", decoder_input.shape)
    print("Label:", label.shape)
    print("Encoder Mask:", encoder_mask.shape)
    print("Decoder Mask:", decoder_mask.shape)
    print("Decoced input:", english_tokenizer.decode(encoder_input[0].tolist()))
    print("Decoced input:", french_tokenizer.decode(label[0].tolist()))
    break

Encoder Input: torch.Size([32, 20])
Decoder Input: torch.Size([32, 25])
Label: torch.Size([32, 25])
Encoder Mask: torch.Size([32, 20])
Decoder Mask: torch.Size([32, 25])
Decoced input: I'm the strongest.
Decoced input: Je suis le plus fort.


In [67]:


#Parameters
batch_size = 32
d_model = 128
max_length_encoder = max_length_english
max_length_decoder = max_length_french
vocab_size_encoder = len(english_tokenizer.vocab)
vocab_size_decoder = len(french_tokenizer.vocab)
num_out = vocab_size_decoder
num_heads = 8
dv = 16
dk = 16
d_ff = 512
dropout = 0.1
num_encoders = 4
num_decoders = 4
dropout = 0.1

learning_rate = 0.001
num_epochs = 10

# Initialize the model, loss function, and optimizer

def init_transformer():
    model = Transformer(
        d_model=d_model,
        max_length_encoder=max_length_encoder,
        max_length_decoder=max_length_decoder,
        vocab_size_encoder=vocab_size_encoder,
        vocab_size_decoder=vocab_size_decoder,
        num_out=num_out,
        num_heads=num_heads,
        dv=dv,
        dk=dk,
        d_ff=d_ff,
        dropout=dropout,
        num_encoders=num_encoders,
        num_decoders=num_decoders,
    )
    return model
model = init_transformer()
criterion = nn.CrossEntropyLoss(ignore_index=english_tokenizer.pad_token_id)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
 

In [68]:
import torch
import torch.nn as nn
import torch.optim as optim

# Assuming the model, criterion, optimizer, train_loader, and val_loader are already defined
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

def train(num_epochs):
    best_val_loss = float('inf')
    train_losses = []
    val_losses = []

    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        for batch_idx, batch in enumerate(train_loader):
            encoder_input, decoder_input, label, encoder_mask, decoder_mask = batch
            encoder_input, decoder_input, label, encoder_mask, decoder_mask = encoder_input.to(device), decoder_input.to(device), label.to(device), encoder_mask.to(device), decoder_mask.to(device)
            
            optimizer.zero_grad()
            output = model(encoder_input, decoder_input, encoder_mask, decoder_mask)
            loss = criterion(output.view(-1, vocab_size_decoder), label.view(-1))
            loss.backward()
            optimizer.step()
            
            total_train_loss += loss.item()
            
            if batch_idx %100 == 0: 
                print(f"Epoch [{epoch + 1}/{num_epochs}], Batch [{batch_idx + 1}/{len(train_loader)}], Loss: {loss.item():.4f}")
        
        avg_train_loss = total_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for batch in val_loader:
                encoder_input, decoder_input, label, encoder_mask, decoder_mask = batch
                encoder_input, decoder_input, label, encoder_mask, decoder_mask = encoder_input.to(device), decoder_input.to(device), label.to(device), encoder_mask.to(device), decoder_mask.to(device)
            
                output = model(encoder_input, decoder_input, encoder_mask, decoder_mask)
                loss = criterion(output.view(-1, vocab_size_decoder), label.view(-1))
                total_val_loss += loss.item()
        
        avg_val_loss = total_val_loss / len(val_loader)
        val_losses.append(avg_val_loss)
        
        print(f"Epoch [{epoch + 1}/{num_epochs}], Average Training Loss: {avg_train_loss:.4f}, Average Validation Loss: {avg_val_loss:.4f}")
        
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), "best_transformer_model.pth")
            print(f"Model saved with validation loss: {best_val_loss:.4f}")

    return train_losses, val_losses


In [69]:
num_params = sum(p.numel() for p in model.parameters())
print(f"Number of parameters in the Transformer model: {num_params}")

Number of parameters in the Transformer model: 19204991


In [70]:
num_epochs = 5
train_losses, val_losses = train(num_epochs)

Epoch [1/5], Batch [1/5814], Loss: 11.1658
Epoch [1/5], Batch [101/5814], Loss: 6.0203
Epoch [1/5], Batch [201/5814], Loss: 5.3267
Epoch [1/5], Batch [301/5814], Loss: 5.1262
Epoch [1/5], Batch [401/5814], Loss: 4.9666
Epoch [1/5], Batch [501/5814], Loss: 4.8190
Epoch [1/5], Batch [601/5814], Loss: 4.6695
Epoch [1/5], Batch [701/5814], Loss: 4.5371
Epoch [1/5], Batch [801/5814], Loss: 4.7708
Epoch [1/5], Batch [901/5814], Loss: 4.0614
Epoch [1/5], Batch [1001/5814], Loss: 4.0212
Epoch [1/5], Batch [1101/5814], Loss: 4.3762
Epoch [1/5], Batch [1201/5814], Loss: 4.3826
Epoch [1/5], Batch [1301/5814], Loss: 4.0596
Epoch [1/5], Batch [1401/5814], Loss: 4.1522
Epoch [1/5], Batch [1501/5814], Loss: 3.8611


KeyboardInterrupt: 