# Encoders mini-project:
## Implementation of a numerical sequence generator

### I. Introduction

As seen in the explanation.ipnyb notebook, enven though decoders as implemented in this repository were introduced as part of a bigger architecture - the *transformer* architecture - they can be used as a standalone architecture for sequences generation. 

In this notebook, we will implement a simple sequence generator and make different tests and observation to illustrate what was said in the explanations. 

### II. Implementation of the model

First, let's import the code from model.py. The containt of this file if precisely what was done in the explanations notebook:

In [163]:
from model import SelfAttention, TransformerBlock, StandaloneDecoderBlock, StandaloneDecoder
import torch
import torch.nn as nn
import torch.functional as F

### III. Configuration

In [164]:
BATCH_SIZE = 16



### IV. Creation of the dataset

For this mini-project, we will make the dataset ourself so that the data is both very simple, in on-demand quantities and completely mastered. We will make the ArithmeticSequenceDataset so that we will be able to generate two simple different types of arithmetic sequences:

Arithmetic: $U_n = a + nb$

Geometric: $U_n = a(b)^n$

In [203]:
from torch.utils.data import Dataset
import random as rd
import numpy as np

class ArithmeticSequenceDataset(Dataset):
    def __init__(self, size=1000, length=6, proportions=(50,50), max_length=30):
        super().__init__()

        self.size = size
        self.length = length
        self.proportions = proportions
        self.max_length = max_length

        self.max_value = self.make_sequences()

        # We could make different choices for the vocabulary, but we will choose to limit as
        # much as possible its size
        self.vocab = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
                      '<SEP>',
                      '<PAD>',
                      '<EOS>']

        self.vocab2idx = {c: i for i, c in enumerate(self.vocab)}
        self.idx2vocab = {i: c for c, i in self.vocab2idx.items()}

        # Convert sequences to tokens
        self.tokenized_sequences = [self.tokenize_sequence(seq) for seq in self.sequences]

    def tokenize_sequence(self, sequence):
        """
        Input:
            sequence (list): example: [1, 23, 456]
          
        Returns:
            (list): example: ['1', '<SEP>', '2', '3', '<SEP>', '4', '5', '6', '<EOS>']
        """
        tokens = []
        
        for i, num in enumerate(sequence):
            for digit in str(num):
                tokens.append(digit)
            
            if i < len(sequence) - 1:
                tokens.append('<SEP>')
        
        tokens.append('<EOS>')
        
        return tokens
    
    def detokenize_sequence(self, tokens):
        """
        Input:
            sequence (list): example: ['1', '<SEP>', '2', '3', '<SEP>', '4', '5', '6', '<EOS>']
          
        Returns:
            (list): example: [1, 23, 456]
        """

        numbers = []
        current_number = ""
        
        for token in tokens:
            if token in ['<SEP>', '<EOS>', '<PAD>']:
                if current_number:
                    numbers.append(int(current_number))
                    current_number = ""
                if token == '<EOS>':
                    break
            elif token.isdigit():
                current_number += token
        
        if current_number:
            numbers.append(int(current_number))
            
        return numbers

    def generate_random_arithmetic_sequence(self):
        """
        Makes an array of size length containing the length first values
        for U_n = a + b*n with a and b chosen randomly

        Input:
            length: number of values generated

        Returns:
            (np.array)
        """

        a = rd.randint(0, 5)
        b = rd.randint(1, 5)

        _seq = [] 
        for n in range(self.length):

            _seq.append(a + n*b)

        return np.array(_seq)
    
    def generate_random_geometric_sequence(self):
        """
        Makes an array of size length containing the length first values
        for U_n = a*b^n with a and b chosen randomly

        Input:
            length: number of values generated

        Returns:
            (np.array)
        """

        a = rd.randint(1, 4)
        b = rd.randint(2, 4)

        _seq = [] 
        for n in range(1, self.length+1):

            _seq.append(a*(b**n))

        return np.array(_seq)
    
    def make_sequences(self):
        self.sequences = []

        num_arithmetic = self.size*self.proportions[0]//100
        num_geometric = self.size*self.proportions[1]//100

        max_value = 0

        # Generate the arithmetic sequences:
        for k in range(num_arithmetic):
            generated_seq = self.generate_random_arithmetic_sequence()
            self.sequences.append(generated_seq)

            if generated_seq[-1] > max_value:
                max_value = generated_seq[-1]

        # Generate the geometric sequences:
        for k in range(num_geometric):
            generated_seq = self.generate_random_geometric_sequence()
            self.sequences.append(generated_seq)

            if generated_seq[-1] > max_value:
                max_value = generated_seq[-1]

        self.sequences = np.array(self.sequences)

        return max_value
    
    def pad_sequence(self, tokens):
        """Pad sequence to max_length"""
        if len(tokens) > self.max_length:
            return tokens[:self.max_length]  # Truncate
        
        # Pad
        padded = tokens + ['<PAD>'] * (self.max_length - len(tokens))
        return padded
    

    def __getitem__(self, idx):
        sequence = self.sequences[idx]  # [2, 4, 8, 16, 32]
        
        # Context size to decide how much context is needed.
        # In our, case, we need the first 3 numbers + 3 seperators
        context_size = 3  # Minimum to get the pattern
        predict_size = len(sequence) - context_size
        
        # Split context and target
        context = sequence[:context_size]    # [2, 4, 8]
        target = sequence[context_size:]     # [16, 32]
        
        # Tokenize
        context_tokens = self.tokenize_sequence(context.tolist())[:-1] + ['<SEP>'] # ['2', '<SEP>', '4', '<SEP>', '8']
        target_tokens = self.tokenize_sequence(target.tolist()) # ['<SEP>', '8', '<SEP>', '1', '6', '<SEP>', '3', '2', '<EOS>']
        
        full_sequence = context_tokens + target_tokens
        
        # Pad to max_length
        padded_sequence = self.pad_sequence(full_sequence)

        context_len = len(context_tokens)
        
        # Create input and target tensors (shifted by 1)
        input_ids = torch.tensor([self.vocab2idx[token] for token in padded_sequence[:-1]], dtype=torch.long)
        target_ids = torch.tensor([self.vocab2idx[token] for token in padded_sequence[context_len:]], dtype=torch.long)
        
        # Create loss mask (only predict on target tokens, not context)
        
        loss_mask = torch.cat([
            torch.zeros(context_len, dtype=torch.float),  # Don't compute loss on context
            torch.ones(len(target_tokens), dtype=torch.float),  # Compute loss on predictions
            torch.zeros(self.max_length - len(full_sequence), dtype=torch.float)  # Don't compute loss on padding
        ])
        
        return input_ids, target_ids, loss_mask


            
    def __len__(self):
        return len(self.sequences)
    

test = ArithmeticSequenceDataset()
test.__getitem__(999)


(tensor([ 6, 10,  1,  2, 10,  2,  4, 10,  4,  8, 10,  9,  6, 10,  1,  9,  2, 12,
         11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11]),
 tensor([ 4,  8, 10,  9,  6, 10,  1,  9,  2, 12, 11, 11, 11, 11, 11, 11, 11, 11,
         11, 11, 11, 11]),
 tensor([0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]))

We will also need to implement the associated dataloader

In [201]:
from torch.utils.data import DataLoader

train_dataset = ArithmeticSequenceDataset(
    size=2000,
    length=6,
    proportions=(50, 50)
)

test_dataset = ArithmeticSequenceDataset(
    size=400,
    length=6,
    proportions=(50, 50)
)


train_loader = DataLoader(
    train_dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=True
)

test_loader = DataLoader(
    test_dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=True
)