## tokenizer

In [29]:
class Tokenizer:
    def __init__(self,vocab=None):
        self.vocab = vocab if vocab is not None else {}
        # Create mappings between characters and indices
        self.vocab_index = {char: i for i, char in enumerate(self.vocab)}
        self.index_vocab = {i: char for i, char in enumerate(self.vocab)}
        self.vocab_size = len(self.vocab)

    @classmethod
    def from_corpus(cls, corpus: str):
        vocab = sorted(set(corpus))
        return cls(vocab)

    def encode(self, text):
        return [self.vocab_index[char] for char in text]

    def decode(self, tokens):
        return ''.join([self.index_vocab[token] for token in tokens])




## Casual attention block

In [30]:
from torch import nn
import torch.nn.functional as F
import torch

class MultiHeadAttention(nn.Module):
    def __init__(self,embedding_size,num_heads):
        super().__init__()
        self.embedding_size = embedding_size
        self.num_heads = num_heads
        self.head_size = embedding_size // num_heads

        self.query = nn.Linear(embedding_size,embedding_size)
        self.key = nn.Linear(embedding_size,embedding_size)
        self.value = nn.Linear(embedding_size,embedding_size)
        self.out = nn.Linear(embedding_size,embedding_size)

        self.attn_dropout = nn.Dropout(0.1)  # Adding dropout for attention
        self.embed_dropout = nn.Dropout(0.1)  # Adding dropout for embeddings
        self.register_buffer('tril', torch.tril(torch.ones(1024, 1024)))  # Using a reasonable default context length of 1024


    def forward(self,x):
        B,T,C = x.shape
        q = self.query(x)
        k = self.key(x)
        v = self.value(x)

        q = q.view(B,T,self.num_heads,self.head_size)
        k = k.view(B,T,self.num_heads,self.head_size)
        v = v.view(B,T,self.num_heads,self.head_size)

        q = q.transpose(1,2)
        k = k.transpose(1,2)
        v = v.transpose(1,2)

        attn = q @ k.transpose(-2,-1) / (k.size(-1) ** 0.5)
        attn = attn.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        attn = F.softmax(attn, dim=-1)
        attn = self.attn_dropout(attn)

        attended_values = attn @ v
        attended_values = attended_values.transpose(1,2).contiguous().view(B,T,self.embedding_size)
        output = self.out(attended_values)
        output = self.embed_dropout(output)

        return output


In [None]:
# Test MultiHeadAttention
embedding_size = 512
num_heads = 8
batch_size = 32
seq_len = 10

# Create random input tensor
x = torch.randn(batch_size, seq_len, embedding_size)

# Initialize MultiHeadAttention
mha = MultiHeadAttention(embedding_size=embedding_size, num_heads=num_heads)

# Forward pass
output = mha(x)

print(f"Input shape: {x.shape}")
print(f"Output shape: {output.shape}")
assert output.shape == x.shape, f"Expected shape {x.shape}, got {output.shape}"
print("Shape test passed!")

# Test that output values are reasonable
print(f"Output mean: {output.mean().item():.3f}")
print(f"Output std: {output.std().item():.3f}")


In [33]:
class TransformerBlock(nn.Module):
    def __init__(self,embedding_size,num_heads):
        super().__init__()
        self.attention = MultiHeadAttention(embedding_size,num_heads)
        self.layer_norm_1 = nn.LayerNorm(embedding_size)
        self.layer_norm_2 = nn.LayerNorm(embedding_size)
        self.ff_net = nn.Sequential(
            nn.Linear(embedding_size,4*embedding_size),
            nn.ReLU(),
            nn.Linear(4*embedding_size,embedding_size),
            nn.Dropout(0.1)
        )

    def forward(self,x):
        x = x + self.attention(self.layer_norm_1(x))
        x = x + self.ff_net(self.layer_norm_2(x))
        return x



In [None]:
mha = TransformerBlock(embedding_size=embedding_size, num_heads=num_heads)

# Forward pass
output = mha(x)

print(f"Input shape: {x.shape}")
print(f"Output shape: {output.shape}")
assert output.shape == x.shape, f"Expected shape {x.shape}, got {output.shape}"
print("Shape test passed!")

# Test that output values are reasonable
print(f"Output mean: {output.mean().item():.3f}")
print(f"Output std: {output.std().item():.3f}")

## GPT

In [35]:
import torch
import torch.nn as nn
import torch.nn.functional as F
class GPT(nn.Module):
    def __init__(self,vocab_size,embedding_size,num_blocks,context_length=1024):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size,embedding_size)
        self.position_embedding_table = nn.Embedding(context_length,embedding_size)
        self.blocks = nn.Sequential(*[TransformerBlock(embedding_size,num_heads) for _ in range(num_blocks)])
        self.final_layer_norm = nn.LayerNorm(embedding_size)
        self.lm_head = nn.Linear(embedding_size,vocab_size)
        self.context_length = context_length
    
    def forward(self,x,targets=None):
        B, T = x.shape
        pos = torch.arange(T, device=x.device)

        x = self.token_embedding_table(x) 
        # + self.position_embedding_table(pos)
        x = self.blocks(x)
        x = self.final_layer_norm(x)
        logits = self.lm_head(x)
        
        if targets is None:
            loss = None
        else:
            loss = F.cross_entropy(logits.view(-1,logits.size(-1)),targets.view(-1))
        return logits,loss
    
    @torch.no_grad()
    def generate(self,idx,num_tokens,temperature=1.0,do_sample=False,top_k=None):
        for _ in range(num_tokens):
            logits,loss = self(idx)
            logits = logits[:, -1, :] / (temperature + 1e-8)
            if top_k is not None:
                logits[logits < torch.topk(logits, top_k)[0][:, [-1]]] = -float("Inf")
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1) if do_sample else torch.topk(probs, k=1, dim=-1)[1]
            idx = torch.cat((idx, idx_next), dim=1)
        return idx



In [None]:
# Test the GPT model

embedding_size = 512
num_heads = 8
batch_size = 32
seq_len = 10

vocab_size = 100 
num_blocks = 4

context_length = 10
batch_size = 10
seq_length = 10

# Create model instance
model = GPT(vocab_size=vocab_size, 
            embedding_size=embedding_size,
            num_blocks=num_blocks, 
            context_length=context_length)

# Test forward pass
x = torch.randint(0, vocab_size, (batch_size, seq_length))
targets = torch.randint(0, vocab_size, (batch_size, seq_length))

print(x.shape)
print(targets.shape)

logits, loss = model(x, targets)

print(f"Input shape: {x.shape}")
print(f"Logits shape: {logits.shape}")
print(f"Loss: {loss}")

# # Test generation
start_tokens = torch.randint(0, vocab_size, (1, 5))
start_tokens.size(1)
generated = model.generate(start_tokens, num_tokens=10)

print(f'\ninput tokens: {start_tokens.squeeze().tolist()}')
print(f"Generated tokens: {generated.squeeze().tolist()}")


## Trainer

In [53]:
from tqdm import tqdm
import torch

class Trainer:
    def __init__(self,model,train_loader,val_loader,learning_rate=3e-4,num_epochs=1):
        self.model = model
        self.optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.learning_rate = learning_rate
        self.num_epochs = num_epochs


    def train(self):
        self.model.train()
        for epoch in range(self.num_epochs):
            total_loss = 0
            pbar = tqdm(enumerate(self.train_loader), desc=f"Epoch {epoch+1}/{self.num_epochs}", total=len(self.train_loader))
            for i, batch in pbar:
                x,y = batch
                logits,loss = self.model(x,y)
                loss.backward()
                self.optimizer.step()
                self.optimizer.zero_grad()
                
                total_loss += loss.item()
                avg_loss = total_loss / (i + 1)
                pbar.set_postfix({
                    'loss': f'{loss.item():.4f}',
                    'avg_loss': f'{avg_loss:.4f}'
                })

In [None]:

embedding_size = 512
num_heads = 8
batch_size = 32
seq_len = 10

vocab_size = 100 
num_blocks = 4

context_length = 100
batch_size = 32
seq_length = 100


# Load and preprocess data
with open("../input.txt", "r") as f:
    corpus = f.read()
corpus = corpus.lower()

# Create tokenizer and encode corpus
tokenizer = Tokenizer.from_corpus(corpus)
encoded = torch.tensor(tokenizer.encode(corpus), dtype=torch.long)





# # Create training dataset
class TextDataset(torch.utils.data.Dataset):
    def __init__(self, data, context_length):
        self.data = data
        self.context_length = context_length

    def __len__(self):
        return len(self.data) - self.context_length

    def __getitem__(self, idx):
        x = self.data[idx:idx + self.context_length]
        y = self.data[idx + 1:idx + self.context_length + 1]
        return x, y

# Create data loaders
train_size = int(0.1 * len(encoded))
train_data = encoded[:train_size]
val_data = encoded[train_size:]

print(f"Train data batches: {len(train_data)/batch_size}")

train_dataset = TextDataset(train_data, context_length)
val_dataset = TextDataset(val_data, context_length)

train_loader = torch.utils.data.DataLoader(
    train_dataset, 
    batch_size=32,
    shuffle=True
)
val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=32,
    shuffle=False
)

# # Initialize trainer

trainer = Trainer(model, train_loader, val_loader,learning_rate=3e-4,num_epochs=1)

# Train model
num_epochs = 1
for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    trainer.train()


In [None]:
input="hello world"
tokens = tokenizer.encode(input)
tokens = torch.tensor(tokens).unsqueeze(0)
output=model.generate(tokens, num_tokens=100)
print(tokenizer.decode(output.squeeze().tolist()))