In [2]:
import torch
from torch import nn
import torch.nn.functional as F

In [3]:
import math
import copy

In [None]:
###
## C = A @ B
## [k, m, p] = [k, m, n] @ [k, n, p]
## assuming k = batchsize 

In [156]:
## Attention Mechanism
def attention(query, key, value, mask=None, dropout=None,d_k:int=None, verbosity:bool=True):
    "Compute 'Scaled Dot Product Attention'"
    d_k = query.size(-1)
    ## reshape the scores to (batch_size, seq_len, d_k) with einsum function
    scores = torch.einsum('bik,bjk->bij', query, key) / math.sqrt(d_k)
    if verbosity:
        print("==>Actual Attention Mechanism<==")
        ## print the mask size 
        print(f"q size:     \t{query.size()}")
        print(f"k size:     \t{key.size()}")
        print(f"v size:     \t{value.size()}")
        print(f"mask size:  \t{mask.size()}")
        print(f"d_k:        \t{d_k}")
        ## print the size of the scores 
        print(f"scores size:\t{scores.size()}")
        print("==>~~~~~~~~~~~~~~~~~~~~~~~~~~~<==")
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)
    p_attn = F.softmax(scores, dim = -1)
    print(f"p_attn size:\t{p_attn.size()}")
    if dropout is not None:
        p_attn = dropout(p_attn)
    return torch.matmul(p_attn, value), p_attn

## write a function to test the attention mechanism
def test_attention(batch_size:int=2, seq_len:int=3, d_model:int=4, n_heads:int=2, verbosity:bool=True):
    d_k = d_v = d_model // n_heads
    # (batch_size, seq_len, d_model)
    q = torch.randn(batch_size, seq_len, d_model)
    # (batch_size, seq_len, d_model)
    k = torch.randn(batch_size, seq_len, d_model)
    # (batch_size, seq_len, d_model)
    v = torch.randn(batch_size, seq_len, d_model)
    # (batch_size, seq_len, seq_len)
    mask = torch.ones(batch_size, seq_len, d_model)
    mask[:,:,0] = 0
    mask[:,:,2] = 0
    # (batch_size, seq_len, d_model)
    if verbosity:
        print("==>Testing Attention Mechanism<==")
        print(f"q size:   \t{q.size()}")
        print(f"k size:   \t{k.size()}")
        print(f"v size:   \t{v.size()}")
        print(f"mask size:\t{mask.size()}")
        print(f"d_k:      \t{d_k}")
        print("==>~~~~~~~~~~~~~~~~~~~~~~~~~~~<==\n")
    output, attn = attention(q, k, v, mask=mask,d_k=d_k, verbosity=verbosity)
    # print the size of the output and the output
    print(f"output size:\t{output.size()}")
    print(f"attn size:\t{attn.size()}")

In [159]:
test_attention(batch_size = 5, seq_len=3, d_model=3, n_heads=3, verbosity=True)

==>Testing Attention Mechanism<==
q size:   	torch.Size([5, 3, 3])
k size:   	torch.Size([5, 3, 3])
v size:   	torch.Size([5, 3, 3])
mask size:	torch.Size([5, 3, 3])
d_k:      	1
==>~~~~~~~~~~~~~~~~~~~~~~~~~~~<==

==>Actual Attention Mechanism<==
q size:     	torch.Size([5, 3, 3])
k size:     	torch.Size([5, 3, 3])
v size:     	torch.Size([5, 3, 3])
mask size:  	torch.Size([5, 3, 3])
d_k:        	3
scores size:	torch.Size([5, 3, 3])
==>~~~~~~~~~~~~~~~~~~~~~~~~~~~<==
p_attn size:	torch.Size([5, 3, 3])
output size:	torch.Size([5, 3, 3])
attn size:	torch.Size([5, 3, 3])


In [162]:
## define the class for the Multi-Head Attention Mechansim in a Transformer 
class MultiHeadAttention(nn.Module):
    """
    Defines the Multi-Headed Attention Mechanism, defining the: 
    number of heads : n_heads 
    output dimension: out_dim 
    dropout rate    : dropout
    
    """
    def __init__(self, n_heads, d_model, dropout=0.1):
        super().__init__()
        assert d_model % n_heads == 0
        self.d_k = d_model // n_heads
        self.h = n_heads
        self.linears = self.clones(nn.Linear(d_model, d_model), 4)
        self.attn = None
        self.dropout = nn.Dropout(p=dropout)
    ## define clones 
    def clones(self, module, N):
        "Produce N identical layers."
        return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
    ## define the split_heads method
    def split_heads(self, x, n_heads, d_k):
        "Split x into different heads, with dimension d_k."
        return x.view(n_heads, -1, d_k)
    ## define the concat method
    def concat(self, x):
        "Concatenate tensors with different dimensions."
        return x.transpose(0, 1).contiguous().view(-1, self.d_k * self.h)
    def forward(self, query, key, value, mask=None):
        if mask is not None:
            mask = mask.unsqueeze(1)
        n_heads = self.h
        d_k = self.d_k
        q, k, v = self.linears[0](query), self.linears[1](key), self.linears[2](value)
        q, k, v = self.split_heads(q, n_heads, d_k), self.split_heads(k, n_heads, d_k), self.split_heads(v, n_heads, d_k)
        # perform attention
        ## print the size of the q, k ,v 
        print(f"q size:   \t{q.size()}")
        print(f"k size:   \t{k.size()}")
        print(f"v size:   \t{v.size()}")
        attn, self.attn = attention(q, k, v, mask=mask, dropout=self.dropout)
        # concatenate heads
        attn = self.concat(attn)
        # apply final linear layer
        attn = self.linears[3](attn)
        return attn

## test the multi-head attention mechanism
def test_mha(n_heads, d_model, verbosity:bool=True):
    mha = MultiHeadAttention(n_heads, d_model)
    if verbosity:
        print("==>Testing Multi-Head Attention Mechanism<==")
        print(f"n_heads:   \t{n_heads}")
        print(f"d_model:   \t{d_model}")
        print(f"d_k:       \t{mha.d_k}")
        print("==>~~~~~~~~~~~~~~~~~~~~~~~~~~~<==\n")
    ## define the input 
    query = torch.randn(2, 3, d_model)
    key = torch.randn(2, 3, d_model)
    value = torch.randn(2, 3, d_model)
    mask = torch.ones(2, 3, d_model)
    mask[:,:,0] = 0
    mask[:,:,2] = 0
    output = mha(query, key, value, mask=mask)
    print(output.numpy().shape)

In [163]:
test_mha(n_heads=3, d_model=3, verbosity=True)

==>Testing Multi-Head Attention Mechanism<==
n_heads:   	3
d_model:   	3
d_k:       	1
==>~~~~~~~~~~~~~~~~~~~~~~~~~~~<==

q size:   	torch.Size([3, 6, 1])
k size:   	torch.Size([3, 6, 1])
v size:   	torch.Size([3, 6, 1])
==>Actual Attention Mechanism<==
q size:     	torch.Size([3, 6, 1])
k size:     	torch.Size([3, 6, 1])
v size:     	torch.Size([3, 6, 1])
mask size:  	torch.Size([2, 1, 3, 3])
d_k:        	1
scores size:	torch.Size([3, 6, 6])
==>~~~~~~~~~~~~~~~~~~~~~~~~~~~<==


RuntimeError: The size of tensor a (3) must match the size of tensor b (6) at non-singleton dimension 3

In [10]:
## define the Feed-Forward Network Class 
class FeedForwardNetwork(nn.Module):
    def __init__(self, d_model, d_ff=2048, dropout=0.1):
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)
    ## forward method 
    def forward(self, x):
        x = self.dropout(F.relu(self.linear_1(x)))
        x = self.linear_2(x)
        return x

In [11]:
## define the Encoder Layer Class
class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.mha = MultiHeadAttention(n_heads, d_model, dropout=dropout)
        self.ffn = FeedForwardNetwork(d_model, d_ff, dropout=dropout)
        ## apply layer norm 
        self.norm_1 = nn.LayerNorm(d_model)
        self.norm_2 = nn.LayerNorm(d_model)
        ## apply dropout 
        self.drop1 = nn.Dropout(dropout)
        self.drop2 = nn.Dropout(dropout)
    def forward(self, x, mask):
        x2 = self.norm_1(x)
        x = x + self.drop1(self.mha(x2, x2, x2, mask))
        x2 = self.norm_2(x)
        x = x + self.drop2(self.ffn(x2))
        return x

In [12]:
## define the Decoder Layer Class 
class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.mha1 = MultiHeadAttention(n_heads, d_model, dropout=dropout)
        self.mha2 = MultiHeadAttention(n_heads, d_model, dropout=dropout)
        self.ffn = FeedForwardNetwork(d_model, d_ff, dropout=dropout)
        ## apply layer norm 
        self.norm_1 = nn.LayerNorm(d_model)
        self.norm_2 = nn.LayerNorm(d_model)
        self.norm_3 = nn.LayerNorm(d_model)
        ## apply dropout 
        self.drop1 = nn.Dropout(dropout)
        self.drop2 = nn.Dropout(dropout)
        self.drop3 = nn.Dropout(dropout)
    def forward(self, x, e_outputs, src_mask, tgt_mask):
        x2 = self.norm_1(x)
        x = x + self.drop1(self.mha1(x2, x2, x2, tgt_mask))
        x2 = self.norm_2(x)
        x = x + self.drop2(self.mha2(x2, e_outputs, e_outputs, src_mask))
        x2 = self.norm_3(x)
        x = x + self.drop3(self.ffn(x2))
        return x

In [13]:
## define the Positional Encoding class 
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    ## forward pass 
    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [14]:
## define the Encoder class 
class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.embed = Embedder(vocab_size, d_model)
        self.pe = PositionalEncoding(d_model, dropout=dropout)
        self.layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout=dropout) for _ in range(n_layers)])
    def forward(self, src, mask):
        x = self.embed(src)
        x = self.pe(x)
        for layer in self.layers:
            x = layer(x, mask)
        return x

In [15]:
## define the Decoder 
class Decoder(nn.Module):
    def _init__(self, vocab_size, d_model, n_layers, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.embed = Embedder(vocab_size, d_model)
        self.pe = PositionalEncoding(d_model, dropout=dropout)
        self.layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout=dropout) for _ in range(n_layers)])
    ## define the forward pass 
    def forward(self, trg, e_outputs, src_mask, tgt_mask):
        x = self.embed(trg)
        x = self.pe(x)
        for layer in self.layers:
            x = layer(x, e_outputs, src_mask, tgt_mask)
        return x

In [16]:
## define the Embedder 
class Embedder(nn.Module):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.d_model = d_model
    def forward(self, src):
        return self.embed(src) * math.sqrt(self.d_model)

In [17]:
## define the Transformer class using all the classes before 
class Transformer(nn.Module):
    def __init__(self, src_vocab, tgt_vocab, d_model, n_layers, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.encoder = Encoder(src_vocab, d_model, n_layers, n_heads, d_ff, dropout=dropout)
        self.decoder = Decoder(tgt_vocab, d_model, n_layers, n_heads, d_ff, dropout=dropout)
        self.out = nn.Linear(d_model, tgt_vocab)
    
        ## define get_pad_mask 
    def get_pad_mask(self, x):
        return (x == 0).unsqueeze(-2)
    
    ## forward pass 
    def forward(self, src, trg):
        src_mask = self.get_pad_mask(src)
        tgt_mask = self.get_pad_mask(trg)
        e_outputs = self.encoder(src, src_mask)
        d_outputs = self.decoder(trg, e_outputs, src_mask, tgt_mask)
        output = self.out(d_outputs)
        return output

In [7]:
test_transformer()

TypeError: __init__() got an unexpected keyword argument 'dropout'