
## GPT from scratch in PyTorch


In [1]:

import torch
import numpy as np
import torch.nn as nn

from torch.nn import functional as F


In [2]:

torch.manual_seed(256)
device = 'cuda' if torch.cuda.is_available() else 'cpu'

block_size        = 40      ## N tokens in sequence
batch_size        = 64 
max_iters         = 6000
eval_interval     = 500     
learning_rate     = 0.0003
eval_iters        = 300
vocab_size        = 65

## every id for a given token is embedded to vector of this size
n_embd            = 512                  
n_head            = 8         ## 8 attention heads
n_layer           = 6         ## 6 eoncoder layers
dropout           = 0.2


In [4]:

text = ''

input_file2 = 'input.txt' ## 'AdventureHuckFinn.txt'

with open(input_file2, 'r', encoding='utf-8') as f:
    text = f.read()


In [6]:

print("length of data in letter or characters")
len(text)




length of data in letter or characters


1115394

In [7]:

list(set(text))


['i',
 'H',
 'Q',
 'k',
 '$',
 ';',
 "'",
 'o',
 'B',
 'G',
 ',',
 'V',
 'z',
 'j',
 'x',
 'U',
 'I',
 't',
 'Z',
 'E',
 'd',
 'F',
 'a',
 'm',
 'D',
 'J',
 's',
 'g',
 'C',
 'u',
 'N',
 'S',
 'Y',
 '?',
 'P',
 ' ',
 'y',
 'r',
 '&',
 'p',
 'c',
 'e',
 'n',
 'f',
 '\n',
 'w',
 'h',
 '.',
 'A',
 'b',
 'K',
 '3',
 'q',
 'l',
 'v',
 ':',
 '!',
 'M',
 'X',
 'O',
 'W',
 'L',
 '-',
 'R',
 'T']

In [8]:

the_chars  = sorted(     list(set(text))     )

vocab_size = len( the_chars )      ## 65

print(  len(the_chars)  )

print(  ''.join(the_chars)  )

## The printed oputput
## !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz



65

 !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz


In [9]:

stoi = { ch:i for i, ch in enumerate(the_chars) }
itos = { i:ch for i, ch in enumerate(the_chars) }


In [10]:

print( stoi )
print( itos )


{'\n': 0, ' ': 1, '!': 2, '$': 3, '&': 4, "'": 5, ',': 6, '-': 7, '.': 8, '3': 9, ':': 10, ';': 11, '?': 12, 'A': 13, 'B': 14, 'C': 15, 'D': 16, 'E': 17, 'F': 18, 'G': 19, 'H': 20, 'I': 21, 'J': 22, 'K': 23, 'L': 24, 'M': 25, 'N': 26, 'O': 27, 'P': 28, 'Q': 29, 'R': 30, 'S': 31, 'T': 32, 'U': 33, 'V': 34, 'W': 35, 'X': 36, 'Y': 37, 'Z': 38, 'a': 39, 'b': 40, 'c': 41, 'd': 42, 'e': 43, 'f': 44, 'g': 45, 'h': 46, 'i': 47, 'j': 48, 'k': 49, 'l': 50, 'm': 51, 'n': 52, 'o': 53, 'p': 54, 'q': 55, 'r': 56, 's': 57, 't': 58, 'u': 59, 'v': 60, 'w': 61, 'x': 62, 'y': 63, 'z': 64}
{0: '\n', 1: ' ', 2: '!', 3: '$', 4: '&', 5: "'", 6: ',', 7: '-', 8: '.', 9: '3', 10: ':', 11: ';', 12: '?', 13: 'A', 14: 'B', 15: 'C', 16: 'D', 17: 'E', 18: 'F', 19: 'G', 20: 'H', 21: 'I', 22: 'J', 23: 'K', 24: 'L', 25: 'M', 26: 'N', 27: 'O', 28: 'P', 29: 'Q', 30: 'R', 31: 'S', 32: 'T', 33: 'U', 34: 'V', 35: 'W', 36: 'X', 37: 'Y', 38: 'Z', 39: 'a', 40: 'b', 41: 'c', 42: 'd', 43: 'e', 44: 'f', 45: 'g', 46: 'h', 47: 'i',

In [11]:

encode = lambda s: [ stoi[c]          for c in s   ] 

encode("bahh")


[40, 39, 46, 46]

In [12]:

decode = lambda l: ''.join(   itos[i] for i in l   )    

decode([40, 39, 46, 46])



'bahh'

In [13]:

data = torch.tensor(   encode(text), dtype=torch.long   )

print( data )


tensor([18, 47, 56,  ..., 45,  8,  0])


In [14]:

n          = int(   0.9*len(data)   )

train_data = data[:n]
val_data   = data[n:]


In [15]:

def get_batch(split):
    if split == "train":
        data = train_data
    else:
        data = val_data
        
    ix = torch.randint(   len(data) - block_size, (batch_size,)   )
    
    x  = torch.stack(    [  data[   i : i+block_size ]     for i in ix ]    ) 
    y  = torch.stack(    [  data[ i+1 : i+1+block_size ]   for i in ix ]    )
    
    x, y = x.to(device), y.to(device)

    return x, y


In [16]:

temp_batch_size = 4
temp_block_size = 16

## select random starting points for the 4 sentences
ix = torch.randint(   
            len(data) - block_size, 
            (temp_batch_size,)   
)

print( ix )


tensor([337600,  75432,  15277, 377247])


In [17]:

for index_temp in ix:
    print(  data[index_temp]  )



tensor(53)
tensor(40)
tensor(1)
tensor(52)


In [18]:

x  = torch.stack(    
    [ data[   i : i+  temp_block_size ]   for i in ix ] 
    
) 

y  = torch.stack(    
    [ data[ i+1 : i+1+ temp_block_size ]  for i in ix ]    
)

print(x)
print(y)



tensor([[53, 63, 39, 50,  1, 57, 61, 53, 56, 42,  1, 63, 53, 59, 56,  1],
        [40, 53, 52, 43, 57,  0, 27, 59, 58,  1, 53, 44,  1, 58, 46, 63],
        [ 1, 46, 47, 51, 11,  1, 44, 56, 53, 51,  1, 61, 46, 43, 52, 41],
        [52, 42,  1, 53, 44,  1, 42, 43, 39, 58, 46,  8,  0,  0, 14, 33]])
tensor([[63, 39, 50,  1, 57, 61, 53, 56, 42,  1, 63, 53, 59, 56,  1, 40],
        [53, 52, 43, 57,  0, 27, 59, 58,  1, 53, 44,  1, 58, 46, 63,  1],
        [46, 47, 51, 11,  1, 44, 56, 53, 51,  1, 61, 46, 43, 52, 41, 43],
        [42,  1, 53, 44,  1, 42, 43, 39, 58, 46,  8,  0,  0, 14, 33, 31]])


In [19]:

@torch.no_grad()    ## for efficient processing
def estimate_loss():
    out = {}
    model.eval()   ## set to no training
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()  ## back to training
    return out




## NN Architectures


In [20]:

class Head(nn.Module):

    def __init__(self, head_size):
        super().__init__()
        
        self.key   = nn.Linear(n_embd, head_size, bias=False)  ## [512, 64]
        self.query = nn.Linear(n_embd, head_size, bias=False)  ## [512, 64]
        self.value = nn.Linear(n_embd, head_size, bias=False)  ## [512, 64]

        tril_def = torch.tril( torch.ones(block_size, block_size) )  ## [40, 40]
        
        self.register_buffer(
                  'tril', 
                  tril_def
               )
        
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        
        B, T, E = x.shape   ## [batch_size, 40, 512]
        
        k = self.key(   x )            ## k = (B, T, 64)
        q = self.query( x )            ## q = (B, T, 64)

        E2 = 64     ## I think this is 64 and not 512
        ## (B, T, E) @ (B, E, T)  -> (B, T, T)
        wei = q @ k.transpose(-2, -1) * E2 ** -0.5        
        
        wei = wei.masked_fill(
                      self.tril[:T, :T] == 0, 
                      float('-inf')
        )   
        
        ## (B, T, T)
        wei = F.softmax( wei, dim= -1 )         ## (B, T, T)
        wei = self.dropout(   wei   )
        
        ## perform weighted aggregation of values
        
        v   = self.value(  x  )   ## x = (B, 40, E)
        out = wei @ v             ## (B, T, T) @ (B, T, 64) -> (B, T, 64)
        
        return out
        


In [21]:


class FeedForward(nn.Module):

    def __init__(self, n_embd):         ## 512
        
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),      ## [512, 4*512]
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),      ## [4*512, 512]
            nn.Dropout(dropout),
        )
        
    def forward(self, x):
        return self.net(x)


In [22]:

class MultiHeadAttention(nn.Module):

    def __init__(self, num_heads, head_size):    ## (8, 64)
        super().__init__()
        self.heads = nn.ModuleList(  [ Head(head_size) for _ in range(num_heads) ] )
        self.proj  = nn.Linear(n_embd, n_embd)   ## 512, 512
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        out = torch.cat(   [ h(x) for h in self.heads ], dim = -1   )
        out = self.proj(  out   )
        out = self.dropout(   out   )
        return out



In [23]:

class Block(nn.Module):
    
    def __init__(self, n_embd, n_head):     ## (512, 8)
        super().__init__()
        head_size = n_embd // n_head        ## 64
        self.sa   = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward( n_embd)    ## 512
        self.ln1  = nn.LayerNorm(n_embd)
        self.ln2  = nn.LayerNorm(n_embd)
        
    def forward(self, x):
        x = x + self.sa(     self.ln1(x)      )
        x = x + self.ffwd(   self.ln2(x)      )
        return x


In [24]:

class GPTModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)   ## [65, 512]
        self.pos_emb_table = nn.Embedding(block_size, n_embd)     ## [block, 512]
        
        self.blocks = nn.Sequential(
                *[   Block(n_embd, n_head=n_head) for _ in range(n_layer)    ]
        )
        
        self.ln_f    = nn.LayerNorm(  n_embd    )        
        self.lm_ffw_head = nn.Linear(n_embd, vocab_size)  ## [512, 65] # FFW Layer
        
    def forward(self, idx, targets=None):
        B, T = idx.shape     ## (Batch, 40)
        ## ids and targets are both (B, T) tensors of integers
        
        tok_emb = self.token_embedding_table(idx)      
        pos_emb = self.pos_emb_table(torch.arange(T, device=device))  
        
        x = tok_emb + pos_emb    ## [B, T, E] or [64, 40, 512]

        ## This is the architecture
        x = self.blocks(  x  )   ## (B, T, E)        
        x = self.ln_f(    x  )   ## (B, T, E)   ## norm
        logits = self.lm_ffw_head(x)         ## [B, 40, 65] 
        
        if targets is None:
            loss = None
        else:
            B, T, E  = logits.shape
            logits  = logits.view( B*T, E)
            targets = targets.view(B*T)
            loss    = F.cross_entropy(logits, targets)
        return logits, loss
        
    def generate(self, idx, max_new_tokens):    ## idx is (B, T)
        for _ in range(max_new_tokens):
            ## crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            logits, loss = self(idx_cond)    ## ## get preds
            logits = logits[:, -1, :]    ## focus on last one (B, E)
            probs = F.softmax(logits, dim= -1)    ## (B, E) get probs
            idx_next = torch.multinomial(probs, num_samples=1)     ## (B, 1) selected
            idx = torch.cat(  (idx, idx_next), dim=1  )   ## (B, T+1) append sample to running sequence
        return idx
            


In [25]:

model   = GPTModel()

m       = model.to(device)

optimizer = torch.optim.Adam(  m.parameters(), lr=learning_rate   )



In [26]:


for iter in range(max_iters):
    
    if iter % eval_interval == 0:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    xb, yb = get_batch('train')
    
    ## eval the loss
    logits, loss = m(xb, yb)
    
    optimizer.zero_grad(set_to_none=True)   ## zero out
    loss.backward()
    optimizer.step()


step 0: train loss 4.3397, val loss 4.3431


KeyboardInterrupt: 

In [None]:


## Starting token  id_sos = 0
sos_context = torch.zeros(  (1, 1),  dtype=torch.long, device=device   )   

generated_text = m.generate(sos_context, max_new_tokens=500)[0].tolist()

print(  decode(generated_text)   )

