In [None]:
import torch
import torch.nn as nn
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence

## Encoder

In [None]:
import torch
import torch.nn as nn

class Encoder(nn.Module):
    """ 
    Encoder for the sequence-to-sequence math problem assistant model using an LSTM. 
    Converts a sequence of token indices into a hidden representation 
    that will be used by the decoder for sequence generation.
    """
    def __init__(self, input_size, hidden_size):
        """
        Initializes the Encoder module.

        Args:
            input_size (int): The size of the input vocabulary.
            hidden_size (int): The number of hidden units in the LSTM.
        """
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size

        # Embedding layer converts token indices into dense vectors for better representation.
        # The embeddings refer to vector representations of words or tokens used as input to the model.
        # Instead of feeding raw word indices (which don’t capture meaning), feedingembeddings allows
        # the LSTM to learn meaningful semantic relationships between words.
        self.embedding = nn.Embedding(input_size, hidden_size)

        # LSTM layer processes embedded input sequences to generate hidden states
        self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True)

    def forward(self, input_seq):
        """
        Forward pass through the encoder.

        Args:
            input_seq (Tensor): Tensor containing token indices for a batch of input sentences.

        Returns:
            outputs (Tensor): Encoder outputs at each time step.
            hidden (Tensor): Final hidden state of the LSTM.
            cell (Tensor): Final cell state of the LSTM.
        """
        embedded = self.embedding(input_seq)            # Convert input tokens into embeddings 
        outputs, (hidden, cell) = self.lstm(embedded)   # Process embeddings through LSTM
        return outputs, (hidden, cell)

## Attention

In [None]:
class Attention(nn.Module):
    """ 
    Attention model for the sequence-to-sequence math problem assistant
    whichimplements a 'Bahdanau attention' mechanism. This allows the model
    to dynamically compute attention scores based on the decoder's hidden
    state and the encoder's outputs, allowing the model to focus on relevant
    parts of the input sequence at each decoding step.
    """
    def __init__(self, hidden_size):
        """
        Initializes the attention mechanism.

        Args:
            hidden_size (int): The size of the hidden state of the LSTM.
        """
        super(Attention, self).__init__()

        # Learnable linear transformation to compute alignment scores
        self.attn = nn.Linear(hidden_size * 2, hidden_size)
        
        # Learnable parameter to compute weighted attention scores
        self.v = nn.Parameter(torch.rand(hidden_size))

    def forward(self, hidden, encoder_outputs):
        """
        Computes attention weights using Bahdanau's additive attention method.

        Args:
            hidden (Tensor): Decoder hidden state at the current time step.
            encoder_outputs (Tensor): Encoder outputs at all time steps.

        Returns:
            attention_weights (Tensor): Softmax-normalized attention scores.
        """
        seq_len = encoder_outputs.size(1)

        # Expand the hidden state across sequence length to match encoder outputs
        hidden = hidden.unsqueeze(1).repeat(1, seq_len, 1)

        # Compute energy scores (alignment) using a feed-forward layer
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))

        # Transpose energy tensor for matrix multiplication with attention parameter
        energy = energy.permute(0, 2, 1)

        # Expand attention parameter across batch size
        v = self.v.repeat(encoder_outputs.size(0), 1).unsqueeze(1)

        # Compute attention weights using learned vector `v`
        attention_weights = torch.bmm(v, energy).squeeze(1)

        # Apply softmax to normalize scores across sequence length
        return torch.softmax(attention_weights, dim=1)

## Decoder

In [None]:
import torch
import torch.nn as nn

class Decoder(nn.Module):
    """ 
    Encoder for the sequence-to-sequence math problem assistant model using an
    LSTM with Bahdanau attention. The decoder generates output tokens one by one
    while dynamically focusing  on relevant parts of the encoder’s outputs using
    the attention mechanism.
    """
    def __init__(self, output_size, hidden_size):
        """
        Initializes the Decoder module.

        Args:
            output_size (int): The size of the output vocabulary.
            hidden_size (int): The number of hidden units in the LSTM.
        """
        super(Decoder, self).__init__()

        # Embedding layer converts token indices into dense vectors
        self.embedding = nn.Embedding(output_size, hidden_size)

        # LSTM layer processes embeddings and maintains hidden state across timesteps
        self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True)

        # Linear layer maps concatenated attention context & LSTM output to vocab space
        self.fc = nn.Linear(hidden_size * 2, output_size)

        # Attention mechanism for dynamic focus on encoder outputs
        self.attention = Attention(hidden_size)

    def forward(self, input, hidden, cell, encoder_outputs):
        """
        Forward pass for the decoder.

        Args:
            input (Tensor): Current token input to the decoder.
            hidden (Tensor): Previous hidden state from the LSTM.
            cell (Tensor): Previous cell state from the LSTM.
            encoder_outputs (Tensor): Encoder outputs from all timesteps.

        Returns:
            output (Tensor): Predicted token probabilities.
            hidden (Tensor): Updated hidden state.
            cell (Tensor): Updated cell state.
            attention_weights (Tensor): Attention scores for each encoder timestep.
        """
        # Expand input dimensions to match expected input shape for embedding
        input = input.unsqueeze(1)  
        
        # Convert token indices into dense embeddings
        embedded = self.embedding(input)

        # Forward pass through LSTM to generate new hidden and cell states
        lstm_output, (hidden, cell) = self.lstm(embedded, (hidden, cell))

        # Compute attention weights using the current hidden state and encoder outputs
        attention_weights = self.attention(hidden[-1], encoder_outputs)

        # Apply attention: generate weighted sum of encoder outputs
        context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs)

        # Flatten tensors for the fully connected layer
        lstm_output = lstm_output.squeeze(1)
        context = context.squeeze(1)

        # Generate token probabilities using concatenated LSTM output and attention context
        output = self.fc(torch.cat((lstm_output, context), dim=1))

        return output, hidden, cell, attention_weights

##  Basic Tokenization and Vocabulary Setup
First, create a vocabulary and tokenize the input sentence (e.g., "two plus four" etc). To keep things extremely simple we'll manually establish a 'word-to-index' vocabulary whihc offers a simple map from word to tokensized integer value.

In [None]:
# Tokenization and vocab setup
# Create a vocabulary mapping words to indices
word_to_index = {"<SOS>": 0, "<EOS>": 1, "<PAD>": 2, "two": 3, "plus": 4, "four": 5, "equals": 6, "six": 7, "three": 8, "minus": 9, "one": 10}
index_to_word = {v: k for k, v in word_to_index.items()}  # Reverse mapping for decoding

# Example input and target sequences
input_sentence = "two plus four"
target_sentence = "equals six"

# Test Tokenized input and targets
input_tokens = [word_to_index[word] for word in input_sentence.split()]
target_tokens = [word_to_index["<SOS>"]] + [word_to_index[word] for word in target_sentence.split()] + [word_to_index["<EOS>"]]

## Generate the math problem dataset from the tokenzied data

In [None]:
class BasicMathWordProblemDataset(Dataset):
    def __init__(self, input_sentences, target_sentences, word_to_index):
        self.input_data = [[word_to_index[word] for word in sentence.split()] for sentence in input_sentences]
        self.target_data = [[word_to_index["<SOS>"]] +
                            [word_to_index[word] for word in sentence.split()] +
                            [word_to_index["<EOS>"]] for sentence in target_sentences]

    def __len__(self):
        return len(self.input_data)

    def __getitem__(self, idx):
        return torch.tensor(self.input_data[idx], dtype=torch.long), torch.tensor(self.target_data[idx], dtype=torch.long)

# Example data
input_sentences = ["two plus four"]
target_sentences = ["equals six"]

# Create dataset and dataloader
dataset = BasicMathWordProblemDataset(input_sentences, target_sentences, word_to_index)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

## Establish hyperparamters

In [None]:
# Define the device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Instantiate the encoder and decoder
input_size = len(word_to_index)  # Total vocabulary size
output_size = len(word_to_index)  # Vocabulary size
hidden_size = 128

encoder = Encoder(input_size, hidden_size).to(device)
decoder = Decoder(output_size, hidden_size).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=word_to_index["<PAD>"])  # Ignore padding tokens
encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.001)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.001)

## Establish the training regime

In [None]:
def train(num_epochs, encoder_save_name, decoder_save_name):
    """
    Trains the sequence-to-sequence model using an encoder-decoder architecture.

    Args:
        num_epochs (int): Number of training epochs.
        encoder_save_name (str): File name for saving the trained encoder model.
        decoder_save_name (str): File name for saving the trained decoder model.
    """
    for epoch in range(num_epochs):
        for input_seq, target_seq in dataloader:
            # Move data to GPU if available, else keep on CPU
            input_seq, target_seq = input_seq.to(device), target_seq.to(device)

            # Reset gradients before each batch
            encoder_optimizer.zero_grad()
            decoder_optimizer.zero_grad()

            # **ENCODER FORWARD PASS**
            # Processes the input sequence and generates context for the decoder
            encoder_outputs, (hidden, cell) = encoder(input_seq)

            # **DECODER INITIALIZATION**
            # The first input to the decoder is always the <SOS> token
            decoder_input = torch.tensor([word_to_index["<SOS>"]] * input_seq.size(0), device=device)
            decoder_hidden, decoder_cell = hidden, cell

            # Compute actual target sequence lengths (excluding padding)
            target_lengths = (target_seq != word_to_index["<PAD>"]).sum(dim=1)
            max_target_length = target_lengths.max().item()  # Maximum sequence length in batch

            loss = 0  # Track loss per batch

            # **ITERATE OVER TARGET SEQUENCE (Adaptive length)**
            for t in range(max_target_length):
                # Check if sequences are still active (haven't reached <EOS>)
                still_active = t < target_lengths
                if not still_active.any():  # If all sequences are finished, stop decoding
                    break

                # **DECODER FORWARD PASS**
                output, decoder_hidden, decoder_cell, _ = decoder(
                    decoder_input, decoder_hidden, decoder_cell, encoder_outputs
                )

                # Compute masked loss (only valid tokens contribute to loss)
                loss += (criterion(output, target_seq[:, t]) * still_active.float()).sum() / still_active.sum()

                # Apply teacher forcing: Use actual target token as next input
                decoder_input = target_seq[:, t]  

            # **BACKPROPAGATION & OPTIMIZATION**
            loss.backward()  # Compute gradients
            encoder_optimizer.step()  # Update encoder weights
            decoder_optimizer.step()  # Update decoder weights

        # Print epoch loss for tracking progress
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item()}")

    # **SAVE TRAINED MODELS**
    torch.save(encoder.state_dict(), f"{encoder_save_name}.pth")
    torch.save(decoder.state_dict(), f"{decoder_save_name}.pth")

## Train the model

In [None]:
num_epochs = 100  # Define the number of epochs
train(num_epochs, "basic_math_problem_encoder", "basic_math_problem_decoder")

## Test setup

In [None]:
def test(encoder, decoder, input_sentence):
    # Set the models to evaluation mode
    encoder.eval()
    decoder.eval()

    # Tokenize the input sentence
    input_tokens = [word_to_index[word] for word in input_sentence.split()]
    input_seq = torch.tensor(input_tokens, dtype=torch.long).unsqueeze(0)  # Add batch dimension

    # generate the output sequence
    # Forward pass through the encoder
    encoder_outputs, (hidden, cell) = encoder(input_seq)

    # Initialize the decoder
    decoder_input = torch.tensor([word_to_index["<SOS>"]], dtype=torch.long)  # Start-of-Sequence token
    decoder_hidden = hidden
    decoder_cell = cell

    # Generate output sequence
    output_sequence = []
    target_length = 10  # Maximum output sequence length

    for _ in range(target_length):
        output, decoder_hidden, decoder_cell, _ = decoder(
            decoder_input, decoder_hidden, decoder_cell, encoder_outputs
        )
        predicted_token = output.argmax(1).item()  # Get token with the highest probability
        if predicted_token == word_to_index["<EOS>"]:  # Stop at End-of-Sequence token
            break
        output_sequence.append(predicted_token)
        decoder_input = torch.tensor([predicted_token], dtype=torch.long)

    print("Word to index: ", word_to_index)
    print("Input tokens: ", input_tokens)
    print("Tokenized output sentence: ", output_sequence)

    # Convert tokens back to words
    output_sentence = " ".join([index_to_word[token] for token in output_sequence])
    print("Output Sentence:", output_sentence)

## Test the trained model

In [None]:
# load the trained model
input_size = len(word_to_index)  # Same as during training
output_size = len(word_to_index)
hidden_size = 128  # Same as during training

encoder = Encoder(input_size, hidden_size)
decoder = Decoder(output_size, hidden_size)

# Load the trained weights
encoder.load_state_dict(torch.load("basic_math_problem_encoder.pth"))
decoder.load_state_dict(torch.load("basic_math_problem_decoder.pth"))

input_sentence = "two plus four"

test(encoder, decoder, input_sentence)

# Establish a more complex dataset and train

## Establish a baseline vocabulary and tokenization capability

In [None]:
# Define special tokens
word_to_index = {"<SOS>": 0, "<EOS>": 1, "<PAD>": 2}  
 
# Function to tokenize a sentence and update mapping dynamically
def tokenize(sentence, word_to_index):
    tokens = []
    for word in sentence.lower().split():
        if word not in word_to_index:
            word_to_index[word] = len(word_to_index)  # Assign index to new words
        tokens.append(word_to_index[word])
    return tokens

## Load CSV dataset from file

In [None]:
# Load dataset from CSV
def load_sequences_from_csv(csv_file):
    df = pd.read_csv(csv_file)
    return df["Problem"].tolist(), df["Solution"].tolist()

csv_file = "simple_math_problems_addition_only.csv"
input_sentences, target_sentences = load_sequences_from_csv(csv_file)

## Tokenize input and target sentences and convert into tensors

In [None]:
# Tokenize input and target sentences
input_data = [tokenize(sentence, word_to_index) for sentence in input_sentences]
target_data = [[word_to_index["<SOS>"]] + tokenize(sentence, word_to_index) + [word_to_index["<EOS>"]]
               for sentence in target_sentences]

index_to_word = {v: k for k, v in word_to_index.items()}

# Convert tokenized sentences into tensors
input_tensors = [torch.tensor(seq) for seq in input_data]
target_tensors = [torch.tensor(seq) for seq in target_data]

## Generate the math problem dataset from the tokenzied data, including padding

In [None]:
# Apply dynamic padding
input_padded = pad_sequence(input_tensors, batch_first=True, padding_value=word_to_index["<PAD>"])
target_padded = pad_sequence(target_tensors, batch_first=True, padding_value=word_to_index["<PAD>"])

class DynamicMathWordProblemDataset(Dataset):
    def __init__(self, input_padded, target_padded):
        self.input_data = input_padded
        self.target_data = target_padded

    def __len__(self):
        return len(self.input_data)

    def __getitem__(self, idx):
        return self.input_data[idx], self.target_data[idx]

dataset = DynamicMathWordProblemDataset(input_padded, target_padded)
dataloader = DataLoader(dataset, batch_size=64, shuffle=False)

## Establish hyperparamters

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

input_size = len(word_to_index)  # Vocabulary size
output_size = len(word_to_index)
hidden_size = 256

encoder = Encoder(input_size, hidden_size).to(device)
decoder = Decoder(output_size, hidden_size).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=word_to_index["<PAD>"])
encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.0005)#0.001)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.0005)#0.001)

num_epochs = 100  # Define the number of epochs
train(num_epochs, "addition_only_math_problem_encoder", "addition_only_math_problem_decoder")

# Test the revised model

In [None]:
# load the trained model
input_size = len(word_to_index)  # Same as during training
output_size = len(word_to_index)
hidden_size = 256  # Same as during training

encoder = Encoder(input_size, hidden_size)
decoder = Decoder(output_size, hidden_size)

# Load the trained weights
encoder.load_state_dict(torch.load("addition_only_math_problem_encoder.pth"))
decoder.load_state_dict(torch.load("addition_only_math_problem_decoder.pth"))

input_sentence = "twenty plus fifty nine" #zero point eight six

test(encoder, decoder, input_sentence)