In [None]:
import os
import requests
import numpy as np
import torch
import torch.nn.functional as F

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# download the tiny shakespeare dataset
input_file_path = 'shakespeare.txt'
if not os.path.exists(input_file_path):
    data_url = 'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt'
    with open(input_file_path, 'w', encoding='utf-8') as f:
        f.write(requests.get(data_url).text)

with open(input_file_path, 'r', encoding='utf-8') as f:
    data = f.read()
n = len(data)
train_data = data[:int(n*0.9)]
val_data = data[int(n*0.9):]

In [None]:
stoi = {ch: i for i, ch in enumerate(sorted(set(data)))}
itos = {i: ch for ch, i in stoi.items()}

In [None]:
train_data_encoded = np.array([stoi[ch] for ch in train_data], dtype=np.uint16)
val_data_encoded = np.array([stoi[ch] for ch in val_data], dtype=np.uint16)
def encode(s):
    return [stoi[c] for c in s] # encoder: take a string, output a list of integers
def decode(l):
    return ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

In [None]:
vocab_size = len(stoi)
n_embd = 384
batch_size = 64
block_size = 256
head_size = 16
n_head = 6
n_layer = 6
dropout = 0.2
learning_rate = 3e-4

In [None]:
def get_batch(split):
    if split == 'train':
        data = train_data_encoded
    elif split == 'val':
        data = val_data_encoded
    else:
        raise ValueError('split must be either train or val')
    start_idx = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([torch.from_numpy((data[i:i+block_size]).astype(np.int64)) for i in start_idx])
    y = torch.stack([torch.from_numpy((data[i+1:i+1+block_size]).astype(np.int64)) for i in start_idx])
    return x.to(device), y.to(device)


In [None]:
get_batch('train')

In [None]:
class Head(torch.nn.Module):
    def __init__(self, input_size, head_size):
        super().__init__()
        self.key = torch.nn.Linear(input_size, head_size)
        self.query = torch.nn.Linear(input_size, head_size)
        self.value = torch.nn.Linear(input_size, head_size)
        self.register_buffer('mask', torch.tril(torch.ones(block_size, block_size)) == 0)
        self.dropout = torch.nn.Dropout(dropout)
    
    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        v = self.value(x)
        wei = q @ k.transpose(-2, -1) * (head_size**-0.5)
        wei = wei.masked_fill(self.mask[:T, :T], float('-inf'))
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)
        return wei @ v        

In [None]:
class MultiHeadAttention(torch.nn.Module):
    def __init__(self, n_heads, head_size, input_size):
        super().__init__()
        self.heads = torch.nn.ModuleList([Head(input_size, head_size) for _ in range(n_heads)])
        self.proj = torch.nn.Linear(n_heads*head_size, input_size)
        self.dropout = torch.nn.Dropout(dropout)
    
    def forward(self, x):
        x = torch.cat([h(x) for h in self.heads], dim=-1)
        x = self.proj(x)
        x = self.dropout(x)
        return x

In [None]:
class FeedForward(torch.nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = torch.nn.Sequential(
            torch.nn.Linear(n_embd, 4 * n_embd),
            torch.nn.ReLU(),
            torch.nn.Linear(4 * n_embd, n_embd),
            torch.nn.Dropout(dropout),
        )
    
    def forward(self, x):
        return self.net(x)

In [None]:
class Block(torch.nn.Module):
    def __init__(self, n_embd, n_heads):
        super().__init__()
        head_size = n_embd // n_heads
        self.sa = MultiHeadAttention(n_heads, head_size, n_embd)
        self.ffwd = FeedForward(n_embd)
        self.ln1 = torch.nn.LayerNorm(n_embd)
        self.ln2 = torch.nn.LayerNorm(n_embd)
        
    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

In [None]:
class Model(torch.nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.token_embedding_table = torch.nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = torch.nn.Embedding(block_size, n_embd)
        self.blocks = torch.nn.Sequential(
            *[Block(n_embd, n_head) for _ in range(n_layer)],
            torch.nn.LayerNorm(n_embd)
        )
        self.lm_head = torch.nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        tok_emb = self.token_embedding_table(idx)
        pos_emb = self.position_embedding_table(torch.arange(T, device=idx.device))
        x = tok_emb + pos_emb
        x = self.blocks(x)
        logits = self.lm_head(x)
        
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)
        return logits, loss
    
    def generate(self, idx, max_new_tokens):
        for i in range(max_new_tokens):
            logits, _ = self(idx[:, -block_size:])
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)
            new_token = torch.multinomial(probs, num_samples=1)
            idx = torch.cat([idx, new_token], dim=-1)
        return idx

In [None]:
lm = Model().to(device)
optimizer = torch.optim.AdamW(lm.parameters(), lr=0.001)

In [None]:
for step in range(5000):
    x, y = get_batch('train')
    logits, loss = lm(x, y)
    if step % 100 == 0:
        print(f'step {step}, loss {loss.item()}')
    lm.zero_grad()
    loss.backward()
    optimizer.step()

In [None]:
x = torch.zeros((1, 1), dtype=torch.int64).to(device)
y_ = lm.generate(x, 1000)
print(decode(y_[0].tolist()))

In [None]:
x = torch.tensor([encode('Hi M')]).to(device)
y_ = lm.generate(x, 30)
print(decode(y_[0].tolist()))