In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as f

In [2]:
GPT_CONFIG_124M = {
    'vocab_size': 50257,    # vocabulary size
    'context_length': 1024,  # Context Length
    'emb_dim': 768,         # Embedding dimension
    'n_heads': 12,          # Number of attention heads
    'n_layers': 12,         # Number of layers
    'drop_rate': 0.1,       # Dropout rate
    'qkv_bias': False       # Query-Key-Value bias
}

## GPT2 Model From Scratch

#### Utils

In [3]:
class LayerNorm(nn.Module):
    
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))
    
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift

In [4]:
class GELU(nn.Module):
    
    def __init__(self):
        super().__init__()
    
    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(
            torch.sqrt(torch.tensor(2/torch.pi)) * 
            (x + 0.44715 * torch.pow(x, 3))
        ))

In [5]:
class FeedForward(nn.Module):
    
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(cfg['emb_dim'], 4 * cfg['emb_dim']), ## Expansion
            GELU(),                                        ## Activation
            nn.Linear(4 * cfg['emb_dim'], cfg['emb_dim'])    ## Contraction
        )
    
    def forward(self, x):
        return self.layers(x)

In [6]:
class MultiHeadAttention(nn.Module):
    
    def __init__(self, d_in, d_out, context_length, num_heads, dropout, qkv_bias=False):
        super().__init__()
        assert(d_out % num_heads == 0), 'd_out must be divisible by num_heads'
        
        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads
        
        self.Wq = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.Wk = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.Wv = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = nn.Linear(d_out, d_in) # Linear layer to combine head outputs
        self.droput = nn.Dropout(dropout)
        self.register_buffer(
            'mask',
            torch.triu(torch.ones(context_length, context_length), diagonal=1)
        )
        
    def forward(self, x):
        b, num_tokens, d_in = x.shape
        
        # (b, num_tokens, d_out)
        queries = self.Wq(x)
        keys = self.Wk(x)
        values = self.Wv(x)
        
        # (b, num_tokens, num_heads, head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        
        # (b, num_heads, num_tokens, head_dim)
        queries = queries.transpose(1, 2)
        keys = keys.transpose(1, 2)
        values = values.transpose(1, 2)
        
        attn_scores = queries @ keys.transpose(2, 3) # (b, num_heads, num_tokens, num_tokens)
        
        attn_scores = attn_scores.masked_fill_(self.mask.bool()[:num_tokens, :num_tokens], -torch.inf)
        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)
        
        context_vec = attn_weights @ values
        context_vec = context_vec.transpose(1, 2) # (b, num_tokens, num_heads, head_dim)
        
        context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out) # (b, num_tokens, d_out)
        context_vec = self.out_proj(context_vec) # (b, num_tokens, d_in)
        
        return context_vec

#### Transformer Block

In [7]:
class TransformerBlock(nn.Module):
    
    def __init__(self,cfg):
        super().__init__()
        self.att = MultiHeadAttention(
            d_in = cfg['emb_dim'],
            d_out = cfg['emb_dim'],
            context_length = cfg['context_length'],
            num_heads = cfg['n_heads'],
            dropout = cfg['drop_rate'],
            qkv_bias = cfg['qkv_bias']
        )
        self.ff = FeedForward(cfg)
        self.norm1 = LayerNorm(cfg['emb_dim'])
        self.norm2 = LayerNorm(cfg['emb_dim'])
        self.drop_shortcut = nn.Dropout(cfg['drop_rate'])
    
    def forward(self, x):
        # x.shape: [B, num_tokens, emb_dim]
        shortcut = x 
        x = self.norm1(x)           
        x = self.att(x)            
        x = self.drop_shortcut(x)   
        x = x + shortcut            # Shortcut connection 
        
        shortcut = x
        x = self.norm2(x)
        x = self.ff(x)
        x = self.drop_shortcut(x)
        x = x + shortcut
        
        return x

### GPT-2 Model

In [8]:
class GPTModel(nn.Module):
    
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg['vocab_size'], cfg['emb_dim'])
        self.pos_emb = nn.Embedding(cfg['context_length'], cfg['emb_dim'])
        self.drop_emb = nn.Dropout(cfg['drop_rate'])
        
        self.transformer_block = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg['n_layers'])]
        )
        
        self.final_norm = LayerNorm(cfg['emb_dim'])
        self.out_head = nn.Linear(
            cfg['emb_dim'], cfg['vocab_size'], bias=False
        )
    
    def forward(self, in_idx):
        b, seq_len = in_idx.shape
        tok_embed = self.tok_emb(in_idx)                                        # Token Embeddings
        pos_embed = self.pos_emb(torch.arange(seq_len, device=in_idx.device))   # Positional Embeddings
        x = tok_embed + pos_embed                                               # Input Embeddings
        
        x = self.drop_emb(x)
        x = self.transformer_block(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits

## Predicting the next token

In [9]:
def generate_text_simple(model, idx, max_new_tokens, context_size):
    
    for _ in range(max_new_tokens):
        
        # Crop current context if it exceeds the supported context size,
        # E.g., if LLm support only 5 tokens, and the context size is 10
        # then only the last 5 token are used as context
        idx_cond = idx[:, -context_size:]
        
        with torch.no_grad():
            logits = model(idx_cond) # (B, seq_len, vocab_size)
        
        # Focus only on the last time step
        # (batch, n_tokens, vocab_size) becomes (batch, vocabz_size)
        logits = logits[:, -1, :]
        
        # apply softmax to get probabilities
        probas = torch.softmax(logits, dim=-1)
        
        # Get the idx of the vocab entry with the highest probability value
        idx_next = torch.argmax(probas, dim=-1, keepdim=True) # (batch, 1)
        
        # Append sampled index to the running sequence
        idx = torch.cat((idx, idx_next), dim=1) # (batch, n_tokens+1)
    return idx

In [10]:
import tiktoken
tokenizer = tiktoken.get_encoding('gpt2')

In [11]:
start_context = 'Hello, I am'
encoded = tokenizer.encode(start_context)
print('encoded:', encoded)
encoded_tensor = torch.tensor(encoded).unsqueeze(0)
print('encoded_tensor.shape:', encoded_tensor.shape)

encoded: [15496, 11, 314, 716]
encoded_tensor.shape: torch.Size([1, 4])


In [12]:
model = GPTModel(GPT_CONFIG_124M)
model.eval()

out = generate_text_simple(model=model,
                           idx=encoded_tensor,
                           max_new_tokens=6,
                           context_size=GPT_CONFIG_124M['context_length'])

print('Output:', out)
print('Output length:', len(out[0]))

Output: tensor([[15496,    11,   314,   716, 45855, 44925, 50082, 14349, 12995, 21102]])
Output length: 10


In [13]:
decoded_text = tokenizer.decode(out.squeeze(0).tolist())
decoded_text

'Hello, I ampowder WHITE migraine stabil AlphaMel'

## Coding the LLM Model Outputs

In [14]:
GPT_CONFIG_124M = {
    'vocab_size': 50257,    # vocabulary size
    'context_length': 256,  # Context Length
    'emb_dim': 768,         # Embedding dimension
    'n_heads': 12,          # Number of attention heads
    'n_layers': 12,         # Number of layers
    'drop_rate': 0.1,       # Dropout rate
    'qkv_bias': False       # Query-Key-Value bias
}

torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)
model.eval();

In [15]:
def text_to_token_ids(text, tokenizer):
    encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'})
    encoded_tensor = torch.tensor(encoded).unsqueeze(0) # add batch dimension
    return encoded_tensor

def token_ids_to_text(token_ids, tokenizer):
    flat = token_ids.squeeze(0) # remove batch dimension
    return tokenizer.decode(flat.tolist())

start_context = 'Every effort moves you'
tokenizer = tiktoken.get_encoding('gpt2')

token_ids = generate_text_simple(
    model=model,
    idx=text_to_token_ids(start_context, tokenizer),
    max_new_tokens=10,
    context_size=GPT_CONFIG_124M['context_length']
)

print('Ouptut text:\n', token_ids_to_text(token_ids, tokenizer))

Ouptut text:
 Every effort moves you rentingetic wasnم refres RexAngel infieldcigans


As, we can see above, the model does not produce good text because it has not trained yet.

How do we measue or capture what "good text" is, in a numeric form, to track it during training? `Loss Function`

### Calculating the text generation loss: cross-entropy and perplexity

In [16]:
inputs = torch.tensor([[16833, 3626, 6100],  # ["every effort moves",
                       [40, 1107, 588]])     #  "I really like"]

targets = torch.tensor([[3626, 6100, 345],   # [" effort moves you",
                        [1107, 588, 11311]]) #  " really like chocolate"

In [17]:
with torch.no_grad():
    logits = model(inputs)

probas = torch.softmax(logits, dim=-1)
print(probas.shape)

torch.Size([2, 3, 50257])


In [18]:
token_ids = torch.argmax(probas, dim=-1, keepdim=True)
print('Token IDs:\n', token_ids)
print('\nToken IDs shape:', token_ids.shape)

Token IDs:
 tensor([[[16657],
         [  339],
         [42826]],

        [[49906],
         [29669],
         [41751]]])

Token IDs shape: torch.Size([2, 3, 1])


In [19]:
print(f"Targets batch 1: {token_ids_to_text(targets[0], tokenizer)}")
print(f"Output batch 1: {token_ids_to_text(token_ids[0].flatten(), tokenizer)}")

Targets batch 1:  effort moves you
Output batch 1:  Armed heNetflix


### Cross-entropy loss

In [20]:
probas.shape

torch.Size([2, 3, 50257])

In [21]:
# Extract proba vectors for each probabiltiy score predicted by the initial GPT model given the first text of our inputs
# Then, from those probability vectors get the probabilty scores given to the real target values by the model.
# Our goal is to make those probabilities as close to 1 as possible

text_idx = 0 # Batch No
target_probas_1 = probas[text_idx, [0, 1, 2], targets[text_idx]]
print('Text 1:', target_probas_1)

text_idx = 1
target_probas_2 = probas[text_idx, [0, 1, 2], targets[text_idx]]
print('Text 2:', target_probas_2)

Text 1: tensor([7.2671e-05, 3.1046e-05, 1.1696e-05])
Text 2: tensor([1.0426e-05, 5.4604e-05, 4.7716e-06])


In [22]:
# Compute logarithm of all token probabilities
log_probas = torch.log(torch.cat((target_probas_1, target_probas_2)))
print(log_probas)

tensor([ -9.5296, -10.3800, -11.3563, -11.4712,  -9.8154, -12.2528])


In [23]:
# Calculate the average probability for each token
avg_log_probas = torch.mean(log_probas)
print(avg_log_probas)

tensor(-10.8009)


- The goal is to make this average log probability as large as possible by optimizing the model parameters.
- Due to the log, the largest possible value is 0, and we're currently far away from 0.
- In deep learning, instead of maximing the average log-probability, it's a standard convention to minimize the negative average log-probability.

In [24]:
neg_avg_log_probas = avg_log_probas * -1
print(neg_avg_log_probas)

tensor(10.8009)


In [25]:
print('Logits shape:', logits.shape)
print('Targets shape:', targets.shape)

Logits shape: torch.Size([2, 3, 50257])
Targets shape: torch.Size([2, 3])


In [26]:
logits_flat = logits.flatten(0, 1)
targets_flat = targets.flatten()

print('Flattened logits:', logits_flat.shape)
print('Flattened targets:', targets_flat.shape)

Flattened logits: torch.Size([6, 50257])
Flattened targets: torch.Size([6])


In [27]:
loss = f.cross_entropy(logits_flat, targets_flat)
print(loss)

tensor(10.8009)


#### Perplexity Score

In [28]:
perplexity = torch.exp(loss)
print(perplexity)

tensor(49064.1641)
