In [36]:
import torch
from torch.utils.data import Dataset
import random

class ArithmeticDataset(Dataset):
    def __init__(self, max_length, num_samples):
        self.max_length = max_length
        self.num_samples = num_samples
        self.data = self.generate_data()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

    def generate_number(self, length):
        return random.randint(10**(length-1), 10**length - 1)

    def generate_data(self):
        raise NotImplementedError("Subclasses must implement this method")

In [37]:
class AdditionDataset(ArithmeticDataset):
    def generate_data(self):
        data = []
        samples_per_combination = self.num_samples // (self.max_length ** 2)
        for i in range(1, self.max_length + 1):
            for j in range(1, self.max_length + 1):
                for _ in range(samples_per_combination):
                    num1 = self.generate_number(i)
                    num2 = self.generate_number(j)
                    result = num1 + num2
                    data.append((f"{num1}+{num2}=", str(result)))
        return data

In [38]:
class MultiplicationDataset(ArithmeticDataset):
    def generate_data(self):
        data = []
        samples_per_combination = self.num_samples // (self.max_length ** 2)
        for i in range(1, self.max_length + 1):
            for j in range(1, self.max_length + 1):
                for _ in range(samples_per_combination):
                    num1 = self.generate_number(i)
                    num2 = self.generate_number(j)
                    result = num1 * num2
                    data.append((f"{num1}*{num2}=", str(result)))
        return data

In [39]:
class SortingDataset(ArithmeticDataset):
    def generate_data(self):
        data = []
        samples_per_combination = self.num_samples // (self.max_length ** 2)
        for i in range(1, self.max_length + 1):  # number of integers
            for j in range(1, self.max_length + 1):  # max digit length
                for _ in range(samples_per_combination):
                    numbers = [self.generate_number(random.randint(1, j)) for _ in range(i)]
                    indices = list('abcdefghijklmnopqrstuvwxyz'[:i])
                    input_str = ','.join([f"{idx}:{num}" for idx, num in zip(indices, numbers)])
                    sorted_indices = [idx for _, idx in sorted(zip(numbers, indices))]
                    output_str = ''.join(sorted_indices)
                    data.append((input_str, output_str))
        return data

In [40]:
def create_datasets(dataset_class, max_length, train_samples, test_samples):
    train_dataset = dataset_class(max_length, train_samples)
    test_dataset = dataset_class(max_length, test_samples)
    return train_dataset, test_dataset

In [41]:
# Set parameters
max_length = 20  # maximum length of operands
train_samples = 200_000  # 20 million as mentioned in the paper
test_samples = 1_000  # adjust as needed

# Create datasets
addition_train, addition_test = create_datasets(AdditionDataset, max_length, train_samples, test_samples)
multiplication_train, multiplication_test = create_datasets(MultiplicationDataset, max_length, train_samples, test_samples)
sorting_train, sorting_test = create_datasets(SortingDataset, max_length, train_samples, test_samples)

# Print some samples
print("Addition sample:", addition_train[0])
print("Multiplication sample:", multiplication_train[0])
print("Sorting sample:", sorting_train[0])

Addition sample: ('4+5=', '9')
Multiplication sample: ('6*4=', '24')
Sorting sample: ('a:3', 'a')


In [42]:
import random

def print_samples(dataset, name, num_samples=10):
    print(f"\n{name} Samples:")
    for _ in range(num_samples):
        idx = random.randint(0, len(dataset) - 1)
        sample = dataset[idx]
        print(f"Input: {sample[0]}, Output: {sample[1]}")

# Sample from Addition dataset
print_samples(addition_train, "Addition")

# Sample from Multiplication dataset
print_samples(multiplication_train, "Multiplication")

# Sample from Sorting dataset
print_samples(sorting_train, "Sorting")


Addition Samples:
Input: 135748326+108631275=, Output: 244379601
Input: 1354172+336685848355199866=, Output: 336685848356554038
Input: 44338320546283+972243690165=, Output: 45310564236448
Input: 75040161704634192095+9527451304538227395=, Output: 84567613009172419490
Input: 9318+84623911195379659736=, Output: 84623911195379669054
Input: 58739854+8279644088=, Output: 8338383942
Input: 9395564888+77629=, Output: 9395642517
Input: 520+2371=, Output: 2891
Input: 5623061952901691460+3=, Output: 5623061952901691463
Input: 243+4925787039520747=, Output: 4925787039520990

Multiplication Samples:
Input: 1314*4727797=, Output: 6212325258
Input: 3228169008960561*26=, Output: 83932394232974586
Input: 7035962375386837078*20466463929=, Output: 144001270161655858478286759462
Input: 4830844194151411*4972082029=, Output: 24019353602639217538092919
Input: 68464*476769075851088=, Output: 32641518009068888832
Input: 3517314548080498*75912257251479930246=, Output: 267007286808259638068510650232942508
Input

# Small Transformer for Arithmetic Tasks

This code implements a small transformer model designed to learn basic arithmetic operations, inspired by the Abacus Embeddings paper. The model architecture is as follows:

## Model Architecture
- Embedding layer: Custom Abacus Embedding
- Transformer layers: 2
- Attention heads per layer: 2
- Embedding dimension: 64
- Feed-forward dimension: 128
- Maximum sequence length: 20

## Key Components
1. **AbacusEmbedding**: A custom embedding layer that combines token embeddings with positional information.
2. **SmallTransformer**: The main model class, incorporating the Abacus Embedding and transformer layers.
3. **Training Loop**: Includes both training and evaluation phases, tracking loss and accuracy.

## Training Details
- Dataset: Addition task (can be extended to multiplication and sorting)
- Batch size: 32
- Number of epochs: 10
- Optimizer: Adam
- Learning rate: 0.001
- Loss function: Cross Entropy Loss (ignoring padding tokens)

This setup allows for quick experimentation and debugging on a CPU. Once the basic functionality is verified, the model size and dataset can be scaled up to match the specifications in the Abacus Embeddings paper.

Let's calculate the number of parameters for this model configuration. We'll break it down by component:

1. Embedding Layer:
   - Token Embedding: vocab_size * embed_size = 14 * 64 = 896
   - Positional Embedding: max_length * embed_size = 20 * 64 = 1,280

2. Transformer Layers (for each layer):
   - Self-Attention:
     * Query, Key, Value matrices: 3 * (embed_size * embed_size) = 3 * (64 * 64) = 12,288
     * Output projection: embed_size * embed_size = 64 * 64 = 4,096
   - Feed-forward network:
     * First linear layer: embed_size * ff_dim = 64 * 128 = 8,192
     * Second linear layer: ff_dim * embed_size = 128 * 64 = 8,192
   - Layer Norm (2 per layer): 2 * 2 * embed_size = 2 * 2 * 64 = 256

   Total per layer: 12,288 + 4,096 + 8,192 + 8,192 + 256 = 33,024

3. Output Layer:
   - Linear projection: embed_size * vocab_size = 64 * 14 = 896

Now, let's sum it up:
- Embedding Layer: 896 + 1,280 = 2,176
- Transformer Layers: 33,024 * 2 = 66,048
- Output Layer: 896

Total parameters: 2,176 + 66,048 + 896 = 69,120

So, this small transformer model would have approximately 69,120 parameters.

This is a very small model, which is perfect for initial experiments and debugging on a CPU. It's about 3 orders of magnitude smaller than the models described in the Abacus Embeddings paper (which mentions models with ~12 million parameters), allowing for quick iterations and tests of the basic architecture and training loop.

In [1]:
import torch
import random
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader



In [2]:
import torch
from torch.utils.data import Dataset
import random

class AdditionDataset(Dataset):
    def __init__(self, max_length, num_samples):
        # Initialize the dataset with maximum length of numbers and total samples
        self.max_length = max_length
        self.num_samples = num_samples
        
        # Define the vocabulary for tokenization
        # 0-9 for digits, 10 for '+', 11 for '=', 12 for padding, 13 for end of sequence
        self.vocab = {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9, 
                      '+': 10, '=': 11, '<PAD>': 12, '<EOS>': 13}
        # Create an inverse vocabulary for decoding
        self.inv_vocab = {v: k for k, v in self.vocab.items()}
        # Generate the dataset
        self.data = self.generate_data()

    def __len__(self):
        # Return the number of samples in the dataset
        return len(self.data)

    def __getitem__(self, idx):
        # Return a specific item from the dataset
        return self.data[idx]

    def generate_number(self, length):
        # Generate a random number of specified length
        return random.randint(10**(length-1), 10**length - 1)

    def tokenize(self, s):
        # Convert a string to a list of token IDs
        return [self.vocab[c] for c in s if c in self.vocab]

    def pad_sequence(self, seq, max_length):
        # Pad a sequence with <PAD> tokens to reach the specified length
        return seq + [self.vocab['<PAD>']] * (max_length - len(seq))

    def generate_data(self):
        data = []
        # Calculate samples per length combination to achieve desired total samples
        samples_per_combination = max(1, self.num_samples // (self.max_length ** 2))
        
        # Generate addition problems for all possible length combinations
        for i in range(1, self.max_length + 1):
            for j in range(1, self.max_length + 1):
                for _ in range(samples_per_combination):
                    # Generate two random numbers
                    num1 = self.generate_number(i)
                    num2 = self.generate_number(j)
                    result = num1 + num2
                    
                    # Create the input string (reversed for right-to-left processing)
                    input_str = f"{num1:0{i}}+{num2:0{j}}="
                    input_str = input_str[::-1]  # Reverse the string
                    
                    # Create the target string (reversed)
                    target_str = f"{result}"[::-1]
                    
                    # Tokenize and pad both input and target
                    input_tokens = self.tokenize(input_str)
                    target_tokens = self.tokenize(target_str) + [self.vocab['<EOS>']]
                    
                    max_seq_length = self.max_length * 2 + 2  # Maximum possible sequence length
                    input_padded = self.pad_sequence(input_tokens, max_seq_length)
                    target_padded = self.pad_sequence(target_tokens, max_seq_length)
                    
                    # Convert to PyTorch tensors
                    input_tensor = torch.tensor(input_padded, dtype=torch.long)
                    target_tensor = torch.tensor(target_padded, dtype=torch.long)
                    
                    data.append((input_tensor, target_tensor))
        
        # Shuffle the data for randomness
        random.shuffle(data)
        return data

    def decode(self, tensor):
        # Convert a tensor of token IDs back to a string, reversing and removing special tokens
        return ''.join(self.inv_vocab[t.item()] for t in tensor if t.item() not in [self.vocab['<PAD>'], self.vocab['<EOS>']])[::-1]

# Set parameters for the dataset
max_length = 20  # maximum length of operands
train_samples = 200_000  # Number of training samples
test_samples = 1_000  # Number of test samples

# Create training and test datasets
addition_train = AdditionDataset(max_length, train_samples)
addition_test = AdditionDataset(max_length, test_samples)

In [3]:
# Print some samples
print("Addition samples:")
for i in range(0,5):
    input_tensor, target_tensor = addition_train[i]
    input_str = addition_train.decode(input_tensor)
    target_str = addition_train.decode(target_tensor)
    print(f"Input: {input_str}")
    print(f"Target: {target_str}")
    print(f"Equation: {input_str} {target_str}")
    print()

Addition samples:
Input: 9+203=
Target: 212
Equation: 9+203= 212

Input: 547600+82709939191192837=
Target: 82709939191740437
Equation: 547600+82709939191192837= 82709939191740437

Input: 754012+3=
Target: 754015
Equation: 754012+3= 754015

Input: 4794181742+9611879=
Target: 4803793621
Equation: 4794181742+9611879= 4803793621

Input: 6226+31918523272548940491=
Target: 31918523272548946717
Equation: 6226+31918523272548940491= 31918523272548946717



In [4]:
class AbacusEmbedding(nn.Module):
    def __init__(self, vocab_size, embed_size, max_length):
        super().__init__()
        # Create an embedding layer for the input tokens
        self.embed = nn.Embedding(vocab_size, embed_size)
        # Create a separate embedding layer for positional encodings
        self.pos_embed = nn.Embedding(max_length, embed_size)
        self.max_length = max_length
        
    def forward(self, x):
        # Get the sequence length of the input
        seq_length = x.size(1)
        
        # Generate position indices
        pos = torch.arange(seq_length, device=x.device).unsqueeze(0)
        
        # Truncate positions to max_length
        # This ensures that positions beyond max_length use the same embedding
        pos = torch.clamp(pos, max=self.max_length - 1)
        
        # Get the token embeddings
        embedded = self.embed(x)
        
        # Get the positional embeddings
        positional = self.pos_embed(pos)
        
        # Combine token embeddings and positional embeddings
        return embedded + positional[:, :seq_length]

In [5]:
class SmallTransformer(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, ff_dim, num_layers, max_length):
        super().__init__()
        # Initialize the custom Abacus Embedding layer
        self.embedding = AbacusEmbedding(vocab_size, embed_size, max_length)
        
        # Create a single Transformer encoder layer
        self.transformer_layer = nn.TransformerEncoderLayer(
            d_model=embed_size,
            nhead=num_heads,
            dim_feedforward=ff_dim,
            batch_first=True
        )
        
        # Create the full Transformer encoder by stacking multiple layers
        self.transformer = nn.TransformerEncoder(self.transformer_layer, num_layers=num_layers)
        
        # Final linear layer to project to vocabulary size
        self.fc_out = nn.Linear(embed_size, vocab_size)
        
    def forward(self, x):
        try:
            # Apply Abacus Embedding
            x = self.embedding(x)
            
            # Pass through the Transformer encoder
            x = self.transformer(x)
            
            # Project to vocabulary size
            return self.fc_out(x)
        except Exception as e:
            print(f"Error in SmallTransformer forward pass: {str(e)}")
            raise e

In [18]:
import time
from tqdm import tqdm

def train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs):
    best_accuracy = 0
    for epoch in range(num_epochs):
        # Set the model to training mode
        model.train()
        total_loss = 0
        correct_predictions = 0
        total_predictions = 0
        start_time = time.time()

        # Create a progress bar for each epoch
        progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")
        
        for batch_idx, (inputs, targets) in progress_bar:
            try:
                # Reset gradients
                optimizer.zero_grad()
                
                # Forward pass
                outputs = model(inputs)
                
                # Calculate loss
                loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
                
                # Backward pass
                loss.backward()
                
                # Update weights
                optimizer.step()
                
                total_loss += loss.item()

                # Calculate accuracy for this batch
                _, predicted = outputs.max(dim=-1)
                non_pad_mask = targets.ne(addition_train.vocab['<PAD>'])
                correct_predictions += (predicted[non_pad_mask] == targets[non_pad_mask]).sum().item()
                total_predictions += non_pad_mask.sum().item()

                # Update progress bar with current loss and accuracy
                progress_bar.set_postfix({
                    'loss': f"{loss.item():.4f}",
                    'acc': f"{correct_predictions/total_predictions:.4f}"
                })

            except Exception as e:
                # Error handling and debugging information
                print(f"\nError in batch {batch_idx}")
                print(f"Input shape: {inputs.shape}, max value: {inputs.max().item()}, min value: {inputs.min().item()}")
                print(f"Target shape: {targets.shape}, max value: {targets.max().item()}, min value: {targets.min().item()}")
                print(f"Output shape: {outputs.shape}")
                raise e
        
        # Calculate average loss and accuracy for the epoch
        avg_loss = total_loss / len(train_loader)
        train_accuracy = correct_predictions / total_predictions
        epoch_time = time.time() - start_time

        print(f'\nEpoch {epoch+1}/{num_epochs} - Time: {epoch_time:.2f}s')
        print(f'Train Loss: {avg_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
        
        # Evaluation on test set
        model.eval()  # Set the model to evaluation mode
        correct = 0
        total = 0
        test_loss = 0
        with torch.no_grad():
            for inputs, targets in test_loader:
                outputs = model(inputs)
                _, predicted = outputs.max(dim=-1)
                non_pad_mask = targets.ne(addition_train.vocab['<PAD>'])
                total += non_pad_mask.sum().item()
                correct += (predicted[non_pad_mask] == targets[non_pad_mask]).sum().item()
                
                # Calculate test loss
                loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
                test_loss += loss.item()

        # Calculate test accuracy and average test loss
        test_accuracy = correct / total
        avg_test_loss = test_loss / len(test_loader)
        print(f'Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

        # Save the best model
        if test_accuracy > best_accuracy:
            best_accuracy = test_accuracy
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'New best model saved with accuracy: {best_accuracy:.4f}')

        print('-' * 60)

    print(f'Training completed. Best test accuracy: {best_accuracy:.4f}')

In [85]:
# Model parameters
vocab_size = 14  # 0-9 digits <PAD>, <EOS>, +, =,
embed_size = 64
num_heads = 2
ff_dim = 128
num_layers = 2
max_length = 20

# Training parameters
batch_size = 32
num_epochs = 10
learning_rate = 0.001

# Create dataloaders
train_loader = DataLoader(addition_train, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(addition_test, batch_size=batch_size)

max_seq_length = max_length * 2 + 2  # This should be 42 based on your current setup
model = SmallTransformer(vocab_size, embed_size, num_heads, ff_dim, num_layers, max_seq_length)
criterion = nn.CrossEntropyLoss(ignore_index=vocab_size-2)  # Assuming <PAD> is the second to last token
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs)

Epoch 1/10:   0%|          | 0/6250 [00:00<?, ?it/s]

Epoch 1/10: 100%|██████████| 6250/6250 [03:46<00:00, 27.65it/s, loss=1.3243, acc=0.3827]



Epoch 1/10 - Time: 226.04s
Train Loss: 1.6537, Train Accuracy: 0.3827
Test Loss: 1.4574, Test Accuracy: 0.4344
New best model saved with accuracy: 0.4344
------------------------------------------------------------


Epoch 2/10: 100%|██████████| 6250/6250 [03:54<00:00, 26.67it/s, loss=1.6127, acc=0.4225]



Epoch 2/10 - Time: 234.32s
Train Loss: 1.5003, Train Accuracy: 0.4225
Test Loss: 1.4118, Test Accuracy: 0.4449
New best model saved with accuracy: 0.4449
------------------------------------------------------------


Epoch 3/10: 100%|██████████| 6250/6250 [03:45<00:00, 27.69it/s, loss=1.3877, acc=0.4366]



Epoch 3/10 - Time: 225.70s
Train Loss: 1.4610, Train Accuracy: 0.4366
Test Loss: 1.3644, Test Accuracy: 0.4725
New best model saved with accuracy: 0.4725
------------------------------------------------------------


Epoch 4/10: 100%|██████████| 6250/6250 [03:49<00:00, 27.28it/s, loss=1.4293, acc=0.4684]



Epoch 4/10 - Time: 229.11s
Train Loss: 1.3960, Train Accuracy: 0.4684
Test Loss: 1.2328, Test Accuracy: 0.5350
New best model saved with accuracy: 0.5350
------------------------------------------------------------


Epoch 5/10: 100%|██████████| 6250/6250 [18:04<00:00,  5.76it/s, loss=1.2539, acc=0.5084]   



Epoch 5/10 - Time: 1084.67s
Train Loss: 1.3149, Train Accuracy: 0.5084
Test Loss: 1.1163, Test Accuracy: 0.5858
New best model saved with accuracy: 0.5858
------------------------------------------------------------


Epoch 6/10: 100%|██████████| 6250/6250 [03:58<00:00, 26.24it/s, loss=1.1787, acc=0.5408]



Epoch 6/10 - Time: 238.23s
Train Loss: 1.2441, Train Accuracy: 0.5408
Test Loss: 1.0412, Test Accuracy: 0.6149
New best model saved with accuracy: 0.6149
------------------------------------------------------------


Epoch 7/10: 100%|██████████| 6250/6250 [03:26<00:00, 30.26it/s, loss=1.3563, acc=0.5738]



Epoch 7/10 - Time: 206.56s
Train Loss: 1.1618, Train Accuracy: 0.5738
Test Loss: 0.9037, Test Accuracy: 0.6607
New best model saved with accuracy: 0.6607
------------------------------------------------------------


Epoch 8/10: 100%|██████████| 6250/6250 [03:28<00:00, 29.99it/s, loss=1.3578, acc=0.5975]



Epoch 8/10 - Time: 208.38s
Train Loss: 1.0946, Train Accuracy: 0.5975
Test Loss: 0.8801, Test Accuracy: 0.6699
New best model saved with accuracy: 0.6699
------------------------------------------------------------


Epoch 9/10: 100%|██████████| 6250/6250 [03:45<00:00, 27.66it/s, loss=0.9276, acc=0.6073]



Epoch 9/10 - Time: 225.94s
Train Loss: 1.0679, Train Accuracy: 0.6073
Test Loss: 0.8410, Test Accuracy: 0.6808
New best model saved with accuracy: 0.6808
------------------------------------------------------------


Epoch 10/10: 100%|██████████| 6250/6250 [03:00<00:00, 34.56it/s, loss=0.9931, acc=0.6154]



Epoch 10/10 - Time: 180.84s
Train Loss: 1.0465, Train Accuracy: 0.6154
Test Loss: 0.8201, Test Accuracy: 0.6943
New best model saved with accuracy: 0.6943
------------------------------------------------------------
Training completed. Best test accuracy: 0.6943


In [11]:
# After training is complete
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'vocab_size': vocab_size,
    'embed_size': embed_size,
    'num_heads': num_heads,
    'ff_dim': ff_dim,
    'num_layers': num_layers,
    'max_seq_length': max_seq_length
}, 'trained_addition_model.pth')

print("Model saved successfully!")

NameError: name 'model' is not defined

In [7]:
# Load the saved model
checkpoint = torch.load('trained_addition_model.pth')

# Recreate the model architecture
loaded_model = SmallTransformer(
    checkpoint['vocab_size'],
    checkpoint['embed_size'],
    checkpoint['num_heads'],
    checkpoint['ff_dim'],
    checkpoint['num_layers'],
    checkpoint['max_seq_length']
)

# Load the model weights
loaded_model.load_state_dict(checkpoint['model_state_dict'])

# Set the model to evaluation mode
loaded_model.eval()

print("Model loaded successfully!")

# Function to preprocess input for the model
def preprocess_input(input_str, max_length):
    # Reverse the input string
    input_str = input_str[::-1]
    # Tokenize
    tokens = [addition_train.vocab[c] for c in input_str if c in addition_train.vocab]
    # Pad
    padded = tokens + [addition_train.vocab['<PAD>']] * (max_length - len(tokens))
    return torch.tensor(padded).unsqueeze(0)  # Add batch dimension

# Function to decode model output
def decode_output(output_tensor):
    _, predicted = output_tensor.max(2)
    decoded = ''.join([addition_train.inv_vocab[t.item()] for t in predicted[0] if t.item() not in [addition_train.vocab['<PAD>'], addition_train.vocab['<EOS>']]])
    return decoded[::-1]  # Reverse the output

Model loaded successfully!


In [17]:
# Test the model on a single addition problem
def test_addition(num1, num2):
    input_str = f"{num1}+{num2}="
    input_tensor = preprocess_input(input_str, checkpoint['max_seq_length'])
    
    with torch.no_grad():
        output = loaded_model(input_tensor)
    
    result = decode_output(output)
    print(f"{num1} + {num2} = {result}")
    print(f"Correct result: {num1 + num2}")
    print(f"Model's prediction is {'correct' if int(result) == num1 + num2 else 'incorrect'}")

# Test on some examples
#test_addition(123, 456)
test_addition(1234,7890)
#test_addition(4, 1000)

1234 + 7890 = 9024
Correct result: 9124
Model's prediction is incorrect


In [9]:
import random

def generate_test_set(num_samples, max_digits):
    test_set = []
    for _ in range(num_samples):
        num1 = random.randint(1, 10**max_digits - 1)
        num2 = random.randint(1, 10**max_digits - 1)
        result = num1 + num2
        test_set.append((num1, num2, result))
    return test_set

# Generate a test set
num_test_samples = 1000
max_test_digits = 20  # Maximum number of digits in each operand
test_set = generate_test_set(num_test_samples, max_test_digits)

In [10]:
def evaluate_on_dataset(model, dataloader, dataset_name="Dataset"):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, targets in dataloader:
            outputs = model(inputs)
            _, predicted = outputs.max(2)
            
            # Create a mask for non-padding tokens
            non_pad_mask = targets.ne(addition_train.vocab['<PAD>'])
            
            # Count correct predictions
            correct += (predicted[non_pad_mask] == targets[non_pad_mask]).sum().item()
            total += non_pad_mask.sum().item()
    
    accuracy = correct / total
    print(f"{dataset_name} Accuracy: {accuracy:.4f}")
    return accuracy

# Create a DataLoader for the training data
train_loader_for_eval = DataLoader(addition_train, batch_size=32, shuffle=False)

# Evaluate on training data
train_accuracy = evaluate_on_dataset(loaded_model, train_loader_for_eval, "Training Data")

# Evaluate on test data for comparison
test_accuracy = evaluate_on_dataset(loaded_model, test_loader, "Test Data")

# Print comparison
print(f"\nAccuracy comparison:")
print(f"Training Data: {train_accuracy:.4f}")
print(f"Test Data:     {test_accuracy:.4f}")

Training Data Accuracy: 0.6875


NameError: name 'test_loader' is not defined

In [22]:
def test_larger_additions(model, max_seq_length, num_samples=100):
    correct = 0
    for _ in range(num_samples):
        num1 = random.randint(10**20, 10**30 - 1)  # 21 to 30 digit numbers
        num2 = random.randint(10**20, 10**30 - 1)
        true_result = num1 + num2
        
        input_str = f"{num1}+{num2}="
        input_tensor = preprocess_input(input_str, max_seq_length)
        
        with torch.no_grad():
            output = model(input_tensor)
        
        predicted_result = decode_output(output)
        
        try:
            if int(predicted_result) == true_result:
                correct += 1
        except ValueError:
            pass
    
    accuracy = correct / num_samples
    print(f"Accuracy on {num_samples} large number samples (21-30 digits): {accuracy:.4f}")

# Test on larger numbers
test_larger_additions(loaded_model, checkpoint['max_seq_length'])

def test_specific_patterns(model, max_seq_length):
    test_cases = [
        (999999, 1),  # Testing carry over
        (1, 999999),  # Testing different order
        (10**15 - 1, 1),  # Large number + small number
        (10**15, 10**15),  # Two large, round numbers
        (123456789, 987654321),  # Ascending + descending
    ]
    
    for num1, num2 in test_cases:
        input_str = f"{num1}+{num2}="
        input_tensor = preprocess_input(input_str, max_seq_length)
        
        with torch.no_grad():
            output = model(input_tensor)
        
        predicted_result = decode_output(output)
        true_result = num1 + num2
        
        print(f"{num1} + {num2} = {predicted_result} (True: {true_result})")
        print(f"Correct: {int(predicted_result) == true_result}")
        print()

# Test on specific patterns
print("Testing on specific addition patterns:")
test_specific_patterns(loaded_model, checkpoint['max_seq_length'])

Accuracy on 100 large number samples (21-30 digits): 0.0000
Testing on specific addition patterns:
999999 + 1 = 99991999900 (True: 1000000)
Correct: False

1 + 999999 = 11999000 (True: 1000000)
Correct: False

999999999999999 + 1 = 00999990099999990999999999999999900 (True: 1000000000000000)
Correct: False

1000000000000000 + 1000000000000000 = 0000011010002000000000000000 (True: 2000000000000000)
Correct: False

123456789 + 987654321 = 91951616571110877600 (True: 1111111110)
Correct: False



In [11]:
import torchinfo  # Better alternative to torchsummary for transformers

In [14]:
import torch
from torchinfo import torchinfo  # Better alternative to torchsummary for transformers

def summarize_transformer(model, batch_size=32, seq_length=42, vocab_size=None):
    """
    Provides a detailed summary of a transformer model using torchinfo.
    
    Args:
        model: The transformer model to analyze
        batch_size: Number of samples in a batch
        seq_length: Length of input sequences
        vocab_size: Size of vocabulary (if None, will use model's vocab_size if available)
    """
    # Create dummy input tensor with proper dtype
    if vocab_size is None:
        try:
            vocab_size = model.vocab_size
        except AttributeError:
            vocab_size = 1000  # default fallback
    
    # Generate random indices within vocab size range
    dummy_input = torch.randint(0, vocab_size, (batch_size, seq_length), dtype=torch.long)
    
    # Get model summary
    summary = torchinfo.summary(
        model,
        input_data=dummy_input,
        col_names=["input_size", "output_size", "num_params", "kernel_size", "mult_adds"],
        depth=4,  # Adjust this to see more/less layers
        device='cpu'  # Change to 'cuda' if using GPU
    )
    
    return summary

# Example usage:
    # Get model summary
summary = summarize_transformer(
    loaded_model,
    batch_size=32,
    seq_length=42,
    vocab_size=checkpoint['vocab_size']
    )

print(summary)

Layer (type:depth-idx)                        Input Shape               Output Shape              Param #                   Kernel Shape              Mult-Adds
SmallTransformer                              [32, 42]                  [32, 42, 14]              33,472                    --                        --
├─AbacusEmbedding: 1-1                        [32, 42]                  [32, 42, 64]              --                        --                        --
│    └─Embedding: 2-1                         [32, 42]                  [32, 42, 64]              896                       --                        28,672
│    └─Embedding: 2-2                         [1, 42]                   [1, 42, 64]               2,688                     --                        2,688
├─TransformerEncoder: 1-2                     [32, 42, 64]              [32, 42, 64]              --                        --                        --
│    └─ModuleList: 2-3                        --                    

# Small Transformer Model for Arithmetic Operations

This is a compact transformer model designed for arithmetic tasks. Here's a breakdown of its architecture:

## Model Overview
- **Total Parameters**: 104,910 (very lightweight!)
- **Memory Footprint**: Only 0.89 MB
- **Max Sequence Length**: 42 tokens
- **Embedding Dimension**: 64
- **Vocabulary Size**: 14 (likely digits 0-9 plus special tokens)

## Architecture Components

### 1. Input Processing (AbacusEmbedding)
- **Token Embedding**: Converts each input number/symbol into a 64-dimensional vector
- **Positional Embedding**: Adds position information to each token to maintain sequence order
- These embeddings combine to give the model understanding of both WHAT each token is and WHERE it appears

### 2. Transformer Encoder
- **Number of Layers**: 2
- Each layer contains:
 - Self-attention mechanism (allows model to weigh importance of different positions)
 - Feed-forward neural network
- Helps model understand relationships between different positions in the input sequence

### 3. Output Layer
- Linear projection layer that converts the 64-dimensional features back to vocabulary size (14)
- Produces predictions for each position in the sequence

This model's architecture suggests it's optimized for tasks like addition or basic arithmetic, where it needs to process sequences of numbers and operators. The small vocabulary size (14) is perfect for digits 0-9 plus a few special tokens (like '+', '=', etc.).