In [None]:
import torch, torchvision, torchaudio
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
import os, pickle, math, random
from copy import deepcopy
device ='cuda' if torch.cuda.is_available() else 'cpu'
torch.__version__, torchvision.__version__

('2.5.1+cu124', '0.20.1+cu124')

#### Transformer Architecture

In [None]:
''' Transformer
tokenization, word embedding, positional encoding, attention, encoder-decoder transformer '''

In [None]:
import pandas as pd
df = pd.read_csv('english-to-french.csv')
print(df[:3], len(df), df.iloc[47172]['en'], df.iloc[47172]['fr'])
from transformers import XLMTokenizer
from collections import Counter
tokenizer = XLMTokenizer.from_pretrained('xlm-clm-enfr-1024')
tokenized_en = tokenizer.tokenize("I don't speak english")
tokenized_fr = tokenizer.tokenize("Je ne parle pas francais")
print(tokenized_en, tokenized_fr)

en = df['en'].tolist()
en_tokens = [['BOS'] + tokenizer.tokenize(x) + ['EOS'] for x in en]
PAD = 0
UNK = 1
wordcount = Counter()
for sentence in en_tokens:
  for word in sentence:
    wordcount[word] += 1
frequency = wordcount.most_common(50000)
total_en_words = len(frequency) +2
en_worddict = {word[0]:idx+2 for idx, word in enumerate(frequency)}
en_worddict['PAD'] = PAD
en_worddict['UNK'] = UNK
en_idxdict = {v:k for k,v in en_worddict.items()}
en_idx = [en_worddict.get(i,UNK) for i in tokenized_en]
print(en_idx)
entokens = [en_idxdict.get(i, 'UNK') for i in en_idx]
print(entokens)
en_phrase = ''.join(entokens)
en_phrase = en_phrase.replace('</w>', ' ')
for x in '''?:;.,'("-!&)%''':
  en_phrase = en_phrase.replace(f' {x}', f'{x}')
print(en_phrase)

fr = df['fr'].tolist()
fr_tokens = [['BOS'] + tokenizer.tokenize(x) + ['EOS'] for x in fr]
wordcount = Counter()
for sentence in fr_tokens:
  for word in sentence:
    wordcount[word] += 1
frequency = wordcount.most_common(50000)
total_fr_words = len(frequency) +2
fr_worddict = {word[0]:idx+2 for idx, word in enumerate(frequency)}
fr_worddict['PAD'] = PAD
fr_worddict['UNK'] = UNK
fr_idxdict = {v:k for k,v in fr_worddict.items()}
fr_idx = [fr_worddict.get(i,UNK) for i in tokenized_fr]
print(fr_idx)
frtokens = [fr_idxdict.get(i, 'UNK') for i in fr_idx]
print(frtokens)
fr_phrase = ''.join(frtokens)
fr_phrase = fr_phrase.replace('</w>', ' ')
for x in '''?:;.,'("-!&)%''':
  fr_phrase = fr_phrase.replace(f' {x}', f'{x}')
print(fr_phrase)
with open('dict.p', 'wb') as fd:
  pickle.dump((en_worddict, en_idxdict, fr_worddict, fr_idxdict), fd)

   Unnamed: 0                                                 en  \
0           0  Two young, White males are outside near many b...   
1           0  Several men in hard hats are operating a giant...   
2           0    A little girl climbing into a wooden playhouse.   

                                                  fr  
0  Deux jeunes mâles blancs se trouvent à l’extér...  
1  Plusieurs hommes portant un chapeau d'assaut f...  
2  Une petite fille grimpant dans une maison de j...   47173 Look both ways before you cross the stree! Regardez les deux côtés avant de traverser la tige!
['i</w>', 'don</w>', "'t</w>", 'speak</w>', 'eng', 'lish</w>'] ['je</w>', 'ne</w>', 'parle</w>', 'pas</w>', 'franc', 'ais</w>']
[15, 100, 38, 377, 227, 244]
['i</w>', 'don</w>', "'t</w>", 'speak</w>', 'eng', 'lish</w>']
i don't speak english 
[28, 40, 231, 32, 726, 370]
['je</w>', 'ne</w>', 'parle</w>', 'pas</w>', 'franc', 'ais</w>']
je ne parle pas francais 


In [None]:
''' batch creation '''
out_en_ids = [[en_worddict.get(word, 1) for word in s] for s in en_tokens]
out_fr_ids = [[fr_worddict.get(word, 1) for word in s] for s in fr_tokens]
sorted_ids = sorted(range(len(out_en_ids)), key=lambda x:len(out_en_ids[x]))
out_en_ids = [out_en_ids[x] for x in sorted_ids]
out_fr_ids = [out_fr_ids[x] for x in sorted_ids]
batch_size = 128
batch_index = []
idx_lst = np.arange(0, len(en_tokens), batch_size)
np.random.shuffle(idx_lst)
for idx in idx_lst:
  batch_index.append(np.arange(idx, min(len(en_tokens), idx+batch_size)))
def seq_padding(X, padding=0):
  L = [len(x) for x in X]
  ML = max(L)
  seq_padded = np.array([np.concatenate([x, [padding]*(ML -len(x))])
          if len(x) < ML else x for x in X])
  return seq_padded
device = 'cuda' if torch.cuda.is_available() else 'cpu'
def subsequent_mask(size):
  attn_shape = (1, size, size)
  subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')
  output = torch.from_numpy(subsequent_mask) ==0
  return output
def make_mask(tgt, pad):
  tgt_mask = (tgt !=pad).unsqueeze(-2)
  output = tgt_mask & subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)
  return output
class Batch:
  def __init__(self, src, trg=None, pad=0):
    src = torch.from_numpy(src).to(device).long()
    trg = torch.from_numpy(trg).to(device).long()
    self.src = src
    self.src_mask = (src != pad).unsqueeze(-2)
    if trg is not None:
      self.trg = trg[:, :-1]
      self.trg_y = trg[:, 1:]
      self.trg_mask = make_mask(self.trg, pad)
      self.ntokens = (self.trg_y !=pad).data.sum()
batches = []
for batch in batch_index:
  batch_en = [out_en_ids[x] for x in batch]
  batch_fr = [out_fr_ids[x] for x in batch]
  batch_en = seq_padding(batch_en)
  batch_fr = seq_padding(batch_fr)
  batches.append(Batch(batch_en, batch_fr))

In [None]:
''' embedding '''
src_vocabulary = len(en_worddict)
tgt_vocabulary = len(fr_worddict)
class Embeddings(nn.Module):
  def __init__(self, model, vocab):
    super().__init__()
    self.emb = nn.Embedding(vocab, model)
    self.model = model
  def forward(self, x):
    out = self.emb(x) * math.sqrt(self.model)
    return out

In [None]:
''' positional encoding of sequences '''
class PositionalEnc(nn.Module):
  def __init__(self, model, dropout, maxlen=5000):
    super().__init__()
    self.dropout = nn.Dropout(p=dropout)
    posenc = torch.zeros(maxlen, model, device=device)
    position = torch.arange(0., maxlen, device=device).unsqueeze(1)
    divterm = torch.exp(torch.arange(
        0., model, 2, device=device)* -(math.log(10000.0)/model))
    posenc_pos = torch.mul(position, divterm)
    posenc[:, 0::2] = torch.sin(posenc_pos)
    posenc[:, 1::2] = torch.cos(posenc_pos)
    posenc = posenc.unsqueeze(0)
    self.register_buffer('posenc', posenc)
  def forward(self, x):
    x = x + self.posenc[:, :x.size(1)].requires_grad_(False)
    out = self.dropout(x)
    return out
posenc = PositionalEnc(256, 0.1)
x = torch.zeros(1, 8, 256).to(device)
y = posenc.forward(x)
y

tensor([[[ 0.0000e+00,  1.1111e+00,  0.0000e+00,  ...,  1.1111e+00,
           0.0000e+00,  1.1111e+00],
         [ 9.3497e-01,  6.0034e-01,  8.9107e-01,  ...,  0.0000e+00,
           1.1940e-04,  1.1111e+00],
         [ 0.0000e+00, -4.6239e-01,  1.0646e+00,  ...,  1.1111e+00,
           2.3880e-04,  0.0000e+00],
         ...,
         [-1.0655e+00,  3.1518e-01, -1.1091e+00,  ...,  1.1111e+00,
           5.9700e-04,  1.1111e+00],
         [-3.1046e-01,  1.0669e+00, -7.1559e-01,  ...,  1.1111e+00,
           7.1640e-04,  1.1111e+00],
         [ 7.2999e-01,  8.3767e-01,  2.5419e-01,  ...,  1.1111e+00,
           8.3581e-04,  1.1111e+00]]])

### Attention Mechanism

In [None]:
''' The attention mechanism '''
def attention(query, key, value, mask=None, dropout=None):
  d_k = query.size(-1)
  scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
  if mask is not None:
    scores = scores.masked_fill(mask == 0, -1e9)
  attn_ = nn.functional.softmax(scores, dim=-1)
  if dropout is not None:
    attn_ = dropout(attn_)
  return torch.matmul(attn_, value), attn_

from copy import deepcopy
class MultiHeadAttention(nn.Module):
  def __init__(self, h, d_model, dropout=0.1):
    super().__init__()
    assert d_model % h == 0
    self.d_k == d_model//h
    self.h = h
    self.linears = nn.ModuleList(
        [deepcopy(nn.Linear(d_model, d_model)) for i in range(4)])
    self.attn = None
    self.dropout = nn.Dropout(p =dropout)

  def forward(self, query, key, value, mask=None):
    if mask is not None:
      mask = mask.unsqueeze(1)
    nbatches = query.size(0)
    query, key, value = [l(x).view(nbatches,-1, self.h, self.d_k).transpose(1,2)
        for l, x in zip(self.linears, (query, key, value))]
    x, self.attn = attention(
        query, key, value, mask = mask, dropout = self.dropout)
    x = x.transpose(1,2).contiguous().view(nbatches, -1, self.h * self.d_k)
    output = self.linears[-1](x)
    return output

class PositionwiseFeedForward(nn.Module):
  def __init__(self, d_model, d_ff, dropout=0.1):
    super().__init__()
    self.w_1 = nn.Linear(d_model, d_ff)
    self.w_2 = nn.Lineat(d_ff, d_model)
    self.dropout = nn.Dropout(dropout)
  def forward(self, x):
    h1 = self.w_1(x)
    h2 = self.dropout(h1)
    return self.w_2(h2)

### Encoder Layer

In [None]:
class SublayerConnection(nn.Module):
  def __init__(self, size, dropout):
    super().__init__()
    self.norm = LayerNorm(size)
    self.dropout = nn.Dropout(dropout)
  def forward(self, x, sublayer):
    output = x + self.dropout(sublayer(self.norm(x)))
    return output

class EncoderLayer(nn.Module):
  def __init__(self, size, self_attn, fee_forward, dropout):
    super().__init__()
    self.self_attn = self_attn
    self.feed_forward = feed_forward
    self.sublayer = nn.ModuleList([deepcopy(
        SublayerConnection(size, dropout)) for i in range(2)])
    self.size = size
  def forward(self, x, mask):
    x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
    output = self.sublayer[1](x, self.feed_forward)
    return output

class EncoderLayer(nn.Module):
  def __init__(self, size, self_attn, fee_forward, dropout):
    super().__init__()
    self.self_attn = self_attn
    self.feed_forward = feed_forward
    self.sublayer = nn.ModuleList([deepcopy(
        SublayerConnection(size, dropout)) for i in range(2)])
    self.size = size
  def forward(self, x, mask):
    x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
    output = self.sublayer[1](x, self.feed_forward)
    return output

class LayerNorm(nn.Module):
  def __init__(self, features, eps=1e-6):
    super().__init__()
    self.a_2 = nn.Parameter(torch.ones(features))
    self.b_2 = nn.Paramters(torch.zeros(features))
    self.eps = eps
  def forward(self, x):
    mean = x.mean(-1, keepdim=True)
    std = x.std(-1, keepdim=True)
    x_score = (x -mean) / torch.sqrt(std**2 + self.eps)
    output = self.a_2*x_score + self.b_2
    return output

### Encoder-Decoder Transformer

In [None]:
class DecoderLayer(nn.Module):
  def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
    super().__init__()
    self.size = size
    self.self_attn = self.attn
    self.src_attn = src_attn
    self.feed_forward = feed_forward
    self.sublayer = nn.ModuleList(
        [deepcopy(SublayerConnection(size, dropout)) for i in range(3)])
  def forward(self, x, memory, src_mask, tgt_mask):
    x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
    x = self.sublayer[1](x, lambda x: self.src_attn(x, memory, memory, src_mask))
    output = self.sublayer[2](x, self.feed_forward)
    return output

''' Generator '''
class Generator(nn.Module):
  def __init__(self, d_model, vocab):
    super().__init__()
    self.proj = nn.Linear(d_model, vocab)
  def forward(self, x):
    out = self.proj(x)
    probs = nn.functional.log_softmax(out, dim =-1)
    return probs

''' Transformer '''
class Decoder(nn.Module):
  def __init__(self, layer, N):
    super().__init__()
    self.layers = nn.ModuleList(
        [deepcopy(layer) for i in range(N)])
    self.norm = LayerNorm(layer.size)
  def forward(self, x, memory, src_mask, tgt_mask):
    for layer in self.layers:
      x = layer(x, memory, src_mask, tgt_mask)
    output = self.norm(x)
    return output

class Transformer(nn.Module):
  def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
    super().__init__()
    self.encoder = encoder
    self.decoder = decoder
    self.src_embed = src_embed
    self.tgt_embed = tgt_embed
    self.generator = generator
  def encode(self, src, src_mask):
    return self.encoder(self.src_embed(src), src_mask)
  def decode(self, memory, src_mask, tgt, tgt_mask):
    return self.decoded(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
  def forward(self, src, tgt, src_mask, tgt_mask):
    memory = self.encode(src, src_mask)
    output = self.decode(memory, src_mask, tgt, tgt_mask)

#### Model Translation

In [None]:
def create_model(src_vocab, tgt_vocab, N, d_model, d_ff, h, dropout=0.1):
  attn = MultiHeadAttention(h, d_model).to(device)
  feedforward = PositionwiseFeedForward(d_model, d_ff, dropout).to(device)
  posenc = PositionalEncoding(d_model, dropout).to(device)
  model = Transformer(
    Encoder(EncoderLayer(d_model,
      deepcopy(attn), deepcopy(feedforward), dropout).to(device),N).to(device),
    Decoder(DecoderLayer(d_model,
      deepcopy(attn), deepcopy(feedforward), dropout).to(device),N).to(device),
    nn.Sequential(Embeddings(d_model, src_vocab).to(device), deepcopy(posenc)),
    nn.Sequential(Embeddings(d_model, tgt_vocab).to(device), deepcopy(posenc)),
    Generator(d_model, tgt_vocab)).to(device)
  for p in model.parameters():
    if p.dim() > 1:
      nn.init.xavier_uniform_(p)
  return model.to(device)

class LabelSmoothing(nn.Module):
	def __init__(self, size, padding_idx, smoothing=0.1):
		super().__init__()
		self.criterion = nn.KLDivLoss(reduction='sum')
		self.padding_idx = padding_idx
		self.confidence = 1.0 - smoothing
		self.smoothing = smoothing
		self.size = size
		self.true_dist = None

	def forward(self, x, target):
		assert x.size(1) == self.size
		true_dist = x.data.clone()
		true_dist.fill_(self.smoothing / (self.size - 2))
		true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
		true_dist[:, self.padding_idx] = 0
		mask = torch.nonzero(target.data == self.padding_idx)
		if mask.dim() > 0:
			true_dist.index_fill_(0, mask.squeeze(), 0.0)
		self.true_dist = true_dist
		output = self.criterion(x, true_dist.clone().detach())
		return output

class Optimizer:
  def __init__(self, model_size, factor, warmup, optimizer):
    self.optimizer = optimizer
    self._step = 0
    self.warmup = warmup
    self.factor = factor
    self.model_size = model_size
    self._rate = 0
  def step(self):
    self._step +=1
    rate = self.rate()
    for p in self.optimizer.param_groups:
      p['lr'] = rate
    self._rate = rate
    self.optimizer.step()
  def rate(self, step=None):
    if step is None:
      step = self._step
    output = self.factor * (self.model_size **(-.5) * \
                min(step **(-.5), step * self.warmup **(-1.5)))
    return output

optimizer = Optimizer(256, 1, 2000, torch.optim.Adam(
    model.parameters(), lr=0, betas=(0.9, 0.98), eps=1e-9))

class SimpleLossCompute:
  def __init__(self, generator, criterion, opt=None):
    self.generator = generator
    self.criterion = criterion
    self.opt = opt
  def __call__(self, x, y, norm):
    x = self.generator(x)
    loss = self.criterion(
        x.contiguous().view(-1, x.size(-1)), y.contiguous().view(-1)) / norm
    loss.backward()
    if self.opt is not None:
      self.opt.step()
      self.opt.optimizer.zero_grad()
    return loss.data.item() * norm.float()

criterion = LabelSmoothing(tgt_vocab, padding_idx=0, smoothing=0.1)
loss_func = SimpleLossCompute(model.generator, criterion, optimizer)

In [None]:
''' train the model '''
for epoch in range(50):
  model.train()
  totalloss = 0
  tokens = 0
  for batch in batches:
    out = model(batch.src, batch.trg, batch.src_mask, batch.trg_mask)
    loss = loss_func(out, batch.trg_y, batch.ntokens)
    totalloss += loss
    tokens += batch.ntokens
  print(f'Epoch {epoch}, average loss {totalloss/tokens}')
torch.save(model.state_dict(), 'english-to-french.pth')

In [None]:
def translate(english):
  tokenized_en = tokenizer.tokenize(english)
  tokenized_en = ['BOS'] + tokenized_en + ['EOS']
  en_idx = [en_worddict.get(i,UNK) for i in tokenized_en]
  src = torch.tensor(en_idx).long().to(device).unsqueeze(0)
  src_mask = (src!=0).unsqueeze(-2)
  memory = model.encode(src, src_mask)
  start_symbol = fr_worddict['BOS']
  ys = torch.ones(1,1).fill_(start_symbol).type_as(src.data)
  translation = []
  for i in range(50):
    out = model.decode(memory, src_mask, ys,
                       subsequent_mask(ys.size(1)).type_as(src.data))
    prob = model.generator(out[:, -1])
    _, next_word = torch.max(prob, dim=1)
    next_word = next_word.data[0]
    ys = torch.cat(
        [ys, torch.ones(1,1).type_as(src.data).fill_(next_word)], dim=1)
    sym = fr_idxdict[ys[0, -1].item()]
    if sym != 'EOS':
      translation.append(sym)
    else:
      break
  translate = ''.join(translation)
  translate = translate.replace('</w', ' ')
  for x in '''?:;.,'("-!&)%''':
    translate = translate.replace(f' {x}, f'{x})
  print(translate)
  return translate