<a href="https://colab.research.google.com/github/isaaccodekill/EncoderDecoder/blob/main/EncoderDecoderForLangTranslation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install lightning

Collecting lightning
  Downloading lightning-2.5.3-py3-none-any.whl.metadata (39 kB)
Collecting lightning-utilities<2.0,>=0.10.0 (from lightning)
  Downloading lightning_utilities-0.15.2-py3-none-any.whl.metadata (5.7 kB)
Collecting torchmetrics<3.0,>0.7.0 (from lightning)
  Downloading torchmetrics-1.8.1-py3-none-any.whl.metadata (22 kB)
Collecting pytorch-lightning (from lightning)
  Downloading pytorch_lightning-2.5.3-py3-none-any.whl.metadata (20 kB)
Downloading lightning-2.5.3-py3-none-any.whl (824 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m824.2/824.2 kB[0m [31m33.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading lightning_utilities-0.15.2-py3-none-any.whl (29 kB)
Downloading torchmetrics-1.8.1-py3-none-any.whl (982 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m983.0/983.0 kB[0m [31m55.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pytorch_lightning-2.5.3-py3-none-any.whl (828 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
!pip install torchtext

Collecting torchtext
  Downloading torchtext-0.18.0-cp312-cp312-manylinux1_x86_64.whl.metadata (7.9 kB)
Downloading torchtext-0.18.0-cp312-cp312-manylinux1_x86_64.whl (2.0 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.0 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.0/2.0 MB[0m [31m66.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m44.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torchtext
Successfully installed torchtext-0.18.0


In [3]:
import torch
import torch.nn as nn
import lightning as L
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [4]:
!pip install datasets



In [5]:
from datasets import load_dataset

In [None]:
ds = load_dataset("bentrevett/multi30k")  # splits: train/validation/test
print(ds["train"][0])  # {'en': 'A man ...', 'de': 'Ein Mann ...'}

In [7]:
import numpy as np
import spacy
import random


In [8]:
spacy_en = spacy.load('en_core_web_sm', disable=["tagger", "parser", "ner", "lemmatizer"])

In [None]:
!python -m spacy download de_core_news_sm

In [10]:
spacy_de = spacy.load('de_core_news_sm', disable=["tagger", "parser", "ner", "lemmatizer"])

In [11]:
def tokenizer_ger(text):
  return [w.text for w in spacy_de.tokenizer(text)]

def tokenizer_en(text):
  return [w.text for w in spacy_en.tokenizer(text)]

In [None]:
# tokenize the dataset, add init token and eos tokens

german = ds['train'].map(lambda x: {'german': ['<sos>'] + tokenizer_ger(x['de']) + ['<eos>']})
english = ds['train'].map(lambda x: {'english': ['<sos>'] + tokenizer_en(x['en']) + ['<eos>']})




In [13]:
ds["train"].column_names

['en', 'de']

In [None]:
# let's try to figure out how to tokenize a whole batch of training data for both languages

SOS, EOS = '<sos>', '<eos>'

def batch_tokenize(batch):
  en_texts = batch["en"]
  de_texts = batch["de"]

  en_docs = list(spacy_en.pipe(en_texts))
  de_docs = list(spacy_de.pipe(de_texts))

  en_tokens = [[SOS] + [w.text.lower() for w in doc] + [EOS] for doc in en_docs]
  de_tokens = [[SOS] + [w.text.lower() for w in doc] + [EOS] for doc in de_docs]

  return {"en": en_tokens, "de": de_tokens}

batch_tokenize(ds["train"][:2])


In [None]:
# lets do all for all the whole data

ds_tokenized = ds.map(batch_tokenize, batched=True)

In [None]:
ds_tokenized["train"][:2]

In [17]:
# build a vocabulary for each language with a min repeat size of 2
from collections import Counter

def build_vocab(tokenized_data, min_freq=2):
    # Special tokens
    special_tokens = ['<pad>', '<unk>', '<sos>', '<eos>']

    # Count frequencies
    token_counter = Counter()
    for tokens in tokenized_data:
        token_counter.update(tokens)

    # Build vocab with special tokens first
    vocab = {token: idx for idx, token in enumerate(special_tokens)}

    # Add tokens that meet min frequency
    idx = len(special_tokens)
    for token, freq in token_counter.items():
        if freq >= min_freq and token not in vocab:
            vocab[token] = idx
            idx += 1

    return vocab

In [18]:
german_vocab = build_vocab(ds_tokenized["train"]["de"])
english_vocab = build_vocab(ds_tokenized["train"]["en"])

In [None]:
print(german_vocab)

In [None]:
print(english_vocab)

In [21]:
class Encoder(nn.Module):
  def __init__(self, input_size, embedding_size, hidden_size, num_layers, p):
    super().__init__()
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.dropout = nn.Dropout(p)
    self.embedding = nn.Embedding(input_size, embedding_size)
    self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=p)

  def forward(self, x):
    # x is a vector of indices
    # eg "Isaac is awesome" -> "[Isaac, is, awesome]" -> "[10, 2, 8]" ->
    # the shape of x is (seq_length, N) where N is the batch size

    embedding = self.dropout(self.embedding(x)) # randomly zero things in the embedding. to prevent overfitting
    # embedding shape: (seq_length, N, embedding_size)

    outputs, (hidden, cell) = self.rnn(embedding)

    return hidden, cell



class Decoder(nn.Module):
  def __init__(self, input_size, embedding_size, hidden_size, output_size, num_layers, p):
    super().__init__()
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.dropout = nn.Dropout(p)
    self.embedding = nn.Embedding(input_size, embedding_size)
    self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=p)
    self.fc = nn.Linear(hidden_size, output_size)

  def forward(self, x, hidden, cell):
    # takes in the hidden and cell state output from the Encoder, as well as one word at a time
    # and for x the input we know it takes in a word
    x = x.unsqueeze(0)
    embedding = self.dropout(self.embedding(x))
    # embedding shape: (1, N, embedding_size)
    outputs, (hidden, cell) = self.rnn(embedding, (hidden, cell))
    # shape of output: (1, N, hidden_size)

    predictions = self.fc(outputs)
    # shape of predictions: (1, N, length_of_vocab)

    predictions = predictions.squeeze(0)

    return predictions, hidden, cell

In [22]:
# Lets create a lightning module to rule them all.

class Seq2seq(L.LightningModule):
  def __init__(self, encoder, decoder):
    super().__init__() #
    self.encoder = encoder
    self.decoder = decoder
    self.loss_func = nn.CrossEntropyLoss(ignore_index=english_vocab['<pad>']) # Ignore padding in loss calculation

  def forward(self, source, target, teacher_force_ratio=0.5):
    # teacher force ratio is just saying 50% of the time, don't use the previously predicted word from the decoder as the next input to the decoder
    # instead use the target (the correct target words)
    batch_size = source.shape[1]
    target_len = target.shape[0]
    target_vocab_size = len(english_vocab) # Target language is English

    outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(self.device)

    hidden, cell = self.encoder(source)

    # First input to the decoder is the <sos> token of the target language
    x = target[0]

    for t in range(1, target_len):
      output, hidden, cell = self.decoder(x, hidden, cell)
      outputs[t] = output
      best_guess = output.argmax(1)
      # Use teacher forcing with a certain probability
      x = target[t] if random.random() < teacher_force_ratio else best_guess


    return outputs

  def training_step(self, batch, batch_idx):
    # Swap source and target to match the model: encode German (source), decode English (target)
    source = batch["de"]
    target = batch["en"]
    outputs = self(source, target)
    # Flatten the outputs and targets for loss calculation, excluding the first token (<sos>)
    outputs = outputs[1:].reshape(-1, outputs.shape[2])
    target = target[1:].reshape(-1)
    loss = self.loss_func(outputs, target)
    self.log("train_loss", loss, prog_bar=True, on_step=True, on_epoch=True)
    return loss

  def validation_step(self, batch, batch_idx):
    # Swap source and target to match the model: encode German (source), decode English (target)
    source = batch["de"]
    target = batch["en"]
    outputs = self(source, target, teacher_force_ratio=0) # No teacher forcing during validation
    # Flatten the outputs and targets for loss calculation, excluding the first token (<sos>)
    outputs = outputs[1:].reshape(-1, outputs.shape[2])
    target = target[1:].reshape(-1)
    loss = self.loss_func(outputs, target)
    self.log("val_loss", loss, prog_bar=True, on_step=True, on_epoch=True)


  def configure_optimizers(self):
    return torch.optim.Adam(self.parameters())

In [23]:
encoder = Encoder(
    input_size=len(german_vocab),
    embedding_size=256,
    hidden_size=1024,
    num_layers=2,
    p=0.5
)

decoder = Decoder(
    input_size=len(english_vocab), # Input to decoder is from the target language (English)
    embedding_size=256,
    hidden_size=1024,
    output_size=len(english_vocab), # Output of decoder is the target language vocabulary size (English)
    num_layers=2,
    p=0.5
)


model = Seq2seq(encoder, decoder)

In [24]:
# # yay training finally
# from torch.nn.utils.rnn import pad_sequence

# def collate_fn(batch):
#     # Pad sequences to the maximum length in the batch
#     german_batch = [torch.tensor([german_vocab[token] if token in german_vocab else german_vocab['<unk>'] for token in item["de"]]) for item in batch]
#     english_batch = [torch.tensor([english_vocab[token] if token in english_vocab else english_vocab['<unk>'] for token in item["en"]]) for item in batch]

#     padded_german = pad_sequence(german_batch, batch_first=False, padding_value=german_vocab['<pad>'])
#     padded_english = pad_sequence(english_batch, batch_first=False, padding_value=english_vocab['<pad>'])

#     return {"de": padded_german, "en": padded_english}


# trainer = L.Trainer(max_epochs=20)
# dataloader = DataLoader(ds_tokenized["train"], batch_size=64, shuffle=True, collate_fn=collate_fn)
# val_dataloader = DataLoader(ds_tokenized["validation"], batch_size=64, shuffle=False, collate_fn=collate_fn)
# trainer.fit(model, train_dataloaders=dataloader, val_dataloaders=val_dataloader)

In [None]:
from torch.nn.utils.rnn import pad_sequence

def preprocess_to_indices(dataset):
    def convert_to_indices(examples):
        de_indices = []
        en_indices = []

        for de_tokens, en_tokens in zip(examples['de'], examples['en']):
            de_idx = [german_vocab.get(token, german_vocab['<unk>']) for token in de_tokens]
            en_idx = [english_vocab.get(token, english_vocab['<unk>']) for token in en_tokens]
            de_indices.append(de_idx)
            en_indices.append(en_idx)

        return {'de_indices': de_indices, 'en_indices': en_indices}

    return dataset.map(convert_to_indices, batched=True, remove_columns=['de', 'en'])

ds_indexed = preprocess_to_indices(ds_tokenized)


def collate_fn_fast(batch):
    german_batch = [torch.tensor(item["de_indices"]) for item in batch]
    english_batch = [torch.tensor(item["en_indices"]) for item in batch]

    padded_german = pad_sequence(german_batch, batch_first=False, padding_value=german_vocab['<pad>'])
    padded_english = pad_sequence(english_batch, batch_first=False, padding_value=english_vocab['<pad>'])

    return {"de": padded_german, "en": padded_english}

In [None]:
trainer = L.Trainer(max_epochs=20, log_every_n_steps=10, accelerator="gpu",devices=1,precision="16-mixed")
dataloader = DataLoader(ds_indexed["train"], batch_size=64, shuffle=True, collate_fn=collate_fn_fast)
val_dataloader = DataLoader(ds_indexed["validation"], batch_size=64, shuffle=False, collate_fn=collate_fn_fast)
trainer.fit(model, train_dataloaders=dataloader, val_dataloaders=val_dataloader)

In [None]:
%load_ext tensorboard
%tensorboard --logdir="lightning_logs/"

In [35]:
path_to_best_checkpoint = trainer.checkpoint_callback.best_model_path

In [None]:
print(path_to_best_checkpoint)

In [None]:
model.train() # Set the model back to training mode after loading the checkpoint
trainer = L.Trainer(max_epochs=30, log_every_n_steps=10, accelerator="gpu",devices=1,precision="16-mixed")
dataloader = DataLoader(ds_indexed["train"], batch_size=64, shuffle=True, collate_fn=collate_fn_fast)
val_dataloader = DataLoader(ds_indexed["validation"], batch_size=64, shuffle=False, collate_fn=collate_fn_fast)
trainer.fit(model, train_dataloaders=dataloader, val_dataloaders=val_dataloader, ckpt_path=path_to_best_checkpoint)

In [None]:
def translate_sentence(model, sentence, german_vocab, english_vocab, device, max_length=50):
    model.eval()  # Set to evaluation mode

    # Tokenize the input sentence
    if isinstance(sentence, str):
        tokens = ['<sos>'] + [token.lower() for token in sentence.split()] + ['<eos>']
    else:
        tokens = sentence  # Already tokenized

    # Convert tokens to indices
    indices = [german_vocab.get(token, german_vocab['<unk>']) for token in tokens]

    # Convert to tensor and add batch dimension
    sentence_tensor = torch.LongTensor(indices).unsqueeze(1).to(device)

    # Get encoder output
    with torch.no_grad():
        hidden, cell = model.encoder(sentence_tensor)

    outputs = []

    # Start with <sos> token
    input_token = english_vocab['<sos>']

    for _ in range(max_length):
        input_tensor = torch.LongTensor([input_token]).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(input_tensor, hidden, cell)

        # Get the predicted token
        predicted = output.argmax(1).item()
        outputs.append(predicted)

        # Stop if we predict <eos>
        if predicted == english_vocab['<eos>']:
            break

        input_token = predicted

    # Convert indices back to words
    english_idx2token = {idx: token for token, idx in english_vocab.items()}
    translated_tokens = [english_idx2token.get(idx, '<unk>') for idx in outputs]

    # Remove <eos> if present
    if '<eos>' in translated_tokens:
        translated_tokens = translated_tokens[:translated_tokens.index('<eos>')]

    return ' '.join(translated_tokens)

# Use the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# Translate a sentence
german_sentence = "Ich zog das Schwert aus dem Stein"
translation = translate_sentence(model, german_sentence, german_vocab, english_vocab, device)
print(f"German: {german_sentence}")
print(f"English: {translation}")