<a href="https://colab.research.google.com/github/kopreusz/Actuarial-computations/blob/main/Encoder_Decoder_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

***TODOS***


1.   Dropout: Done for now, we still have to apply it in the case of embeddings,and positional encodings. We also have not checked it in the decoder yet.
2.   Batches: We may have to get familiar with the DataLoader first.
3.  Double check the linear transformations we use in the cross attention. We need one extra, for the encoders output, and also one for the output of tha masked attention.




In [1]:
import torch
import torch.nn as nn
import math

In [2]:
class EncodersSelfAttention(nn.Module):
  def __init__(self, d_model,number_of_heads, number_of_tokens):
    super(EncodersSelfAttention,self).__init__()

    self.d_model = d_model
    self.number_of_heads = number_of_heads
    self.number_of_tokens = number_of_tokens


    self.get_QKV = nn.Linear(self.d_model,self.d_model,bias = False)
    self.get_multihead = nn.Linear(self.d_model,self.d_model//self.number_of_heads,bias = False)
    self.norm = nn.Softmax(dim=0)
    self.Dropout = nn.Dropout(0.1)                                                                              # We could define aparameter for the rate of dropout, for now I use the constant mentioned in the article.

  def forward (self,x):
    Q = self.get_QKV(x)                                                                                         # They have to be exactly the same linear transformations in one attention head
    K = self.get_QKV(x)
    V = self.get_QKV(x)

    QW = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)             #integer division, 3D tensor.
    KW = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)
    VW = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)

    for i in range(self.number_of_heads):
        QW[i,:] = self.get_multihead(Q)                                                                         # Applyin number of heads transformations and aligning the results by the first coordinate.
        KW[i,:] = self.get_multihead(K)
        VW[i,:] = self.get_multihead(V)

    attention = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)      #initializing an attention matrix

    for i in range(self.number_of_heads):
        attention[i] = torch.einsum('ik,kj->ij',torch.einsum('ik,jk->ij',QW[i],KW[i]),KW[i])                    # This is the formula for the attention score

    concat_output = attention[0]

    for i in range(1,self.number_of_heads):
      concat_output = torch.cat((concat_output,attention[i]),1)


    return self.norm(x + self.Dropout(concat_output))                                                           # We apply residual connections here, I hope it won't mess up our gradient calculation.

In [3]:
class FeedForwardBlock(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()

        self.norm = nn.Softmax(dim = 0)
        self.Dropout = nn.Dropout(0.1)

        self.block = nn.Sequential( )
        self.block.add_module(f'linear_1',  nn.Linear(d_model, d_ff))
        self.block.add_module(f'ReLU', nn.ReLU())
        self.block.add_module(f'linear_2', nn.Linear(d_ff, d_model))


    def forward(self, x):

      out = self.norm(self.Dropout(self.block(x)) + x)

      return out

In [4]:
class Encoder(nn.Module):
  def __init__(self, number_of_layers: int):
    super().__init__()

    self.number_of_layers = number_of_layers
    self.EncoderBlock = nn.Sequential()

    for i in range (self.number_of_layers):
      self.EncoderBlock.add_module(f'attention{i}',EncodersSelfAttention(10,2,10))
      self.EncoderBlock.add_module(f'ffnn{i}',FeedForwardBlock(10,100))

  def forward (self,x):

    out = self.EncoderBlock(x)

    return out


In [5]:
x = torch.randn(10,10)
model = Encoder(4)

print(model)

#for i in model.parameters():
#  print(i)
#training->loss and optimizer:

target = x
learning_rate = 0.001
num_epoch = 4

loss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr = learning_rate)

#training loop:

torch.autograd.set_detect_anomaly(True)

for epoch in range(num_epoch):

  #forward pass

  out = model(x)
  lossval = loss(out,target)

  #backward pass

  optimizer.zero_grad()
  print(f'epoch: {epoch}')
  lossval.backward(retain_graph=True)
  optimizer.step()


Encoder(
  (EncoderBlock): Sequential(
    (attention0): EncodersSelfAttention(
      (get_QKV): Linear(in_features=10, out_features=10, bias=False)
      (get_multihead): Linear(in_features=10, out_features=5, bias=False)
      (norm): Softmax(dim=0)
      (Dropout): Dropout(p=0.1, inplace=False)
    )
    (ffnn0): FeedForwardBlock(
      (norm): Softmax(dim=0)
      (Dropout): Dropout(p=0.1, inplace=False)
      (block): Sequential(
        (linear_1): Linear(in_features=10, out_features=100, bias=True)
        (ReLU): ReLU()
        (linear_2): Linear(in_features=100, out_features=10, bias=True)
      )
    )
    (attention1): EncodersSelfAttention(
      (get_QKV): Linear(in_features=10, out_features=10, bias=False)
      (get_multihead): Linear(in_features=10, out_features=5, bias=False)
      (norm): Softmax(dim=0)
      (Dropout): Dropout(p=0.1, inplace=False)
    )
    (ffnn1): FeedForwardBlock(
      (norm): Softmax(dim=0)
      (Dropout): Dropout(p=0.1, inplace=False)
      (

# Decoder

After we've finished the Encoderblock we concentrate on the decoder. First we introduce the two new versions of Attention, after that we follow the same way as we did in the case of the Encoder.

In [6]:
class DecodersMaskedAttention(nn.Module):                                                                     # This is exactly the same as the EncodersSelfAttention's code. Tha only difference is that we apply masking.
  def __init__(self, d_model,number_of_heads, number_of_tokens):
    super(DecodersMaskedAttention,self).__init__()

    self.d_model = d_model
    self.number_of_heads = number_of_heads
    self.number_of_tokens = number_of_tokens


    self.get_QKV = nn.Linear(self.d_model,self.d_model,bias = False)
    self.get_multihead = nn.Linear(self.d_model,self.d_model//self.number_of_heads,bias = False)
    self.norm = nn.Softmax(dim=0)
    self.Dropout = nn.Dropout(0.1)                                                                              # We could define a parameter for the rate of dropout, for now I use the constant mentioned in the article.

  def forward (self,x):
    Q = self.get_QKV(x)                                                                                         # They have to be exactly the same linear transformations in one attention head
    K = self.get_QKV(x)
    V = self.get_QKV(x)

    QW = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)             #integer division, 3D tensor.
    KW = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)
    VW = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)

    for i in range(self.number_of_heads):
        QW[i,:] = self.get_multihead(Q)                                                                         # Applyin number of heads transformations and aligning the results by the first coordinate.
        KW[i,:] = self.get_multihead(K)
        VW[i,:] = self.get_multihead(V)

    attention = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)      #initializing an attention matrix

    mask = torch.einsum('ji->ij',torch.triu(torch.ones(self.number_of_tokens,self.number_of_tokens)))                               # Transposing an upper triangular matrix.

    for i in range(self.number_of_heads):
        attention[i] = torch.einsum('ik,kj->ij',torch.einsum('ik,jk->ij',QW[i],KW[i]).masked_fill(mask == 0,float("0.00000000000001")),VW[i])               # -infty!!!


    concat_output = attention[0]

    for i in range(1,self.number_of_heads):
      concat_output = torch.cat((concat_output,attention[i]),1)


    return self.norm(x + self.Dropout(concat_output))                                                           # We apply residual connections here, I hope it won't mess up our gradient calculation.


In [7]:
print(DecodersMaskedAttention(10,2,10))

DecodersMaskedAttention(
  (get_QKV): Linear(in_features=10, out_features=10, bias=False)
  (get_multihead): Linear(in_features=10, out_features=5, bias=False)
  (norm): Softmax(dim=0)
  (Dropout): Dropout(p=0.1, inplace=False)
)


In [8]:
class CrossAttention(nn.Module):
  def __init__(self, EncodersOut, d_model,number_of_heads, number_of_tokens) -> None:
    super(CrossAttention,self).__init__()

    self.EncodersOut = EncodersOut
    self.d_model = d_model
    self.number_of_heads = number_of_heads
    self.number_of_tokens = number_of_tokens

    self.get_multihead = nn.Linear(self.d_model,self.d_model//self.number_of_heads,bias = False)
    self.norm = nn.Softmax(dim=0)
    self.Dropout = nn.Dropout(0.1)

  def forward (self,Q):                                                                         # encoders's output should be an input here and

    QW = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)
    KW = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)
    VW = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)

    for i in range(self.number_of_heads):
        QW[i,:] = self.get_multihead(Q)
        KW[i,:] = self.get_multihead(self.EncodersOut)
        VW[i,:] = self.get_multihead(self.EncodersOut)

    attention = torch.zeros(self.number_of_heads,self.number_of_tokens,self.d_model//self.number_of_heads)

    for i in range(self.number_of_heads):
        attention[i] = torch.einsum('ik,kj->ij',torch.einsum('ik,jk->ij',QW[i],KW[i]),KW[i])

    concat_output = attention[0]

    for i in range(1,self.number_of_heads):
      concat_output = torch.cat((concat_output,attention[i]),1)


    return self.norm(Q + self.Dropout(concat_output))

In [9]:
EncodersOut = torch.ones(10,10)
Att = CrossAttention(EncodersOut,10,2,10)
Q= torch.ones(10,10)
Ve = Att(Q)
print(Ve)

tensor([[1.6461e-15, 2.5505e-22, 3.3333e-01, 1.0000e-01, 1.0000e-01, 1.1111e-01,
         1.2500e-01, 1.0000e-01, 6.5486e-06, 1.0000e-01],
        [1.4286e-01, 1.2500e-01, 3.1536e-10, 1.0000e-01, 1.0000e-01, 1.1111e-01,
         1.2500e-01, 1.0000e-01, 4.9997e-01, 1.0000e-01],
        [1.4286e-01, 1.2500e-01, 3.1536e-10, 1.0000e-01, 1.0000e-01, 1.2803e-15,
         1.2500e-01, 1.0000e-01, 6.5486e-06, 1.0000e-01],
        [1.4286e-01, 1.2500e-01, 3.3333e-01, 1.0000e-01, 1.0000e-01, 1.1111e-01,
         1.2500e-01, 1.0000e-01, 6.5486e-06, 1.0000e-01],
        [1.4286e-01, 2.5505e-22, 3.1536e-10, 1.0000e-01, 1.0000e-01, 1.1111e-01,
         1.2500e-01, 1.0000e-01, 6.5486e-06, 1.0000e-01],
        [1.4286e-01, 1.2500e-01, 3.1536e-10, 1.0000e-01, 1.0000e-01, 1.1111e-01,
         2.5505e-22, 1.0000e-01, 6.5486e-06, 1.0000e-01],
        [1.4286e-01, 1.2500e-01, 3.3333e-01, 1.0000e-01, 1.0000e-01, 1.1111e-01,
         1.2500e-01, 1.0000e-01, 6.5486e-06, 1.0000e-01],
        [1.4286e-01, 1.2500

In [10]:
class Decoder(nn.Module):
  def __init__(self,EncodersOut, number_of_layers: int):
    super(). __init__()

    self.EncodersOut = EncodersOut
    self.number_of_layers = number_of_layers
    self.DecoderBlock = nn.Sequential()

    for i in range (self.number_of_layers):
      self.DecoderBlock.add_module(f'MaskedAttention{i}',DecodersMaskedAttention(10,2,10))
      self.DecoderBlock.add_module(f'CrossAttention{i}',CrossAttention (self.EncodersOut,10,2,10))
      self.DecoderBlock.add_module(f'FeedForward{i}',FeedForwardBlock(10,100))

  def forward (self,x):

    out = self.DecoderBlock(x)

    return out

In [11]:
x = torch.randn(10,10)
y = torch.randn(10,10)
encoder = Encoder(4)
encodersout = torch.randn(10,10)
decoder = Decoder(encodersout,4)
outputprobs = decoder(y)
#for i in decoder.parameters():
 # print(i)
target = x
learning_rate = 0.001
num_epoch = 4

loss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr = learning_rate)

#training loop:

torch.autograd.set_detect_anomaly(True)

for epoch in range(num_epoch):

  #forward pass

  out = decoder(x)
  lossval = loss(out,target)

  #backward pass

  optimizer.zero_grad()
  print(epoch)
  lossval.backward(retain_graph=True)
  optimizer.step()


0
1
2
3
