In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import math

In [2]:
## HyperParameters: Revisit this cell and update/add hyperparameters as build progresses

BATCH_SIZE = 64
CONTEXT_LENGTH = 256
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
EMBEDDING_DIM = 256
NUM_QUERIES = 512
EVAL_ITERS = 200
LEARNING_RATE = 1e-4
MAX_ITERS = 5000
EVAL_INTERVAL = 500
NUM_HEADS = 6
DROPOUT_PROB = 0.2
NUM_LAYERS = 6

# Prepare Dataset

In [3]:
with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()

text = text.replace('\\n', '\n')

chars = sorted(set(text))
vocab_size = len(chars)
stoi = {char : idx for idx, char in enumerate(chars)}

def encode(snippet):
    return [stoi[char] for char in snippet]

def decode(idx_list):
    return ''.join(chars[idx] for idx in idx_list)

## print(encode("wow"), decode(encode("wow")))

data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data))
train = data[:n]
test = data[n:]

In [4]:
def getBatch(split = 'train'):
    data = train if split == 'train' else test
    idx_list = torch.randint(high = len(data) - CONTEXT_LENGTH, size = (BATCH_SIZE,))
    x = torch.stack([data[idx : idx + CONTEXT_LENGTH] for idx in idx_list])
    y = torch.stack([data[idx+1 : idx + 1 + CONTEXT_LENGTH] for idx in idx_list])
    x, y = x.to(DEVICE), y.to(DEVICE)
    
    return x, y

# x, y = getBatch('val')
# print(x.shape, y.shape)

# Define Loss

In [5]:
def estimate_loss(model):
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(EVAL_ITERS)
        for k in range(EVAL_ITERS):
            X, Y = getBatch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

# Build Model

In [6]:
class decoder_Head(nn.Module):
    def __init__(self, head_size): ## FOr my understanding: hea_size is the no of queries
        super().__init__()
        self.Query = nn.Linear(EMBEDDING_DIM, head_size, bias=False)
        self.Key = nn.Linear(EMBEDDING_DIM, head_size, bias=False)
        self.Value = nn.Linear(EMBEDDING_DIM, head_size, bias=False)
        # self.mask = torch.tril(torch.ones(CONTEXT_LENGTH, CONTEXT_LENGTH))
        self.register_buffer('mask', torch.tril(torch.ones(CONTEXT_LENGTH, CONTEXT_LENGTH)))

        self.dropout = nn.Dropout(DROPOUT_PROB)

    def forward(self, x):
        ## I Thought: Input Shape = BATCH_SIZE, CONTEXT_LENGTH, EMBEDDING_DIM, Output_Shape = BATCH_SIZE, CONTEXT_LENGTH, head_size
        ## Which is wrong because we'll send input at each timestamp. 1-2 chars will come as well not nessasarily a input of context length 
        # input of size (batch, time-step, channels) ## Channels = EMBEDDING_DIM
        # output of size (batch, time-step, head size)
        B,T,C = x.shape

        q = self.Query(x)   ## B, T, hs
        k = self.Key(x)     ## B, T, hs
        v = self.Value(x)   ## B, T, hs

        attention = q @ k.transpose(-2, -1) ## B, T, T
        ## Scale
        attention /= (k.shape[-1]**0.5)
        ## Apply Mask
        attention = attention.masked_fill(self.mask[:T, :T] == 0, float('-inf'))
        ## softmax
        attention = F.softmax(attention, dim = -1)
        ## Dropout
        attention = self.dropout(attention)

        out = attention @ v ## (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out ## (B, T, hs)


In [7]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([decoder_Head(head_size=head_size) for head_num in range(num_heads)])
        self.ffn = nn.Linear(num_heads * head_size, EMBEDDING_DIM)

        self.dropout = nn.Dropout(DROPOUT_PROB)
    
    def forward(self, x):
        # x shape: (B, T, C)
        # Where B is the batch size, T is the timesteps, and C is the number of channels (embedding size)

        # Applies forward method to each head in parallel
        # Each head's output shape: (B, T, HS)
        # Where HS is the head size
        out = [head(x) for head in self.heads]

        # Merge the head outputs along the channels (HS) axis
        # Out shape: (B, T, Num_Heads * HS) or (B, T, QUERY_SIZE)
        out = torch.cat(out, dim = -1)

        # Pass the concatenated output through a Linear layer
        # Final output shape: (B, T, EMBEDDING_DIM)
        out = self.ffn(out)

        ## Dropout
        out = self.dropout(out)

        return out

In [8]:
class FeedFoward(nn.Module):
    ## Linear Feed Forward Layer with nonlinearities

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(DROPOUT_PROB),
        )

    def forward(self, x):
        return self.net(x)

In [9]:
class decoderBlock(nn.Module):
    def __init__(self, n_queries, n_head):
        super().__init__()

        head_size = n_queries//n_head
        self.multi_head_attn = MultiHeadAttention(num_heads=n_head, head_size=head_size)
        self.layer_norm1 = nn.LayerNorm(EMBEDDING_DIM)
        self.layer_norm2 = nn.LayerNorm(EMBEDDING_DIM)
        self.ffn = FeedFoward(EMBEDDING_DIM)
    
    def forward(self, x):
        # x shape: (B, T, C)
        # Where B is the batch size, T is the timesteps, and C is the number of channels (embedding size)

        x_copy = x

        ## Layer Norm 1 (Preserves shape)
        x = self.layer_norm1(x)
        
        ## MultiheadAttention: (B, T, EMBEDDING_DIM)
        x = self.multi_head_attn(x)
        
        ## Layer_Norm 2 (Add and Norm)
        x += x_copy ## Preserves Shape
        x_copy = x
        x = self.layer_norm2(x)
        
        ## FFN
        x = self.ffn(x)
        
        out = x + x_copy

        return out ## (B, T, EMBEDDING_DIM)

In [10]:
## Got it from some github repo

class PositionEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)  # Step 1
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # Step 2
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))  # Step 3
        pe[:, 0::2] = torch.sin(position * div_term)  # Step 4
        pe[:, 1::2] = torch.cos(position * div_term)  # Step 5
        pe = pe.unsqueeze(0).transpose(0, 1)  # Step 6 ## (1, max_len, d_model)
        self.register_buffer('pe', pe)  # Step 7

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.pe[:x.size(0), :]  # Step 8
        return self.dropout(x)  # Step 9

In [11]:
class yetAnotherGPT(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.lang_embedding = nn.Embedding(vocab_size, EMBEDDING_DIM)
        # self.pos_embedding = nn.Embedding(CONTEXT_LENGTH, EMBEDDING_DIM)
        self.pos_encoder = PositionEncoding(d_model=EMBEDDING_DIM, max_len=CONTEXT_LENGTH)
        # self.decoder = decoder_Head(NUM_QUERIES)
        # head_size = NUM_QUERIES//NUM_HEADS
        # self.decoder = MultiHeadAttention(num_heads=NUM_HEADS, head_size=head_size)
        # self.Decoder_Block = decoderBlock(NUM_QUERIES, NUM_HEADS)
        self.decoder_layers = nn.Sequential(*[decoderBlock(NUM_QUERIES, NUM_HEADS) for layer in range(NUM_LAYERS)])
        self.layer_norm = nn.LayerNorm(EMBEDDING_DIM)
        self.ffn_out = nn.Linear(EMBEDDING_DIM, vocab_size)
    
    def forward(self, x, y = None):
        ## Again wrong here: x.shape = BATCH_SIZE, CONTEXT_LENGTH
        ## X = Batch_size, TimeStamp

        B, T = x.shape
        l_embd = self.lang_embedding(x) ## B, T, EMBEDDING_DIM
        # p_embd = self.pos_embedding(x) ## B, T, EMBEDDING_DIM
        p_embd = self.pos_encoder(l_embd.transpose(1, 2)).transpose(1, 2)
        x = l_embd + p_embd
        x = self.decoder_layers(x) ## B, T, EMBEDDING_DIM
        x = self.layer_norm(x)
        logits = self.ffn_out(x) ## B, T, vocab_size
        
        loss = None

        if y != None:
            B, T, C = logits.shape ## BATCH, CONTEXT_SIZE, VOCAB_SIZE
            logits = logits.view(B*T, C)
            y = y.view(B*T)
            loss = F.cross_entropy(logits, y)
        
        return logits, loss
    
    def generate(self, context, max_new_tokens = 500):
        # context is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # get the predictions
            logits, loss = self(context[:, -CONTEXT_LENGTH:])
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            context = torch.cat((context, idx_next), dim=1) # (B, T+1)
        return context


In [12]:
model = yetAnotherGPT(vocab_size)
model = model.to(DEVICE)

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)

  from .autonotebook import tqdm as notebook_tqdm
  _torch_pytree._register_pytree_node(


In [13]:
for iter in range(MAX_ITERS):

    # every once in a while evaluate the loss on train and val sets
    if iter % EVAL_INTERVAL == 0:
        losses = estimate_loss(model)
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = getBatch('train')

    # model forward pass and backward pass
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

KeyboardInterrupt: 

In [30]:
params = list(model.named_parameters())
def count_parameters(param_tuple):
    param, tensor = param_tuple
    return tensor.nelement()
total_params = sum(map(count_parameters, params))
print(f'Total number of parameters in the model: {total_params}')

Total number of parameters in the model: 257089


In [31]:
# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=DEVICE)
gen = decode(model.generate(context, max_new_tokens=500)[0].tolist())
# print(gen.replace('\\n', '\n'))
print(gen)


NARORI:
Myvexe bevore, l Eyou are anny tha t?
EOMyoro'tely wiyoou t, mt tied she-ours hear wi bl ind, ve ave,
wean ame
Thaccore wnimud hespl ishals dss bengeand arschoonorf d ild;
Gove th 's
Whe loveat Gicosthire toof bes?

lve
TAREUMARENIULO:
AROBTICINHA hatth ITK:
GES:
Fu o-be tooust nower ct weid mour teseze hemy, Insth ak, woromyese! trie sstortsuson we, f oikine the:
Hene
Ove nthe se'tharo nde gur l-y pe lst, put hiserace:
Ame ar yociend o sius alolllillled, s om I we s s: hathaso g ftt.
Fo
