<a href="https://colab.research.google.com/github/omerahmed12345elhussien/GPT_Implementation/blob/main/GPT_Implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import packages

In [None]:
from collections import defaultdict
import torch
from torch import Tensor
from torch import Tensor
from typing import *
import torch.nn as nn
from torch.nn import functional as F
device = "cuda" if torch.cuda.is_available() else "cpu"

# Full implementation

## Hyper-parameters

In [None]:
#The ratio of traing to validation datasets
split_portion=0.9
#The number of independent sequences that can be processed in parallel
batch_size=64
#The size of the context
block_size=256
num_embed=384
#The number of head in the self-attention
num_head=6
#The number of blocks of the decoder
num_layer=6
#The ratio of weights to be set to zero. For overfitting
dropout_value=0.2
eval_iters = 500
eval_interval = 1000
learning_rate=3e-4
num_epochs=5000

## Functions + Classes

In [None]:
#Set the random seeds
torch.manual_seed(40)

#Downloading the data
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

#Reading the data
with open('input.txt','r',encoding='utf-8') as file:
  content=file.read()

#Determine the unique characters in input.txt
unq_char= []
for char in content:
  if char not in unq_char:
    unq_char.append(char)
unq_char=sorted(unq_char)
#The vocabulary size.
voc_size=len(unq_char)

#Mapping the data from characters to inegers.
char_to_int=defaultdict(lambda: 0)
int_to_char=defaultdict(lambda: '')
for idx, char in enumerate(unq_char):
  char_to_int[char]=idx
  int_to_char[idx]=char

# The encoder: takes a string, and output a list of integers.
encoder=lambda inp: [char_to_int[element] for element in inp]
#The decoder: takes a list of integers, and ouput a string.
decoder= lambda inp: ''.join([ int_to_char[numb] for numb in inp])
#Encode the all data and store it in a tensor
data_enc=torch.tensor(encoder(content)).long()

#Split the data to train, validation
split_size=int(split_portion*len(data_enc))
train_data=data_enc[:split_size]
valid_data=data_enc[split_size:]

#Data loader
def gen_batch(inpt: str)->tuple:
  """
  gen_batch: accepts a string train or valid
  It returns: a tuple of two tensors of context x, and target y.
  """
  req_data=train_data if inpt=='train' else valid_data
  idx=torch.randint(len(req_data)-block_size,size=(batch_size,))
  x= torch.vstack([req_data[i:i+block_size] for i in idx])
  y= torch.vstack([req_data[i+1:i+block_size+1] for i in idx])
  return x.to(device),y.to(device)

#Estimate loss function
@torch.no_grad()
def estimate_loss_fun(model)->dict:
  #Initalize our output dictionary
  output={}
  model.eval()
  for split in ['train', 'val']:
    losses=torch.zeros(eval_iters)
    for num in range(eval_iters):
      context,target=gen_batch(split)
      logit,loss=model(context,target)
      losses[num]=loss.item()
    output[split]=losses.mean()
  model.train()
  return output

#Single Head Implementation
class Head(nn.Module):
  """Single head of self-attention"""
  #Class constructor
  def __init__(self,head_size:int)->None:
    super().__init__()
    #Linear projections
    self.key=nn.Linear(num_embed,head_size,bias=False)
    self.query=nn.Linear(num_embed,head_size,bias=False)
    self.value=nn.Linear(num_embed,head_size,bias=False)
    self.register_buffer('low_tri',torch.tril(torch.ones(block_size,block_size)))
    self.dropout=nn.Dropout(dropout_value)

  def forward(self,tok_emb:Tensor)->Tensor:
    Batch_size, Block_size, num_embed=tok_emb.shape
    #k,q, and v of size (Batch_size, Block_size, num_embed)
    k=self.key(tok_emb)
    q=self.query(tok_emb)
    v=self.value(tok_emb)
    #Calculate attention scores
    weight=q@k.transpose(-2,-1)* num_embed**-0.5 #(Batch_size, block , num_embed) @ (Batch_size, num_embed, block) ----> (Batch_size, num_embed, num_embed)
    weight=weight.masked_fill(self.low_tri[:Block_size,:Block_size]==0,float('-inf')) #Of size (Batch_size,num_embed,num_embed)
    weight=F.softmax(weight,dim=-1) #Of size (Batch_size,num_embed)
    output=weight@v # (Batch_size,num_embed) @ (Batch_size, Block_size, num_embed) ----> (Batch_size, Block_size, num_embed)
    return output

#Multiple Head class Implementation
class Multi_Head(nn.Module):
  """Multi-head self-attention implementation """
  #class constructor
  def __init__(self,num_head:int, head_size:int)->None:
    super().__init__()
    self.heads= nn.ModuleList([Head(head_size) for i in range(num_head)])
    self.lin_proj=nn.Linear(num_embed,num_embed)
    self.dropout=nn.Dropout(dropout_value)

  def forward(self,tok_emb:Tensor)->Tensor:
    output=torch.cat([head(tok_emb) for head in self.heads],dim=-1)
    output=self.dropout(self.lin_proj(output))
    return output

#Feed forward Networks
class FF_Networ(nn.Module):
  """a simple NN with single linear projection and ReLU"""
  #Class constructor
  def __init__(self,num_embed:int)->None:
    super().__init__()
    self.network=nn.Sequential(
        nn.Linear(num_embed,4*num_embed),
        nn.ReLU(),
        nn.Linear(4*num_embed,num_embed),
        nn.Dropout(dropout_value)
    )

  def forward(self,valu:Tensor)->Tensor:
    return self.network(valu)

#Transformer blocks
class Transf_Block(nn.Module):
  #Class constructor
  def __init__(self,num_embed:int,num_head:int)->None:
    super().__init__()
    head_size=num_embed//num_head
    self.mul_head=Multi_Head(num_head,head_size)
    self.ff_net=FF_Networ(num_embed)
    self.ln1=nn.LayerNorm(num_embed)
    self.ln2=nn.LayerNorm(num_embed)

  def forward(self, x:Tensor)->Tensor:
    #Add residual connection
    x=x+self.mul_head(self.ln1(x))
    #Add residual connection
    x=x+self.ff_net(self.ln2(x))
    return x

#GPT Implementation class
class GPT_Imp(nn.Module):
  #Class constructor
  def __init__(self)->None:
    super().__init__()
    #a lookup table of size (voc_size,num_embed)
    self.token_emb=nn.Embedding(voc_size,num_embed)
    self.position_emb=nn.Embedding(block_size,num_embed)
    self.blocks=nn.Sequential(*[Transf_Block(num_embed,num_head) for i in range(num_layer)])
    self.f_ln=nn.LayerNorm(num_embed)
    self.lin_head=nn.Linear(num_embed,voc_size)

  def forward(self,context:Tensor, target:Tensor=None)->tuple:
    #context and target are of size (Batch_size, Block_size)
    Batch_size, Block_size=context.shape
    tok_emb=self.token_emb(context) #Of size (Batch_size, Block_size, num_embed)
    pos_emb=self.position_emb(torch.arange(Block_size,device=device)) #Of size (Block_size,num_embed)
    sum_tok=tok_emb+pos_emb #Of size (Batch_size, Block_size, num_embed)
    sum_tok=self.blocks(sum_tok)
    sum_tok=self.f_ln(sum_tok)
    logit=self.lin_head(sum_tok) #Of size (Batch_size, Block_size, voc_size)
    if target is None:
      loss=None
    else:
      Batch_size, Block_size, voc_size=logit.shape
      #This reshape step is due to cross_entropy requirement that C:voc_size, should be the second dimension.
      logit=logit.reshape(Batch_size*Block_size, voc_size)
      target= target.reshape(Batch_size*Block_size)
      loss=F.cross_entropy(logit,target)
    return logit, loss

  def generate(self, context:Tensor, max_new_tok:Tensor)->Tensor:
    #The job of this function is to generate next text for the given number of max_new_tok
    for i in range(max_new_tok):
      #Consider only last part of context of size: block_size
      context_cond=context[:,-block_size:]
      #Make prediction
      logit,loss =self(context_cond)
      #For the case of bigram, focus on the last element of the block_size. So, we have shape (Batch_size, voc_size)
      logit=logit[:,-1,:]
      #Apply softmax
      prob=F.softmax(logit,dim=-1)
      # Take a sample. We get shape (Batch_size,1)
      context_next= torch.multinomial(prob, num_samples=1)
      #Append context_next to context. The new shape is (Batch_size,Block_size+1)
      context=torch.hstack((context,context_next))
    return context

--2023-06-25 20:50:53--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2023-06-25 20:50:53 (158 MB/s) - ‘input.txt’ saved [1115394/1115394]



In [None]:
gpt_ob= GPT_Imp().to(device)
#Optimizer
optimizer = torch.optim.AdamW(gpt_ob.parameters(), lr=learning_rate)

In [None]:
#Training loop
for iter in range(num_epochs):

  #Evaluate the loss on the train and validation datasets
  if iter % eval_interval == 0 or iter == num_epochs - 1:
      loss = estimate_loss_fun(gpt_ob)
      print(f"step {iter}: train loss {loss['train']:.4f}, val loss {loss['val']:.4f}")
  #Sample from the data with the given batch_size
  context,target=gen_batch('train')
  #Forward pass to get logit and loss
  logit,loss=gpt_ob(context,target)
  # Clear gradients w.r.t. parameters
  optimizer.zero_grad()
  # Getting gradients w.r.t. parameters
  loss.backward()
  # Updates parameters:
  optimizer.step()

step 0: train loss 4.3686, val loss 4.3661
step 1000: train loss 1.5951, val loss 1.7802
step 2000: train loss 1.3357, val loss 1.5834
step 3000: train loss 1.2107, val loss 1.5167
step 4000: train loss 1.1203, val loss 1.5143
step 4999: train loss 1.0403, val loss 1.5389


In [None]:
#Generate new words
print(decoder(gpt_ob.generate(torch.zeros((1,1),device=device).long(),3000)[0].tolist()))


Shepherd by the goods to no need of a king?

BRUTUS:
Neither neighbours, his of my vow; but leave
to meet only.

SICINIUS:
Ineither, sir? us that the true matter was
two beastles of
The triumphant and and his shamed thrusts piece
To thy gainsays, we'll raze a modester of hours!

Third Servingman:
In all, you and the last which wrought of devotion
That I would live,
This were from of hand, somethicable, that I ould:
In Somer Clifford you and Mowbray, hence,
Even from me; thou hast have left the heart of it
Read mile may me. I, there's so ink to the
farmer than stand her honesty 'Work both taken; and if
Dare leanness princess, and that close of him
Which manner came complaints with us.

LORD STERSTEY:
I say, if you unhadame,
Your cannot broud him to back blindards blowled:
Starr'd him, you cousin his segrance years,
And, and s, you play I more than you might shall
To redress that do far marry and all the guilder,
But low'd those and than you are much to-day,
To badgare the provoke man. 

#References
We implement this code following: https://www.youtube.com/watch?v=kCc8FmEb1nY&ab_channel=AndrejKarpathy
