This notebook was used to create a small transformer model and develop my own tokenizer needed for the task of predicting the next symbol in the code.

In [9]:
import pandas as pd
import numpy as np
import nltk
from nltk.util import ngrams
from collections import defaultdict, Counter
import torch.nn.functional as F
import re
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim


Data processing and tokenizer development part


In [10]:
df = pd.read_parquet('props-proofs.parquet').head(110)


In [11]:
df['combined'] = df['proposition'] + ' ' + df['proof']
df.drop(columns=['proposition', 'proof', 'imports', 'filename', 'symbolic_name'], inplace=True)
df_test = df.iloc[100:]
df = df.iloc[:100]

In [12]:
def find_unique_symbols(df, column_name):
    """
    This function finds all unique symbols occuring in our dataset and changes it to a set. 
    """
    all_text = ''.join(df[column_name].dropna().astype(str))
    unique_symbols = set(all_text)
    
    return unique_symbols


In [13]:

def create_tokenizer(symbol_set):
    """
    This function creates a tokenizer that changes a string into words, symbols, and whitespace tokens.
    An exception is added for variable names so that they are not split into individual characters. i.e. 'var_name' is one token instead of 'var' '_' 'name'.

    """
    filtered_symbols = {s for s in symbol_set if not s.isalpha()}

    symbol_pattern = '|'.join(re.escape(s) for s in sorted(filtered_symbols, key=len, reverse=True))

    def tokenizer(input_string):
        tokens = re.findall(rf'\b\w+(?:_\w+)*\b|{symbol_pattern}|\s|[^\w\s]', input_string)
        return tokens

    return tokenizer


symbol_set = find_unique_symbols(df, 'combined')
tokenizer = create_tokenizer(symbol_set)

In [14]:
data = df.apply(lambda x: tokenizer(x['combined']), axis=1).tolist()

Model development part

In [15]:
class TokenDataset(Dataset):
    """
    This class created the dataset for the model from our given cleaned and tokenized data.
    """
    def __init__(self, data, vocab=None, max_vocab_size=30000):
        self.data = data
        
        if vocab is None:
            token_counts = Counter(token for line in data for token in line)
            most_common_tokens = token_counts.most_common(max_vocab_size - 2)
            self.vocab = {token: idx + 2 for idx, (token, _) in enumerate(most_common_tokens)}
            self.vocab['<PAD>'] = 0
            self.vocab['<UNK>'] = 1
        else:
            self.vocab = vocab

        self.vocab_size = len(self.vocab)
        self.pad_token = self.vocab['<PAD>']

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        tokens = self.data[idx]
        # before the token text
        input_tokens = tokens[:-1] 
        # token to predict
        target_token = tokens[1:] 
        
        input_indices = [self.vocab.get(token, self.vocab['<UNK>']) for token in input_tokens]
        target_indices = [self.vocab.get(token, self.vocab['<UNK>']) for token in target_token]

        return torch.tensor(input_indices, dtype=torch.long), torch.tensor(target_indices, dtype=torch.long)

def collate_fn(batch):
    """
    This function prepares the batches for the model.
    """
    inputs, targets = zip(*batch)
    inputs_padded = nn.utils.rnn.pad_sequence(inputs, batch_first=True, padding_value=0)
    targets_padded = nn.utils.rnn.pad_sequence(targets, batch_first=True, padding_value=0)
    return inputs_padded, targets_padded

class TransformerModel(nn.Module):
    """
    This is our transformer model class
    """
    def __init__(self, vocab_size, embedding_dim=512, nhead=8, num_layers=6, dim_feedforward=2048, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.transformer = nn.Transformer(
            d_model=embedding_dim,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout
        )
        self.fc_out = nn.Linear(embedding_dim, vocab_size)
        
    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        src_emb = self.embedding(src)  # [seq_length, batch_size, embedding_dim]
        tgt_emb = self.embedding(tgt)
        
        transformer_output = self.transformer(src_emb, tgt_emb, src_mask=src_mask, tgt_mask=tgt_mask)
        output = self.fc_out(transformer_output)  # [seq_length, batch_size, vocab_size]
        
        return output

def generate_square_subsequent_mask(size):
    """
    This function ensures that the model does not look into the future tokens.
    """
    mask = torch.triu(torch.ones(size, size) * float('-inf'), diagonal=1)
    return mask


def train_model(model, dataloader, criterion, optimizer, epochs=5):
    """
    This function trains the given model using the given dataloader, criterion, and optimizer.
    """
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for src, tgt in dataloader:
            src, tgt = src.transpose(0, 1), tgt.transpose(0, 1)
            
            src_mask = generate_square_subsequent_mask(src.size(0))
            tgt_mask = generate_square_subsequent_mask(tgt.size(0) - 1)
            
            output = model(src, tgt[:-1], src_mask=src_mask, tgt_mask=tgt_mask)
            output_dim = output.shape[-1]
            
            output = output.reshape(-1, output_dim)
            tgt = tgt[1:].reshape(-1)
            loss = criterion(output, tgt)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader)}")


dataset = TokenDataset(data, max_vocab_size=30000)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
embedding_dim = 512
model = TransformerModel(vocab_size=dataset.vocab_size, embedding_dim=embedding_dim, num_layers=3)
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss(ignore_index=dataset.pad_token)

train_model(model, dataloader, criterion, optimizer, 25)
model_path = 'transformer_model.pth'
torch.save(model.state_dict(), model_path)
print(f'Model saved to {model_path}')



Epoch 1/25, Loss: 4.294771194458008
Epoch 2/25, Loss: 3.3761261105537415
Epoch 3/25, Loss: 3.1697906851768494
Epoch 4/25, Loss: 2.9279643297195435
Epoch 5/25, Loss: 2.71120285987854
Epoch 6/25, Loss: 2.5502926111221313
Epoch 7/25, Loss: 2.445933520793915
Epoch 8/25, Loss: 2.3825116753578186
Epoch 9/25, Loss: 2.3142624497413635
Epoch 10/25, Loss: 2.2764819264411926
Epoch 11/25, Loss: 2.146627187728882
Epoch 12/25, Loss: 2.090774178504944
Epoch 13/25, Loss: 1.9795898497104645
Epoch 14/25, Loss: 1.9832974970340729
Epoch 15/25, Loss: 1.8855700194835663
Epoch 16/25, Loss: 1.8477830588817596
Epoch 17/25, Loss: 1.8249483406543732
Epoch 18/25, Loss: 1.759735107421875
Epoch 19/25, Loss: 1.685219556093216
Epoch 20/25, Loss: 1.683346927165985
Epoch 21/25, Loss: 1.700977474451065
Epoch 22/25, Loss: 1.6306636333465576
Epoch 23/25, Loss: 1.512267678976059
Epoch 24/25, Loss: 1.5150374472141266
Epoch 25/25, Loss: 1.491888552904129
Model saved to transformer_model.pth


In [16]:
def predict_next_token(model, sequence, vocab, max_len=50):
    """
    This function predicts the next token based on the given sequence.
    
    Inputs:
    - model: trained transformer model
    - sequence: base sequence before the predictied token
    - vocab: dictionalry mapping tokens to indexes
    - max_len: maximum amount of tokens to generate
    
    Outputs
    - Next given tokens
    """
    model.eval()
    
    input_indices = [vocab.get(token, vocab['<UNK>']) for token in sequence]
    input_tensor = torch.tensor(input_indices, dtype=torch.long).unsqueeze(1)
    src_mask = generate_square_subsequent_mask(input_tensor.size(0))
    
    with torch.no_grad():
        output = model(input_tensor, input_tensor, src_mask=src_mask, tgt_mask=src_mask)
        output = output[-1, 0, :]
        
        probabilities = F.softmax(output, dim=-1)
        
        # most probable next token is chosen
        predicted_index = torch.argmax(probabilities).item()
        # print(predicted_index)
        idx_to_token = {idx: token for token, idx in vocab.items()}
        # print(idx_to_token)
        predicted_token = idx_to_token.get(predicted_index, '<UNK>')
    
    return predicted_token
test_sequence = ['Lemma', ' ', 'scons_comp', ' ', 'x', ' ', 'f', ' ', 'g', ' ', ':', ' ', '(', 'x', ' ', '.', ':', ' ', 'f', ')', ' ', '>', '>', '>', ' ', 'g', ' ', '=', ' ', '(', 'g', ' ', 'x', ')', ' ', '.', ':', ' ', 'f', ' ', '>', '>', '>', ' ', 'g', '.', ' ', 'Proof', '.', ' ', 'f_ext', ';', ' ', 'let', ' ', 'x', ' ', ':', '=', ' ', 'fresh', ' ', 'in', ' ', 'intros', ' ', 'x', ';', ' ', 'now', ' ', 'destruct', ' ', 'x', '.', ' ', 'Qed']


predicted_token = predict_next_token(model, test_sequence, dataset.vocab)
print("Predicted token: '" + predicted_token+"'")

Predicted token: '.'


Model testing part

In [19]:
pd.set_option('display.max_colwidth', None)
data_test = df_test.apply(lambda x: tokenizer(x['combined']), axis=1).tolist()


Tests token lists were taken from data_test

In [18]:
tests_og = [
['Lemma', ' ', 'p_impl_p', ' ', ':', ' ', 'forall', ' ', '(', 'p', ':', 'Prop', ')', ',', ' ', 'p', ' ', '/', '\\', ' ', 'True', ' ', '-', '>', ' ', 'p', '.', ' ', 'Proof', '.', ' ', 'intros', '.', ' ', 'itauto', ' ', 'idtac', '.', ' ', 'Qed', '.'],
['Lemma', ' ', 'x_andb_true', ' ', ':', ' ', 'forall', ' ', 'x', ',', ' ', 'Is_true', ' ', '(', 'x', ' ', '&', '&', ' ', 'true', ')', ' ', '-', '>', ' ', 'Is_true', ' ', 'x', '.', ' ', 'Proof', '.', ' ', 'intros', '.', ' ', 'itauto', ' ', 'idtac', '.', ' ', 'Qed', '.'],
['Lemma', ' ', 'l2', ' ', ':', ' ', 'forall', ' ', '(', 'x', ':', 'Z', ')', ',', ' ', 'x', ' ', '>', '=', ' ', '0', ' ', '-', '>', ' ', 'x', ' ', '<', '=', ' ', '0', ' ', '-', '>', ' ', 'x', ' ', '<', '>', ' ', '0', ' ', '-', '>', ' ', 'False', '.', ' ', 'Proof', '.', ' ', 'intros', '.', ' ', 'itauto', ' ', 'lia', '.', ' ', 'Qed', '.'],
]
tests = [
['Lemma', ' ', 'p_impl_p', ' ', ':', ' ', 'forall', ' ', '(', 'p', ':', 'Prop', ')', ',', ' ', 'p', ' ', '/', '\\'],
['Lemma', ' ', 'x_andb_true', ' ', ':', ' ', 'forall', ' ', 'x', ',', ' ', 'Is_true', ' ', '(', 'x', ' ', '&'],
['Lemma', ' ', 'l2', ' ', ':', ' ', 'forall', ' ', '(', 'x', ':', 'Z', ')', ',', ' ', 'x', ' ', '>', '=', ' ', '0', ' ', '-', '>', ' ', 'x', ' ', '<', '=', ' ', '0', ' ', '-']
]
for i in range(3):
    print('Predicted tokens for transformer model:')
    print("'" + predict_next_token(model, tests[i], dataset.vocab)+ "'")
    print('Correct token:')
    x = len(tests[i])
    print("'"+ tests_og[i][x]+"'")


Predicted tokens for transformer model:
' '
Correct token:
' '
Predicted tokens for transformer model:
' '
Correct token:
'&'
Predicted tokens for transformer model:
'>'
Correct token:
'>'
