# LSTM-arithmetic

## Dataset
- [Arithmetic dataset](https://drive.google.com/file/d/1cMuL3hF9jefka9RyF4gEBIGGeFGZYHE-/view?usp=sharing)

In [15]:
! pip install seaborn opencc torch matplotlib scikit-learn tqdm

Collecting tqdm
  Downloading tqdm-4.67.0-py3-none-any.whl.metadata (57 kB)
Downloading tqdm-4.67.0-py3-none-any.whl (78 kB)
Installing collected packages: tqdm
Successfully installed tqdm-4.67.0


In [2]:
# ! pip install seaborn
# ! pip install opencc
# ! pip install -U scikit-learn

import numpy as np
import pandas as pd
import torch
import torch.nn
import torch.nn.utils.rnn
import torch.utils.data
import matplotlib.pyplot as plt
import seaborn as sns
import opencc
import os
from sklearn.model_selection import train_test_split

data_path = './data'

In [3]:
df_train = pd.read_csv(os.path.join(data_path, 'arithmetic_train.csv'))
df_eval = pd.read_csv(os.path.join(data_path, 'arithmetic_eval.csv'))
df_train.head()

Unnamed: 0.1,Unnamed: 0,src,tgt
0,2285313,14*(43+20)=,882
1,317061,(6+1)*5=,35
2,718770,13+32+29=,74
3,170195,31*(3-11)=,-248
4,2581417,24*49+1=,1177


In [4]:
# transform the input data to string
df_train['tgt'] = df_train['tgt'].apply(lambda x: str(x))
df_train['src'] = df_train['src'].add(df_train['tgt'])
df_train['len'] = df_train['src'].apply(lambda x: len(x))

df_eval['tgt'] = df_eval['tgt'].apply(lambda x: str(x))
df_eval['src'] = df_eval['src'].add(df_eval['tgt'])
df_eval['len'] = df_eval['src'].apply(lambda x: len(x))

# Build Dictionary
 - The model cannot perform calculations directly with plain text.
 - Convert all text (numbers/symbols) into numerical representations.
 - Special tokens
    - '&lt;pad&gt;'
        - Each sentence within a batch may have different lengths.
        - The length is padded with '&lt;pad&gt;' to match the longest sentence in the batch.
    - '&lt;eos&gt;'
        - Specifies the end of the generated sequence.
        - Without '&lt;eos&gt;', the model will not know when to stop generating.

In [5]:
char_to_id = {}
id_to_char = {}

# Add special tokens first
char_to_id['<pad>'] = 0  
char_to_id['<eos>'] = 1

# Add common characters from expressions
chars = set()
for row in df_train['src'].values:
    chars.update(list(row))

# Sort chars to ensure consistent ordering
sorted_chars = sorted(list(chars))
for char in sorted_chars:
    if char not in char_to_id:
        idx = len(char_to_id)
        char_to_id[char] = idx

# Create reverse mapping
for char, idx in char_to_id.items():
    id_to_char[idx] = char

vocab_size = len(char_to_id)
print(f'Vocab size: {vocab_size}')

Vocab size: 18


# Data Preprocessing
 - The data is processed into the format required for the model's input and output.
 - Example: 1+2-3=0
     - Model input: 1 + 2 - 3 = 0
     - Model output: / / / / / 0 &lt;eos&gt;  (the '/' can be replaced with &lt;pad&gt;)
     - The key for the model's output is that the model does not need to predict the next character of the previous part. What matters is that once the model sees '=', it should start generating the answer, which is '0'. After generating the answer, it should also generate&lt;eos&gt;


In [6]:
def preprocess_sequence(df):
    df['char_id_list'] = df['src'].apply(lambda x: [char_to_id[c] for c in x] + [char_to_id['<eos>']])
    
    # For each sequence, create label list with <pad> before '=' and actual digits after
    def create_label_sequence(row):
        # Find index of '=' in the input sequence
        eq_idx = row['src'].find('=')
        # Before = should be <pad>
        prefix = [char_to_id['<pad>']] * eq_idx
        # After = should be target number + <eos>
        suffix = [char_to_id[c] for c in row['tgt']] + [char_to_id['<eos>']]
        return prefix + suffix
    
    df['label_id_list'] = df.apply(create_label_sequence, axis=1)
    return df

df_train = preprocess_sequence(df_train) 
df_eval = preprocess_sequence(df_eval)

df_train.head()

Unnamed: 0.1,Unnamed: 0,src,tgt,len,char_id_list,label_id_list
0,2285313,14*(43+20)=882,882,14,"[8, 11, 4, 2, 11, 10, 5, 9, 7, 3, 17, 15, 15, ...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 15, 9, 1]"
1,317061,(6+1)*5=35,35,10,"[2, 13, 5, 8, 3, 4, 12, 17, 10, 12, 1]","[0, 0, 0, 0, 0, 0, 0, 10, 12, 1]"
2,718770,13+32+29=74,74,11,"[8, 10, 5, 10, 9, 5, 9, 16, 17, 14, 11, 1]","[0, 0, 0, 0, 0, 0, 0, 0, 14, 11, 1]"
3,170195,31*(3-11)=-248,-248,14,"[10, 8, 4, 2, 10, 6, 8, 8, 3, 17, 6, 9, 11, 15...","[0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 9, 11, 15, 1]"
4,2581417,24*49+1=1177,1177,12,"[9, 11, 4, 11, 16, 5, 8, 17, 8, 8, 14, 14, 1]","[0, 0, 0, 0, 0, 0, 0, 8, 8, 14, 14, 1]"


# Hyper Parameters

|Hyperparameter|Meaning|Value|
|-|-|-|
|`batch_size`|Number of data samples in a single batch|64|
|`epochs`|Total number of epochs to train|10|
|`embed_dim`|Dimension of the word embeddings|256|
|`hidden_dim`|Dimension of the hidden state in each timestep of the LSTM|256|
|`lr`|Learning Rate|0.001|
|`grad_clip`|To prevent gradient explosion in RNNs, restrict the gradient range|1|

In [7]:
batch_size = 64
epochs = 10
embed_dim = 256
hidden_dim = 256
lr = 0.001
grad_clip = 1

# Data Batching
- Use `torch.utils.data.Dataset` to create a data generation tool called  `dataset`.
- The, use `torch.utils.data.DataLoader` to randomly sample from the `dataset` and group the samples into batches.

In [8]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, sequences):
        self.sequences = sequences
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, index):
        x = self.sequences.iloc[index]['char_id_list']
        y = self.sequences.iloc[index]['label_id_list']
        return x, y

# collate function, used to build dataloader
def collate_fn(batch):
    batch_x = [torch.tensor(data[0]) for data in batch]
    batch_y = [torch.tensor(data[1]) for data in batch]
    batch_x_lens = torch.LongTensor([len(x) for x in batch_x])
    batch_y_lens = torch.LongTensor([len(y) for y in batch_y])
    
    # Pad the input sequence
    pad_batch_x = torch.nn.utils.rnn.pad_sequence(batch_x,
                                                  batch_first=True,
                                                  padding_value=char_to_id['<pad>'])
    
    pad_batch_y = torch.nn.utils.rnn.pad_sequence(batch_y,
                                                  batch_first=True,
                                                  padding_value=char_to_id['<pad>'])
    
    return pad_batch_x, pad_batch_y, batch_x_lens, batch_y_lens

In [9]:
ds_train = Dataset(df_train[['char_id_list', 'label_id_list']])
ds_eval = Dataset(df_eval[['char_id_list', 'label_id_list']])

In [10]:
# Build dataloader of train set and eval set, collate_fn is the collate function
# Create dataloaders
dl_train = torch.utils.data.DataLoader(
    ds_train, 
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn
)

dl_eval = torch.utils.data.DataLoader(
    ds_eval,
    batch_size=batch_size,
    shuffle=False,
    collate_fn=collate_fn
)

# Model Design

## Execution Flow
1. Convert all characters in the sentence into embeddings.
2. Pass the embeddings through an LSTM sequentially.
3. The output of the LSTM is passed into another LSTM, and additional layers can be added.
4. The output from all time steps of the final LSTM is passed through a Fully Connected layer.
5. The character corresponding to the maximum value across all output dimensions is selected as the next character.

## Loss Function
Since this is a classification task, Cross Entropy is used as the loss function.

## Gradient Update
Adam algorithm is used for gradient updates.

In [11]:
class CharRNN(torch.nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim):
        super(CharRNN, self).__init__()
        
        self.embedding = torch.nn.Embedding(num_embeddings=vocab_size,
                                            embedding_dim=embed_dim,
                                            padding_idx=char_to_id['<pad>'])
        
        self.rnn_layer1 = torch.nn.LSTM(input_size=embed_dim,
                                        hidden_size=hidden_dim,
                                        batch_first=True)
        
        self.rnn_layer2 = torch.nn.LSTM(input_size=hidden_dim,
                                        hidden_size=hidden_dim,
                                        batch_first=True)
        
        self.linear = torch.nn.Sequential(torch.nn.Linear(in_features=hidden_dim,
                                                          out_features=hidden_dim),
                                          torch.nn.ReLU(),
                                          torch.nn.Linear(in_features=hidden_dim,
                                                          out_features=vocab_size))
        
    def forward(self, batch_x, batch_x_lens):
        return self.encoder(batch_x, batch_x_lens)
    
    # The forward pass of the model
    def encoder(self, batch_x, batch_x_lens):
        batch_x = self.embedding(batch_x)
        
        batch_x = torch.nn.utils.rnn.pack_padded_sequence(batch_x,
                                                          batch_x_lens,
                                                          batch_first=True,
                                                          enforce_sorted=False)
        
        batch_x, _ = self.rnn_layer1(batch_x)
        batch_x, _ = self.rnn_layer2(batch_x)
        
        batch_x, _ = torch.nn.utils.rnn.pad_packed_sequence(batch_x,
                                                            batch_first=True)
        
        batch_x = self.linear(batch_x)
        
        return batch_x
    
    def generator(self, start_char, max_len=200):
        char_list = [char_to_id[c] for c in start_char]
        
        while len(char_list) < max_len:
            # Convert to tensor
            x = torch.tensor(char_list).unsqueeze(0)
            x_lens = torch.tensor([len(char_list)])
            
            # Get model prediction
            with torch.no_grad():
                output = self.encoder(x, x_lens)
                y = output[0, -1]  # Get last timestep prediction
                
            # Get next token prediction
            next_char = torch.argmax(y).item()
            
            if next_char == char_to_id['<eos>']:
                break
                
            char_list.append(next_char)
            
        return [id_to_char[ch_id] for ch_id in char_list]

In [12]:
torch.manual_seed(2)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = CharRNN(vocab_size,
                embed_dim,
                hidden_dim)

In [13]:
criterion = torch.nn.CrossEntropyLoss(ignore_index=char_to_id['<pad>'])
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

# Training
1. The outer `for` loop controls the `epoch`
    1. The inner `for` loop uses `data_loader` to retrieve batches.
        1. Pass the batch to the `model` for training.
        2. Compare the predicted results `batch_pred_y` with the true labels `batch_y` using Cross Entropy to calculate the loss `loss`
        3. Use `loss.backward` to automatically compute the gradients.
        4. Use `torch.nn.utils.clip_grad_value_` to limit the gradient values between `-grad_clip` &lt; and &lt; `grad_clip`.
        5. Use `optimizer.step()` to update the model (backpropagation).
2.  After every `1000` batches, output the current loss to monitor whether it is converging.

In [None]:
from tqdm import tqdm
from copy import deepcopy

model = model.to(device)
model.train()
i = 0

for epoch in range(1, epochs + 1):
    # The process bar
    bar = tqdm(dl_train, desc=f"Train epoch {epoch}")
    
    for batch_x, batch_y, batch_x_lens, batch_y_lens in bar:
        # Clear the gradient
        optimizer.zero_grad()

        # Forward pass through the model
        batch_pred_y = model(batch_x.to(device), batch_x_lens)
        
        # Ensure batch_pred_y has the correct shape for CrossEntropyLoss
        # CrossEntropyLoss expects [batch_size * seq_len, vocab_size] for predictions and [batch_size * seq_len] for targets
        batch_pred_y = batch_pred_y[:, :batch_y.size(1), :].contiguous().view(-1, vocab_size)
        batch_y = batch_y.to(device).contiguous().view(-1)  # Flatten target as well to match

        # Calculate loss and backpropagate
        loss = criterion(batch_pred_y, batch_y)
        loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_value_(model.parameters(), grad_clip)

        # Optimize parameters in the model
        optimizer.step()

        # Update progress bar with the current loss every 50 iterations
        i += 1
        if i % 50 == 0:
            bar.set_postfix(loss=loss.item())

    # Evaluate the model after each epoch
    model.eval()  # Set to evaluation mode
    bar = tqdm(dl_eval, desc=f"Validation epoch {epoch}")
    matched = 0
    total = 0

    with torch.no_grad():  # No need to track gradients during evaluation
        for batch_x, batch_y, batch_x_lens, batch_y_lens in bar:
            batch_size = batch_x.size(0)
            
            # Generate predictions for each sequence in the batch
            predictions = []
            for i in range(batch_size):
                x = batch_x[i, :batch_x_lens[i]]
                input_str = ''.join([id_to_char[idx.item()] for idx in x])
                # Process input until '=' character
                eq_idx = input_str.find('=')
                if eq_idx != -1:
                    input_str = input_str[:eq_idx + 1]
                # Generate the prediction
                pred = ''.join(model.generator(input_str))
                # Extract the answer part (after '=')
                eq_idx = pred.find('=')
                if eq_idx != -1:
                    pred = pred[eq_idx + 1:].strip()
                predictions.append(pred)
            
            # Check predictions against ground truth
            for i in range(batch_size):
                true_answer = ''.join([id_to_char[idx.item()] for idx in batch_y[i] 
                                       if idx.item() not in [char_to_id['<pad>'], char_to_id['<eos>']]])
                if predictions[i] == true_answer:
                    matched += 1
                total += 1
            
        # Print the accuracy after each epoch for evaluation
        print(f"Accuracy: {matched / total:.2f}")
    
    model.train()  # Switch back to training mode for the next epoch

Train epoch 1: 100%|██████████| 37020/37020 [53:20<00:00, 11.57it/s, loss=0.268]  
Validation epoch 1: 100%|██████████| 4114/4114 [10:33<00:00,  6.50it/s]


Accuracy: 0.57


Train epoch 2: 100%|██████████| 37020/37020 [37:27<00:00, 16.47it/s, loss=0.185]  
Validation epoch 2: 100%|██████████| 4114/4114 [11:01<00:00,  6.22it/s]


Accuracy: 0.69


Train epoch 3:  36%|███▋      | 13430/37020 [15:55<31:55, 12.32it/s, loss=0.221]  

# Generation
Use `model.generator` and provide an initial character to automatically generate a sequence.

In [None]:
model = model.to("cpu")
print("".join(model.generator('1+1=')))