## Custom MiniGPT (Basic implementation)
- [Dataset repo](https://www.kaggle.com/datasets)
- [Dataset used --> Drake Lyrics](https://www.kaggle.com/datasets/juicobowley/drake-lyrics?select=drake_lyrics.txt)

In [1]:
with open("sample_data/drake_lyrics.txt", "r", encoding="utf-8") as file:
  text = file.read()

In [2]:
print(len(text))

772371


In [3]:
print(text[:1000])

"[Verse]
Put my feelings on ice
Always been a gem
Certified lover boy, somehow still heartless
Heart is only gettin' colder"
"[Verse]
Hands are tied
Someone's in my ear from the other side
Tellin' me that I should pay you no mind
Wanted you to not be with me all night
Wanted you to not stay with me all night
I know, you know, who that person is to me
Doesn't really change things

[Chorus]
I know you're scared of dating, falling for me
Shorty, surely you know me
Right here for you always
You know, I don't ever change
Right here for you always
You know I don't ever change
Right here for you

[Bridge]
In mind you make me want to do things, love you
Like I'm supposed to
You make me want to love you
Like I'm supposed to
You make me want to love you
Like I'm supposed to, remind you
Ayy

[Chorus]
I know you're scared of dating, falling for me
Shorty, by now you know me
Right here for you always
You know, I don't ever change
Right here for you always
You know I don't ever change
Right here for

In [4]:
text = text.replace('"', '')
print(text[:1000])

[Verse]
Put my feelings on ice
Always been a gem
Certified lover boy, somehow still heartless
Heart is only gettin' colder
[Verse]
Hands are tied
Someone's in my ear from the other side
Tellin' me that I should pay you no mind
Wanted you to not be with me all night
Wanted you to not stay with me all night
I know, you know, who that person is to me
Doesn't really change things

[Chorus]
I know you're scared of dating, falling for me
Shorty, surely you know me
Right here for you always
You know, I don't ever change
Right here for you always
You know I don't ever change
Right here for you

[Bridge]
In mind you make me want to do things, love you
Like I'm supposed to
You make me want to love you
Like I'm supposed to
You make me want to love you
Like I'm supposed to, remind you
Ayy

[Chorus]
I know you're scared of dating, falling for me
Shorty, by now you know me
Right here for you always
You know, I don't ever change
Right here for you always
You know I don't ever change
Right here for yo

In [5]:
chars = sorted(list(set(text)))
print(chars)

['\n', ' ', '!', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '[', ']', '_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '{', '|', '}', '\x81', '\x9d', '¡', '¦', '¨', '©', '«', '±', '³', 'º', 'Ã', 'â', 'œ', 'Ÿ', '˜', '“', '”', '…', '€', '™']


In [6]:
vocab_size = len(chars)
print(vocab_size)

106


In [7]:
# Very basic encoding / tokenization
char_to_int = {char:i for i, char in enumerate(chars)}
int_to_char = {i:char for i, char in enumerate(chars)}

encode = lambda s: [char_to_int[c] for c in s]
decode = lambda l: ''.join([int_to_char[i] for i in l])

print(encode("Laying on the beach"))
print(len(encode("Laying on the beach")))

[39, 57, 81, 65, 70, 63, 1, 71, 70, 1, 76, 64, 61, 1, 58, 61, 57, 59, 64]
19


In [8]:
print(decode(encode("Laying on the beach")))

Laying on the beach


## Convert Dataset from Python to Pytorch

In [9]:
import torch

In [10]:
# Text to tensor
data = torch.tensor(encode(text), dtype=torch.long)
print(data.shape, data.dtype)

torch.Size([770671]) torch.int64


In [11]:
print(data[:1000])

tensor([54, 49, 61, 74, 75, 61, 55,  0, 43, 77, 76,  1, 69, 81,  1, 62, 61, 61,
        68, 65, 70, 63, 75,  1, 71, 70,  1, 65, 59, 61,  0, 28, 68, 79, 57, 81,
        75,  1, 58, 61, 61, 70,  1, 57,  1, 63, 61, 69,  0, 30, 61, 74, 76, 65,
        62, 65, 61, 60,  1, 68, 71, 78, 61, 74,  1, 58, 71, 81, 11,  1, 75, 71,
        69, 61, 64, 71, 79,  1, 75, 76, 65, 68, 68,  1, 64, 61, 57, 74, 76, 68,
        61, 75, 75,  0, 35, 61, 57, 74, 76,  1, 65, 75,  1, 71, 70, 68, 81,  1,
        63, 61, 76, 76, 65, 70,  6,  1, 59, 71, 68, 60, 61, 74,  0, 54, 49, 61,
        74, 75, 61, 55,  0, 35, 57, 70, 60, 75,  1, 57, 74, 61,  1, 76, 65, 61,
        60,  0, 46, 71, 69, 61, 71, 70, 61,  6, 75,  1, 65, 70,  1, 69, 81,  1,
        61, 57, 74,  1, 62, 74, 71, 69,  1, 76, 64, 61,  1, 71, 76, 64, 61, 74,
         1, 75, 65, 60, 61,  0, 47, 61, 68, 68, 65, 70,  6,  1, 69, 61,  1, 76,
        64, 57, 76,  1, 36,  1, 75, 64, 71, 77, 68, 60,  1, 72, 57, 81,  1, 81,
        71, 77,  1, 70, 71,  1, 69, 65, 

In [12]:
train_split = int(0.9 * len(data)) # 90% of data will be used for training
train_data = data[:train_split]
validation_data = data[train_split:] # 10% for validation data
print(train_data.shape)
print(validation_data.shape)

torch.Size([693603])
torch.Size([77068])


In [13]:
block_size = 8
train_data[:block_size + 1] # +1 for predicting next character

tensor([54, 49, 61, 74, 75, 61, 55,  0, 43])

In [14]:
x = train_data[:block_size] # input
y = train_data[1:block_size + 1] # output --> start predicting after the input

for i in range(block_size):
  context = x[:i+1].numpy()
  target = y[i] # i because we satart at position 1 instead of 0
  print(f"Input to the model: {context} the target should be {target}")

Input to the model: [54] the target should be 49
Input to the model: [54 49] the target should be 61
Input to the model: [54 49 61] the target should be 74
Input to the model: [54 49 61 74] the target should be 75
Input to the model: [54 49 61 74 75] the target should be 61
Input to the model: [54 49 61 74 75 61] the target should be 55
Input to the model: [54 49 61 74 75 61 55] the target should be 0
Input to the model: [54 49 61 74 75 61 55  0] the target should be 43


In [15]:
for i in range(block_size):
  context = decode(x[:i+1].numpy())
  target = decode([y[i].item()]) # i because we satart at position 1 instead of 0
  print(f"Input to the model: {context} the target should be {target}")

Input to the model: [ the target should be V
Input to the model: [V the target should be e
Input to the model: [Ve the target should be r
Input to the model: [Ver the target should be s
Input to the model: [Vers the target should be e
Input to the model: [Verse the target should be ]
Input to the model: [Verse] the target should be 

Input to the model: [Verse]
 the target should be P


In [16]:
torch.manual_seed(1337)
batch_size = block_size = 8
device = "cuda" if torch.cuda.is_available() else "cpu"

In [17]:
def get_batch_data(dataset_split):
  data = train_data if dataset_split == 'train' else validation_data
  ix = torch.randint(len(data) - block_size, (batch_size, ))
  x = torch.stack([data[i:i+block_size] for i in ix])
  y = torch.stack([data[i+1:i+block_size+1] for i in ix])
  return x, y

In [18]:
x_batch, y_batch = get_batch_data('train')

In [19]:
print(x_batch.shape, y_batch.shape)

torch.Size([8, 8]) torch.Size([8, 8])


In [20]:
print(x_batch)

tensor([[30, 71, 69, 61,  1, 76, 64, 74],
        [81,  1, 46, 71, 70, 63, 82, 55],
        [76, 65, 70, 63,  1, 69, 71, 70],
        [77, 61, 68, 29, 57, 70, 60,  1],
        [ 1, 69, 61,  1, 68, 65, 67, 61],
        [ 1, 65, 75, 11,  1, 81, 71, 77],
        [ 1, 79, 61,  1, 63, 61, 76,  1],
        [ 1, 70, 71, 79, 11,  1, 63, 65]])


In [21]:
print(y_batch)

tensor([[71, 69, 61,  1, 76, 64, 74, 71],
        [ 1, 46, 71, 70, 63, 82, 55,  0],
        [65, 70, 63,  1, 69, 71, 70, 61],
        [61, 68, 29, 57, 70, 60,  1, 66],
        [69, 61,  1, 68, 65, 67, 61,  1],
        [65, 75, 11,  1, 81, 71, 77,  1],
        [79, 61,  1, 63, 61, 76,  1, 81],
        [70, 71, 79, 11,  1, 63, 65, 74]])


## Model Architecture

In [22]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import math
import numpy as np

In [23]:
torch.tril(torch.ones(block_size, block_size))

tensor([[1., 0., 0., 0., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0., 0., 0., 0.],
        [1., 1., 1., 0., 0., 0., 0., 0.],
        [1., 1., 1., 1., 0., 0., 0., 0.],
        [1., 1., 1., 1., 1., 0., 0., 0.],
        [1., 1., 1., 1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1., 1., 1., 1.]])

In [24]:
class AttentionHead(nn.Module):
  def __init__(self, head_size, dropout=0.1):
    super(AttentionHead, self).__init__()
    self.dropout = nn.Dropout(dropout)
    # Inputs k, v, q --> refer to attention is all you need paper
    self.key = nn.Linear(n_embeddings, head_size, bias=False)
    self.value = nn.Linear(n_embeddings, head_size, bias=False)
    self.query = nn.Linear(n_embeddings, head_size, bias=False)

    # Triangular matrix for masking:
    # the first input we attend to first elem, the second we attend to first and second elems and so on
    self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

  def forward(self, x):
    key = self.key(x)
    query = self.query(x)

    attention_scores = torch.matmul(query, key.transpose(-2, -1)) / np.sqrt(query.size(-1))

    # query.size(1) --> first dimension to get only the block_size; -inf for stability
    attention_scores = attention_scores.masked_fill(self.tril[:query.size(1), :query.size(1)] == 0, float("-inf"))
    attention_scores = F.softmax(attention_scores, dim=-1) # along the last dimension
    attention_scores = self.dropout(attention_scores)
    value = self.value(x)
    return torch.matmul(attention_scores, value), attention_scores

In [52]:
class MultiHeadAttention(nn.Module):
  # n_head --> how many attention heads we want to create
  def __init__(self, n_head, head_size, dropout=0.1):
    super(MultiHeadAttention, self).__init__()
    self.heads = nn.ModuleList([AttentionHead(head_size) for i in range(n_head)])
    self.projection = nn.Linear(n_embeddings, n_embeddings)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    # Concat all our heads
    output = torch.cat([h(x)[0] for h in self.heads], dim=-1)
    output = self.projection(output)
    return self.dropout(output)

In [54]:
class FeedForward(nn.Module):
  # d_model --> number of neurons
  def __init__(self, d_model, dropout=0.1):
    super(FeedForward, self).__init__()
    self.linear_1 = nn.Linear(d_model, 6*d_model)
    self.dropout = nn.Dropout(dropout)
    self.linear_2 = nn.Linear(6*d_model, d_model)

  def forward(self, x):
    x = self.linear_1(x)
    x = F.relu(x)
    x = self.linear_2(x)
    return self.dropout(x)

In [55]:
class LayerNormalization(nn.Module):
  def __init__(self, d_model, epsilon=1e-5):
    super(LayerNormalization, self).__init__()
    self.gamma = nn.Parameter(torch.ones(d_model))
    self.beta = nn.Parameter(torch.zeros(d_model))
    self.epsilon = epsilon

  def forward(self, x):
    mean = x.mean(dim=-1, keepdim=True) # x is Pytorch Tensor so can call mean method straight
    std = x.std(dim=-1, keepdim=True) # Standard Deviation
    x = (x - mean) / (std + self.epsilon)
    self.gamma * x * self.beta
    return x

In [56]:
class TransformerBlock(nn.Module):
  def __init__(self, d_model, n_head, dropout=0.1):
    super(TransformerBlock, self).__init__()
    head_size = n_embeddings // n_head
    self.multi_head_attention = MultiHeadAttention(n_head, head_size, dropout)
    self.feed_forward = FeedForward(d_model, dropout)
    self.layer_normalization_1 = LayerNormalization(d_model)
    self.layer_normalization_2 = LayerNormalization(d_model)
    self.dropout_1 = nn.Dropout(dropout)
    self.dropout_2 = nn.Dropout(dropout)

  # Residual connection
  def forward(self, x):
    x_2 = self.layer_normalization_1(x)
    x_2 = self.multi_head_attention(x_2)
    x = x + x_2
    x_2 = self.layer_normalization_2(x)
    x_2 = self.feed_forward(x_2)
    x = x + x_2
    return x

## HYPERPARAMETERS

Change these variables to change the total number of parameters

In [57]:
chars = sorted(list(set(text)))
vocab_size = len(chars)
batch_size = 32
block_size = 32
max_n_iters = 30000
evaluation_interval = 100
evaluation_iterations = 200
learning_rate = 5e-3
device = "cuda" if torch.cuda.is_available() else "cpu"
# device = "mps" --> for mac computers (arm64)

# Parameters:
n_embeddings = 64
n_head = 4
n_layers = 4
dropout = 0.1

## GPT Model

In [58]:
class MiniFakeGPT(nn.Module):
  def __init__(self):
    super().__init__()
    self.token_embedding_table = nn.Embedding(vocab_size, n_embeddings)
    self.position_embedding_table = nn.Embedding(block_size, n_embeddings)
    self.transformer_blocks = nn.Sequential(
        *[TransformerBlock(n_embeddings, n_head, dropout) for i in range(n_layers)]
    )
    self.layer_normalization_forward = nn.LayerNorm(n_embeddings) # Using pytorch implementation instead of ours
    self.linear_mapping_head = nn.Linear(n_embeddings, vocab_size) # linear mapping to attention head

  def forward(self, index, targets=None): # Targets are labels, desired outputs
    B, T = index.shape # Batch and Transformer blocks
    token_embedding = self.token_embedding_table(index)
    positional_embedding = self.position_embedding_table(torch.arange(T, device=device)) # Unique position for each individual input
    x = token_embedding + positional_embedding
    x = self.transformer_blocks(x)
    x = self.layer_normalization_forward(x)
    logits = self.linear_mapping_head(x) # predictions

    if targets is None:
      loss = None
    else:
      B, T, C = logits.shape # dimensions of output based on batches, input size
      logits = logits.view(B*T, C)
      targets = targets.view(B*T)
      loss = F.cross_entropy(logits, targets) # Cross Entropy as loss function
    return logits, loss

  def generate(self, index, max_tokens): # max num of token to generate based on input
    for i in range(max_tokens): # generate tokens until max_tokens is reached
      index_state = index[:, -block_size:]
      logits, loss = self(index_state) # forward pass
      logits = logits[:, -1, :] # Focus on last timestep in the sequence
      probabilities = F.softmax(logits, dim=-1) # distribution: probability of next token
      index_next = torch.multinomial(probabilities, num_samples=1)
      index = torch.cat((index, index_next), dim=1)
    return index

In [59]:
model = MiniFakeGPT()
m = model.to(device)

## Number of parameters in Millions order

In [60]:
print(sum(p.numel() for p in model.parameters()) / 1e6)

0.281066


# Training Loop

In [61]:
@torch.no_grad() # Not to calculate gradients for optimization purposes
def estimate_loss():
  output = {}
  model.eval()

  for split in ['train', 'eval']:
    losses = torch.zeros(evaluation_iterations)
    for i in range(evaluation_iterations):
      x, y = get_batch_data(split)
      logits, loss = model(x, y)
      losses[i] = loss.item()
    output[split] = losses.mean()
  model.train()
  return output

In [62]:
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [66]:
for iter in range(max_n_iters):
  if iter % evaluation_interval == 0 or iter == max_n_iters - 1:
    losses = estimate_loss()
    print(f"step: {iter} - training loss: {losses['train']:.4f} - validation loss: {losses['eval']:.4f}")

  x, y = get_batch_data('train')
  logits, loss = model(x, y) # forward pass
  optimizer.zero_grad(set_to_none=True) # Updating the gradients in order to avoid gradient accumulation
  loss.backward() # backward propagation for updating the weights
  optimizer.step() # next step of learning

KeyboardInterrupt: 

In [67]:
input_char = torch.zeros((1, 1), dtype=torch.long, device = device)

In [None]:
output_text = decode(m.generate(input_char, max_tokens=2000)[0].tolist())

In [None]:
print(output_text)