<a href="https://colab.research.google.com/github/n1teshy/Algorithms/blob/main/Makemore.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:

import os
import torch
import torch.nn as nn
from torch.nn import functional as F

TRAINING_PORTION = 0.9
DEVICE = "cuda" if torch.cuda.is_available() else "CPU"
DEVICE = torch.device(DEVICE)

In [6]:
with open("shakespeare.txt", "r") as file:
    content = file.read()

chars = sorted(list(set(content)))
vocab_size = len(chars)
s_to_i = {char: idx for idx, char in enumerate(chars)}
i_to_s = {val: fld for fld, val in s_to_i.items()}
encode = lambda text: [s_to_i[char] for char in text]
decode = lambda codes: "".join(i_to_s[code] for code in codes)
content = torch.tensor(encode(content), dtype=torch.long, device=DEVICE)
content_split_idx = int(TRAINING_PORTION * len(content))
training_split = content[:content_split_idx]
validation_split = content[content_split_idx:]

In [7]:
block_size = 32
batch_size = 32
embedding_size = 32
epochs = 5000
eval_interval = epochs / 20
eval_iters = 100
learning_rate = 30e-4
num_heads = 4
num_layers = 4
dropout = 0.2
torch.manual_seed(1337)

<torch._C.Generator at 0x7db9ef7258b0>

In [9]:

def get_batch(batch="train"):
    data = training_split if batch == "train" else validation_split
    idxs = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[idx : idx + block_size] for idx in idxs]).to(DEVICE)
    y = torch.stack([data[idx + 1 : idx + block_size + 1] for idx in idxs]).to(DEVICE)
    return x, y

In [8]:
@torch.no_grad()
def estimate_losses():
    split_loss = {}
    model.eval()
    for split in ["train", "val"]:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            x_b, y_b = get_batch()
            logits, loss = model(x_b, y_b)
            losses[k] = loss.item()
        split_loss[split] = losses.mean()
    model.train()
    return split_loss

In [18]:
class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(embedding_size, head_size, bias=False)
        self.query = nn.Linear(embedding_size, head_size, bias=False)
        self.value = nn.Linear(embedding_size, head_size, bias=False)
        self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)  # (B, T, C) @ (C, C) -> (B, T, C)
        q = self.query(x)  # (B, T, C) @ (C, C) -> (B, T, C)
        wei = q @ k.transpose(-1, -2) * C**-0.5  # (B, T, C) @ (B, C, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float("-inf"))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        v = self.value(x)  # (B, T, C) @ (C, C) -> (B, T, C)
        out = wei @ v  # (B, T, T) @ (B, T, C) -> (B, T, C)
        return out

In [17]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = [Head(head_size) for _ in range(num_heads)]
        self.proj = nn.Linear(embedding_size, embedding_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

In [16]:
class FeedForward(nn.Module):
    def __init__(self, embedding_size):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(embedding_size, 4 * embedding_size),
            nn.ReLU(),
            nn.Linear(4 * embedding_size, embedding_size),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

In [14]:
class Block(nn.Module):
    def __init__(self, embedding_size, num_heads):
        super().__init__()
        if embedding_size % num_heads != 0:
            print(
                f"num_heads {num_heads} is not compatible with embedding size {embedding_size}"
            )
            exit()
        head_size = embedding_size // num_heads
        self.sa = MultiHeadAttention(num_heads, head_size)
        self.ffwd = FeedForward(embedding_size)
        self.ln1 = nn.LayerNorm(embedding_size)
        self.ln2 = nn.LayerNorm(embedding_size)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

In [27]:
class BigramModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, embedding_size)
        self.position_embedding_table = nn.Embedding(block_size, embedding_size)
        self.blocks = nn.Sequential(
            *(Block(embedding_size, num_heads) for _ in range(num_layers))
        )
        self.ln = nn.LayerNorm(embedding_size)
        self.lm_head = nn.Linear(embedding_size, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        token_embeddings = self.token_embedding_table(idx)
        position_embeddings = self.position_embedding_table(torch.arange(T, device=DEVICE))
        x = token_embeddings * position_embeddings
        x = self.blocks(x)
        x = self.ln(x)
        logits = self.lm_head(x)
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            # pytorch expects the input dimensionality to be (B, C) or (C, )
            logits = logits.view(B * T, C)
            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)
        return logits, loss

    def generate(self, idx, max_new_chars=100):
        result = idx.clone()
        for _ in range(max_new_chars):
            idx = idx[:, -block_size:]
            logits, loss = self(idx)
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=1)
            next_idx = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, next_idx), dim=1)
            result = torch.cat((result, next_idx), dim=1)
        return result

In [28]:
model = BigramModel().to(DEVICE)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [29]:
try:
    for i in range(epochs):
        x_b, y_b = get_batch()
        logits, loss = model(x_b, y_b)
        if i % eval_interval == 0:
            losses = estimate_losses()
            print(
                f"iter: {i}, training loss: {losses['train']}, val loss: {losses['val']}"
            )
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()
except KeyboardInterrupt:
    pass

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:0 and cpu! (when checking argument for argument mat2 in method wrapper_CUDA_mm)

In [None]:
encoded_chars = model.generate(
    torch.zeros((1, 1), dtype=torch.long, device=DEVICE), max_new_chars=1000
)
for idx, row in enumerate(encoded_chars, start=1):
    print(f"{idx}: {decode(row.tolist())}")