<a href="https://colab.research.google.com/github/karam-koujan/Transformer/blob/main/transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [91]:
import torch
import torch.nn as nn

def get_device():
    return torch.device("cuda" if torch.cuda.is_available() else "cpu")

class Positional_embedding(nn.Module):
      def __init__(self,d_model,max_len=1000) :
            super(Positional_embedding,self).__init__()
            self.d_model = d_model
            self.max_len = max_len
            self.positional_embedding = torch.zeros((max_len,d_model))
            pos = torch.arange(0,max_len,dtype=torch.float).unsqueeze(1)
            div = torch.pow(1000,torch.arange(0,d_model,2,dtype=torch.float)/d_model)
            print(pos.size())
            print(div.size())
            self.positional_embedding[:,0::2] = torch.sin(pos/div)
            self.positional_embedding[:,1::2] = torch.cos(pos/div)
      def forward(self,x) :
            print(x.size())
            x = x +  self.positional_embedding
            return x
class Tokenizer(nn.Module) :
      def __init__(self,model_d,max_sequence_length,language_to_index,start_token,end_token,pad_token,dropout_p=0.1) :
            super(Tokenizer,self).__init__()
            self.vocab_size = len(language_to_index)
            self.embedding = nn.Embedding(self.vocab_size,model_d)
            self.max_sequence_length = max_sequence_length
            self.language_to_index = language_to_index
            self.start_token = start_token
            self.end_token = end_token
            self.pad_token = pad_token
            self.dropout = nn.Dropout(p=dropout_p)
            self.positional_embedding = Positional_embedding(model_d,max_sequence_length)
      def batch_tokenization(self,batch,start_token,end_token) :
              def sentence_tokenize(sentence,start_token,end_token) :
                      sentence_to_index = [  self.language_to_index[token]  for token in list(sentence)]
                      if start_token :
                            sentence_to_index.insert(0,self.language_to_index[self.start_token])
                      if end_token :
                            sentence_to_index.append(self.language_to_index[self.end_token])
                      for _ in range(len(sentence_to_index),self.max_sequence_length):
                                  sentence_to_index.append(self.language_to_index[self.pad_token])
                      return torch.tensor(sentence_to_index)

              sentence_batch = []
              for sentence_idx in range(len(batch)) :
                    sentence_batch.append(sentence_tokenize(batch[sentence_idx],start_token,end_token))

              sentence_batch = torch.stack(sentence_batch)
              return sentence_batch.to(get_device())
      def forward(self,x,start_token,end_token) :
            # (batch,vocab_size,embed_d)
            x = self.batch_tokenization(x,start_token,end_token)
            x = self.embedding(x)
            pos = self.positional_embedding(x)
            x = self.dropout(x+pos)
            return x





In [92]:
class LayerNorm(nn.Module):
      def __init__(self,epsilon=1e-6):
              super(LayerNorm,self).__init__()
              self.epsilon = epsilon

      def forward(self,x) :
              batch_size,seq_length,model_d = x.shape
              gamma = nn.Parameter(torch.ones(model_d))
              beta = nn.Parameter(torch.zeros(model_d))
              mean = x.mean(-1,keepdim=True)
              var = x.var(-1,keepdim=True)
              x_normalized = (x - mean) / torch.sqrt(var + self.epsilon)

              x = gamma * x_normalized + beta
              return x

In [93]:
attention_heads = 8
model_d = 512
qkv_d = model_d // attention_heads


In [94]:
class MultiHeadAttention(nn.Module) :
      def __init__(self,input_d,model_d,heads_num) :
                  super(MultiHeadAttention,self).__init__()
                  self.input_d = input_d
                  self.model_d = model_d
                  self.heads_num = heads_num
                  self.qkv_d = model_d // heads_num
                  self.queryP = nn.Linear(model_d,model_d)
                  self.valueP = nn.Linear(model_d,model_d)
                  self.keyP = nn.Linear(model_d,model_d)
                  self.out = nn.Linear(model_d,model_d)
      def attention(self,q,k,v,d,mask=None) :
             dk = torch.tensor(d,dtype=torch.float32)
             energy = torch.matmul(q,k.permute((0,1,3,2))) / torch.sqrt(dk)
             if mask is not None:
                        energy = energy.masked_fill(mask != 0, float("-1e9"))
             return torch.matmul(torch.softmax(energy,dim=-1),v)
      def forward(self,x,mask=None):
              """
               first we create key,query and value using a linear projection using a 1 fully connected layer
               The size of these tensors is (input_sequence_length,model_d)
              """
              query= None
              key = None
              value = None
              if isinstance(x,(list,tuple)) :
                   query,key,value = x
                   query = self.queryP(query)
                   key = self.keyP(key)
                   value = self.valueP(value)

              else :
                   print(x.shape)
                   query = self.queryP(x)
                   key = self.keyP(x)
                   value = self.valueP(x)

              """
               we add another dimension for heads now the tensors size is (heads_num,input_sequence,model_d)
               calculte attention for each head independently and in parallel
              """
              query = query.view(-1,self.heads_num,self.input_d,self.qkv_d)
              key = key.view(-1,self.heads_num,self.input_d,self.qkv_d)
              value = value.view(-1,self.heads_num,self.input_d,self.qkv_d)
              attention = self.attention(query,key,value,self.qkv_d,mask).permute((0,2,3,1)).contiguous().view(-1,self.input_d,self.model_d)
              out = self.out(attention)
              return out




In [95]:
import torch.nn.functional as F
class FeedForward(nn.Module) :
        def __init__(self,input_size,output_size,hidden_size,dropout_p=0.1) :
                  super(FeedForward,self).__init__()
                  self.fc1 =  nn.Linear(input_size,hidden_size)
                  self.fc2 = nn.Linear(hidden_size,output_size)
                  self.dropout = nn.Dropout(p=dropout_p)
        def forward(self,x) :
                  x = F.relu(self.fc1(x))
                  x = self.dropout(x)
                  x = self.fc2(x)
                  return x

In [96]:
class EncoderLayer(nn.Module) :
     def __init__(self,model_d,dropout_p,vocab_size=500,max_length=1000,hidden_size=2048,attention_heads=8) :
        super(EncoderLayer,self).__init__()
        self.model_d = model_d
        self.max_length = max_length
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.attention_heads = attention_heads
        self.dropout1 = nn.Dropout(p=dropout_p)
        self.dropout2 = nn.Dropout(p=dropout_p)
        self.multi_head_attention =  MultiHeadAttention(self.max_length,self.model_d,self.attention_heads)
        self.layernorm = LayerNorm()
        self.fc = FeedForward(self.model_d,self.model_d,self.hidden_size)
     def forward(self,x,mask=None) :
              res_x = x.clone()
              x = self.multi_head_attention(x,mask)
              x = self.dropout1(x)
              x = self.layernorm(x + res_x )
              res_x = x.clone()
              x = self.fc(x)
              x = self.dropout2(x)
              x =  self.layernorm(x + res_x )
              return x


In [97]:
class EncoderLayers(nn.Sequential) :

          def forward(self, x,mask=None):
                     for module in self._modules.values():
                          x = module(x,mask)
                     return x

In [98]:
class Encoder(nn.Module):
           def __init__(self,model_d,dropout_p,num_layers,language_to_index,start_token,end_token,pad_token,vocab_size=500,max_length=1000,hidden_size=2048,attention_heads=8) :
                super(Encoder,self).__init__()
                self.layers  = EncoderLayers(*[ EncoderLayer(model_d,dropout_p,vocab_size,max_length,hidden_size,attention_heads) for _ in range(num_layers)])
                self.tokenizer = Tokenizer(model_d,max_sequence_length,language_to_index,start_token,end_token,pad_token)
           def forward(self,x,mask,start_token,end_token) :
                   x = self.tokenizer(x,start_token,end_token)
                   x = self.layers(x,mask)
                   return x



In [99]:
class DecoderLayer(nn.Module) :
        def __init__(self,model_d,dropout_p,vocab_size=500,max_length=1000,hidden_size=2048,attention_heads=8) :
            super(DecoderLayer,self).__init__()
            self.model_d = model_d
            self.max_length = max_length
            self.hidden_size = hidden_size
            self.vocab_size = vocab_size
            self.attention_heads = attention_heads
            self.dropout1 = nn.Dropout(p=dropout_p)
            self.dropout2 = nn.Dropout(p=dropout_p)
            self.dropout3 = nn.Dropout(p=dropout_p)
            self.multi_head_attention =  MultiHeadAttention(self.max_length,self.model_d,self.attention_heads)
            self.layernorm = LayerNorm()
            self.fc = FeedForward(self.model_d,self.model_d,self.hidden_size)

        def forward(self,x,encoder_out,att_mask,pad_mask) :
                    res_x = x.clone()
                    x = self.multi_head_attention(x,att_mask)
                    x = self.dropout1(x)
                    x = self.layernorm(x + res_x )
                    res_x = x.clone()
                    x  = self.multi_head_attention((x,encoder_out,encoder_out),pad_mask)
                    x = self.dropout2(x)
                    x = self.layernorm(x + res_x )
                    res_x = x.clone()
                    x =  self.fc(x)
                    x = self.dropout3(x)
                    x = self.layernorm(x + res_x )
                    return x


In [100]:
class DecoderLayers(nn.Sequential) :

          def forward(self, x, encoder_out,att_mask,pad_mask):
                     for module in self._modules.values():
                          x = module(x,encoder_out,att_mask,pad_mask)
                     return x

In [101]:
class Decoder(nn.Module):
           def __init__(self,model_d,dropout_p,num_layers,language_to_index,start_token,end_token,pad_token,vocab_size=500,max_length=1000,hidden_size=2048,attention_heads=8) :
                super(Decoder,self).__init__()
                self.layers  = DecoderLayers(*[ DecoderLayer(model_d,dropout_p,vocab_size,max_length,hidden_size,attention_heads) for _ in range(num_layers)])
                self.tokenizer = Tokenizer(model_d,max_sequence_length,language_to_index,start_token,end_token,pad_token)

           def forward(self,x,encoder_out,att_mask,pad_mask,start_token,end_token) :
                   x = self.tokenizer(x,start_token,end_token)
                   x = self.layers(x,encoder_out,att_mask,pad_mask)
                   return  x



In [102]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 5

x = torch.randn( (batch_size, max_sequence_length, d_model) ) # English sentence positional encoded
y = torch.randn( (batch_size, max_sequence_length, d_model) ) # Kannada sentence positional encoded
mask = torch.full([max_sequence_length, max_sequence_length] , float('-inf'))


In [103]:
mask_d = (max_sequence_length,max_sequence_length)
mask = torch.triu(torch.ones(mask_d),diagonal=1)
print(mask)

tensor([[0., 1., 1.,  ..., 1., 1., 1.],
        [0., 0., 1.,  ..., 1., 1., 1.],
        [0., 0., 0.,  ..., 1., 1., 1.],
        ...,
        [0., 0., 0.,  ..., 0., 1., 1.],
        [0., 0., 0.,  ..., 0., 0., 1.],
        [0., 0., 0.,  ..., 0., 0., 0.]])


In [104]:
class Transformer(nn.Module):
    def __init__(self,model_d,dropout_p,
                num_layers,
                english_to_index,
                german_to_index,
                start_token,
                end_token,
                pad_token
                ,vocab_size=500,max_length=1000,hidden_size=2048,attention_heads=8
                ):
        super().__init__()
        self.encoder = Encoder(model_d,dropout_p,num_layers,english_to_index,start_token,end_token,pad_token)
        self.decoder = Decoder(model_d,dropout_p,num_layers,german_to_index,start_token,end_token,pad_token)
        self.linear = nn.Linear(d_model, vocab_size)
        self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    def forward(self,
                x,
                y,
                encoder_pad_mask=None,
                decoder_att_mask=None,
                decoder_pad_mask=None,
                enc_start_token=False,
                enc_end_token=False,
                dec_start_token=False, # We should make this true
                dec_end_token=False): # x, y are batch of sentences
        x = self.encoder(x,encoder_pad_mask ,start_token=enc_start_token, end_token=enc_end_token)
        out = self.decoder(y,x,decoder_att_mask,decoder_pad_mask, start_token=dec_start_token, end_token=dec_end_token)
        out = self.linear(out)
        return out

In [105]:
import numpy as np


file_path = "./deu.txt"
start_token = ''
end_token = ''
pad_token = ''
english_vocabulary = [start_token, ' ', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/',
                        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
                      ':', '<', '=', '>', '?', '@','[', '\\', ']', '^', '_', '`',
                        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
                        'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
                        'y', 'z',
                        '{', '|', '}', '~', pad_token, end_token]

german_vocabulary = german_vocabulary = [start_token, ' ', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/',
                      '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
                      ':', '<', '=', '>', '?', '@', '[', '\\', ']', '^', '_', '`',
                      'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
                      'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
                      'y', 'z', 'ä', 'ö', 'ü', 'ß',
                      '{', '|', '}', '~', pad_token, end_token]


In [106]:
index_to_german = {k : v  for k,v in enumerate(german_vocabulary)}
german_to_index = {v : k  for k,v in enumerate(german_vocabulary)}
index_to_english = {k : v  for k,v in enumerate(english_vocabulary)}
english_to_index = {v : k  for k,v in enumerate(english_vocabulary)}

In [107]:

with open(file_path, 'r') as file:
    raw_data = file.readlines()

In [108]:
sentences =  [ (sentence.rstrip("\n").split("\t")[0].lower(),sentence.rstrip("\n").split("\t")[1].lower()) for sentence in raw_data]
sentences

[('go.', 'geh.'),
 ('hi.', 'hallo!'),
 ('hi.', 'grüß gott!'),
 ('run!', 'lauf!'),
 ('run.', 'lauf!'),
 ('wow!', 'potzdonner!'),
 ('wow!', 'donnerwetter!'),
 ('fire!', 'feuer!'),
 ('help!', 'hilfe!'),
 ('help!', 'zu hülf!'),
 ('stop!', 'stopp!'),
 ('wait!', 'warte!'),
 ('wait.', 'warte.'),
 ('begin.', 'fang an.'),
 ('go on.', 'mach weiter.'),
 ('hello!', 'hallo!'),
 ('hurry!', 'beeil dich!'),
 ('hurry!', 'schnell!'),
 ('i hid.', 'ich versteckte mich.'),
 ('i hid.', 'ich habe mich versteckt.'),
 ('i ran.', 'ich rannte.'),
 ('i see.', 'ich verstehe.'),
 ('i see.', 'aha.'),
 ('i try.', 'ich probiere es.'),
 ('i won!', 'ich hab gewonnen!'),
 ('i won!', 'ich habe gewonnen!'),
 ('relax.', 'entspann dich.'),
 ('shoot!', 'feuer!'),
 ('shoot!', 'schieß!'),
 ('smile.', 'lächeln!'),
 ('ask me.', 'frag mich!'),
 ('ask me.', 'fragt mich!'),
 ('ask me.', 'fragen sie mich!'),
 ('attack!', 'angriff!'),
 ('attack!', 'attacke!'),
 ('cheers!', 'zum wohl!'),
 ('eat it.', 'iss es.'),
 ('eat up.', 'iss auf.'

In [109]:
import numpy as np
PERCENTILE = 97
print( f"{PERCENTILE}th percentile length English: {np.percentile([len(x[0]) for x in sentences], PERCENTILE)}" )
print( f"{PERCENTILE}th percentile length German: {np.percentile([len(x[1]) for x in sentences], PERCENTILE)}" )



97th percentile length English: 61.0
97th percentile length German: 73.0


In [110]:
max_sequence_length = 80
def is_token_exist(sentence,vocab):
     for token in sentence :
           if token not in vocab :
                 return False
     return True

def  is_valid_length(sentence,max_sequence_length) :
           return len(sentence) < max_sequence_length - 1

is_token_exist('sie geht zu fuß.',german_vocabulary)


True

In [111]:
valid_sentence_indicies = []
for index in range(len(sentences)):
    german_sentence, english_sentence = sentences[index][1], sentences[index][0]
    if is_valid_length(german_sentence, max_sequence_length) \
      and is_valid_length(english_sentence, max_sequence_length) \
      and is_token_exist(german_sentence, german_vocabulary):
        valid_sentence_indicies.append(index)

print(f"Number of sentences: {len(sentences)}")
print(f"Number of valid sentences: {len(valid_sentence_indicies)}")

Number of sentences: 221533
Number of valid sentences: 215416


In [112]:
import torch

d_model = 512
batch_size = 30
ffn_hidden = 2048
num_heads = 8
drop_prob = 0.1
num_layers = 1
max_sequence_length = 200
german_vocab_size = len(german_vocabulary)
transformer = Transformer(d_model,drop_prob,num_layers,english_to_index,german_to_index,start_token,end_token,pad_token,vocab_size=german_vocab_size)
transformer

torch.Size([200, 1])
torch.Size([256])
torch.Size([200, 1])
torch.Size([256])


Transformer(
  (encoder): Encoder(
    (layers): EncoderLayers(
      (0): EncoderLayer(
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
        (multi_head_attention): MultiHeadAttention(
          (queryP): Linear(in_features=512, out_features=512, bias=True)
          (valueP): Linear(in_features=512, out_features=512, bias=True)
          (keyP): Linear(in_features=512, out_features=512, bias=True)
          (out): Linear(in_features=512, out_features=512, bias=True)
        )
        (layernorm): LayerNorm()
        (fc): FeedForward(
          (fc1): Linear(in_features=512, out_features=2048, bias=True)
          (fc2): Linear(in_features=2048, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
    (tokenizer): Tokenizer(
      (embedding): Embedding(69, 512)
      (dropout): Dropout(p=0.1, inplace=False)
      (positional_embedding): Positional_embedding()
    )
  )
  (decode