# CharTransformers: next character prediction using Transformers

In [1]:
import numpy as np
import pandas as pd
import json
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import time
import math
import os
from tempfile import TemporaryDirectory
from typing import Tuple
import torch
from torch import nn, Tensor
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset
import torch.nn.functional as F

# Preprocess data

In [2]:
file_path = "../data/alpaca_data_cleaned_subset.json"
context_length = 10

In [3]:
def read_json(file_path):
    with open(file_path, 'r') as f:
        return json.load(f)

def preprocess_data(data):
    sequences = []
    for item in data:
        sequence = list(item['instruction']) + ['\n'] + list(item['input']) + ['\n'] + list(item['output']) + ['[EOS]']
        sequences.append(sequence)
    return sequences

valid_chars = list('abcdefghijklmnopqrstuvwxyz ?!.,\n')
def preprocess_data_simple(data):
    sequences = []
    for item in data:
        instruction_alpha = ''.join([char for char in item['instruction'].lower() if char in valid_chars])
        input_alpha = ''.join([char for char in item['input'].lower() if char in valid_chars])
        output_alpha = ''.join([char for char in item['output'].lower() if char in valid_chars])

        sequence = list(instruction_alpha) + ['\n'] + list(input_alpha) + ['\n'] + list(output_alpha) + ['[EOS]']
        sequences.append(sequence)
    return sequences

def create_vocab(sequences):
    chars = [char for seq in sequences for char in seq]
    return sorted(set(chars))

def one_hot_encode(sequence, char_to_idx):
    one_hot = np.zeros((len(sequence), len(char_to_idx)), dtype=np.int32)
    for i, char in enumerate(sequence):
        one_hot[i, char_to_idx[char]] = 1
    return one_hot

def numerical_encode(sequence, char_to_idx):
    numerical_encoding = np.zeros(len(sequence), dtype=np.int32)
    for i, char in enumerate(sequence):
        numerical_encoding[i] = char_to_idx[char]
    return numerical_encoding

def create_training_examples(sequences, char_to_idx, input_length=10):
    x = []
    y = []

    for seq in sequences:
        encoded_seq = one_hot_encode(seq, char_to_idx)
        numerical_encode_seq = numerical_encode(seq, char_to_idx)
        total_chars = len(seq)

        for i in range(total_chars - input_length):
            x.append(numerical_encode_seq[i:i+input_length])
            y.append(numerical_encode_seq[i+input_length])

    return np.array(x), np.array(y)

# Main script
data = read_json(file_path)
#sequences = preprocess_data(data)
sequences = preprocess_data_simple(data)

vocab = create_vocab(sequences)
char_to_idx = {char: idx for idx, char in enumerate(vocab)}
idx_to_char = {idx: char for char, idx in char_to_idx.items()}

X, Y = create_training_examples(sequences, char_to_idx, input_length=context_length)
X = X.reshape(X.shape[0], -1)

print(f"Number of training examples: {X.shape[0]}")
print(f"Example input shape: {X[0].shape}")
print(f"output shape: {Y.shape}")

Number of training examples: 19237
Example input shape: (10,)
output shape: (19237,)


In [4]:
idx_to_char

{0: '\n',
 1: ' ',
 2: ',',
 3: '.',
 4: '?',
 5: '[EOS]',
 6: 'a',
 7: 'b',
 8: 'c',
 9: 'd',
 10: 'e',
 11: 'f',
 12: 'g',
 13: 'h',
 14: 'i',
 15: 'j',
 16: 'k',
 17: 'l',
 18: 'm',
 19: 'n',
 20: 'o',
 21: 'p',
 22: 'q',
 23: 'r',
 24: 's',
 25: 't',
 26: 'u',
 27: 'v',
 28: 'w',
 29: 'x',
 30: 'y',
 31: 'z'}

In [5]:
X.shape, Y.shape

((19237, 10), (19237,))

In [6]:
# Split the data
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2)

# Model

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

# Define the Positional Encoding class
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super(PositionalEncoding, self).__init__()
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, 2 * d_model, 2).float() * -(math.log(10000.0) / d_model))
        self.positional_encoding = nn.Parameter(torch.sin(position * div_term), requires_grad=False)

    def forward(self, x):
        pos_encoding = self.positional_encoding[:x.size(1), :x.size(2)].unsqueeze(0).expand(x.size(0), -1, -1)
        x = x + pos_encoding
        return x

# Define the Transformer model for sequence-to-sequence prediction
class CharTransformer(nn.Module):
    def __init__(self, input_size, output_size, embed_size, num_heads, num_layers):
        super(CharTransformer, self).__init__()
        self.embedding = nn.Embedding(input_size, embed_size)
        self.pos_encoder = PositionalEncoding(d_model=embed_size)
        self.transformer = nn.Transformer(
            d_model=embed_size,
            nhead=num_heads,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dim_feedforward=dim_feedforward
        )
        self.fc = nn.Linear(embed_size, output_size)

    def forward(self, x):
        x = self.embedding(x)
        x = self.pos_encoder(x)
        x = self.transformer(x, x)

        # Take the representation of the last token from each sequence
        x_last_token = x[:, -1, :]

        output = self.fc(x_last_token)

        return output

# Define a custom dataset
class CharDataset(Dataset):
    def __init__(self, X, Y):
        self.X = X
        self.Y = Y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        input_vector = self.X[idx]
        target_index = self.Y[idx]
        return input_vector, target_index

# Hyperparameters
input_size = 32 # size of the vocabulary, specifically the number of unique characters in your dataset
output_size = 32
embed_size = 64
num_heads = 2 # 2
num_layers = 2 # 2
batch_size = 32
learning_rate = 0.001 # 0.01
weight_decay = 1e-5  # Adjust the weight decay value as needed
num_epochs = 20 # 10
dim_feedforward = 2048

# Create model, loss, and optimizer
model = CharTransformer(input_size, output_size, embed_size, num_heads, num_layers)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

# Create the dataset and DataLoader
dataset = CharDataset(X_train, Y_train)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Training loop
start_time = time.time()  # Record the start time
for epoch in range(num_epochs):
    total_loss = 0
    total_correct = 0
    for batch in tqdm(dataloader, desc=f'Epoch {epoch + 1}/{num_epochs}'):
        inputs, targets = batch

        # Convert inputs to PyTorch tensor
        input_tensor = torch.as_tensor(inputs, dtype=torch.long).clone().detach()
        target_tensor = torch.as_tensor(targets, dtype=torch.long).clone().detach()

        # Add batch and sequence dimensions
        optimizer.zero_grad()
        outputs = model(input_tensor)

        # Check if batch sizes match before calculating the loss
        assert outputs.size(0) == target_tensor.size(0), "Batch sizes do not match"

        # Calculate the loss
        loss = criterion(outputs, target_tensor)
        loss.backward()
        optimizer.step()

        # Calculate accuracy on this batch
        # Get predicted characters (argmax along the second dimension)
        predicted_chars = torch.argmax(outputs, dim=1)  # Use dim=1 for the second dimension
        true_chars = targets

        # Count correct predictions
        correct_predictions = (predicted_chars == true_chars)
        total_correct += correct_predictions.sum().item()
        
        # Add to loss
        total_loss += loss.item()

    average_loss = total_loss / len(dataloader)
    accuracy = total_correct / len(dataloader) / batch_size
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {average_loss}, Accuracy: {accuracy * 100:.2f}%')

end_time = time.time()  # Record the end time
elapsed_time = end_time - start_time
print(f'Training took {elapsed_time} seconds.')

# Save the trained model
torch.save(model.state_dict(), 'char_transformer_model.pth')


Epoch 1/20:  52%|█████▏    | 252/481 [00:17<00:23,  9.78it/s]

In [None]:
# Load the trained model
model = CharTransformer(input_size, output_size, embed_size, num_heads, num_layers)
model.load_state_dict(torch.load('char_transformer_model.pth'))
model.eval()

# Create the test dataset and DataLoader
test_dataset = CharDataset(X_test, Y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Evaluation loop
total_correct = 0
total_samples = 0

with torch.no_grad():
    for batch in test_dataloader:
        inputs, targets = batch

        # Convert inputs to PyTorch tensor
        input_tensor = torch.as_tensor(inputs, dtype=torch.long).clone().detach()
        target_tensor = torch.as_tensor(targets, dtype=torch.long).clone().detach()

        # Forward pass
        outputs = model(input_tensor)

        # Get predicted characters (argmax along the second dimension)
        predicted_chars = torch.argmax(outputs, dim=1)  # Use dim=1 for the second dimension
        true_chars = targets

        # Count correct predictions
        correct_predictions = (predicted_chars == true_chars)
        total_correct += correct_predictions.sum().item()
        total_samples += correct_predictions.size(0)

# Calculate accuracy
accuracy = total_correct / total_samples
print(f'Test Accuracy: {accuracy * 100:.2f}%')

Test Accuracy: 26.85%
