<a href="https://colab.research.google.com/github/luigiantonelli/DeepLearning-Project/blob/main/Deep_Learning_Project_Antonelli_Cuconasu_Gaudenzi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installations and imports

In [None]:
!pip install pytorch-lightning --quiet

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import matplotlib.pyplot as plt
import numpy as np
import os
import pickle as pkl
from tqdm.notebook import tqdm
import pytorch_lightning as pl 

# Dataset

# Modules

In [None]:
#forse meglio definire una stable softmax

In [None]:
def dot_product_attention(query, key, value, sqrt_q, device, mask = None):
    t = torch.matmul(query, key.transpose(-2, -1))/sqrt_q
    if mask is not None:
      t = t.masked_fill_(mask == 0, -1e-10) #-1e-10 acts like -infinity, so that the softmax will consider these tokens less important
    return torch.matmul(F.softmax(t, dim = -1), value)

In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d, num_heads, batch_size):
    super(MultiHeadAttention, self).__init__()
    assert d % h == 0
    self.dim_head = d // num_heads #single head dimension
    self.sqrt_q = sqrt(self.dim_head)
    self.num_heads = num_heads
    self.batch_size = batch_size
    self.W_q = nn.Linear(d, d, bias = False) #stack of num_heads matrices of dimension (d, dim_head), one for each head
    self.W_k = nn.Linear(d, d, bias = False)
    self.W_v = nn.Linear(d, d, bias = False)
    self.W_o = nn.Linear(d, d, bias = False)

  def forward(self, query, key, value, mask = None): #query, key, value
    query = self.W_q(query).view(self.batch_size, -1, self.num_heads, self.dim_head).transpose(1, 2)
    key = self.W_k(key).view(self.batch_size, -1, self.num_heads, self.dim_head).transpose(1, 2)
    value = self.W_v(value).view(self.batch_size, -1, self.num_heads, self.dim_head).transpose(1, 2)
    attention_value = dot_product_attention(query, key, value, self.sqrt_q, mask)
    return self.W_o(attention_value.transpose(1, 2).contiguous().view(self.batch_size, -1, self.num_heads*self.dim_head))

In [None]:
class TP_MultiHeadAttention(nn.Module):
  def __init__(self, d, num_heads, batch_size):
    super(TP_MultiHeadAttention, self).__init__()
    assert d % h == 0
    self.dim_head = d // num_heads #single head dimension
    self.sqrt_q = sqrt(self.dim_head)
    self.num_heads = num_heads
    self.batch_size = batch_size
    self.W_q = nn.Linear(d, d, bias = True) #stack of num_heads matrices of dimension (d, dim_head), one for each head
    self.W_k = nn.Linear(d, d, bias = True)
    self.W_v = nn.Linear(d, d, bias = True)
    self.W_o = nn.Linear(d, d, bias = True)
    self.W_r = nn.Linear(d, d, bias = True) #ruolo

  def forward(self, query, key, value, mask = None): #query, key, value
    pass

In [None]:
class TransformerBlock(nn.Module):
  def __init__(self, d, num_heads, batch_size, hidden_size, dropout, tp_attention = False):
    super(TransformerBlock, self).__init__()
    self.d = d
    self.num_heads = num_heads
    self.batch_size = batch_size
    self.attention = MultiHeadAttention(d, h, batch_size) if not tp_attention else TP_MultiHeadAttention(d, h, batch_size)
    self.norm1 = nn.LayerNorm(d)
    self.dropout1 = nn.Dropout(dropout)
    self.norm2 = nn.LayerNorm(d)
    self.dropout2 = nn.Dropout(dropout)
    self.ff = nn.Sequential(nn.Linear(d, hidden_size, bias = True), 
                            nn.ReLU(inplace = True),
                            nn.Linear(hidden_size, d, bias = True))
  def forward(self, query, key, value, mask): #query, key, value
    x = value + self.attention(query, key, value, mask)
    x = self.dropout1(self.norm1(x))
    x = x + self.ff(x)
    x = self.dropout2(self.norm2(x))
    return x

In [None]:
class DecoderBlock(nn.Module):
  def __init__(self, d, h, batch_size, hidden_size, dropout, tp_attention = False):
    super(DecoderBlock, self).__init__()
    self.attention = MultiHeadAttention(d, h, batch_size) if not tp_attention else TP_MultiHeadAttention(d, h, batch_size)
    self.norm = nn.LayerNorm(d)
    self.dropout = nn.Dropout(dropout)
    self.transformer_block = TransformerBlock(d, h, batch_size, hidden_size, dropout, tp_attention)

  def forward(self, query, key, value, mask): #serve output encoder
    pass

In [None]:
class PositionalEncoding(nn.Module):
  def __init__(self, d, max_len = 5000):
    super(PositionalEncoding, self).__init__()
    pe = torch.zeros(max_len, d)
    position = torch.arange(0, max_len).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d, 2) * -(math.log(10000.0) / d))
    pe[:, 0::2] = torch.sin(position * div_term)
    pe[:, 1::2] = torch.cos(position * div_term)
    pe = pe.unsqueeze(0)
    self.register_buffer('pe', pe)
      
  def forward(self, x):
    return x + Variable(self.pe[:, :x.size(1)], requires_grad = False)

In [None]:
class TransformerEncoder(nn.Module):
  def __init__(self, d, num_heads, batch_size, hidden_size, dropout, num_blocks = 6, tp_attention = False):
    super(TransformerEncoder, self).__init__()
    self.d = d
    self.num_heads = num_heads
    self.batch_size = batch_size
    self.encoder = nn.ModuleList(
        [TransformerBlock(d, num_heads, batch_size, hidden_size, dropout, tp_attention) for _ in range(num_blocks)]
        )
    
  def forward(self, x): 
    #self.encoder(x)
    pass

In [None]:
class TransformerDecoder(nn.Module):
  def __init__(self, d, num_heads, batch_size, hidden_size, dropout, num_blocks = 6, tp_attention = False):
    super(TransformerEncoder, self).__init__()
    self.d = d
    self.num_heads = num_heads
    self.batch_size = batch_size
    self.encoder = nn.ModuleList(
        [DecoderBlock(d, num_heads, batch_size, hidden_size, dropout, tp_attention) for _ in range(num_blocks)]
        )
    
  def forward(self, output_encoder, x): 
    pass

In [None]:
class Transformer(nn.Module):
  def __init__(self, d, num_heads, batch_size, hidden_size, dropout, num_blocks_encoder = 6, num_blocks_decoder = 6, tp_attention = False):
    super(TransformerEncoder, self).__init__()
    self.d = d
    self.num_heads = num_heads
    self.batch_size = batch_size
    self.encoder = TransformerEncoder(d, num_heads, batch_size, hidden_size, dropout, num_blocks_encoder, tp_attention)
    self.decoder = TransformerDecoder(d, num_heads, batch_size, hidden_size, dropout, num_blocks_decoder, tp_attention)

  def inference(self, x):
    #encode and then generate the output token by token
    pass
    
  def forward(self, x): 
    pass

# SOTA

# NON-SOTA