In [1]:
import tiktoken
import torch
from torch.utils.data import DataLoader, Dataset

with open("the-verdict.txt", "r", encoding="utf-8") as f:
    raw_text = f.read()

In [2]:
class TextDataset(Dataset):
    def __init__(self, data, max_length= 1, stride = 1):
        self.input_ids = []
        self.target_ids = []

        tokenizer = tiktoken.encoding_for_model("gpt-2")

        token_ids = tokenizer.encode(data, allowed_special={"<|endoftext|>"})

        # Use a sliding window to create overlapping sequences
        for i in range(0, len(token_ids) - 1, stride):  # Adjusted to prevent IndexError
            input_chunk = token_ids[i:i + max_length]
            target_chunk = token_ids[i + 1: min(len(token_ids), i + max_length + 1)]

            # Ensure chunks have the same length
            if len(input_chunk) == max_length and len(target_chunk) == max_length:
                self.input_ids.append(torch.tensor(input_chunk, dtype=torch.long))
                self.target_ids.append(torch.tensor(target_chunk, dtype=torch.long))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]

In [3]:
with open("the-verdict.txt", "r", encoding="utf-8") as f:
    raw_text = f.read()

# load the data train , validation, test
train_data = raw_text[:int(len(raw_text)*0.8)]
valid_data = raw_text[int(len(raw_text)*0.8):int(len(raw_text)*0.9)]
test_data = raw_text[int(len(raw_text)*0.9):]

# Create datasets
train_dataset = TextDataset(train_data, max_length=4, stride=4)
valid_dataset = TextDataset(valid_data, max_length=4, stride=4)
test_dataset = TextDataset(test_data, max_length=4, stride=4)
# Create data loaders
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
valid_dataloader = DataLoader(valid_dataset, batch_size=8, shuffle=False, num_workers=4)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=4)

In [4]:
# architecture

In [5]:
GPT_CONFIG_124M = {
    "vocab_size": 50257,
    "emb_dim": 768,
    "n_heads": 2,
    "n_layers": 2,
    "drop_rate": 0.1,
    "context_length": 4,
    "qkv_bias": False
}


In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CausalMultiHeadAttention(nn.Module):
    def __init__(self, emb_dim, n_heads, drop_rate=0.1, qkv_bias=False):
        super(CausalMultiHeadAttention, self).__init__()
        
        self.emb_dim = emb_dim  # Embedding dimension
        self.n_heads = n_heads  # Number of attention heads
        self.head_dim = emb_dim // n_heads  # Dimension per head
        self.scale = 1 / (self.head_dim ** 0.5)  # Scaling factor for attention scores
        
        # Linear layer to compute Query, Key, and Value (concatenated as a single tensor)
        self.qkv = nn.Linear(emb_dim, emb_dim * 3, bias=qkv_bias)
        
        # Dropout for attention weights
        self.attn_drop = nn.Dropout(drop_rate)
        
        # Linear layer for output projection
        self.proj = nn.Linear(emb_dim, emb_dim)
        
        # Dropout for the final output
        self.proj_drop = nn.Dropout(drop_rate)

    def forward(self, x):
        B, N, C = x.shape  # B = batch size, N = sequence length, C = embedding dimension
        
        # Compute Q, K, V by projecting input and reshaping
        qkv = self.qkv(x).reshape(B, N, 3, self.n_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]  # Extract Query, Key, and Value tensors
        
        # Compute attention scores: Q @ K^T (scaled dot product)
        attn = (q @ k.transpose(-2, -1)) * self.scale  # Shape: (B, n_heads, N, N)
        
        # Causal mask to prevent attending to future tokens
        mask = torch.triu(torch.ones(N, N, device=x.device), diagonal=1).bool()
        attn.masked_fill_(mask, float('-inf'))  # Set future positions to -inf before softmax
        
        # Apply softmax to normalize attention scores
        attn = attn.softmax(dim=-1)
        
        # Apply dropout to attention weights
        attn = self.attn_drop(attn)
        
        # Compute weighted sum of Value vectors
        out = (attn @ v).transpose(1, 2).reshape(B, N, C)
        
        # Final linear projection of attention output
        out = self.proj(out)
        
        # Apply dropout to the final output
        out = self.proj_drop(out)
        
        return out


In [7]:
class FeedForward(nn.Module):
    def __init__(self, emb_dim, expansion=4, drop_rate=0.1):
        super().__init__()
        self.fc1 = nn.Linear(emb_dim, emb_dim * expansion)  # Expand dimension
        self.fc2 = nn.Linear(emb_dim * expansion, emb_dim)  # Project back
        self.dropout = nn.Dropout(drop_rate)

    def forward(self, x):
        x = F.gelu(self.fc1(x))  # Apply GELU activation
        x = self.dropout(x)
        x = self.fc2(x)  # Project back to original dimension
        x = self.dropout(x)
        return x

class TransformerBlock(nn.Module):
    def __init__(self, emb_dim, n_heads, drop_rate, qkv_bias):
        super().__init__()
        self.attn = CausalMultiHeadAttention(emb_dim, n_heads, drop_rate, qkv_bias)
        self.ff = FeedForward(emb_dim, expansion=4, drop_rate=drop_rate)
        self.norm1 = nn.LayerNorm(emb_dim)
        self.norm2 = nn.LayerNorm(emb_dim)
        self.drop_shortcut = nn.Dropout(drop_rate)

    def forward(self, x):
        # Attention layer with residual connection
        shortcut = x
        x = self.norm1(x)
        x = self.attn(x)
        x = self.drop_shortcut(x)
        x = x + shortcut  # Residual connection

        # Feed-forward layer with residual connection
        shortcut = x
        x = self.norm2(x)
        x = self.ff(x)
        x = self.drop_shortcut(x)
        x = x + shortcut  # Residual connection

        return x

In [8]:
class BetaLLM(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])

        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg["emb_dim"], cfg["n_heads"], cfg["drop_rate"], cfg["qkv_bias"]) for _ in range(cfg["n_layers"])]
        )

        self.final_norm = nn.LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)

    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
        x = tok_embeds + pos_embeds  # Shape [batch_size, num_tokens, emb_size]
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits


In [9]:
# Initialize the model
model = BetaLLM(GPT_CONFIG_124M)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Initialize the optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
# Initialize the loss function
criterion = nn.CrossEntropyLoss()
# Training loop
num_epochs = 2
for epoch in range(num_epochs):
    model.train()
    for batch_idx, (input_ids, target_ids) in enumerate(train_dataloader):
        input_ids = input_ids.to(device)
        target_ids = target_ids.to(device)

        optimizer.zero_grad()

        logits = model(input_ids)
        
        # Compute loss using CrossEntropyLoss
        loss = criterion(logits.view(-1, logits.size(-1)), target_ids.view(-1))
        
        loss.backward()
        optimizer.step()

        if batch_idx % 10 == 0:
            print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}")
# Save the model
torch.save(model.state_dict(), "beta_llm.pth")

Epoch 0, Batch 0, Loss: 10.988608360290527
Epoch 0, Batch 10, Loss: 10.628263473510742
Epoch 0, Batch 20, Loss: 10.267882347106934
Epoch 0, Batch 30, Loss: 9.852721214294434
Epoch 0, Batch 40, Loss: 8.161416053771973
Epoch 0, Batch 50, Loss: 8.199272155761719
Epoch 0, Batch 60, Loss: 6.844272136688232
Epoch 0, Batch 70, Loss: 7.8677144050598145
Epoch 0, Batch 80, Loss: 7.449978828430176
Epoch 0, Batch 90, Loss: 7.183849334716797
Epoch 0, Batch 100, Loss: 7.593692779541016
Epoch 0, Batch 110, Loss: 7.47831392288208
Epoch 0, Batch 120, Loss: 7.944108486175537
Epoch 1, Batch 0, Loss: 6.20392370223999
Epoch 1, Batch 10, Loss: 6.661401748657227
Epoch 1, Batch 20, Loss: 6.829856872558594
Epoch 1, Batch 30, Loss: 5.997282028198242
Epoch 1, Batch 40, Loss: 6.889233112335205
Epoch 1, Batch 50, Loss: 5.873077392578125
Epoch 1, Batch 60, Loss: 6.793896198272705
Epoch 1, Batch 70, Loss: 6.585855007171631
Epoch 1, Batch 80, Loss: 6.351033687591553
Epoch 1, Batch 90, Loss: 5.165925025939941
Epoch 1,

In [10]:
# add validation and test loop
def evaluate(model, dataloader, criterion):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch_idx, (input_ids, target_ids) in enumerate(dataloader):
            input_ids = input_ids.to(device)
            target_ids = target_ids.to(device)

            logits = model(input_ids)
            
            # Compute loss using CrossEntropyLoss
            loss = criterion(logits.view(-1, logits.size(-1)), target_ids.view(-1))
            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    return avg_loss
# Validation loop
valid_loss = evaluate(model, valid_dataloader, criterion)
print(f"Validation Loss: {valid_loss}")
# Test loop
test_loss = evaluate(model, test_dataloader, criterion)
print(f"Test Loss: {test_loss}")


Validation Loss: 6.97504648566246
Test Loss: 6.4770691254559685


In [11]:
import torch
import tiktoken

# Initialize model and tokenizer
GPT_CONFIG_124M = {
    "vocab_size": 50257,
    "emb_dim": 768,
    "n_heads": 2,
    "n_layers": 2,
    "drop_rate": 0.1,
    "context_length": 4,  # Important: model can only process 4 tokens at a time
    "qkv_bias": False
}

model = BetaLLM(GPT_CONFIG_124M)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.load_state_dict(torch.load("beta_llm.pth"))
model.to(device)  # Move model to the correct device
model.eval()  # Set model to evaluation mode

tokenizer = tiktoken.encoding_for_model("gpt-2")

# Use context_length from model config
MAX_CONTEXT_LENGTH = GPT_CONFIG_124M["context_length"]

def generate_text(model, tokenizer, prompt, max_length=4):
    tokens = tokenizer.encode(prompt, allowed_special={"<|endoftext|>"})
    
    # Ensure input does not exceed model's context length
    if len(tokens) > MAX_CONTEXT_LENGTH:
        print(f"Warning: Prompt is too long. Truncating to {MAX_CONTEXT_LENGTH} tokens.")
        tokens = tokens[:MAX_CONTEXT_LENGTH]

    input_ids = torch.tensor(tokens).unsqueeze(0).to(device)  # Ensure input tensor is on the correct device

    with torch.no_grad():
        for _ in range(max_length):
            outputs = model(input_ids)
            next_token_logits = outputs[:, -1, :]  # Get logits for the last token
            next_token = torch.argmax(next_token_logits, dim=-1)  # Sample the next token
            input_ids = torch.cat((input_ids[:, 1:], next_token.unsqueeze(0)), dim=1)  # Shift context

            # Ensure special tokens are allowed for encoding
            end_token = tokenizer.encode("<|endoftext|>", allowed_special={"<|endoftext|>"})[0]

            # Check if the model has generated the end token
            if next_token.item() == end_token:
                break

    generated_tokens = input_ids.squeeze().tolist()
    generated_text = tokenizer.decode(generated_tokens)
    return generated_text

# Example usage
prompt = "Once upon a time"
generated_text = generate_text(model, tokenizer, prompt, max_length=4)
print(generated_text)


, and I had
