In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import requests
device = 'mps'
torch.manual_seed(0)

<torch._C.Generator at 0x13c101890>

In [2]:
res = requests.get('https://assets.datacamp.com/production/repositories/3937/datasets/213ca262bf6af12428d42842848464565f3d5504/sherlock.txt')
dat = str((res.content))
import re

def replace_newlines(text):
    pattern = r'\\n'
    replaced_text = re.sub(pattern, ' ', text)
    return replaced_text


text = replace_newlines(dat)
print(text[:500])

b'The Project Gutenberg EBook of The Adventures of Sherlock Holmes by Sir Arthur Conan Doyle (#15 in our series by Sir Arthur Conan Doyle)  Copyright laws are changing all over the world. Be sure to check the copyright laws for your country before downloading or redistributing this or any other Project Gutenberg eBook.  This header should be the first thing seen when viewing this Project Gutenberg file.  Please do not remove it.  Do not change or edit the header without written permission.  Plea


In [3]:
from transformers import AutoTokenizer

tokz = AutoTokenizer.from_pretrained('openai-community/gpt2')

tokenized_text = tokz.encode(text,truncation=True,return_overflowing_tokens=True)


In [4]:
n = int(0.9*len(tokenized_text)) # first 90% will be train, rest val
train_data = (tokenized_text[:n])
val_data = tokenized_text[n:]

In [5]:
def get_batch(split, batch_size=32):
    # generate a batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    
    # get the total number of sequences in the data
    total_sequences = len(data)
    
    # generate random indices to select batches
    ix = torch.randint(total_sequences, (batch_size,))
    # select sequences from the data using the random indices
    
    x = torch.stack([torch.tensor(data[i]) for i in ix])
    y = torch.stack([torch.cat((seq[1:], seq[0].unsqueeze(0))) for seq in x])
    
    x, y = x.to(device), y.to(device)
    return x, y

x,y = get_batch('train')

In [16]:
max_iters = 5000
eval_interval = 500
eval_iters = 200
learning_rate = 3e-4
device = 'mps'
n_embed = 384 # has to be divisible(without rem) by n_head, given head_size definition further below
n_head = 6
n_layer = 7
dropout = 0.3
vocab_size = len(tokz.get_vocab())
block_size = 1024

# single head of self attention
class Head(nn.Module):
    """ one head of self attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embed, head_size, bias=False)
        self.query = nn.Linear(n_embed, head_size, bias=False)
        self.value = nn.Linear(n_embed, head_size, bias=False)
        # in Pytorch convention a variable that's not a parameter of the model is called a buffer
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)
        
        
    def forward(self, x): # 32 x 1024 x 384 
        B,T,C = x.shape
        # emit keys and queries for x
        k = self.key(x)  # (B, T, hs) 32 x 1024 x 64
        q = self.query(x) # (B, T, hs) 32 x 1024 x 64

        # compute attention
        wei = q @ k.transpose(-2, -1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T) 32 x 1024 x 1024
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T) 32 x 1024 x 1024
        wei = F.softmax(wei, dim=-1) # (B, T, T) 32 x 1024 x 1024
        wei = self.dropout(wei) 
        v = self.value(x) # (B, T, hs) 32 x 1024 x 64
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs) # 32 x 1024 x 64
        return out

class MultiHeadAttention(nn.Module):
    
    def __init__(self, num_heads = 6, head_size = 64):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embed, n_embed)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.proj(out) # outcome of the linear layer to project back into the residual pathway
        out = self.dropout(out) # final dropout
        return out

x = torch.rand(32,1024,384)
mod = MultiHeadAttention()
mod(x).shape

torch.Size([32, 1024, 384])

In [None]:
class Head(nn.Module):
    """ one head of self attention """
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embed, head_size, bias=False)
        self.query = nn.Linear(n_embed, head_size, bias=False)
        self.value = nn.Linear(n_embed, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)
        self.beta = nn.Parameter(torch.tensor(0.0))  # Initialize beta as a learnable parameter

    def forward(self, x, a_mem_prev=None, z_prev=None):
        B, T, C = x.shape
        k = self.key(x)   # (B, T, hs)
        q = self.query(x) # (B, T, hs)
        v = self.value(x) # (B, T, hs)
        # Compute attention weights (A_dot)

        wei = q @ k.transpose(-2, -1) * k.shape[-1]**-0.5  # (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))  # (B, T, T)
        wei = F.softmax(wei, dim=-1)  # (B, T, T)
        wei = self.dropout(wei)

        # Compute M_s and Z_s
        m_s = torch.nn.functional.elu(k.transpose(-2, -1)) + 1  # (B, hs, T)
        m_s = m_s @ v  # (B, hs, hs)
        print(m_s.shape)
        z_s = torch.sum(torch.nn.functional.elu(k) + 1, dim=-2)  # (B, hs)
        # Compute A_mem
        if a_mem_prev is None:
            a_mem = None
        else:
            phi_q = torch.nn.functional.elu(q) + 1
            print(f'phi_q:{phi_q.shape}')
            print(a_mem_prev.shape)
            a_mem = (torch.nn.functional.elu(q) + 1) * (a_mem_prev / (torch.nn.functional.elu(q) + 1) * z_prev.unsqueeze(-1))

        # Compute final attention (A)
        if a_mem is None:
            a = wei  # A_dot
        else:
            a = torch.sigmoid(self.beta) * a_mem + (1 - torch.sigmoid(self.beta)) * wei

        out = a @ v  # (B, T, hs)
        return out, m_s, z_s

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads=6, head_size=64):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embed, n_embed)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        a_mem_prev = None
        z_prev = None
        head_outputs = []

        for head in self.heads:
            out, m_s, z_s = head(x, a_mem_prev, z_prev)
            head_outputs.append(out)
            a_mem_prev = m_s
            z_prev = z_s

        out = torch.cat(head_outputs, dim=-1)
        out = self.proj(out)
        out = self.dropout(out)
        return out

x = torch.rand(32,1024,384)
mod = MultiHeadAttention()
mod(x).shape

In [None]:

class FeedForward(nn.Module):
    " simple linear layer followed by non linearity "
    
    def __init__(self, n_embed):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embed, 4 * n_embed), # as mentioned in the paper
            nn.ReLU(),
            nn.Linear(4 * n_embed, n_embed), # projection layer : the final projection back into the residual pathway
            nn.Dropout(dropout), # dropout before final projection
        )
        
    def forward(self, x):
        return self.net(x)


class Block(nn.Module):
    """ a transformer block : communication then computation """
    
    def __init__(self, n_embed, n_head):
        super().__init__()
        head_size = n_embed // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedForward(n_embed)
        self.ln1 = nn.LayerNorm(n_embed)
        self.ln2 = nn.LayerNorm(n_embed)
    
    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x


class LanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(vocab_size, n_embed)
        self.position_embedding_table = nn.Embedding(block_size, n_embed)
        self.blocks = nn.Sequential(*[Block(n_embed, n_head=n_head) for _ in range(n_layer)]) 
        self.ln_f = nn.LayerNorm(n_embed)
        self.lm_head = nn.Linear(n_embed, vocab_size)
        
    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        token_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T, C)
        x = token_emb + pos_emb # (B, T, C)
        x = self.blocks(x)
        x = self.ln_f(x)
        logits = self.lm_head(x)# (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block size token
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

model = LanguageModel()

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

m = model.to(device)

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    # interesting that we're not printing loss every iter
    # instead we're estimating the non noisy loss every eval_intervar
    # only for printing purposes
    if (iter % eval_interval == 0) or (iter == max_iters-1):
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
output = tokz.decode(m.generate(context, max_new_tokens=500)[0].tolist())