In [1]:
import tiktoken
import torch
import torch.nn as nn
from torch.nn import functional as F
from GPT import GPT

# Hyperparameters
NUMBER_OF_FILES=10
batch_size = 4  # How many batches per training step
context_length = 16  # Length of the token chunk each batch
d_model = 64  # The size of our model token embeddings
num_blocks = 8  # Number of transformer blocks
num_heads = 4  # Number of heads in Multi-head attention
learning_rate = 1e-3  # 0.001
dropout = 0.1  # Dropout rate
max_iters = 100  # Total of training iterations <- Change this to smaller number for testing
eval_interval = 50  # How often to evaluate
eval_iters = 20  # Number of iterations to average for evaluation
device = 'cuda' if torch.cuda.is_available() else 'cpu'  # Use GPU if it's available.



In [2]:
encoding = tiktoken.get_encoding("cl100k_base")
max_token_value=0

for i in range(NUMBER_OF_FILES):
    fpath=f'./data/data{i}.txt'
    with open(fpath, 'r',encoding='utf-8') as f:
        text = f.read()
    tokenized_text = encoding.encode(text)
    max_token_value = max(max_token_value,max(tokenized_text) + 1)  # the maximum value of the tokenized numbers
    tokenized_text = torch.tensor(tokenized_text, dtype=torch.long, device=device)  # put tokenized text into tensor
    torch.save(tokenized_text, f"./data/tensor{i}.pt")

embed_table=nn.Embedding(num_embeddings=max_token_value+1,embedding_dim=d_model)

In [8]:
def get_training_batch():
    for i in range(NUMBER_OF_FILES-1):
        fpath=f'./data/tensor{i}.pt'
        with open(fpath,'rb') as f:
            data=torch.load(f)
        for i in range((2*max_iters+eval_iters)//(NUMBER_OF_FILES-1)+1):
            idxs = torch.randint(low=0, high=len(data) - context_length, size=(batch_size,))
            x = torch.stack([data[idx:idx + context_length] for idx in idxs]).to(device)
            y = torch.stack([data[idx + 1:idx + context_length + 1] for idx in idxs]).to(device)
            yield x,y


In [10]:
def get_validation_batch():
    fpath=f'./data/tensor{NUMBER_OF_FILES-1}.pt'
    with open(fpath,'rb') as f:
        data=torch.load(f)
    for i in range(max_iters+eval_iters+1):
        idxs = torch.randint(low=0, high=len(data) - context_length, size=(batch_size,))
        x = torch.stack([data[idx:idx + context_length] for idx in idxs]).to(device)
        y = torch.stack([data[idx + 1:idx + context_length + 1] for idx in idxs]).to(device)
        yield x, y

In [11]:
model = GPT(d_model,context_length,num_heads,num_blocks,embed_table,dropout)
model = model.to(device)
train_data_generator=get_training_batch()
valid_data_generator=get_validation_batch()

In [12]:
# Calculate loss
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['training', 'validation']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            x_batch, y_batch = next(train_data_generator) if split=='training' else next(valid_data_generator)
            logits, loss = model(x_batch, y_batch)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [2]:
# Use AdamW optimizer
optimizer = torch.optim.AdamW(params=model.parameters(), lr=learning_rate)
tracked_losses = list()
for step in range(max_iters):
    if step % eval_iters == 0 or step == max_iters - 1:
        losses = estimate_loss()
        tracked_losses.append(losses)
        print('Step:', step, 'Training Loss:', round(losses['training'].item(), 3), 'Validation Loss:',
              round(losses['validation'].item(), 3))

    xb, yb = next(train_data_generator)
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

In [None]:
# Save the model state dictionary
torch.save(model.state_dict(), 'modelGPT.pt')
