# LSTM RNN


In [16]:
%pip install nltk


Note: you may need to restart the kernel to use updated packages.


In [34]:
# libraries
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import defaultdict
import nltk
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu, SmoothingFunction
nltk.download('punkt')



[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Cecilia\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [35]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device.type

'cpu'

In [36]:
# eng -> german sentence pairs
pairs = [
    ["i am a student", "ich bin ein student"],
    ["he is a teacher", "er ist ein lehrer"],
    ["she is happy", "sie ist glücklich"],
    ["they are doctors", "sie sind ärzte"],
    ["we are friends", "wir sind freunde"],
    ["i am tired", "ich bin müde"],
    ["you are smart", "du bist klug"],
    ["it is raining", "es regnet"],
    ["i like apples", "ich mag äpfel"],
    ["he likes music", "er mag musik"],
    ["do you speak german", "sprichst du deutsch"],
    ["what is your name", "wie heißt du"],
    ["my name is anna", "ich heiße anna"],
    ["i live in berlin", "ich wohne in berlin"],
    ["she lives in hamburg", "sie wohnt in hamburg"],
    ["i am hungry", "ich habe hunger"],
    ["are you okay", "bist du okay"],
    ["this is my book", "das ist mein buch"],
    ["i need help", "ich brauche hilfe"],
    ["let's go", "lass uns gehen"],
    ["hi i am a teacher", "hallo ich bin ein lehrer"],
    ["hello i am a teacher", "hallo ich bin ein lehrer"],
    ["i am a teacher", "ich bin ein lehrer"]

]


In [37]:
def tokenize(sentence:str)->(list):
    return sentence.lower().split() 

def build_vocab(sentences:list)->(dict):
    # building a vocabullary so each word had an index
    # <pad> - to align sentences
    # <sos> - "start of sentence" (inserted before each sentence)
    # <eos> - "end of sentence" (inserted at the end)
    vocab = {"<pad>":0, "<sos>":1, "<eos>":2,}
    idx = 3 # starting point
    for sentence in sentences:
        for word in tokenize(sentence):
            if word not in vocab:
                vocab[word] = idx
                idx+=1
    return vocab


In [38]:
# creating eng and ger vocabs
eng_vocab = build_vocab([n[0] for n in pairs])
german_vocab = build_vocab([n[1] for n in pairs])
print(eng_vocab)
print(german_vocab)

{'<pad>': 0, '<sos>': 1, '<eos>': 2, 'i': 3, 'am': 4, 'a': 5, 'student': 6, 'he': 7, 'is': 8, 'teacher': 9, 'she': 10, 'happy': 11, 'they': 12, 'are': 13, 'doctors': 14, 'we': 15, 'friends': 16, 'tired': 17, 'you': 18, 'smart': 19, 'it': 20, 'raining': 21, 'like': 22, 'apples': 23, 'likes': 24, 'music': 25, 'do': 26, 'speak': 27, 'german': 28, 'what': 29, 'your': 30, 'name': 31, 'my': 32, 'anna': 33, 'live': 34, 'in': 35, 'berlin': 36, 'lives': 37, 'hamburg': 38, 'hungry': 39, 'okay': 40, 'this': 41, 'book': 42, 'need': 43, 'help': 44, "let's": 45, 'go': 46, 'hi': 47, 'hello': 48}
{'<pad>': 0, '<sos>': 1, '<eos>': 2, 'ich': 3, 'bin': 4, 'ein': 5, 'student': 6, 'er': 7, 'ist': 8, 'lehrer': 9, 'sie': 10, 'glücklich': 11, 'sind': 12, 'ärzte': 13, 'wir': 14, 'freunde': 15, 'müde': 16, 'du': 17, 'bist': 18, 'klug': 19, 'es': 20, 'regnet': 21, 'mag': 22, 'äpfel': 23, 'musik': 24, 'sprichst': 25, 'deutsch': 26, 'wie': 27, 'heißt': 28, 'heiße': 29, 'anna': 30, 'wohne': 31, 'in': 32, 'berlin': 

In [39]:
def sentence_to_indices(sentence:str, vocab:dict)->(list):
    tokens = tokenize(sentence)
    return [vocab["<sos>"]]+[vocab[n] for n in tokens]+[vocab["<eos>"]]

eng_indices = [sentence_to_indices(n[0], eng_vocab) for n in pairs]
eng_indices

[[1, 3, 4, 5, 6, 2],
 [1, 7, 8, 5, 9, 2],
 [1, 10, 8, 11, 2],
 [1, 12, 13, 14, 2],
 [1, 15, 13, 16, 2],
 [1, 3, 4, 17, 2],
 [1, 18, 13, 19, 2],
 [1, 20, 8, 21, 2],
 [1, 3, 22, 23, 2],
 [1, 7, 24, 25, 2],
 [1, 26, 18, 27, 28, 2],
 [1, 29, 8, 30, 31, 2],
 [1, 32, 31, 8, 33, 2],
 [1, 3, 34, 35, 36, 2],
 [1, 10, 37, 35, 38, 2],
 [1, 3, 4, 39, 2],
 [1, 13, 18, 40, 2],
 [1, 41, 8, 32, 42, 2],
 [1, 3, 43, 44, 2],
 [1, 45, 46, 2],
 [1, 47, 3, 4, 5, 9, 2],
 [1, 48, 3, 4, 5, 9, 2],
 [1, 3, 4, 5, 9, 2]]

In [40]:
def prepare_batch(pairs:list, eng_vocab:dict, german_vocab:dict):
    src_batch = [] # eng
    trg_batch = [] #german

    for eng, ger in pairs:
        src = sentence_to_indices(eng, eng_vocab)
        trg = sentence_to_indices(ger, german_vocab)

        # torch.tensor(src, dtype=torch.long) -> converting to the tensors (arrays)
        src_batch.append(torch.tensor(src, dtype=torch.long))
        trg_batch.append(torch.tensor(trg, dtype=torch.long))

    # src_batch = [
        # tensor([1, 3, 4, 2]),        # "i am happy"
        # tensor([1, 3, 4, 5, 6, 2])   # "i am a student"
        # ]
    # pad_sequence(src_batch, padding_value=0) →
        # tensor([
        # [1, 1],
        # [3, 3],
        # [4, 4],
        # [2, 5],
        # [0, 6],
        # [0, 2]
        # ])  so, the first column is the first sentence and the second is the second the zeros are the paddings, so we had vectors of the same lengths

    src_batch = nn.utils.rnn.pad_sequence(src_batch, padding_value=0)
    trg_batch = nn.utils.rnn.pad_sequence(trg_batch, padding_value=0)

    return src_batch, trg_batch

In [41]:
src_batch, trg_batch = prepare_batch(pairs, eng_vocab, german_vocab)
# getting the batches and send them to the device
src_batch=src_batch.to(device)
trg_batch=trg_batch.to(device)
src_batch

tensor([[ 1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,
          1,  1,  1,  1,  1],
        [ 3,  7, 10, 12, 15,  3, 18, 20,  3,  7, 26, 29, 32,  3, 10,  3, 13, 41,
          3, 45, 47, 48,  3],
        [ 4,  8,  8, 13, 13,  4, 13,  8, 22, 24, 18,  8, 31, 34, 37,  4, 18,  8,
         43, 46,  3,  3,  4],
        [ 5,  5, 11, 14, 16, 17, 19, 21, 23, 25, 27, 30,  8, 35, 35, 39, 40, 32,
         44,  2,  4,  4,  5],
        [ 6,  9,  2,  2,  2,  2,  2,  2,  2,  2, 28, 31, 33, 36, 38,  2,  2, 42,
          2,  0,  5,  5,  9],
        [ 2,  2,  0,  0,  0,  0,  0,  0,  0,  0,  2,  2,  2,  2,  2,  0,  0,  2,
          0,  0,  9,  9,  2],
        [ 0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  2,  2,  0]])

### Encoder part

In [42]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim):
        super().__init__()
        # input_dim - how many words are in the dictionary (dictionary size)
        # emb_dim - size of each embedding (vector length, e.g. 32 or 100)
        # hidden_dim - how many neurons inside the LSTM, the size of the "memory"

        # Example:
        # The word "student" has an embedding index of 42
        # embedding(42) produces a vector of sorts: [0.12, -0.03, 0.44, ..., 0.08] of length emb_dim

        # Input (indices):       [1, 4, 5, 2]
        # → Embedding Layer →    [[...], [...], [...], [...]]  # (seq_len, emb_dim)
        # → LSTM Layer     →     outputs, (hidden, cell)
        self.embeding = nn.Embedding(input_dim,emb_dim) #A dictionary that turns word indices into vectors. Random at first. We will train it
        self.lstm = nn.LSTM(emb_dim, hidden_dim) #Creates an LSTM layer that will handle the embedding sequence.

    def forward(self, src):
        embedded = self.embeding(src)
        outputs, (hidden, cell) = self.lstm(embedded)
        return hidden, cell

### Decoder

In [43]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim):
        super().__init__()
        # output_dim - output dictionary size (how many words in the target language, e.g. German)
        # emb_dim - embbedding size (length of each word vector)
        # hidden_dim — how many neurons are in the LSTM (hidden state size)

        self.embedding = nn.Embedding(output_dim, emb_dim) # (output_dim x emb_dim) matrix
        self.lstm = nn.LSTM(emb_dim, hidden_dim) # (output, hidden, cell)
        # output
        # This is all the hidden states (h_t) at each step of the sequence.
        # Size: (seq_len, batch_size, hidden_dim)
        # That is: for each word → corresponding h_t

        # hidden - shortterm, is what the encoder "understood" about the entire sequence.
        # This last hidden state (h_T) is the final "memory state".
        # Size: (num_layers * num_directions, batch_size, hidden_dim)
        # Normally num_layers = 1, num_directions = 1, so:
        # hidden = (1, batch_size, hidden_dim)

        # cell - longterm 
        # This is the last state of the "memory cell" (c_T)
        # Similar to hidden but stores deep state, used for internal LSTM memory.
            
        # Example:
        # Output hidden from LSTM → [0.5, -0.2, 1.1, ...] (size hidden_dim)
        # fc_out turns this into a vector of length output_dim
        # Then do softmax → probabilities of all words
        self.fc_out = nn.Linear(hidden_dim, output_dim) #Converts the hidden state (hidden) to a probability vector of all words

    def forward(self, input, hidden, cell):
        input = input.unsqueeze(0) # we get (1, batch_size)
        embedded = self.embedding(input)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        return prediction, hidden, cell

### Seq2Seq

In [44]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg, teacher_forcing_ratio = 0.5):
        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.fc_out.out_features
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(device)

        hidden, cell = self.encoder(src)
        input = trg[0, :]  # <sos>

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[t] if teacher_force else top1

        return outputs

### Training Part

In [45]:
def evaluate_bleu(model, pairs, eng_vocab, german_vocab):
    smooth = SmoothingFunction().method4
    bleu_scores = []

    for eng, ger in pairs:
        pred = translate_sentence(eng, model, eng_vocab, german_vocab)
        reference = tokenize(ger)
        candidate = tokenize(pred)
        score = sentence_bleu([reference], candidate, smoothing_function=smooth)
        bleu_scores.append(score)

    average_bleu = sum(bleu_scores) / len(bleu_scores)
    return average_bleu


In [46]:
def evaluate_corpus_bleu(model, pairs, eng_vocab, german_vocab):
    list_of_references = []
    hypotheses = []

    for eng, ger in pairs:
        pred = translate_sentence(eng, model, eng_vocab, german_vocab)
        reference = nltk.word_tokenize(ger.lower())
        candidate = nltk.word_tokenize(pred.lower())

        list_of_references.append([reference])
        hypotheses.append(candidate)

    score = corpus_bleu(list_of_references, hypotheses)
    return score

In [47]:
INPUT_DIM = len(eng_vocab)
OUTPUT_DIM = len(german_vocab)
EMB_DIM = 16
HID_DIM = 32

encoder = Encoder(INPUT_DIM, EMB_DIM, HID_DIM).to(device)
decoder = Decoder(OUTPUT_DIM, EMB_DIM, HID_DIM).to(device)
model = Seq2Seq(encoder, decoder).to(device)

optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=0)

def train(model, src, trg, optimizer, criterion):
    model.train()
    optimizer.zero_grad()
    output = model(src, trg)
    output_dim = output.shape[-1]
    output = output[1:].view(-1, output_dim)
    trg = trg[1:].view(-1)
    loss = criterion(output, trg)
    loss.backward()
    optimizer.step()
    return loss.item()


# Training loop
for epoch in range(1000):
    loss = train(model, src_batch, trg_batch, optimizer, criterion)
    if epoch % 10 == 0:
        bleu = evaluate_bleu(model, pairs, eng_vocab, german_vocab)
        print(f"Epoch {epoch} | Loss: {loss:.4f} | BLEU: {bleu:.4f}")

# Final evaluation with corpus BLEU
final_corpus_bleu = evaluate_corpus_bleu(model, pairs, eng_vocab, german_vocab)
print(f"\nFinal Corpus BLEU Score: {final_corpus_bleu:.4f}")
print(f"Final Corpus BLEU (%): {final_corpus_bleu * 100:.2f}%")

Epoch 0 | Loss: 3.8428 | BLEU: 0.0017
Epoch 10 | Loss: 3.7765 | BLEU: 0.0186
Epoch 20 | Loss: 3.7236 | BLEU: 0.0364
Epoch 30 | Loss: 3.6279 | BLEU: 0.0375
Epoch 40 | Loss: 3.4385 | BLEU: 0.0375
Epoch 50 | Loss: 3.1822 | BLEU: 0.0375
Epoch 60 | Loss: 2.9710 | BLEU: 0.0375
Epoch 70 | Loss: 2.8500 | BLEU: 0.0323
Epoch 80 | Loss: 2.6970 | BLEU: 0.0323
Epoch 90 | Loss: 2.5602 | BLEU: 0.0295
Epoch 100 | Loss: 2.4380 | BLEU: 0.0450
Epoch 110 | Loss: 2.3118 | BLEU: 0.0851
Epoch 120 | Loss: 2.2025 | BLEU: 0.0797
Epoch 130 | Loss: 2.0568 | BLEU: 0.0893
Epoch 140 | Loss: 1.9832 | BLEU: 0.1331
Epoch 150 | Loss: 1.8659 | BLEU: 0.1682
Epoch 160 | Loss: 1.7199 | BLEU: 0.1772
Epoch 170 | Loss: 1.6043 | BLEU: 0.1788
Epoch 180 | Loss: 1.5748 | BLEU: 0.1803
Epoch 190 | Loss: 1.4012 | BLEU: 0.2112
Epoch 200 | Loss: 1.3343 | BLEU: 0.2320
Epoch 210 | Loss: 1.2088 | BLEU: 0.2289
Epoch 220 | Loss: 1.1090 | BLEU: 0.2608
Epoch 230 | Loss: 1.0128 | BLEU: 0.2676
Epoch 240 | Loss: 0.9298 | BLEU: 0.2832
Epoch 250 |

### Usage

In [48]:
def translate_sentence(sentence, model, eng_vocab, german_vocab, max_len=20):
    model.eval()

    idx2word = {idx: word for word, idx in german_vocab.items()}

    # tokenization
    tokens = ["<sos>"] + tokenize(sentence.lower()) + ["<eos>"]
    src_indices = [eng_vocab.get(token, eng_vocab["<pad>"]) for token in tokens]
    src_tensor = torch.tensor(src_indices, dtype=torch.long).unsqueeze(1).to(device)  # (seq_len, 1)

    # Пgo through the encoder
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)

    # decoding with <sos>
    trg_indices = [german_vocab["<sos>"]]

    for _ in range(max_len):
        trg_tensor = torch.tensor([trg_indices[-1]], dtype=torch.long).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)
            pred_token = output.argmax(1).item()

        if pred_token == german_vocab["<eos>"]:
            break

        trg_indices.append(pred_token)

    translated_words = [idx2word[idx] for idx in trg_indices[1:]]  # without <sos>
    return " ".join(translated_words)


In [49]:
translate_sentence("hi i am a teacher",model, eng_vocab, german_vocab)

'hallo ich bin ein lehrer'