<a href="https://colab.research.google.com/github/whoami-Lory271/DL-project/blob/andrea/transformer_base.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from numpy import array
from sklearn.preprocessing import OneHotEncoder
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.modules.normalization import LayerNorm

In [None]:
def create_vocabulary(sentences):
  vocabulary = {}
  for s in sentences:
    tokens = s.split()
    for t in tokens:
      vocabulary[t] = 1
  return vocabulary

In [None]:
def create_one_hot_encoder(vocabulary):
  enc = OneHotEncoder(handle_unknown='ignore')
  enc.fit(array(list(vocabulary.keys())).reshape(-1,1))
  return enc

In [None]:
def one_hot_encoding(enc,sentences):
  X = [[token] for sentence in sentences for token in sentence.split()]
  X = torch.tensor(enc.transform(X).todense(),dtype=torch.float32)
  X = X.reshape((len(sentences),X.shape[0] // len(sentences),-1))

  pad = [1 if token=='<pad>' else 0 for sentence in sentences for token in sentence.split()]
  pad = torch.tensor(pad).reshape((X.shape[0],X.shape[1]))
  padding_mask = pad.repeat(1,1,X.shape[1]).reshape((X.shape[0],X.shape[1],X.shape[1]))
  padding_mask[pad.type(torch.bool)] = 1
  padding_mask = padding_mask.type(torch.bool)
  
  return X,padding_mask

In [None]:
def position_embedding(n_sentences,input_length,dmodel):
  res = []
  for j in range(n_sentences):
    emb = []
    for pos in range(input_length):
      l = []
      for i in range(dmodel):
        if i%2 == 0:
          l.append(math.sin(pos/10000**(2*i/dmodel)))
        else:
          l.append(math.cos(pos/10000**(2*i/dmodel)))
      emb.append(l)
    res.append(emb)
  return torch.tensor(res,dtype=torch.float32)


In [None]:
class SelfAttention(nn.Module):
  def __init__(self,dmodel,dk,dv):
    super().__init__()
    self.dk = dk
    self.Wq = nn.Linear(dmodel,dk)
    self.Wk = nn.Linear(dmodel,dk)
    self.Wv = nn.Linear(dmodel,dv)
    self.softmax = nn.Softmax(dim=2)
  
  def forward(self,x,padding_mask,mask=False,q=None,k=None,v=None,other_mask=None):
    if q == None:
      q = self.Wq(x)
    if k == None:
      k = self.Wk(x)
    if v == None:
      v = self.Wv(x)
    sc = torch.matmul(q,k.permute(0,2,1)) / math.sqrt(self.dk)

    if other_mask == None:
      sc[padding_mask] = float('-inf')
    else:
      qmod = q.clone()
      kmod = k.clone()
      qmod[padding_mask[:,0,:]] = 0
      kmod[other_mask[:,0,:]] = 0
      sc = torch.matmul(qmod,kmod.permute(0,2,1)) / math.sqrt(self.dk)
      sc[sc == 0] = float('-inf')

    if mask==True:
      for i in range(sc.shape[1]):
        sc[:,i,i+1:] = float('-inf')
    score = torch.matmul(torch.nan_to_num(self.softmax(sc)),v)
    return score, q

In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self,dmodel,dk,dv,nhead,dropout=0.1):
    super().__init__()
    self.nhead = nhead
    self.att_layers = nn.ModuleList([SelfAttention(dmodel,dk,dv) for i in range(nhead)])
    self.Wo = nn.Linear(dv * nhead, dmodel)
    self.drop = nn.Dropout(p=dropout)
  
  def forward(self,x,padding_mask,mask=False,q=None,k=None,v=None,other_mask=None):
    y = None
    q_res = None
    if q == None:
      y,q_res = self.att_layers[0](x,padding_mask,mask=mask)
    else:
      y,q_res = self.att_layers[0](x,padding_mask,mask=mask,q=q[:,:,0:dk],k=k,v=v,other_mask=other_mask)
    for i in range(1,self.nhead):
      if q == None:
        y1,q1 = self.att_layers[i](x,padding_mask,mask=mask)
        y = torch.cat([y,y1],dim=2)
        q_res = torch.cat([q_res,q1],dim=2)
      else:
        y1,q1 = self.att_layers[i](x,padding_mask,mask=mask,q=q[:,:,i*dk:(i+1)*dk],k=k,v=v,other_mask=other_mask)
        y = torch.cat([y,y1],dim=2)
    
    y = self.Wo(y)
    y = self.drop(y)
    return y, q_res

In [None]:
class FFN(nn.Module):
  def __init__(self,dmodel,df,dropout=0.1):
    super().__init__()
    self.W1 = nn.Linear(dmodel,df)
    self.W2 = nn.Linear(df,dmodel)
    self.drop = nn.Dropout(p=dropout)
  
  def forward(self,x):
    x = self.W1(x)
    x = F.relu(x)
    x = self.W2(x)
    x = self.drop(x)
    return x

In [None]:
class Encoder(nn.Module):
  def __init__(self,dmodel,dk,dv,df,nhead):
    super().__init__()
    self.mha = MultiHeadAttention(dmodel,dk,dv,nhead)
    self.norm1 = LayerNorm(dmodel)
    self.ffn = FFN(dmodel,df)
    self.norm2 = LayerNorm(dmodel)
  
  def forward(self,x,padding_mask):
    z,_ = self.mha(x,padding_mask)
    z = self.norm1(x+z)
    y = self.ffn(z)
    return self.norm2(z+y)


In [None]:
class Decoder(nn.Module):
  def __init__(self,dmodel,dk,dv,df,nhead):
    super().__init__()
    self.masked_mha = MultiHeadAttention(dmodel,dk,dv,nhead)
    self.norm1 = LayerNorm(dmodel)
    self.enc_dec_attention = MultiHeadAttention(dmodel,dk,dv,nhead)
    self.norm2 = LayerNorm(dmodel)
    self.ffn = FFN(dmodel,df)
    self.norm3 = LayerNorm(dmodel)
  
  def forward(self,x,q,k,v,padding_mask,other_mask):
    z1,q_res = self.masked_mha(x,padding_mask,mask=True)
    z1 = self.norm1(x+z1)
    z2,_ = self.enc_dec_attention(z1,padding_mask,mask=False,q=q_res,k=k,v=v,other_mask=other_mask)
    z2 = self.norm2(z1+z2)
    y = self.ffn(z2)
    return self.norm3(z2+y)


In [None]:
class Transformer(nn.Module):
  def __init__(self,in_voc_size,out_voc_size,dmodel,dk,dv,df,nhead,nlayers,input_size=1):
    super().__init__()
    self.nlayers = nlayers
    self.in_embedding = nn.Parameter(torch.randn(in_voc_size,dmodel))
    self.out_embedding = nn.Parameter(torch.randn(out_voc_size,dmodel))
    self.encoders = nn.ModuleList([Encoder(dmodel,dk,dv,df,nhead) for i in range(nlayers)])
    self.decoders = nn.ModuleList([Decoder(dmodel,dk,dv,df,nhead) for i in range(nlayers)])
    self.Wk = nn.Linear(dmodel,dk)
    self.Wv = nn.Linear(dmodel,dv)
    self.softmax = nn.Softmax(dim=2)
  
  def forward(self,x,z,in_padding_mask,out_padding_mask,encoding=True):
    if encoding:
      emb = (x @ self.in_embedding) * math.sqrt(dmodel)
      t = position_embedding(x.shape[0],x.shape[1],dmodel)
      x = emb + t
      for i in range(self.nlayers):
        x = self.encoders[i](x,in_padding_mask)
    
    Kenc = self.Wk(x)
    Venc = self.Wv(x)

    emb = (z @ self.out_embedding) * math.sqrt(dmodel)
    t = position_embedding(z.shape[0],z.shape[1],dmodel)
    z = emb + t
    for i in range(self.nlayers):
      z = self.decoders[i](z,None,Kenc,Venc,out_padding_mask,in_padding_mask)
    
    z = z @ self.out_embedding.T
    z = self.softmax(z)

    return x,z

In [None]:
in_sentences = ["<sos> dai ragazzi per una volta che ci andiamo non scegliamo il posto che fa pagare poco <eos>","<sos> altrimenti tanto vale andare a mensa <eos> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad>","<sos> importante è la compagnia <eos> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad>"]
in_vocabulary = create_vocabulary(in_sentences)
in_enc = create_one_hot_encoder(in_vocabulary)

In [None]:
out_sentences = ["<sos> come on guys for once let's not choose the place that charges little <eos>","<sos> otherwise we might as well go to the canteen <eos> <pad> <pad> <pad> <pad>","<sos> important is company <eos> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad>"]
out_vocabulary = create_vocabulary(out_sentences)
out_enc = create_one_hot_encoder(out_vocabulary)

In [None]:
in2_sentences = ["dai ragazzi per una volta che ci andiamo non scegliamo il posto che fa pagare poco <eos>","altrimenti tanto vale andare a mensa <eos> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad>","importante è la compagnia <eos> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad>"]
out2_sentences = ["<sos> come on guys for once let's not choose the place that charges little","<sos> otherwise we might as well go to the canteen <pad> <pad> <pad> <pad>","<sos> important is company <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad>"]


In [None]:
input,in_pad_mask = one_hot_encoding(in_enc,in2_sentences)
teacher,out_pad_mask = one_hot_encoding(out_enc,out2_sentences)

In [None]:
dmodel = 512
dk,dv = 64,64
nhead = 8
df = 2048
nlayers = 6
in_vocabulary_size = len(in_vocabulary.keys())
out_vocabulary_size = len(out_vocabulary.keys())

In [None]:
tran = Transformer(in_vocabulary_size,out_vocabulary_size,dmodel,dk,dv,df,nhead,nlayers)

In [None]:
x,output = tran(input,teacher,in_pad_mask,out_pad_mask)

In [None]:
out3_sentences = ["come on guys for once let's not choose the place that charges little <eos>","otherwise we might as well go to the canteen <eos> <pad> <pad> <pad> <pad>","important is company <eos> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad>"]
target,target_pad_mask = one_hot_encoding(out_enc,out3_sentences)

In [None]:
loss = nn.CrossEntropyLoss()
opt = torch.optim.Adam(tran.parameters(),lr=1e-04, betas=(0.9, 0.98), eps=1e-09)

tran.train()

for i in range(50):
    
  opt.zero_grad()
  x,output = tran(input,teacher,in_pad_mask,out_pad_mask)
  l = loss(output, target)
  l.backward()
  #nn.utils.clip_grad_norm_(tran.parameters(), 0.1)
  opt.step()