<a href="https://colab.research.google.com/github/shusank8/GPT/blob/main/GPT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
print("GPT BY SHUSANKET BASYAL")

GPT BY SHUSANKET BASYAL


In [2]:
# GPT BASICS USING SIMPLE NUMBERS

In [3]:
import torch
import torch.nn as nn
from torch.nn import functional as F

In [4]:
# LOADING THE DATASET
# DATASET IS THE SHORT JOKES FROM KAGGLE
import kagglehub
path = kagglehub.dataset_download("abhinavmoudgil95/short-jokes")

print("Path to dataset files:", path)

Path to dataset files: /root/.cache/kagglehub/datasets/abhinavmoudgil95/short-jokes/versions/1


In [5]:
# IMPORTING THE NECESSARY LIBARIES

import os
import pandas as pd
import torch
import torch.nn as nn
from torch.nn import functional as F

In [6]:
# LOOKING WHERE THE FILES HAS BEEN DOWNLOADED
os.listdir(path)

['shortjokes.csv']

In [7]:
# LOADING THE FILE INTO DF
df = pd.read_csv(path+"/shortjokes.csv")
# GETTING ALL THE VALUES IN JOKE COLUMN => RETURNS A LIST
text = df['Joke'].values
# JOINING ALL THE STR VAL IN THE LIST TO GET A SINGLE STR
text = "".join(text)
# GETTING THE UNIQUE CHAR PRESENT IN THE DATASET AND CREATING A VARIABLE VOCAB_SIZE THAT STORES THE LEN OF THE UNIQUE ELEMENTS
char = sorted(list(set(text)))
vocab_size = len(char)
# SIMPLE ENCODER, DECODER
# CREATING A HASMAP THAT MAPS STRING TO ID AND VICE VERSA
stringtoid = {sti:i for i,sti in enumerate(char)}
idtostring = {i:sti for i, sti in enumerate(char)}
# USING THE CREATED HASMAP TO CREATER ENCODER AND DECODER
encode = lambda x : [stringtoid[i] for i in x]
decode = lambda x: "".join([idtostring[i] for i in x])
# ENCODING THE TEXT
text = torch.tensor(encode(text), dtype=torch.long)
# CREATING TRAIN AND VAL SIZE
n = int(0.8*len(text))
train = text[0:n]
val = text[n:]

In [8]:
# This function creates batches of data for training or validation.
# It selects random starting points, extracts sequences of a given length (block_size), and prepares input (x) and target (y) tensors for a model.

def generate_batch(split, batch_size, block_size):
  data = train if split =='train' else val
  idx = torch.randint(0, len(data)-block_size, (batch_size, ))
  x = torch.stack([data[i:i+block_size] for i in idx])
  y = torch.stack([data[i+1:i+1+block_size] for i in idx])
  return x,y


In [9]:
# This function estimates the model's loss on the validation set by running 64 mini-batches through it.
# It calculates cross-entropy loss for each batch and returns the average loss, temporarily switching the model to evaluation mode for accurate assessment.

def estimate_loss(model, vocab_size, batch_size, block_size):

  model.eval()

  losses = torch.zeros(64)
  for _ in range(64):
    x,y = generate_batch('val', batch_size, block_size)
    x = x.to('cuda')
    y = y.to('cuda')
    logits = model(x)
    logits = logits.reshape(-1, vocab_size)
    y = y.view(-1)
    loss = F.cross_entropy(logits, y)
    losses[_] = loss.item()
  model.train()
  return losses.mean()


In [10]:
# global_variables

block_size = 64
batch_size = 64
epoch = 5000
embdim = 64
no_of_head = 8
head_dim = embdim//no_of_head
device = 'cuda'



In [11]:
class Head(nn.Module):
  def __init__(self):
    super().__init__()
    self.query = nn.Linear(embdim, head_dim, bias = False)
    self.key = nn.Linear(embdim, head_dim, bias = False)
    self.value = nn.Linear(embdim, head_dim, bias = False)
    self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))

  def forward(self, x):
    B,T,C = x.shape
    q = self.query(x)
    k = self.key(x)
    v = self.value(x)
    wei = q@k.transpose(-2,-1)

    wei = wei.masked_fill(self.tril[:T, :T]==0, float("-inf"))
    wei = F.softmax(wei, dim=-1)
    v = self.value(x)
    out = wei@v

    return out


In [12]:
class MultiHeadAttention(nn.Module):

  def __init__(self):
    super().__init__()
    self.heads = nn.ModuleList([Head() for _ in range(no_of_head)])
    self.proj = nn.Linear(embdim, embdim)

  def forward(self, x):
    out =  torch.cat([h(x) for h in self.heads], dim=-1 )
    out = self.proj(out)
    return out

In [13]:
class FeedForward(nn.Module):

  def __init__(self):
    super().__init__()
    self.net = nn.Sequential(
        nn.Linear(embdim, 3*embdim),
        nn.ReLU(),
        nn.Linear(3*embdim, embdim)
    )

  def forward(self,x):
    return self.net(x )


In [14]:
class Block(nn.Module):

  def __init__(self):
    super().__init__()
    self.sa = MultiHeadAttention()
    self.ffwd = FeedForward()
    self.ln1 = nn.LayerNorm(embdim)
    self.ln2 = nn.LayerNorm(embdim)

  def forward(self, x):
    x = x+self.sa(self.ln1(x))
    x = x+self.ffwd(self.ln2(x))
    return x


In [15]:
class GPT(nn.Module):

  def __init__(self):
    super().__init__()
    self.embeddings = nn.Embedding(vocab_size, embdim)
    self.positionalEmbeddings = nn.Embedding(block_size, embdim)
    # self.sa_heads = MultiHeadAttention()
    # self.ffd = FeedForward()
    self.blocks = nn.Sequential(
        Block(),
        Block(),
        Block(),
        nn.LayerNorm(embdim)
    )
    self.lm_head = nn.Linear(embdim, vocab_size)

  def forward(self, x):
    # x shape => (B,T)
    tok_emb = self.embeddings(x)
    B,T,C = tok_emb.shape
    # x shape => (B,T,C)
    pos_emb = self.positionalEmbeddings(torch.arange(T, device = device))
    x = tok_emb+pos_emb
    # x = self.sa_heads(x)
    # x = self.ffd(x)
    x = self.blocks(x)
    x = self.lm_head(x)
    return x

In [16]:
model = GPT()
for p in model.parameters():
  if p.dim()>=2:
    torch.nn.init.xavier_uniform_(p)
model = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr = 1e-4)

In [17]:
# This function trains the GPT for a given number of epochs.
# It generates training batches, computes the loss using cross-entropy, updates the model’s weights using backpropagation, and periodically estimates and
# prints the validation loss every 10 epochs.


def train_model(epochs, batch_size):
  for _ in range(epochs):
    x,y = generate_batch('train', batch_size = batch_size, block_size=block_size)
    x = x.to('cuda')
    y = y.to("cuda")
    logits = model(x)
    logits = logits.view(-1, vocab_size)
    y = y.view(-1)
    loss = F.cross_entropy(logits, y)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
    if _ % 500==0:
      lo = estimate_loss(model, vocab_size, batch_size, block_size)
      print(lo.item())

In [27]:
train_model(epoch, batch_size)

2.0781712532043457
2.0694496631622314
2.044644832611084
2.0426087379455566
2.0376267433166504
2.0250625610351562
2.019266128540039
2.017360210418701
2.014068365097046
1.9938020706176758


In [28]:
# This function generates tokens using the trained model.
# Starting from a given input, it predicts the next token, samples from the probability distribution, appends it to the sequence,
# and continues for max_tok steps without updating gradients.

def generatetok(model, start, max_tok):
  with torch.no_grad():
    for _ in range(max_tok):
      starti = start[:, -block_size:]
      logits = model(starti)
      prob = logits[:,-1,:]
      prob = F.softmax(prob, dim=-1)
      lo = torch.multinomial(prob, num_samples=1)
      start = torch.cat([start, lo], dim=1)
  return start


In [29]:
import os
# os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
# INITIALIZING THE START AS 0
start = torch.zeros([1,1], device='cuda',dtype=torch.long)

In [30]:
# GENERATING FROM THE MODEL
out = generatetok(model, start, 200)

In [31]:
# output
decode(out[0].tolist())

'\x08y che red.I dold fave and day. "I car an a swell a clups look? Butes ut to get 6 just friends aggh new don smicub? He drig-mard mrong decy! Yessicent today? Tomes bick up peops Turr new *kider" stans '

In [32]:
print("NEXT THING: SCALE UP; ADD DROP OUT")

NEXT THING: SCALE UP; ADD DROP OUT
