In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import spacy
import pandas as pd
import tensorflow as tf
from torch.nn.utils.rnn import pad_sequence
import torchtext.datasets as datasets

In [12]:
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.datasets import multi30k, Multi30k
from typing import Iterable, List
from torch.utils.data import DataLoader

In [13]:
multi30k.URL["train"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/training.tar.gz"
multi30k.URL["valid"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/validation.tar.gz"

SRC_LANGUAGE = 'de'
TGT_LANGUAGE = 'en'

token_transform = {}
vocab_transform = {}

In [14]:
train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
train_dataloader = DataLoader(train_iter, batch_size=32, num_workers=0)

In [15]:
for batch, (src,tgt) in enumerate(train_dataloader):
    for src1,tgt1 in zip(enumerate(src),enumerate(tgt)):
        print("Source Language Text:")
        print(src)
        print("Target Language Text:")
        print(tgt)
        break
    

Source Language Text:
('Zwei junge weiße Männer sind im Freien in der Nähe vieler Büsche.', 'Mehrere Männer mit Schutzhelmen bedienen ein Antriebsradsystem.', 'Ein kleines Mädchen klettert in ein Spielhaus aus Holz.', 'Ein Mann in einem blauen Hemd steht auf einer Leiter und putzt ein Fenster.', 'Zwei Männer stehen am Herd und bereiten Essen zu.', 'Ein Mann in grün hält eine Gitarre, während der andere Mann sein Hemd ansieht.', 'Ein Mann lächelt einen ausgestopften Löwen an.', 'Ein schickes Mädchen spricht mit dem Handy während sie langsam die Straße entlangschwebt.', 'Eine Frau mit einer großen Geldbörse geht an einem Tor vorbei.', 'Jungen tanzen mitten in der Nacht auf Pfosten.', 'Eine Ballettklasse mit fünf Mädchen, die nacheinander springen.', 'Vier Typen, von denen drei Hüte tragen und einer nicht, springen oben in einem Treppenhaus.', 'Ein schwarzer Hund und ein gefleckter Hund kämpfen.', 'Ein Mann in einer neongrünen und orangefarbenen Uniform fährt auf einem grünen Traktor.', '

In [None]:
token_transform[SRC_LANGUAGE] = get_tokenizer('spacy', language='de_core_news_sm')
token_transform[TGT_LANGUAGE] = get_tokenizer('spacy', language='en_core_web_sm')



def yield_tokens(data_iter: Iterable, language: str)
    language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}

    for data_sample in data_iter:
        yield token_transform[language](data_sample[language_index[language]])


UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    vocab_transform[ln] = build_vocab_from_iterator(yield_tokens(train_iter, ln),
                                                    min_freq=1,
                                                    specials=special_symbols,
                                                    special_first=True)


for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
  vocab_transform[ln].set_default_index(UNK_IDX)

### Self Attention Mechanism 

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F


batch_size = 32
block_size = 64
max_iters = 10000
eval_interval = 100
learning_rate = 1e-3
device = "cuda" if torch.cuda.is_available() else "cpu"
eval_iters = 50

class Head(nn.Module):
    def __init__(self, n_embd, head_size, dropout):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.dropout = nn.Dropout(dropout)

    def forward(self, k, q, v, mask):
        B, T, C = q.shape
        _, S, _ = k.shape  # Source sequence length (could be different from T)

        k_proj = self.key(k)
        q_proj = self.query(q)
        v_proj = self.value(v)

        scores = torch.matmul(q_proj, k_proj.transpose(-2, -1)) / (C ** 0.5)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float("-inf"))
        weights = F.softmax(scores, dim=-1)
        weights = self.dropout(weights)
        out = torch.matmul(weights, v_proj)
        return out

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, n_embd, n_head, dropout):
        super().__init__()
        self.heads = nn.ModuleList([Head(n_embd, n_embd // n_head, dropout) for _ in range(n_head)])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, k, q, v, mask=None):
        out = torch.cat([head(k, q, v, mask) for head in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

### Feed Forward


In [None]:
class FeedForward(nn.Module):
    def __init__(self, n_embd, dropout):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

### Encoder


In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, n_embd, n_head, dropout):
        super().__init__()
        self.attention = MultiHeadAttention(n_embd, n_head, dropout)
        self.ffwd = FeedForward(n_embd, dropout)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x, mask=None):
        x = x + self.attention(x, x, x, mask)
        x = self.ln1(x)
        x = x + self.ffwd(x)
        x = self.ln2(x)
        return x
    

class Encoder(nn.Module):
    def __init__(self, vocab_size, n_embd, n_head, n_layer, dropout, device):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, n_embd)
        self.position_embedding = nn.Parameter(torch.zeros(1, block_size, n_embd))
        self.blocks = nn.ModuleList([EncoderBlock(n_embd, n_head, dropout) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd)
        self.device = device

    def forward(self, src):
        B, T = src.size()
        pos = torch.arange(0, T, device=self.device).unsqueeze(0)
        x = self.token_embedding(src) + self.position_embedding[:, :T, :]
        for block in self.blocks:
            x = block(x)
        x = self.ln_f(x)
        return x

### Decoder

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, n_embd, n_head, dropout):
        super().__init__()
        self.self_attention = MultiHeadAttention(n_embd, n_head, dropout)
        self.cross_attention = MultiHeadAttention(n_embd, n_head, dropout)
        self.ffwd = FeedForward(n_embd, dropout)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)
        self.ln3 = nn.LayerNorm(n_embd)

    def forward(self, x, enc_output, src_mask, tgt_mask):
        x = x + self.self_attention(x, x, x, tgt_mask)
        x = self.ln1(x)
        x = x + self.cross_attention(enc_output, x, enc_output, src_mask)
        x = self.ln2(x)
        x = x + self.ffwd(x)
        x = self.ln3(x)
        return x



class Decoder(nn.Module):
    def __init__(self, vocab_size, n_embd, n_head, n_layer, dropout, device):
        super().__init__()
        self.token_embedding = nn.Embedding(vocab_size, n_embd)
        self.position_embedding = nn.Parameter(torch.zeros(1, block_size, n_embd))
        self.blocks = nn.ModuleList([DecoderBlock(n_embd, n_head, dropout) for _ in range(n_layer)])
        self.out = nn.Linear(n_embd, vocab_size)
        self.device = device

    def forward(self, trg, enc_output, src_mask, tgt_mask):
        B, T = trg.size()
        pos = torch.arange(0, T, device=self.device).unsqueeze(0)
        x = self.token_embedding(trg) + self.position_embedding[:, :T, :]
        for block in self.blocks:
            x = block(x, enc_output, src_mask, tgt_mask)
        x = self.out(x)
        return x

### Transformer


In [None]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, src_pad_idx, trg_pad_idx, device, n_embd=256, n_head=8, n_layer=6, dropout=0.1):
        super().__init__()
        self.encoder = Encoder(vocab_size, n_embd, n_head, n_layer, dropout, device)
        self.decoder = Decoder(vocab_size, n_embd, n_head, n_layer, dropout, device)
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    def make_src_mask(self, src):
        src_mask = (src != self.src_pad_idx).unsqueeze(-2).unsqueeze(-1)
        return src_mask.to(self.device)

    def make_trg_mask(self, trg):
        N, trg_len = trg.size()
        trg_mask = torch.tril(torch.ones((trg_len, trg_len), device=self.device)).unsqueeze(0).unsqueeze(1)
        return trg_mask & (trg.unsqueeze(-2) != self.trg_pad_idx).unsqueeze(-1).unsqueeze(-2)

    def forward(self, src, trg):
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)
        enc_src = self.encoder(src)
        out = self.decoder(trg, enc_src, src_mask, trg_mask)
        return out