In [15]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from IPython.display import SVG, display
from torch.utils.data import DataLoader, TensorDataset
from torchviz import make_dot


ModuleNotFoundError: No module named 'torchviz'

# Overview

In this notebook, we implement a basic character-level recurrent sequence-to-sequence model. We apply it to translating short English sentences into short French sentences, character-by-character. Normally, word-level models are more common in machine translation domain.

We will start with input sentences from a English sentences and corresponding target sequences from French sentences. And we will use LSTM as an encoder turns input sequences to 2 state vectors(the last LSTM state and discard the outputs). A LSTM decoder is trained to turn the target sequences into the same sequence but offset by one timestep in the future, a training process called "teacher forcing" in this context. It uses as initial state the state vectors from the encoder. 

In inference mode, when we want to decode unknown input sequences, we encode the input sequence into state vectors. -Start with a target sequence of size 1(just the start-of-sequence character). - Feed the state vectors and 1-char target sequence to the decoder to produce predictions for the next character - Sample the next character using these predictions(here we use argmax). And append the sampled character to the target sequence- Repeat until we generate the end-of-sequence character or we hit the character limit.


# Loading the Dataset

In [2]:
%%capture
!wget http://www.manythings.org/anki/fra-eng.zip

In [3]:
!unzip -o fra-eng.zip

Archive:  fra-eng.zip
  inflating: _about.txt              
  inflating: fra.txt                 


# Preparing the Data

In [16]:
num_samples=10000 # number of samples to train on

# vectorize the data
input_texts=[]
target_texts=[]
input_characters=set()
target_characters=set()

data_path=os.path.join('', "fra.txt")

with open(data_path, "r", encoding="utf-8") as f:
    lines=f.read().split("\n")
    

# Initialize character sets
input_characters = set()
target_characters = set()

# Data reading and processing
for line in lines[:min(num_samples, len(lines)-1)]:
    input_text, target_text, _ = line.split("\t")
    target_text = "\t" + target_text + "\n"
    input_texts.append(input_text)
    target_texts.append(target_text)
    
    for char in input_text:
        input_characters.add(char)
    for char in target_text:
        target_characters.add(char)

# Sort characters and create token indices
input_characters = sorted(list(input_characters))
target_characters = sorted(list(target_characters))

input_token_index = dict((char, i) for i, char in enumerate(input_characters))
target_token_index = dict((char, i) for i, char in enumerate(target_characters))

# Reverse mapping for decoding
reverse_input_char_index = dict((i, char) for char, i in input_token_index.items())
reverse_target_char_index = dict((i, char) for char, i in target_token_index.items())

input_token_index=dict([(char, i) for i, char in enumerate(input_characters)])
target_token_index=dict([(char, i) for i, char in enumerate(target_characters)])
# Re-create the data from scratch
num_encoder_tokens = len(input_token_index)  # Number of unique input tokens
num_decoder_tokens = len(target_token_index)  # Number of unique output tokens
max_encoder_seq_length = max([len(txt) for txt in input_texts])  # Find the max sequence length for encoder
max_decoder_seq_length = max([len(txt) for txt in target_texts])  # Find the max sequence length for decoder

# Debug print statements
print("Input characters:", input_characters)
print("Target characters:", target_characters)
print("Number of samples:", len(input_texts))
print("Number of unique input tokens:", num_encoder_tokens)
print("Number of unique output tokens:", num_decoder_tokens)
print("Max sequence length for inputs:", max_encoder_seq_length)
print("Max sequence length for outputs:", max_decoder_seq_length)

Input characters: [' ', '!', '"', '$', '%', '&', "'", ',', '-', '.', '0', '1', '2', '3', '5', '7', '8', '9', ':', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'Y', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
Target characters: ['\t', '\n', ' ', '!', '%', '&', "'", ',', '-', '.', '0', '1', '2', '3', '5', '8', '9', ':', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'Y', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '\xa0', '«', '»', 'À', 'Ç', 'É', 'Ê', 'à', 'â', 'ç', 'è', 'é', 'ê', 'î', 'ï', 'ô', 'ù', 'û', 'œ', '\u2009', '’', '\u202f']
Number of samples: 10000
Number of unique input tokens: 70
Number of unique output tokens: 91
Max sequence length for inputs: 14
Max sequence

In [17]:
target_characters[20]

'B'

In [18]:
# Initialize encoder and decoder input data arrays
encoder_input_data = np.zeros(
    (len(input_texts), max_encoder_seq_length, num_encoder_tokens), dtype="float32"
)
decoder_input_data = np.zeros(
    (len(input_texts), max_decoder_seq_length, num_decoder_tokens), dtype="float32"
)
decoder_target_data = np.zeros(
    (len(input_texts), max_decoder_seq_length), dtype="int32"
)  # This is now of integer type for class indices

# Populate the arrays with the token indices
for i, (input_text, target_text) in enumerate(zip(input_texts, target_texts)):
    for t, char in enumerate(input_text):
        encoder_input_data[i, t, input_token_index[char]] = 1.0
    for t, char in enumerate(target_text):
        decoder_input_data[i, t, target_token_index[char]] = 1.0
        if t > 0:
            # decoder_target_data starts from index t-1
            decoder_target_data[i, t - 1] = target_token_index[char]


print(len(encoder_input_data))
print(len(decoder_input_data))
print(len(decoder_target_data))

10000
10000
10000


In [19]:
print(len(decoder_target_data[1]))
print(len(decoder_target_data[1]))
print(len(decoder_target_data[1]))

59
59
59


# Build the Model

In [20]:
class Seq2SeqModel(nn.Module):
    def __init__(self, num_encoder_tokens, num_decoder_tokens, latent_dim):
        super(Seq2SeqModel, self).__init__()
        self.latent_dim = latent_dim
        
        # Encoder
        self.encoder_lstm = nn.LSTM(num_encoder_tokens, latent_dim, batch_first=True)
        
        # Decoder
        self.decoder_lstm = nn.LSTM(num_decoder_tokens, latent_dim, batch_first=True)
        self.decoder_dense = nn.Linear(latent_dim, num_decoder_tokens)
    
    def forward(self, encoder_inputs, decoder_inputs):
        # Encoder forward pass
        _, (state_h, state_c) = self.encoder_lstm(encoder_inputs)
        
        # Decoder forward pass using the encoder's final state as the initial state
        decoder_outputs, _ = self.decoder_lstm(decoder_inputs, (state_h, state_c))
        
        # Dense layer
        decoder_outputs = self.decoder_dense(decoder_outputs)
        
        return F.log_softmax(decoder_outputs, dim=-1)

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Define model parameters
latent_dim = 256  # Latent dimensionality of the encoding space
num_encoder_tokens = encoder_input_data.shape[2]  # Based on input shape
num_decoder_tokens = decoder_input_data.shape[2]  # Based on input shape

# Initialize the model architecture
model = Seq2SeqModel(num_encoder_tokens, num_decoder_tokens, latent_dim)
model.to(device)

Using device: cuda


Seq2SeqModel(
  (encoder_lstm): LSTM(70, 256, batch_first=True)
  (decoder_lstm): LSTM(91, 256, batch_first=True)
  (decoder_dense): Linear(in_features=256, out_features=91, bias=True)
)

In [21]:
def plot_model(model, encoder_input_size, decoder_input_size):
    # Create dummy input tensors with the specified input sizes
    encoder_dummy_input = torch.randn(encoder_input_size).to(next(model.parameters()).device)
    decoder_dummy_input = torch.randn(decoder_input_size).to(next(model.parameters()).device)

    # Perform a forward pass through the model to capture the computational graph
    output = model(encoder_dummy_input, decoder_dummy_input)

    # Create a dot graph visualization using torchviz
    dot = make_dot(output, params=dict(model.named_parameters()))

    # Render the dot graph to SVG format
    svg = dot.render(format="svg", outfile="./seq2seq.svg")

    # Display the SVG
    display(SVG(svg))
    
# Define input sizes for encoder and decoder (batch_size=32, sequence_length=10)
encoder_input_size = (32, 10, num_encoder_tokens)  # Example size
decoder_input_size = (32, 10, num_decoder_tokens)  # Example size

plot_model(model, encoder_input_size, decoder_input_size)

NameError: name 'make_dot' is not defined

# Train the Model

In [27]:
# Set parameters

epochs = 100  # number of epochs to train for
batch_size = 64  # batch size of training
validation_split = 0.2  # 20% validation data

# Convert NumPy arrays to PyTorch tensors
encoder_input_data = torch.tensor(encoder_input_data, dtype=torch.float32)
decoder_input_data = torch.tensor(decoder_input_data, dtype=torch.float32)
decoder_target_data = torch.tensor(decoder_target_data, dtype=torch.long)

# Move model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define loss function and optimizer
# Make sure "target_token_index" is defined, it contains indices of target tokens (ignore padding token index)
criterion = nn.CrossEntropyLoss(ignore_index=target_token_index[" "])  # Optionally ignore padding index
optimizer = optim.RMSprop(model.parameters(), lr=0.001)

# Calculate training and validation sizes
total_size = encoder_input_data.shape[0]
val_size = int(total_size * validation_split)
train_size = total_size - val_size

# Create data indices for training and validation splits
indices = torch.randperm(total_size)
train_indices = indices[:train_size]
val_indices = indices[train_size:]

# Use TensorDataset to create datasets for training and validation
train_dataset = TensorDataset(encoder_input_data[train_indices], decoder_input_data[train_indices], decoder_target_data[train_indices])
val_dataset = TensorDataset(encoder_input_data[val_indices], decoder_input_data[val_indices], decoder_target_data[val_indices])

# Create DataLoaders for training and validation
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=False)

# Training loop
for epoch in range(epochs):
    model.train()  # Set model to training mode
    train_loss = 0

    for encoder_inputs, decoder_inputs, decoder_targets in train_loader:
        encoder_inputs = encoder_inputs.to(device)
        decoder_inputs = decoder_inputs.to(device)
        decoder_targets = decoder_targets.to(device)
        
        optimizer.zero_grad()

        # Forward pass
        outputs = model(encoder_inputs, decoder_inputs)
        
        # Reshape outputs and targets for loss calculation
        outputs = outputs.view(-1, outputs.shape[-1])
        decoder_targets = decoder_targets.view(-1)

        # Compute loss
        loss = criterion(outputs, decoder_targets)
        loss.backward()  # Backpropagation
        optimizer.step()  # Update weights

        train_loss += loss.item()

    # Validation phase
    model.eval()  # Set model to evaluation mode
    val_loss = 0
    with torch.no_grad():  # Disable gradient calculation for validation
        for encoder_inputs, decoder_inputs, decoder_targets in val_loader:
            encoder_inputs = encoder_inputs.to(device)
            decoder_inputs = decoder_inputs.to(device)
            decoder_targets = decoder_targets.to(device)
            
            outputs = model(encoder_inputs, decoder_inputs)
            
            outputs = outputs.view(-1, outputs.shape[-1])
            decoder_targets = decoder_targets.view(-1)

            loss = criterion(outputs, decoder_targets)
            val_loss += loss.item()
    
    # Average losses over batches
    train_loss /= len(train_loader)
    val_loss /= len(val_loader)
    
    print(f"Epoch {epoch + 1}/{epochs}, Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

# Save the model components separately
torch.save({
    'encoder_lstm': model.encoder_lstm.state_dict(),
    'decoder_lstm': model.decoder_lstm.state_dict(),
    'dense': model.decoder_dense.state_dict()
}, 's2s_model.pth')

print("Model saved as s2s_model.pth")

  encoder_input_data = torch.tensor(encoder_input_data, dtype=torch.float32)
  decoder_input_data = torch.tensor(decoder_input_data, dtype=torch.float32)
  decoder_target_data = torch.tensor(decoder_target_data, dtype=torch.long)


Epoch 1/100, Training Loss: 0.1551, Validation Loss: 0.1353


KeyboardInterrupt: 

# Inference

* Encoding input and retrieve intial decoder state
* Runing one step of decoder with this initial state and a "start of sequence" token as target. Output wil be the bext target token
* Repeat with the current target token and curent states

In [22]:
# Load the saved state_dict from the file 's2s_model.pth'
checkpoint = torch.load('s2s_model.pth', map_location=device)

# Load the state dict into the model
model.encoder_lstm.load_state_dict(checkpoint['encoder_lstm'])
model.decoder_lstm.load_state_dict(checkpoint['decoder_lstm'])
model.decoder_dense.load_state_dict(checkpoint['dense'])

print("Model loaded successfully")

Model loaded successfully


  checkpoint = torch.load('s2s_model.pth', map_location=device)


In [23]:
# Function to decode a sequence using the loaded model
def decode_sequence(input_seq, max_decoder_seq_length):
    # Encode the input as state vectors
    input_seq = torch.tensor(input_seq, dtype=torch.float32).to(device)
    _, (state_h, state_c) = model.encoder_lstm(input_seq)
    
    # Initialize the target sequence with the start token
    target_seq = np.zeros((1, 1, num_decoder_tokens))  # (batch_size, time_steps, num_decoder_tokens)
    start_token_index = target_token_index.get('\t')  # Start token for target
    target_seq[0, 0, start_token_index] = 1.  # One-hot encode the start token

    decoded_sentence = []
    
    # Initialize states for the decoder
    stop_condition = False
    while not stop_condition:
        # Convert target_seq to a tensor
        target_seq_tensor = torch.tensor(target_seq, dtype=torch.float32).to(device)
        
        # Run the decoder on the current target sequence and states
        decoder_outputs, (state_h, state_c) = model.decoder_lstm(target_seq_tensor, (state_h, state_c))
        decoder_output = model.decoder_dense(decoder_outputs)
        
        # Choose the character with the highest probability
        token_index = torch.argmax(decoder_output[0, -1, :]).item()
        sampled_char = reverse_target_char_index[token_index]
        
        # Append the decoded character to the decoded_sentence list
        decoded_sentence.append(sampled_char)

        # Exit condition: either hit max length or find the end token
        if sampled_char == '\n' or len(decoded_sentence) > max_decoder_seq_length:
            stop_condition = True
        
        # Update the target sequence (with the sampled character as the next input)
        target_seq = np.zeros((1, 1, num_decoder_tokens))
        target_seq[0, 0, token_index] = 1.  # One-hot encode the sampled character

    return ''.join(decoded_sentence[:-1])  # Exclude the end token


# Example of using the decode_sequence function for inference
for seq_index in range(5):
    # Take one sequence (part of the training set) for trying out decoding
    input_seq = encoder_input_data[seq_index:seq_index+1]  # Take a single sequence
    decoded_sentence = decode_sequence(input_seq, max_decoder_seq_length)  # Pass max_decoder_seq_length
    
    print("-")
    print("Input sentence:", input_texts[seq_index])  # Original input sentence
    print("Decoded sentence:", decoded_sentence)  # Model's output sentence


-
Input sentence: Go.
Decoded sentence: Parssez-vous.
-
Input sentence: Go.
Decoded sentence: Parssez-vous.
-
Input sentence: Go.
Decoded sentence: Parssez-vous.
-
Input sentence: Go.
Decoded sentence: Parssez-vous.
-
Input sentence: Hi.
Decoded sentence: Salut.


In [24]:
input_seq = encoder_input_data[seq_index:seq_index+1]  # Take a single sequence
decoded_sentence = decode_sequence(input_seq, max_decoder_seq_length)  # Pass max_decoder_seq_length

print("-")
print("Input sentence:", input_texts[seq_index])  # Original input sentence
print("Decoded sentence:", decoded_sentence)  # Model's output sentence


-
Input sentence: Hi.
Decoded sentence: Salut.


# Acknowledge
* https://github.com/keras-team/keras-io/blob/master/examples/nlp/lstm_seq2seq.py
* https://keras.io/examples/nlp/lstm_seq2seq/
* https://huggingface.co/blog/ray-rag