In [1]:
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
import tqdm
import matplotlib 

In [None]:
num_stacks = 6
d_model = 512

In [None]:
class Encoder(nn.Module):
  def __init__(self):
    super().__init__()
    # Sub Layer 1 -> multi-head self-attention

    # Residual connection 

    # Layer Normalization

    # Sub Layer 2 -> fully connected feed-forward network

    # Residual connection 

    # Layer Normalization

In [None]:
class Decoder(nn.Module):
  def __init__(self):
    super().__init__()

    # Sub Layer 1 -> masked multi-head attention over the output of the encoder stack
    # masked to ensure only looking at previous known outputs

    # Residual connection 

    # Layer Normalization

    # Sub Layer 2 -> multi-head self-attention

    # Residual connection 

    # Layer Normalization

    # Sub Layer 3 -> fully connected feed-forward network

    # Residual connection 

    # Layer Normalization

### Attention

def attention(query, key, val, model_dim) -> output

- Query, keys, values, and output are all vectors.

Output
- weighted sum of the values
- weight assened to each value = compatibility_fn(query, corresponding_key)



In [None]:
h = 8
def multi_head_attention():
  pass

In [None]:
class TransformerBlock(nn.Module):
  def __init__(self, N=6, h=8, d_model=512, d_ff=2048, p_drop=0.1, eps_ls=0.1):
    """
    N = number of layers
    h = number of parallel attention layers (heads)
    d_k = dimension of embeddings
    d_ff = dimension of feed forwards
    p_drop = percentage of dropout
    eps_ls = label smoothing value (improves accuracy but hurts perplexity)
    """
    super().__init__()
    self.num_heads = h
    self.d_model = d_model
    self.d_k = d_model // self.num_heads
    
    self.dropout_num = p_drop
    self.ff_1 = nn.Linear(d_model, d_ff)
    self.ff_2 = nn.Linear(d_model, d_ff)

    

    self.dropout = nn.Dropout(p_drop)
    self.out = nn.Linear(d_model, d_model)

  def attention(self, embed_dim):

    # x = q @ v
    # x = scale
    # x = optional mask
    # x = softmax(x)
    # return x @ v
    # each q,k,v is a linear neural network
    # <q,k> tells us how much we want to attend to certain features
    # v tells us what to attend to 
    # like database in which we use a query that helps us find a key which has a corresponding value
    # We just want to learn a way to make sure our queries grab the appropriate key.

    self.query = nn.Linear(self.d_model, self.d_model)
    self.key = nn.Linear(self.d_model, self.d_model)
    self.value = nn.Linear(self.d_model, self.d_model)

    inner = query @ key.T
    inner /= np.sqrt(self.dim_size) # large d_k tends to very small gradients, this helps counteract that
    weights = torch.nn.Softmax(inner) # turns into a probability distribution
                                      # If our attention score is 0 then we don’t pay attention to those parts
    attention = torch.dot(weights,value)

    return attention

  def forward(self, x):
    x = x + self.attn(x).dropout(0.1)
    x = x.layernorm().linear(*self.ln1)
    x = x + self.act(x.linear(*self.ff1)).linear(*self.ff2).dropout(0.1)
    x = x.layernorm().linear(*self.ln2)

    return x

In [None]:
class Attention(nn.Module):
   def __init__(self, heads, d_model, dropout = 0.1):
        super().__init__()

        # x = q @ v
        # x = scale
        # x = optional mask
        # x = softmax(x)
        # return x @ v
        # each q,k,v is a linear neural network
        # <q,k> tells us how much we want to attend to certain features
        # v tells us what to attend to 
        # like database in which we use a query that helps us find a key which has a corresponding value
        # We just want to learn a way to make sure our queries grab the appropriate key.

        self.query = nn.Linear(self.d_model, self.d_model)
        self.key = nn.Linear(self.d_model, self.d_model)
        self.value = nn.Linear(self.d_model, self.d_model)

        inner = query @ key.T
        inner /= np.sqrt(self.dim_size) # large d_k tends to very small gradients, this helps counteract that
        weights = torch.nn.Softmax(inner) # turns into a probability distribution
                                          # If our attention score is 0 then we don’t pay attention to those parts
        attention = torch.dot(weights,value)

        return attention

In [None]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048, dropout = 0.1):
        super().__init__() 
        # We set d_ff as a default to 2048
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)
    def forward(self, x):
        x = self.dropout(F.relu(self.linear_1(x)))
        x = self.linear_2(x)
        return x

In [None]:
class Embedder(nn.Module):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
    def forward(self, x):
        return self.embed(x)

In [None]:
class Transformer(nn.Module):
  def __init__(self, num_layers, embed_dim, num_heads, ff_dim):
    super().__init()
    self.core = []

    for i in range(num_layers):
      self.core.append(TransformerBlock(embed_dim, num_heads, ff_dim))
      
    self.final = nn.Linear(embed_dim, ff_dim)


### New Version

In [21]:
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
import math
import matplotlib 
import tqdm
import copy

In [None]:
# Multi Head Attention
class MultiHeadAttention(nn.Module):
  def __init__(self, num_heads, d_model, dropout = 0.1):
    super().__init__()

    self.d_model = d_model
    self.d_k = d_model // num_heads
    self.h = num_heads

    self.q_ff = nn.Linear(d_model, d_model)
    self.k_ff = nn.Linear(d_model, d_model)
    self.v_ff = nn.Linear(d_model, d_model)
    
    self.dropout = nn.Dropout(dropout)
    self.out = nn.Linear(d_model, d_model)


  def attention(q, k, v, d_k, mask=None, dropout=None):
    scores = torch.matmuls(q, k.transpose(-2,-1)) / math.sqrt(d_k)
    scores = F.softmax(scores, dim=-1)
    # add mask
    # add dropout
    
    output = torch.matmul(scores, v)
    return output

  def forward(self, q, k, v, mask=None):
    
    batch_size = q.size(0)

    # the size -1 is inferred from other dimensions
    k = self.k_ff(k).view(batch_size, -1, self.h, self.d_k)
    q = self.q_ff(q).view(batch_size, -1, self.h, self.d_k)
    v = self.v_ff(v).view(batch_size, -1, self.h, self.d_k)

    k = k.transpose(1,2)
    q = q.transpose(1,2)
    v = v.transpose(1,2)

    scores = self.attention(q, k, v, self.d_k, mask, self.dropout)

    concat = scores.transpose(1,2).contiguous.view(batch_size, -1, self.d_model)

    output = self.out(concat)
    return output

In [None]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048, dropout = 0.1):
        super().__init__() 
        # We set d_ff as a default to 2048
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)
    def forward(self, x):
        x = self.dropout(F.relu(self.linear_1(x)))
        x = self.linear_2(x)
        return x

In [2]:
# Embeddings
class Embeddings(nn.Module):
  def __init__(self, vocab_size, d_model):
    super().__init__()
    self.embeds = nn.Embedding(vocab_size, d_model)

  def forward(self, x):
    return self.embed(x)

In [25]:
# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
      super().__init__()
      self.dropout = nn.Dropout(p=dropout)

      position = torch.arange(max_len).unsqueeze(1)
      div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
      pe = torch.zeros(max_len, 1, d_model)
      pe[:, 0, 0::2] = torch.sin(position * div_term)
      pe[:, 0, 1::2] = torch.cos(position * div_term)
      self.register_buffer('pe', pe)

    def forward(self, x):
      x = x + self.pe[:x.size(0)]
      return self.dropout(x)

In [None]:
from torch.nn.modules.dropout import Dropout
# Encoder
class EncoderLayer(nn.Module):
  def __init__(self, d_model, heads, dropout = 0.1):
    super().__init__()
    self.norm_1 = nn.LayerNorm(d_model)
    self.norm_2 = nn.LayerNorm(d_model)
    self.attn = MultiHeadAttention(heads, d_model)
    self.ff = FeedForward(d_model)
    self.dropout_1 = nn.Drouput(Dropout)
    self.dropout_2 = nn.Drouput(Dropout)

  def forward(self, x, mask):
    x2 = self.norm_1(x)
    x = x + self.dropout_1(self.attn(x2,x2,x2,mask))
    x2 = self.norm_2(x)
    x = x + self.dropout_2(self.ff(x2))
    return x


In [22]:
# Decoder
class DecoderLayer(nn.Module):
  def __init__(self, d_model, heads, dropout=0.1):
    super().__init__()
    self.norm_1 = nn.LayerNorm(d_model)
    self.norm_2 = nn.LayerNorm(d_model)
    self.norm_3 = nn.LayerNorm(d_model)

    self.dropout_1 = nn.Dropout(dropout)
    self.dropout_2 = nn.Dropout(dropout)
    self.dropout_3 = nn.Dropout(dropout)

    self.attn_1 = MultiHeadAttention(heads, d_model)
    self.attn_2 = MultiHeadAttention(heads, d_model)
    self.ff = FeedForward(d_model)


  def forward(self, x, e_outputs, src_mask, trg_mask):
    x2 = self.norm_1(x)
    x = x + self.dropout_1(self.attn_1(x2, x2, x2, trg_mask))
    x2 = self.norm_2(x)
    x = x + self.dropout_2(self.attn_2(x2, e_outputs, e_outputs,
    src_mask))
    x2 = self.norm_3(x)
    x = x + self.dropout_3(self.ff(x2))
    return x

# We can then build a convenient cloning function that can generate multiple layers:
def get_clones(module, N):
  return nn.ModuleList([copy.deepcopy(module) for i in range(N)])
    

In [None]:
class Encoder(nn.Module):
  def __init__(self, vocab_size, d_model, N, heads):
    super().__init__()
    self.N = N
    self.embed = Embeddings(vocab_size)
    self.pe = PositionalEncoder(d_model)
    self.layers = get_clones(EncoderLayer(d_model, heads), N)
    self.norm = nn.LayerNorm(d_model)

  def forward(self, src, mask):
        x = self.embed(src)
        x = self.pe(x)
        for i in range(self.N):
            x = self.layers[i](x, mask)
        return self.norm(x)

In [None]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, heads):
        super().__init__()
        self.N = N
        self.embed = Embedder(vocab_size, d_model)
        self.pe = PositionalEncoder(d_model)
        self.layers = get_clones(DecoderLayer(d_model, heads), N)
        self.norm = nn.LayerNorm(d_model)
    def forward(self, trg, e_outputs, src_mask, trg_mask):
        x = self.embed(trg)
        x = self.pe(x)
        for i in range(self.N):
            x = self.layers[i](x, e_outputs, src_mask, trg_mask)
        return self.norm(x)

In [None]:
# Transformer
# p_drop = 0.1
class Transformer(nn.Module):
    def __init__(self, src_vocab, trg_vocab, d_model, N, heads):
      super().__init__()
      self.encoder = Encoder(src_vocab, d_model, N, heads)
      self.decoder = Decoder(trg_vocab, d_model, N, heads)
      self.out = nn.Linear(d_model, trg_vocab)
    def forward(self, src, trg, src_mask, trg_mask):
      e_outputs = self.encoder(src, src_mask)
      d_output = self.decoder(trg, e_outputs, src_mask, trg_mask)
      output = self.out(d_output)
      return output

### Training code

In [None]:
d_model = 512
heads = 8
N = 6
src_vocab = len(EN_TEXT.vocab)
trg_vocab = len(FR_TEXT.vocab)

model = Transformer(src_vocab, trg_vocab, d_model, N, heads)

In [None]:
# Optimizer
# Adam
# b1 = 0.9
# b2 - 0.98
# eps = 10**-9
optim = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

In [10]:
x = torch.Tensor(4,4)
x

tensor([[1.0816e-33, 0.0000e+00, 2.3694e-38, 2.3694e-38],
        [4.3066e+21, 1.1824e+22, 4.3066e+21, 6.3828e+28],
        [3.8016e-39, 1.4994e-43, 1.8523e+28, 4.5581e-41],
        [3.0210e-13, 4.5581e-41, 2.6955e-03, 4.7210e-24]])

In [18]:
x = torch.transpose(x, 0, 1)
x

tensor([[1.0816e-33, 4.3066e+21, 3.8016e-39, 3.0210e-13],
        [0.0000e+00, 1.1824e+22, 1.4994e-43, 4.5581e-41],
        [2.3694e-38, 4.3066e+21, 1.8523e+28, 2.6955e-03],
        [2.3694e-38, 6.3828e+28, 4.5581e-41, 4.7210e-24]])