In [1]:
import os
from torch import optim, nn, utils, Tensor
import torch
import math
import bpemb
# from torchvision.datasets import MNIST
# from torchvision.transforms import ToTensor
# import lightning as L

In [227]:
# from torchtext.datasets import AG_NEWS

## Model

### Scaled Dot Product Attention

In [124]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self,d_model=512,d_k=64,masking=False):
        super().__init__()
        self.d_k = d_k
        self.d_model = d_model
        self.masking =masking
        self.W_Q = nn.Linear(in_features=d_model,out_features=d_k)
        self.W_K = nn.Linear(in_features=d_model,out_features=d_k)
        self.W_V = nn.Linear(in_features=d_model,out_features=d_k)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self,embedding):
        q = self.W_Q(embedding)
        k = self.W_K(embedding)
        v = self.W_V(embedding)

        if self.masking==False:
            attention=self.softmax(torch.matmul(q,k.T)/math.sqrt(self.d_k))
            z = torch.matmul(attention,v)
            return z
        else:
            mask = torch.triu(torch.ones(q.size(0),k.size(0)), diagonal=1)
            # Replace ones with -inf (disallowed positions)
            mask = mask.masked_fill(mask == 1, float('-inf'))
            scores = torch.matmul(q,k.T)/math.sqrt(self.d_k)
            scores = scores.masked_fill(mask == 1, float('-inf'))
            masked_attention=self.softmax(scores)
            z = torch.matmul(masked_attention,v)
            return z
            # mask = torch.triu(torch.ones(q.size(0),k.size(0)), diagonal=1)
            # # Replace ones with -inf (disallowed positions)
            # mask = mask.masked_fill(mask == 1, float('-inf'))
            # attention=self.softmax(torch.matmul(q,k.T)/math.sqrt(self.d_k))
            # masked_attention = torch.add(attention,mask)
            # z = torch.matmul(masked_attention,v)
            # return z
            
        

### MultiHeaded Attention

In [125]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model=512, d_k=64, num_heads=8, dropout=0.1, masking=False):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_k = d_k  ## assume d_v always equals d_k
        self.masking = masking

        self.scale_dotproduct_list= torch.nn.ModuleList()

        for i in range(self.num_heads):
            self.scale_dotproduct_list.append(ScaledDotProductAttention(d_model=self.d_model,d_k=self.d_k,masking=self.masking))
        
        self.linear_layer = nn.Linear(in_features=self.num_heads*self.d_k,out_features=d_model)

    
    def forward(self,embedding):
        z_list = []
        for head_idx in range(self.num_heads):
            z = self.scale_dotproduct_list[head_idx](embedding)
            z_list.append(z)
        
        aggregate_z_output = torch.cat(z_list,dim=1)
        final_z = self.linear_layer(aggregate_z_output)
        return final_z

### Feed Forward

In [126]:
class FeedForward(nn.Module):
    def __init__(self,d_model,d_ff):
        super().__init__()
        self.d_model = d_model
        self.d_ff = d_ff
        self.fc1 = nn.Linear(in_features=d_model,out_features=d_ff)
        self.gelu = nn.GELU()
        self.fc2 = nn.Linear(in_features=d_ff,out_features=d_model)
    
    def forward(self,embedding):
        f1 = self.fc1(embedding)
        f1_activation = self.gelu(f1)
        f2 = self.fc2(f1_activation)
        return f2

### Encoder Block

In [127]:
class EncoderBlock(nn.Module):
    def __init__(self,d_model=512,d_k = 64, num_heads=8, d_ff=2048, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_ff = d_ff
        self.d_k = d_k
        self.dropout = dropout
        self.multi_head_attention = MultiHeadAttention(d_model=d_model,d_k=d_k, num_heads=num_heads, dropout=dropout)
        self.add_and_norm = nn.LayerNorm(normalized_shape=self.d_model)
        self.feed_forward_layer = FeedForward(d_model, d_ff)

    def forward(self,embedding):
        z1 = self.add_and_norm(embedding + self.multi_head_attention(embedding))
        z2 =self.add_and_norm(z1 + self.feed_forward_layer(z1))
        return z2

### Encoder

In [128]:
class Encoder(nn.Module):
    def __init__(self,d_model=512, d_k=64, num_heads=8, d_ff=2048, dropout=0.1, num_encoder_blocks=6):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_k
        self.d_ff = d_ff
        self.dropout = dropout
        self.num_encoder_blocks = num_encoder_blocks
        self.encoder_block_list = torch.nn.ModuleList()
        for i in range(num_encoder_blocks):
            self.encoder_block_list.append(EncoderBlock(d_model=d_model, d_k=d_k, num_heads=num_heads, d_ff=d_ff, dropout=dropout))
        
        self.W_K = nn.Linear(in_features=d_model,out_features=self.d_k)
        self.W_V = nn.Linear(in_features=d_model,out_features=self.d_k)
    
    def forward(self,embedding):
        encoder_out = embedding
        for i in range(self.num_encoder_blocks):
            encoder_out = self.encoder_block_list[i](encoder_out)
        
        k = self.W_K(encoder_out)
        v = self.W_V(encoder_out)

        return encoder_out,k,v

### Scaled Dot Product Attention Decoder

In [129]:
class ScaledDotProductAttentionDecoder(nn.Module):
    def __init__(self,d_model=512,d_k=64,masking=False):
        super().__init__()
        self.d_model = d_model
        self.d_k = d_k
        self.masking = masking
        self.W_Q = nn.Linear(in_features=d_model,out_features=d_k)
        self.softmax = torch.nn.Softmax(dim=-1)
    
    def forward(self,embedding,k,v):
        q = self.W_Q(embedding)
        if self.masking==False:
            attention=self.softmax(torch.matmul(q,k.T)/math.sqrt(self.d_k))
            z = torch.matmul(attention,v)
            return z
        else:
            mask = torch.triu(torch.ones(q.size(0),k.size(0)), diagonal=1)
            # Replace ones with -inf (disallowed positions)
            mask = mask.masked_fill(mask == 1, float('-inf'))
            scores = torch.matmul(q,k.T)/math.sqrt(self.d_k)
            scores = scores.masked_fill(mask == 1, float('-inf'))
            masked_attention=self.softmax(scores)
            # masked_attention = torch.add(attention,mask)
            z = torch.matmul(masked_attention,v)

            # print("sank:",q.size(0)," ",k.size(0))
            # mask = torch.triu(torch.ones(q.size(0),k.size(0)), diagonal=1)
            # # Replace ones with -inf (disallowed positions)
            # mask = mask.masked_fill(mask == 1, float('-inf'))
            # attention=self.softmax(torch.matmul(q,k.T)/math.sqrt(self.d_k))
            # masked_attention = torch.add(attention,mask)
            # z = torch.matmul(masked_attention,v)
            return z


### Encoder decoder Attention

In [130]:
class EncoderDecoderAttention(nn.Module):
    def __init__(self,d_model=512, d_k=64, num_heads=8, dropout=0.1,masking=False):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_k
        self.dropout = dropout
        self.masking = masking
        
        self.scale_dotproduct_list= torch.nn.ModuleList()

        for i in range(self.num_heads):
            self.scale_dotproduct_list.append(ScaledDotProductAttentionDecoder(d_model=self.d_model,d_k=self.d_k,masking=self.masking))
        
        self.linear_layer = nn.Linear(in_features=self.num_heads*self.d_k,out_features=d_model)
    
    def forward(self, embedding,k,v):
        z_list = []
        for head_idx in range(self.num_heads):
            z = self.scale_dotproduct_list[head_idx](embedding,k,v)
            z_list.append(z)

        aggregate_z_output = torch.cat(z_list,dim=1)
        final_z = self.linear_layer(aggregate_z_output)
        return final_z

### Decoder Block

In [131]:
class DecoderBlock(nn.Module):
    def __init__(self,d_model=512,d_k=64, num_heads=8, d_ff=2048, dropout=0.1,masking=False):
        super(DecoderBlock, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_ff = d_ff
        self.dropout = dropout
        self.masking = masking
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_k
        self.d_ff = d_ff
        self.dropout = dropout
        
        self.multi_head_attention = MultiHeadAttention(d_model,d_k=d_k, num_heads=num_heads, dropout=dropout,masking=masking)
        self.encoder_decoder_attention = EncoderDecoderAttention(d_model=d_model,d_k=d_k,num_heads=num_heads,dropout=dropout,masking=masking)
        self.add_and_norm = nn.LayerNorm(normalized_shape=self.d_model)
        self.feed_forward_layer = FeedForward(d_model, d_ff)

    def forward(self,embedding,k,v):
        x = self.multi_head_attention(embedding)
        z1 = self.add_and_norm(embedding + x)
        z2 = self.add_and_norm(z1+self.encoder_decoder_attention(embedding,k,v))
        z2 =self.add_and_norm(z2 + self.feed_forward_layer(z2))
        return z2        
    

    # def forward(self,embedding,k,v):
    #     z1 = self.add_and_norm(embedding + self.multi_head_attention(embedding))
    #     print("sank:decoder block 1:",z1)
    #     z2 = self.add_and_norm(z1+self.encoder_decoder_attention(embedding,k,v))
    #     print("sank:decoder block 2:",z2)
    #     z2 =self.add_and_norm(z2 + self.feed_forward_layer(z2))
    #     print("sank:decoder block 3:",z2)
    #     return z2

### Decoder

In [132]:
class Decoder(nn.Module):
    def __init__(self,vocab_size,d_model=512, d_k=64, num_heads=8, d_ff=2048, dropout=0.1, num_decoder_blocks=6,masking=False):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_k
        self.d_ff = d_ff
        self.dropout = dropout
        self.num_decoder_blocks = num_decoder_blocks
        self.masking = masking
        self.decoder_block_list = torch.nn.ModuleList()
        for i in range(num_decoder_blocks):
            self.decoder_block_list.append(DecoderBlock(d_model=d_model,d_k=d_k, num_heads=num_heads, d_ff=d_ff, dropout=dropout,masking=masking))
        self.linear_layer = nn.Linear(in_features=d_model,out_features=vocab_size)
        
        self.W_K = nn.Linear(in_features=d_model,out_features=self.d_k)
        self.W_V = nn.Linear(in_features=d_model,out_features=self.d_k)
    
    def forward(self,embedding,k,v):
        decoder_out = embedding
        for i in range(self.num_decoder_blocks):
            decoder_out = self.decoder_block_list[i](decoder_out,k,v)
        
        decoder_out = self.linear_layer(decoder_out)

        return decoder_out

### Transformer

In [133]:
class Transformer(nn.Module):
    def __init__(self,vocab_size,d_model=512, d_k=64, num_heads=8, d_ff=2048, dropout=0.1, num_encoder_blocks=6,num_decoder_blocks=6,masking=False):
        super().__init__()
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_ff = d_ff
        self.num_encoder_blocks = num_encoder_blocks
        self.num_decoder_blocks = num_decoder_blocks
        self.encoder = Encoder(d_model=d_model,d_k=d_k, num_heads=8, d_ff=d_ff, dropout=dropout, num_encoder_blocks=num_encoder_blocks)
        self.decoder = Decoder(vocab_size,d_model=d_model, d_k=d_k, num_heads=num_heads, d_ff=d_ff, dropout=dropout, num_decoder_blocks=num_decoder_blocks,masking=masking)
    
    def forward(self,source_embedding,target_embedding):
        _,k,v = self.encoder(source_embedding)
        output = self.decoder(target_embedding,k,v)
        return output

### Positional Encoding

In [119]:
def positional_encode(embedding,d_model):
    seq_len = len(embedding)
    positional_encoding = torch.zeros(seq_len, d_model)
    positions = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)  # Shape: (seq_len, 1)
    # Frequency scaling factors
    div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
    # Apply sine to even indices and cosine to odd indices
    positional_encoding[:, 0::2] = torch.sin(positions * div_term)  # Even dimensions
    positional_encoding[:, 1::2] = torch.cos(positions * div_term)  # Odd dimensions

    return positional_encoding


## Transformer Dataset Model

In [134]:
class TransformerData(torch.utils.data.Dataset):
    def __init__(self,source_language_file,target_language_file,source_language='en',target_language='de',source_vocab_size=200000,target_vocab_size=200000):
        source_file = open(source_language_file,'r')
        target_file = open(target_language_file,'r')
        self.source_lines = source_file.readlines()
        self.target_lines = target_file.readlines()
        self.source_language = source_language
        self.target_language = target_language
        self.source_vocab_size = source_vocab_size
        self.target_vocab_size = target_vocab_size
        self.source_bpemb = bpemb.BPEmb(lang=source_language,vs=source_vocab_size)
        self.target_bpemb = bpemb.BPEmb(lang=target_language,vs=target_vocab_size)
        self.source_embed = torch.nn.Embedding.from_pretrained(torch.tensor(self.source_bpemb.vectors))
        self.target_embed = torch.nn.Embedding.from_pretrained(torch.tensor(self.target_bpemb.vectors))


    def __len__(self):
        return len(self.source_lines)
    
    def __getitem__(self,index):
        source = self.source_lines[index]
        target = self.target_lines[index]
        
        source_tokens = self.source_bpemb.encode_ids(source)
        source_tokens.insert(0,self.source_bpemb.BOS)
        source_tokens.append(self.source_bpemb.EOS)

        target_tokens = self.target_bpemb.encode_ids(target)
        target_tokens.insert(0,self.target_bpemb.BOS)
        target_tokens.append(self.target_bpemb.EOS)

        source_line_embedding = self.source_embed(torch.tensor(source_tokens))
        target_line_embedding = self.target_embed(torch.tensor(target_tokens))

        embedding_dim_source = int(source_line_embedding[0].shape[0])
        embedding_dim_target = int(target_line_embedding[0].shape[0])
        
        source_positional_encoding = positional_encode(source_line_embedding,embedding_dim_source)
        target_positional_encoding = positional_encode(target_line_embedding,embedding_dim_target)

        source_embedding_pe = torch.add(source_line_embedding, source_positional_encoding)
        target_embedding_pe = torch.add(target_line_embedding, target_positional_encoding)

        return source_embedding_pe,target_embedding_pe,target_tokens
    
    def return_bpemb_target_instance(self):
        return self.target_bpemb

    def return_embedding_layer_target(self):
        return self.target_embed

In [135]:

transformer_dataset = TransformerData(source_language_file="train_en.txt", target_language_file="train_de.txt", source_language="en", target_language="de", source_vocab_size=200000, target_vocab_size=200000)


In [136]:
d_model = transformer_dataset[0][0].shape[1]
vocab_size = 200000
num_heads = 8
d_ff = 2048
num_encoder_blocks = 6
num_decoder_blocks = 6
masking = True
d_k =64


## Transformer Train

In [169]:
transformer = Transformer(vocab_size,d_model=d_model,d_k=d_k, num_heads=8, d_ff=2048, dropout=0.1, 
                          num_encoder_blocks=num_encoder_blocks,num_decoder_blocks=num_decoder_blocks,masking=masking)

In [243]:
torch.save(transformer, "simple_model.pt")

In [170]:
transformer.train()

Transformer(
  (encoder): Encoder(
    (encoder_block_list): ModuleList(
      (0-5): 6 x EncoderBlock(
        (multi_head_attention): MultiHeadAttention(
          (scale_dotproduct_list): ModuleList(
            (0-7): 8 x ScaledDotProductAttention(
              (W_Q): Linear(in_features=100, out_features=64, bias=True)
              (W_K): Linear(in_features=100, out_features=64, bias=True)
              (W_V): Linear(in_features=100, out_features=64, bias=True)
              (softmax): Softmax(dim=-1)
            )
          )
          (linear_layer): Linear(in_features=512, out_features=100, bias=True)
        )
        (add_and_norm): LayerNorm((100,), eps=1e-05, elementwise_affine=True)
        (feed_forward_layer): FeedForward(
          (fc1): Linear(in_features=100, out_features=2048, bias=True)
          (gelu): GELU(approximate='none')
          (fc2): Linear(in_features=2048, out_features=100, bias=True)
        )
      )
    )
    (W_K): Linear(in_features=100, out_fea

In [171]:
loss_fn = torch.nn.CrossEntropyLoss()
beta_1 = 0.9
beta_2 = 0.98
epsilon = 10**(-9)
optimizer = torch.optim.Adam(transformer.parameters(), lr=3e-5, betas=(beta_1, beta_2), eps=epsilon)

In [172]:
bpemb_instance_target = transformer_dataset.return_bpemb_target_instance()

In [174]:
num_epochs = 50

for epoch in range(num_epochs):
    print("Epoch:",epoch)
    for sentence_index,(source_embedding,target_embedding,target_tokens) in enumerate(transformer_dataset):
        logits = transformer(source_embedding, target_embedding)

        true_token_probability_distributions = torch.zeros((int(logits.shape[0]), vocab_size))
        for current_token_probability_distribution_index in range(0, (len(logits) - 1)):
            true_next_token_index = target_tokens[current_token_probability_distribution_index + 1]
            true_token_probability_distributions[current_token_probability_distribution_index][true_next_token_index] = 1
        # the true next predicted token of the last token should be EOS; my last two token probability distributions will place the highest probability on EOS
        EOS_token_index_target_language = bpemb_instance_target.EOS
        true_token_probability_distributions[int(logits.shape[0]) - 1][EOS_token_index_target_language] = 1

        loss = loss_fn(logits, true_token_probability_distributions)

        # backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # diagnostic prints
        size = len(transformer_dataset)
        loss, current = loss.item(), sentence_index
        # print(f"sank:loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
    if epoch%10==0:
        evaluate_model(transformer, transformer_dataset, bpemb_instance_target, vocab_size, device="cpu")


Epoch: 0

=== Model Evaluation ===

Sentence 1:
Predicted: glitzer cement filmversion trockental offiziers uli at ## at ## at ## at , ,     . . mattei mattei zeller punks  mercantile fette   erstraße lill hynchus . kok sonderburg  eigentl    
Actual   :  iron cement ist eine gebrauchs ## at ## - ## at ## fertige paste , die mit einem spachtel oder den fingern als hohlkehle in die forme cken ( winkel ) der stahlguss - kok ille aufgetragen wird .  

Sentence 2:
Predicted: der der lenzburg schützt iron cement die verwurzelung ille  den sbuch ,  iven  .   
Actual   :  nach der aushärtung schützt iron cement die kok ille gegen den heissen , abras iven stahlguss .  

Sentence 3:
Predicted: feuer fester reparatur kitt für feuer sibirischen ,  ,  , etc    
Actual   :  feuer fester reparatur kitt für feuer ungsanlagen , öfen , offene feuerstellen etc .  

Sentence 4:
Predicted: der bau und scharnier reparatur der auto straßen ...   
Actual   :  der bau und die reparatur der auto straßen ...  



In [175]:
def evaluate_model(transformer, transformer_dataset, bpemb_instance_target, vocab_size, device="cpu"):
    """Evaluates the Transformer model by printing predictions and actual sentences."""
    
    print("\n=== Model Evaluation ===")
    transformer.eval() 
    
    with torch.no_grad():
        for sentence_index, (source_embedding, target_embedding, target_tokens) in enumerate(transformer_dataset):
            logits = transformer(source_embedding.to(device), target_embedding.to(device))
            
            # Convert logits to predicted token indices
            predicted_token_indices = torch.argmax(logits, dim=-1)  # Shape: (seq_len,)

            # Decode tokens to words
            predicted_sentence = " ".join(bpemb_instance_target.decode(predicted_token_indices.tolist()))
            actual_sentence = " ".join(bpemb_instance_target.decode(target_tokens))

            print(f"\nSentence {sentence_index + 1}:")
            print(f"Predicted: {predicted_sentence}")
            print(f"Actual   : {actual_sentence}")
    
    transformer.train()