## Building a GPT
In this project, we are going to build GPT from scratch and train it on character level language model.
In the end, our Model will be able to generate text given some input character/characters.

![GPT](https://lh5.googleusercontent.com/4I7UBohLcfxXF-p-ioudPoHPGuUB-tu0A4gjRm7jsN-QGSBFbKSeSCRATK2l_QBNDLWcHmi1cKa2TxLJIhy-c-NQ4fqys0jkj8gupXmIWHdFoymkq4m-o86dC85BAX3w9wHDwIWZY68Ae_6MT5A22yQ)

In [6]:
import torch
import torch.nn as nn
from torch.nn import functional as F
!pip install -q tiktoken
import tiktoken

In [7]:
enc = tiktoken.get_encoding('gpt2')
print("Vocab size:", enc.n_vocab)
vocab_size = enc.n_vocab

Vocab size: 50257


In [8]:
# hyperparameters
batch_size = 32 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 5000
eval_interval = 250
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 50
n_embd = 384
n_head = 6
n_layer = 6
dropout = 0.2
# ------------

Getting the data from our Github repository to train our model.

Our data is a transcript of a Supreme Court case arguments. 

In [9]:
! wget https://raw.githubusercontent.com/pythonpypy/Character_Level_Language_Model/main/supreme.txt

'wget' is not recognized as an internal or external command,
operable program or batch file.


In [10]:
# read it in to inspect it
with open('/content/9supreme.txt', 'r', encoding='utf-8') as f:
    text = f.read()

FileNotFoundError: [Errno 2] No such file or directory: '/content/supreme.txt'

In [None]:
print("length of dataset in characters: ", len(text))

In [None]:
# let's look at the first 1000 characters
print(text[:1000])

In [None]:
# here are all the unique characters that occur in this text
chars = sorted(list(set(text)))
vocab_size = len(chars)
print(''.join(chars))
print(vocab_size)

In [None]:
torch.manual_seed(1337)
with open('/content/supreme.txt', 'r', encoding='utf-8') as f:
    text = f.read()

data = enc.encode(text)
tokens = sorted(list(set(data)))
vocab_size = len(tokens)

ttoi = { t:i for i,t in enumerate(tokens) }
itot = { i:t for i,t in enumerate(tokens) }

encode = lambda t: [ ttoi[it] for it in t]
decode = lambda l: ''.join([enc.decode([itot[i] for i in l])])
print(vocab_size)


In [None]:
# Train and test splits
data = torch.tensor(encode(data), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]
print(train_data[:20])

Here, we define a get_batch function that generates a small batch of inputs and targets for the training or validation sets. The function selects a random block of text of length block_size from the input sequence and uses it as the input. The target sequence is the same as the input sequence, but shifted by one character. The function returns the input and target sequences as PyTorch tensors.

In [None]:
# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

Here we define a estimate_loss function that is used to estimate the loss on the training and validation sets. 

The function generates a small batch of data and computes the loss using the model instance, which is an instance of the Block class. The loss is averaged over eval_iters batches to get a more accurate estimate of the loss. The model.eval() and model.train() calls are used to put the model in evaluation and training modes, respectively.

In [None]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

In [None]:
class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B,T,C = x.shape
        k = self.key(x)   # (B,T,C)
        q = self.query(x) # (B,T,C)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * C**-0.5 # (B, T, C) @ (B, C, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,C)
        out = wei @ v # (B, T, T) @ (B, T, C) -> (B, T, C)
        return out


In [None]:
class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out


In [None]:
class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)


The Block class is the main Transformer block that combines communication and computation.

The Block class takes two arguments: n_embd is the embedding dimension, and n_head is the number of heads we'd like. The block first performs self-attention over the input, and then it passes the output through a feedforward network. The LayerNorm classes are used to normalize the output of the self-attention and feedforward layers, respectively.

In [None]:
class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x



Here we define a simple bigram language model using a transformer-based architecture. 

The model takes as input a sequence of tokens represented as integer indices, and outputs logits for the next token in the sequence at each position. 

The architecture consists of a token embedding layer, a position embedding layer, a stack of transformer blocks, a final layer normalization layer, and a linear projection layer to obtain logits.

The forward method takes as input the input indices and optional target indices, and returns the logits and loss if target indices are provided. The loss is calculated using cross-entropy loss between the logits and target indices.

The generate method takes as input an initial sequence of indices and a maximum number of new tokens to generate. The method repeatedly predicts the next token given the current context, samples from the predicted distribution, and appends the sampled token to the context until the maximum number of tokens is reached. The final sequence of indices is returned.

Overall, this is a simple implementation of a bigram language model using a transformer-based architecture. However, it only considers the previous token as context, which may limit its performance compared to models that consider longer context windows.

In [None]:
# super simple bigram model
class BigramLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx


In [None]:
model = BigramLanguageModel()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)


In [None]:
for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
        print("---------------------")
        context = torch.zeros((1, 1), dtype=torch.long, device=device)
        print(decode(m.generate(context, max_new_tokens=200)[0].tolist()))
        print("---------------------")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()


In [None]:
# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))
open('/content/supreme.txt', 'w').write(decode(m.generate(context, max_new_tokens=10000)[0].tolist()))


In [None]:
context = torch.ones((1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))

In [None]:
context = torch.randint(low=0, high=10, size=(1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=500)[0].tolist()))

**CONCLUSION**

After Training the GPT model on transcripts of the supreme court trials our model was able to generate new transcripts.