<a href="https://colab.research.google.com/github/prapti2024/LLM_from_scratch/blob/main/gpt_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## TOKENIZATION


Import tiktoken


In [None]:
import torch

In [None]:
!pip3 install tiktoken

Collecting tiktoken
  Downloading tiktoken-0.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Downloading tiktoken-0.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m14.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tiktoken
Successfully installed tiktoken-0.9.0


In [None]:
import importlib
import tiktoken
print("tiktoken version:", importlib.metadata.version("tiktoken"))

tiktoken version: 0.9.0


In [None]:
tokenizer = tiktoken.get_encoding("gpt2")

Implementing a DataLoader

In [None]:
from torch.utils.data import Dataset, DataLoader


class GPTDatasetV1(Dataset):
    def __init__(self, txt, tokenizer, max_length, stride):
        self.input_ids = []
        self.target_ids = []

        # Tokenize the entire text
        token_ids = tokenizer.encode(txt, allowed_special={"<|endoftext|>"})

        # Use a sliding window to chunk the book into overlapping sequences of max_length
        for i in range(0, len(token_ids) - max_length, stride):
            input_chunk = token_ids[i:i + max_length]
            target_chunk = token_ids[i + 1: i + max_length + 1]
            self.input_ids.append(torch.tensor(input_chunk))
            self.target_ids.append(torch.tensor(target_chunk))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]

In [None]:
def create_dataloader_v1(txt, batch_size=4, max_length=256,
                         stride=128, shuffle=True, drop_last=True,
                         num_workers=0):

    # Initialize the tokenizer
    tokenizer = tiktoken.get_encoding("gpt2")

    # Create dataset
    dataset = GPTDatasetV1(txt, tokenizer, max_length, stride)

    # Create dataloader
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        drop_last=drop_last,
        num_workers=num_workers
    )

    return dataloader

Creating Token Embeddings

In [None]:
vocab_size = 50257
output_dim = 256
context_length = 4

token_embedding_layer = torch.nn.Embedding(vocab_size, output_dim)

In [None]:
print(token_embedding_layer.weight)


Parameter containing:
tensor([[-0.4384, -0.6734, -0.5318,  ..., -1.6842,  0.0568, -1.7278],
        [-1.8346,  0.2576, -1.6789,  ...,  0.1348, -1.0571,  1.2703],
        [-0.5268,  1.3099,  0.5383,  ...,  0.0892,  0.2141, -1.4680],
        ...,
        [ 1.1078, -0.1039,  1.0237,  ...,  0.4691, -0.0714,  1.6100],
        [ 1.8363,  0.3762, -0.9555,  ...,  0.7549, -1.0260, -0.2873],
        [-0.2544, -0.3655,  0.8160,  ...,  0.0540, -0.6048, -0.2044]],
       requires_grad=True)


In [None]:
token_embeddings = token_embedding_layer(torch.arange(context_length))
print(token_embeddings.shape)

torch.Size([4, 256])


Positional embeddings


In [None]:
token_embedding_layer = torch.nn.Embedding(vocab_size, output_dim)

In [None]:
pos_embedding_layer = torch.nn.Embedding(context_length, output_dim)

In [None]:
pos_embeddings = pos_embedding_layer(torch.arange(context_length))
print(pos_embeddings.shape)

torch.Size([4, 256])


In [None]:
input_embeddings = token_embeddings + pos_embeddings
print(input_embeddings.shape)

torch.Size([4, 256])


Creating an instance of data loader.

# GPT-2 BASED TRANSFORMER BLOCK

In [None]:
GPT_CONFIG_124M = {
    "vocab_size" : 50257,
    "context_length" : 1024,
    "emb_dim" : 768,
    "n_heads": 12, #no of attention heads
    "n_layers" : 12, #no_of transformers
    "drop_rate" : 0.1, #10% of neurons are set to 0
    "qkv_bias" :  False #not required bias term rn
}

**Layer Normalization class**

In [None]:
import torch
from torch import nn

In [None]:
class LayerNorm(nn.Module):
  def __init__(self,emb_dim):
    super().__init__()
    self.eps = 1e-5
    self.scale = nn.Parameter(torch.ones(emb_dim))
    self.shift = nn.Parameter(torch.zeros(emb_dim))

  def forward(self,x):
    mean = x.mean(dim = -1,keepdim = True )
    var = x.var(dim = -1,keepdim = True, unbiased = False)
    norm_x = (x - mean)/torch.sqrt(var + self.eps)
    return self.scale * norm_x + self.shift




**GELU ACTIVATION FUNCTION**

In [None]:
class GELU(nn.Module):
  def __init__(self):
    super().__init__()

  def forward(self,x):
     return 0.5 * x *(1 + torch.tanh(torch.sqrt(torch.tensor(2/torch.pi)) * (x + 0.044715 * torch.pow(x,3))))

**Feed Forward Layer**

In [None]:
class FeedForward(nn.Module):
  def __init__(self,cfg):
    super().__init__()
    self.layers = nn.Sequential(
        nn.Linear(cfg["emb_dim"],4*cfg["emb_dim"]),
        GELU(),
        nn.Linear(4*cfg["emb_dim"],cfg["emb_dim"]),
    )

  def forward(self,x):
    return self.layers(x)


Multihead Attention Class

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        assert (d_out % num_heads == 0), \
            "d_out must be divisible by num_heads"

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads # Reduce the projection dim to match desired output dim

        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = nn.Linear(d_out, d_out)  # Linear layer to combine head outputs
        self.dropout = nn.Dropout(dropout)
        self.register_buffer(
            "mask",
            torch.triu(torch.ones(context_length, context_length),
                       diagonal=1)
        )

    def forward(self, x):
        b, num_tokens, d_in = x.shape

        keys = self.W_key(x) # Shape: (b, num_tokens, d_out)
        queries = self.W_query(x)
        values = self.W_value(x)

        # We implicitly split the matrix by adding a `num_heads` dimension
        # Unroll last dim: (b, num_tokens, d_out) -> (b, num_tokens, num_heads, head_dim)
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)

        # Transpose: (b, num_tokens, num_heads, head_dim) -> (b, num_heads, num_tokens, head_dim)
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)

        # Compute scaled dot-product attention (aka self-attention) with a causal mask
        attn_scores = queries @ keys.transpose(2, 3)  # Dot product for each head

        # Original mask truncated to the number of tokens and converted to boolean
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]

        # Use the mask to fill attention scores
        attn_scores.masked_fill_(mask_bool, -torch.inf)

        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)

        # Shape: (b, num_tokens, num_heads, head_dim)
        context_vec = (attn_weights @ values).transpose(1, 2)

        # Combine heads, where self.d_out = self.num_heads * self.head_dim
        context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)
        context_vec = self.out_proj(context_vec) # optional projection

        return context_vec


**TRANSFORMER BLOCK**

In [None]:
class TransformerBlock(nn.Module):
  def __init__(self,cfg):
    super().__init__()
    self.att = MultiHeadAttention(
        d_in = cfg["emb_dim"],
        d_out = cfg["emb_dim"],
        context_length = cfg["context_length"],
        dropout = cfg["drop_rate"],
        num_heads = cfg["n_heads"],
        qkv_bias=cfg["qkv_bias"]
    )
    self.ff = FeedForward(cfg)
    self.norm1 = LayerNorm(cfg["emb_dim"])
    self.norm2 = LayerNorm(cfg["emb_dim"])
    self.drop_shortcut = nn.Dropout(cfg["drop_rate"])

  def forward(self,x):
        shortcut = x
        x = self.norm1(x)
        x = self.att(x)
        x = self.drop_shortcut(x)
        x = x + shortcut

        shortcut = x
        x = self.norm2(x)
        x = self.ff(x)
        x = self.drop_shortcut(x)
        x = x + shortcut
        return x


In [None]:
torch.manual_seed(123)
x = torch.rand(2, 4, 768)
block = TransformerBlock(GPT_CONFIG_124M)
output = block(x)
print("Input size: \n",x.shape)
print("Output size: \n",output.shape)

Input size: 
 torch.Size([2, 4, 768])
Output size: 
 torch.Size([2, 4, 768])


# CODE ENTIRE GPT-2 MODEL

In [None]:
class GPTModel(nn.Module):
  def __init__(self,cfg):
    super().__init__()
    self.tok_emb=nn.Embedding(cfg["vocab_size"],cfg["emb_dim"])
    self.pos_emb=nn.Embedding(cfg["vocab_size"],cfg["emb_dim"])
    self.drop_emb = nn.Dropout(cfg["drop_rate"])

    self.trf_blocks = nn.Sequential(
        *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]
    )

    self.final_norm = LayerNorm(cfg["emb_dim"])
    self.out_head = nn.Linear(cfg["emb_dim"],cfg["vocab_size"],bias = False)

  def forward(self,in_idx):
      batch_size,seq_len = in_idx.shape
      tok_embeddings = self.tok_emb(in_idx)
      pos_embeddings = self.pos_emb(torch.arange(seq_len,device = in_idx.device))
      x = tok_embeddings + pos_embeddings
      x = self.drop_emb(x)
      x = self.trf_blocks(x)
      x = self.final_norm(x)
      logits = self.out_head(x)
      return logits


In [None]:
batch = torch.tensor([[6109,3626,6100,345],
                      [6109,1110,6622,257]])

In [None]:
torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)
out = model(batch)
print("Input: ",batch)
print("Output Shape : ",out.shape)
print(out)

Input:  tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])
Output Shape :  torch.Size([2, 4, 50257])
tensor([[[-0.0658,  0.2695,  0.2692,  ...,  1.3852, -0.0546, -0.5503],
         [ 0.3035, -0.1901, -0.7108,  ...,  0.3340,  0.5741, -0.2450],
         [-0.1009,  0.3720, -0.3529,  ..., -0.2624, -0.1344,  0.6451],
         [ 0.2338,  0.9179,  0.4180,  ...,  0.6700, -1.1723, -0.1790]],

        [[-0.1588,  0.3306,  0.4211,  ...,  1.3506,  0.4324, -0.5578],
         [ 0.5375,  0.0084, -0.0601,  ...,  0.7958,  0.5095, -0.1663],
         [ 0.0115, -0.1516, -0.1547,  ..., -0.0059, -1.0677,  0.2764],
         [ 0.3840,  0.5324,  0.4590,  ...,  0.8971, -0.8554, -0.2125]]],
       grad_fn=<UnsafeViewBackward0>)


## GENERATING TEXT FROM OUTPUT TOKENS

In [None]:
def generate_text_simple(model,idx,max_new_tokens,context_size):
  #idx is (batch,n_tokens) array of indices

  for _ in range(max_new_tokens):
    idx_cond = idx[:,-context_size:]
    #Takes two indexes, one from last five of first batch and another from last five of second batch
    with torch.no_grad():
      logits = model(idx_cond)
      #This step gets logits, the output of the model

      logits = [:,-1,:] #last row from all batches

      #COnvert logits into probabilities
      probas = torch.softmax(logits, dim = -1)
      #Get the highest probability
      idx_next = torch.argmax (probas, dim = -1, keepdim = True)
      #Append the token ids
      idx = torch.cat((idx,idx_next), dim = 1) # (batch, n_tokens+1)


    return idx




In [None]:
start_context = "Hello, I am"
encoded = tokenizer.encode(start_context)
print("encoded: ", encoded)
encoded_tensor = torch.tensor(encoded).unsqueeze(0)
print("encoded_tensor: ", encoded_tensor)


In [None]:
model.eval()
#model = GPTModel(GPT_CONFIG_124M)
generate_text_simple = (
    model = model,
    idx = encoded_tensor,
    max_new_tokens = 6,
    context_size = GPT_CONFIG_124M["context_length"]
)
print("Output: " out)
print("OutputLength: ", len(out[0]))