In [None]:
!pip install tiktoken



In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F

base_config = {
    "vocab_size": 50257,
    "n_ctx": 1024,
    "n_embd": 768,
    "n_layer": 12,
    "n_head": 12,
}

device = 'cuda' if torch.cuda.is_available() else 'cpu'

class SelfAttentionBlock(nn.Module):
  def __init__(self,config):
     super().__init__()
     self.c_attn = nn.Linear(config["n_embd"],3*config["n_embd"])
     self.c_proj = nn.Linear(config["n_embd"],config["n_embd"])
     self.c_proj.NANOGPT_SCALE_INIT = 1
     self.n_head = config["n_head"]
     self.n_embd = config["n_embd"]

  def forward(self,x):
    B,T,C = x.shape
    qkv = self.c_attn(x)
    q,k,v = torch.split(qkv,self.n_embd,dim=2) #each will have dim B,T,C

    #now for these dim becomes B,n_head,T, head_dim
    q = q.view(B,T,self.n_head,C//self.n_head).transpose(1,2)
    k = k.view(B,T,self.n_head,C//self.n_head).transpose(1,2)
    v = v.view(B,T,self.n_head,C//self.n_head).transpose(1,2)
    attn = torch.matmul(q,k.transpose(-2,-1))/self.n_head**0.5

    #attn mask
    mask = torch.tril(torch.ones(T, T, device=x.device)).unsqueeze(0).unsqueeze(0)
    attn_scores = attn.masked_fill(mask == 0, float('-inf'))
    attn_scores = F.softmax(attn_scores, dim=-1)

    out = torch.matmul(attn_scores,v) # B,n_head,T,T * B,n_head,T,head_dim (T x T * T x head_dim = T x head_dim)
    out = out.transpose(1,2).contiguous().view(B,T,C)
    out = self.c_proj(out)
    return out


class MlpBlock(nn.Module):
  def __init__(self,config=base_config):
    super().__init__()
    self.config = config
    self.c_fc = nn.Linear(config["n_embd"],4*config["n_embd"])
    self.c_proj = nn.Linear(4*config["n_embd"],config["n_embd"])
    self.c_proj.NANOGPT_SCALE_INIT = 1
    self.gelu = nn.GELU(approximate="tanh")

  def forward(self,x):
    x = self.gelu(self.c_fc(x))
    x = self.c_proj(x)
    return x

class Transformer_Block(nn.Module):
  def __init__(self,config=base_config):
        super().__init__()
        self.config = config
        self.ln_1 = nn.LayerNorm(config["n_embd"])
        self.attn = SelfAttentionBlock(config)
        self.ln_2 = nn.LayerNorm(config["n_embd"])
        self.mlp = MlpBlock(config)

  def forward(self,x):
        x = x + self.attn(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x


class GPT2(nn.Module):
   def __init__(self,config=base_config):
      super().__init__()
      self.config = config

      self.transformer = nn.ModuleDict({
          "wte": nn.Embedding(config["vocab_size"], config["n_embd"]),
          "wpe": nn.Embedding(config["n_ctx"], config["n_embd"]),
          "h": nn.ModuleList([
              Transformer_Block(self.config)
              for _ in range(config["n_layer"])
          ]),
          "ln_f": nn.LayerNorm(config["n_embd"]),
      })

      self.lm_head = nn.Linear(config["n_embd"], config["vocab_size"], bias=False)

      # weight sharing scheme
      self.transformer.wte.weight = self.lm_head.weight

      self.apply(self._init_weights)

   def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            std = 0.02
            if hasattr(module, 'NANOGPT_SCALE_INIT'):
                std *= (2 * self.config["n_layer"]) ** -0.5
            torch.nn.init.normal_(module.weight, mean=0.0, std=std)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

   def forward(self,idx,targets=None):
    B,T = idx.size()
    pos = torch.arange(T, device=idx.device)
    tok_emb = self.transformer["wte"](idx)
    pos_emb = self.transformer["wpe"](pos)
    x = tok_emb + pos_emb
    for block in self.transformer.h:
       x = block(x)
    x = self.transformer.ln_f(x)
    logits = self.lm_head(x)
    loss = None
    if targets is not None:
      loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))

    return logits, loss


   def generate(self, idx, max_new_tokens):
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -self.config["n_ctx"]:]
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

In [None]:
import requests
import tiktoken
import torch

class Dataloader:
  def __init__(self,batch_size,block_size,device):
    self.B = batch_size
    self.T = block_size
    self.device = device
    url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
    response = requests.get(url)
    text = response.text
    with open("tiny_shakespeare.txt", "w", encoding="utf-8") as f:
        f.write(text)

    tokenizer = tiktoken.get_encoding("gpt2")
    tokens = tokenizer.encode(text)
    tokens= torch.tensor(tokens, dtype=torch.long)
    n = int(0.9*len(tokens))
    self.train_data = tokens[:n].to(device)
    self.val_data = tokens[n:].to(device)
    self.counter = 0

  def next_batch(self):
    #to get batches in sequence
    sample = self.dataset_tokens[self.counter:self.counter+((self.B*self.T)+1)]
    input = sample[:-1].view(self.B,self.T)
    target = sample[1:].view(self.B,self.T)
    self.counter += (self.B*self.T)
    if self.counter >= len(self.dataset_tokens):
      self.counter = 0
    return input,target

  def get_batch(self,split):
    # get batches randomly
    data = self.train_data if split == 'train' else self.val_data
    ix = torch.randint(len(data) - self.T, (self.B,))
    x = torch.stack([data[i:i+self.T] for i in ix])
    y = torch.stack([data[i+1:i+self.T+1] for i in ix])
    return x, y

In [None]:
train_loader = Dataloader(16,256,device)
model = GPT2().to(device)
optimizer = torch.optim.AdamW(model.parameters(),3e-4)
eval_interval = 500
eval_iters = 200

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters, device=device)
        for k in range(eval_iters):
            X, Y = train_loader.get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

#
for epoch in range(5000):
  if epoch % eval_interval == 0 or epoch == 5000 - 1:
      losses = estimate_loss()
      print(f"step {epoch}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

  # forward backward update, with optional gradient accumulation to simulate larger batch size
  # and using the GradScaler if there is one.
  x,y = train_loader.get_batch("train")
  logits,loss = model(x,y)
  optimizer.zero_grad()
  loss.backward()
  optimizer.step()

In [None]:
# generate from the model
tokenizer = tiktoken.get_encoding("gpt2")
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(tokenizer.decode(model.generate(context, max_new_tokens=500)[0].tolist()))