In [1]:
import pandas as pd
import numpy as np
import torch

In [2]:
#Implement Transformer from scratch~


#1. Positional Encoding
#2. Multi-Head Attention
#3. Feed Forward Network
#4. Encoder Layer
#5. Decoder Layer
#6. Encoder
#7. Decoder
#8. Transformer

#1. Positional Encoding
class PositionalEncoding(torch.nn.Module):
    def __init__(self, d_model, max_seq_len=5000):
        super().__init__()
        self.d_model = d_model
        self.max_seq_len = max_seq_len
        self.pe = torch.zeros(max_seq_len, d_model)
        

        for pos in range(max_seq_len):
            for i in range(0, d_model, 2):
                self.pe[pos, i] = np.sin(pos/(10000**(i/d_model)))
                self.pe[pos, i+1] = np.cos(pos/(10000**((i+1)/d_model)))

        self.pe = self.pe.unsqueeze(0)
        self.pe.requires_grad = False

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x
    
#2. Multi-Head Attention
class MultiHeadAttention(torch.nn.Module):
    def __init__(self, d_model, n_heads):
        super().__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.linear_q = torch.nn.Linear(d_model, d_model)
        self.linear_k = torch.nn.Linear(d_model, d_model)
        self.linear_v = torch.nn.Linear(d_model, d_model)
        self.linear_final = torch.nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask):
        bs = q.size(0)

        #perform linear operation and split into h heads
        k = self.linear_k(k).view(bs, -1, self.n_heads, self.d_k)
        q = self.linear_q(q).view(bs, -1, self.n_heads, self.d_k)
        v = self.linear_v(v).view(bs, -1, self.n_heads, self.d_k)

        #transpose to get dimensions bs * n_heads * seq_len * d_k
        k = k.transpose(1, 2)
        q = q.transpose(1, 2)
        v = v.transpose(1, 2)

        scores = self.attention(q, k, v, self.d_k, mask)

        #concatenate heads and put through final linear layer
        concat = scores.transpose(1, 2).contiguous().view(bs, -1, self.d_model)
        output = self.linear_final(concat)

        return output

    def attention(self, q, k, v, d_k, mask=None):
        scores = torch.matmul(q, k.transpose(-2, -1)) /  np.sqrt(d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        scores = torch.nn.functional.softmax(scores, dim=-1)
        output = torch.matmul(scores, v)
        return output

#3. Feed Forward Network
class FeedForwardNetwork(torch.nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear_1 = torch.nn.Linear(d_model, d_ff)
        self.dropout = torch.nn.Dropout(dropout)
        self.linear_2 = torch.nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = self.dropout(torch.nn.functional.relu(self.linear_1(x)))
        x = self.linear_2(x)
        return x

#4. Encoder Layer
class EncoderLayer(torch.nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.norm_1 = torch.nn.LayerNorm(d_model)
        self.norm_2 = torch.nn.LayerNorm(d_model)
        self.attn = MultiHeadAttention(d_model, n_heads)
        self.ff = FeedForwardNetwork(d_model, d_ff)
        self.dropout_1 = torch.nn.Dropout(dropout)
        self.dropout_2 = torch.nn.Dropout(dropout)

    def forward(self, src, mask):
        src2 = self.norm_1(src)
        src = src + self.dropout_1(self.attn(src2, src2, src2, mask))
        src2 = self.norm_2(src)
        src = src + self.dropout_2(self.ff(src2))
        return src

#5. Decoder Layer
class DecoderLayer(torch.nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.norm_1 = torch.nn.LayerNorm(d_model)
        self.norm_2 = torch.nn.LayerNorm(d_model)
        self.norm_3 = torch.nn.LayerNorm(d_model)
        self.attn_1 = MultiHeadAttention(d_model, n_heads)
        self.attn_2 = MultiHeadAttention(d_model, n_heads)
        self.ff = FeedForwardNetwork(d_model, d_ff)
        self.dropout_1 = torch.nn.Dropout(dropout)
        self.dropout_2 = torch.nn.Dropout(dropout)
        self.dropout_3 = torch.nn.Dropout(dropout)

    def forward(self, trg, enc_src, trg_mask, src_mask):
        trg2 = self.norm_1(trg)
        trg = trg + self.dropout_1(self.attn_1(trg2, trg2, trg2, trg_mask))
        trg2 = self.norm_2(trg)
        trg = trg + self.dropout_2(self.attn_2(trg2, enc_src, enc_src, src_mask))
        trg2 = self.norm_3(trg)
        trg = trg + self.dropout_3(self.ff(trg2))
        return trg

#6. Encoder
class Encoder(torch.nn.Module):
    def __init__(self, input_dim, d_model, n_heads, d_ff, n_layers, dropout, max_seq_len):
        super().__init__()
        self.d_model = d_model
        self.tok_embedding = torch.nn.Embedding(input_dim, d_model)
        self.pos_embedding = PositionalEncoding(d_model, max_seq_len)
        self.layers = torch.nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)])
        self.dropout = torch.nn.Dropout(dropout)

    def forward(self, src, mask):
        src = self.dropout((self.tok_embedding(src) * np.sqrt(self.d_model)) + self.pos_embedding(src))
        for layer in self.layers:
            src = layer(src, mask)
        return src

#7. Decoder
class Decoder(torch.nn.Module):
    def __init__(self, output_dim, d_model, n_heads, d_ff, n_layers, dropout, max_seq_len):
        super().__init__()
        self.d_model = d_model
        self.tok_embedding = torch.nn.Embedding(output_dim, d_model)
        self.pos_embedding = PositionalEncoding(d_model, max_seq_len)
        self.layers = torch.nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)])
        self.dropout = torch.nn.Dropout(dropout)

    def forward(self, trg, enc_src, trg_mask, src_mask):
        trg = self.dropout((self.tok_embedding(trg) * np.sqrt(self.d_model)) + self.pos_embedding(trg))
        for layer in self.layers:
            trg = layer(trg, enc_src, trg_mask, src_mask)
        return trg

#8. Seq2Seq
class Seq2Seq(torch.nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def make_src_mask(self, src):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        return src_mask

    def make_trg_mask(self, trg):
        trg_pad_mask = (trg != 0).unsqueeze(1).unsqueeze(2)
        trg_len = trg.shape[1]
        trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device=self.device)).bool()
        trg_mask = trg_pad_mask & trg_sub_mask
        return trg_mask
    



In [3]:
#TEST transformer

#1. Load data
import torchtext
from torchtext.datasets import Multi30k
from torchtext.data import Field, BucketIterator

#import data



ModuleNotFoundError: No module named 'torchtext'