# GPT Transformer from Scratch using PyTorch

#### The goal is to create a GPT model trained on Andrew Huberman's podcast transcripts and create resonable output tokens. We will be using the self-attention architecture.

<img src="./images/transformer_architecture.png" alt="architecture_image" width="600" height="600"/>

In [1]:
#print python version being used
import sys
sys.version

'3.8.7 (default, Jan 25 2021, 11:14:52) \n[GCC 5.5.0 20171010]'

In [2]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import mmap
import random
import pickle
import re

device = 'cuda' if torch.cuda.is_available() else 'cpu'

  return torch._C._cuda_getDeviceCount() > 0


<b>First, we will declare the variables and parameters necessary to model the data and train the model. Batch is the number of training samples processed per iteration, block is the length of sequences used. All of these parameters may be changed and optimized.</b>

In [3]:
#parameters
batch = 32
block = 128
max_iters = 1000
learning_rate = 2e-5
eval_iters = 100 
n_embd = 384 #number of features
n_head = 4 
n_layer = 4
dropout = 0.2

tot_ep = 95
episode_nos = range(1, tot_ep)

#train/val split
val = 0.2
val_eps = random.sample(episode_nos, int(val * tot_ep))
train_eps = list(set(episode_nos) - set(val_eps))

In [4]:
def _tokenizer(text):
    #re to find words
    pattern = r"\b\w+\b"
    
    tokens = re.findall(pattern, text)
    return tokens

In [5]:
def load_episodes_word(split, words):
    eps = train_eps if split == 'train' else val_eps
    #setup for data read
    path = 'transcripts//Episode-'
    extension = '.txt'
    
    split_len = 0
    
    episodes = []
    for ep in eps:
        with open(path + str(ep) + extension) as f:
            #combine the episode into one string
            content = [line.split(' ', 1)[-1].strip().lower() for line in f.readlines()] #list of lines
            content = re.sub(r'[^A-Za-z0-9 ]+', '', ' '.join(content)) #one string
            
            #add any new tokens to words
            tokens = _tokenizer(content)
            length = len(tokens)
            #add length
            split_len += length
            
            words = list(set(words).union(set(tokens)))
            
            episodes.append(content)
    return episodes, split_len, words
            

In [6]:
#train_content = ' '.join(load_episodes_char(train_eps))
#val_content = ' '.join(load_episodes_char(val_eps))
words = []
train_content, train_len, words = load_episodes_word('train', words)
val_content, val_len, words = load_episodes_word('val', words)

train_content = _tokenizer(' '.join(train_content))
val_content = _tokenizer(' '.join(val_content))

In [7]:
train_content[:20]

['welcome',
 'to',
 'the',
 'huberman',
 'lab',
 'podcast',
 'where',
 'we',
 'discuss',
 'science',
 'and',
 'sciencebased',
 'tools',
 'for',
 'everyday',
 'life',
 'im',
 'andrew',
 'huberman',
 'and']

<b>Because language models are computations done on numbers, we must convert any characters into values to use in these computations. We will stick to lowercase alphanumeric characters for this simple model, but the characterset may be as large as needed. We use a simple enumeration on the characters present, but more advanced models may use tokenizers and word embeddings.</b>

In [8]:
#character encoding
#chars = sorted(list(set(train_content).union(set(val_content))))

#create mappings

#character-level
#chtoi = {ch : i for i, ch in enumerate(chars)}
#itoch = {i : ch for i, ch in enumerate(chars)}

#word_level
wtoi = {w : i for i, w in enumerate(words)}
itow = {i : w for w, i in wtoi.items()}


#simple encodings
encode = lambda chunk : [wtoi[word] for word in chunk]
decode = lambda code : ' '.join([itow[i] for i in code])

<b>To get the tensors of encoded characters, we create the following functions that obtain random chunks from the data then draw batch size number of random samples from these chunks of length block size.</b>

In [20]:
def get_random_chunk(split):
    #returns random chunk of data from split using word tokens
    data = train_content if split == 'train' else val_content
    size = train_len if split == 'train' else val_len
    
    start = random.randint(0, size - block * batch) #must start far enough back
    
    chunk = data[start : start + (block*batch)]
    data = torch.tensor(encode(chunk), dtype=torch.long)
    
    return data

def get_batch(split):
    #get random batches with expected outputs
    chunk = get_random_chunk(split)
    ix = torch.randint(len(chunk) - block, (batch,)) #tensor of random starting points
    x = torch.stack([chunk[i : i+block] for i in ix]) #tensor of sequences
    y = torch.stack([chunk[i+1 : i+block+1] for i in ix]) #tensor of sequences plus next char
    x, y = x.to(device), y.to(device)
    return x, y

<b>Now we move on to the transformer architecture. The head module is the basic building block of the multihead attention architecture, which we will discuss in more detail in a moment. The head computes attention weights between the input encodings and produces a context vector. 

To do this, we apply linear transformations to the data before masking it. Masking via torch.tril essentially prevents lookahead bias, so the model can only look at previous time steps but not future ones. Softmax then creates probabilities from the output that the model can then use for generating values.</b>

In [10]:
class Head(nn.Module):
    '''one head of self-attention'''
    
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        
        self.register_buffer('tril', torch.tril(torch.ones(block, block)))
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        
        weight = q @ k.transpose(-2, -1) * k.shape[-1]**-0.5
        weight = weight.masked_fill(self.tril[: T, : T] == 0, float('-inf'))
        weight = F.softmax(weight, dim=-1)
        weight = self.dropout(weight)
        
        v = self.value(x)
        out = weight @ v
        return out

<b>The most prevelant part of this architecture is the multihead attention module. The core idea of this is that each head is learning different representations of the input from different perspectives. These outputs are then concatenated and we set 20% of the inputs to 0 for regularization to introduce noise and make the model more robust via the nn.Dropout method.</b>

In [11]:
class MultiHeadAttention(nn.Module):
    """multiple heads of self-attn in parallel"""
    
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

In [12]:
class FeedForward(nn.Module):
    """Linear layer followed by nonlinearity"""
    
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
                                nn.Linear(n_embd, 4 * n_embd),
                                nn.ReLU(),
                                nn.Linear(4 * n_embd, n_embd),
                                nn.Dropout(dropout)
                                )
        
    def forward(self, x):
        return self.net(x)

<b>Within the block, we have our multihead attention function as well as a residual connection with a feedforward network. Essentially what this does is retain some of the data without transformations and add it to the transformed data. This allows for the model to "remember" some information as most information is actually lost in the transformations. We see this take place in the 'forward' method of the Block class.

The feedforward method, as we see above, is a sequentially layered transformation that applies two linear functions outside of a ReLU function. For those who do not know, ReLU zeros out negative entries and retains positive ones. This is useful in introducing nonlinearity and mitigating the vanishing gradient problem. The vanishing gradient problem is an issue where in training the model, the sequence length increases, which prevents the weights from being updated effectively.</b>

In [13]:
class Block(nn.Module):
    """Transformer Block"""
    
    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embd)
        #normalizations
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)
        
    def forward(self, x):
        y = self.sa(x)
        x = self.ln1(x + y)
        y = self.ffwd(x)
        x = self.ln2(x + y)
        return x

In [14]:
class GPTHuberman(nn.Module):
    def __init__(self, charset_size):
        super().__init__()
        self.token_emb_tbl = nn.Embedding(charset_size, n_embd)
        self.pos_emb_tbl = nn.Embedding(block, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head)])
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, charset_size)
        
        #initialize weights for each submodel
        self.apply(self._init_weights)
        
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                nn.init.zeros_(module.bias)
                
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean=0.0, std=0.02)
            
    def forward(self, index, targets=None):
        B, T = index.shape
        
        #index and target are (B, T) tensor of ints
        tok_emb = self.token_emb_tbl(index)
        pos_emb = self.pos_emb_tbl(torch.arange(T, device=device))
        x = tok_emb + pos_emb
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)
        
        if targets is None:
            loss = None
            
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)
            
        return logits, loss
    
    def generate(self, index, new_tok):
        for _ in range(new_tok):
            index_crop = index[:, -block :]
            #get predictions
            logits, loss = self.forward(index_crop)
            logits = logits[:, -1, :] #last time step
            probs = F.softmax(logits, dim=-1)
            index_next = torch.multinomial(probs, num_samples=1)#next token
            index = torch.cat((index, index_next), dim=1)
            
        return index

In [15]:
model = GPTHuberman(len(words))
m = model.to(device)

In [16]:
@torch.no_grad()
def est_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model.forward(X, targets=Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [21]:
#optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for _iter in range(max_iters):
    if _iter % eval_iters == 0:
        losses = est_loss()
        print(f"Iteration: {_iter}, Train Loss: {losses['train']:.3f}, Val Loss: {losses['val']:.3f}")
    
    xb, yb = get_batch(train_content)
    
    logits, loss = model.forward(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
print(loss.item())

Iteration: 0, Train Loss: 10.273, Val Loss: 10.271
Iteration: 100, Train Loss: 9.419, Val Loss: 9.391
Iteration: 200, Train Loss: 7.915, Val Loss: 7.832
Iteration: 300, Train Loss: 7.220, Val Loss: 7.116
Iteration: 400, Train Loss: 6.861, Val Loss: 6.772
Iteration: 500, Train Loss: 6.730, Val Loss: 6.597
Iteration: 600, Train Loss: 6.652, Val Loss: 6.534
Iteration: 700, Train Loss: 6.642, Val Loss: 6.496
Iteration: 800, Train Loss: 6.610, Val Loss: 6.471
Iteration: 900, Train Loss: 6.610, Val Loss: 6.417
6.364140510559082


In [26]:
#prompt
user_input = ''
print("Welcome to the GPTHuberman Prompt. Enter 'STOP' to quit.")
while user_input != 'STOP':
    user_input = input("Prompt: ")
    
    if user_input == 'STOP':
        break
        
    data = [word.lower() for word in user_input.split()]
    context = torch.tensor(encode(data), dtype=torch.long, device=device)
    generated_chars = decode(m.generate(context.unsqueeze(0), new_tok=25)[0].tolist())
    print(generated_chars)

Welcome to the GPTHuberman Prompt. Enter 'STOP' to quit.
Prompt: cell
cell great ingesting with some of for off mechanism newborn to a pretty wakes actually old and who back next is the per makes the i
Prompt: STOP


<b>So, what are some improvements that we can make on this model? For one, you will notice that if you do not enter a word that has been tokenized, the generate function throws an error. To remedy this, we could introduce a second model that tokenizes and trains at the character level. There are several options to choose from here and more research is required to determine the best method. Furthermore, we see that the sentences generated are gramatically incorrect. Improvement here requires establishing rules, assigning words to parts of the sentence (verb, noun, adjective), and training the model to follow these rules. More research is needed to determine how to add this to the algorithm.</b>

### Sources
- [Huberman Podcast Transcript Data](https://www.kaggle.com/datasets/piyusharma/andrew-huberman-podcast-transcripts-95-episodes?resource=download)
- [LLM Transformer Architecture Overview](https://www.youtube.com/watch?v=UU1WVnMk4E8)
- [Base Model Repo](https://github.com/Infatoshi/fcc-intro-to-llms/blob/main/gpt-v1.ipynb)