In [118]:
import torch
from torch.nn import functional as F

# hyperparameters
batch_size = 64 # how many independent sequences will we process in parallel?
block_size = 150 # what is the maximum context length for predictions?
max_iters = 5000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200
n_embd = 66
n_head = 6
n_layer = 4
# ------------

torch.manual_seed(1337)

# wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
with open('data/input.txt', 'r', encoding='utf-8') as f:
    text = f.read()

# here are all the unique characters that occur in this text
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from characters to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]

# data loading
def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y



In [119]:
x, y = get_batch('train')
print(x[:5])
print(y[:5])

tensor([[42,  1, 21,  1, 46, 39, 60, 43,  1, 61, 43, 50, 50,  1, 42, 43, 57, 43,
         56, 60, 43, 42,  1, 47, 58,  8,  0, 15, 53, 51, 43,  1, 53, 52,  6,  1,
         41, 53, 51, 43,  1, 53, 52, 11,  1, 61, 46, 43, 56, 43,  1, 47, 57,  1,
         63, 53, 59, 56,  1, 40, 53, 39, 56,  7, 57, 54, 43, 39, 56,  6,  1, 51,
         39, 52, 12,  0, 18, 43, 39, 56,  1, 63, 53, 59,  1, 58, 46, 43,  1, 40,
         53, 39, 56,  6,  1, 39, 52, 42,  1, 45, 53,  1, 57, 53,  1, 59, 52, 54,
         56, 53, 60, 47, 42, 43, 42, 12,  0,  0, 31, 32, 13, 26, 24, 17, 37, 10,
          0, 25, 63,  1, 50, 53, 56, 42,  6,  1, 45, 53, 53, 42,  1, 51, 53, 56,
         56, 53, 61, 11,  1, 45],
        [ 1, 53, 44,  1, 22, 59, 50, 47, 43, 58,  5, 57,  1, 42, 43, 39, 58, 46,
         11,  0, 13, 52, 42,  1, 58, 46, 43, 52,  1, 47, 52,  1, 54, 53, 57, 58,
          1, 46, 43,  1, 41, 39, 51, 43,  1, 44, 56, 53, 51,  1, 25, 39, 52, 58,
         59, 39,  0, 32, 53,  1, 58, 46, 47, 57,  1, 57, 39, 51, 43,  1, 54

In [120]:
#Sequential model
class Sequential:
    def __init__(self, layers):
        self.layers = layers

    def __call__(self, x):
        for layer in self.layers:
            x = layer(x)
        self.out = x
        return self.out
    
    def parameters(self):
        return [p for layer in self.layers for p in layer.parameters()]

    def append(self, layer):
        self.layers.append(layer)

#Embedding layer
class Embedding:
    def __init__(self, num_embeddings, embedding_dim):
        self.num_embeddings = num_embeddings
        self.embedding_dim = embedding_dim
        self.weight = torch.randn(num_embeddings, embedding_dim)

    def __call__(self, x):
        self.out = self.weight[x]
        return self.out
    
    def parameters(self):
        return [self.weight]

#Linear layer
class Linear:
    def __init__(self, fan_in, fan_out, bias=True):
        self.fan_in = fan_in
        self.fan_out = fan_out
        self.weight = torch.randn(fan_in, fan_out)/fan_in**0.5 #kaiming initialization
        self.bias = torch.zeros(fan_out) if bias else None

    def __call__(self, x):
        self.out = x @ self.weight
        if self.bias is not None:
            self.out += self.bias
        return self.out
    def parameters(self):
        return [self.weight] + ([] if self.bias is None else [self.bias])

#Tanh activation function
class Tanh:
    def __init__(self):
        pass
    def __call__(self, x):
        self.out = torch.tanh(x)
        return self.out
    def parameters(self):
        return []

#softmax activation function
class Softmax:
    def __init__(self, dim = 1):
        self.dim = dim
    def __call__(self, x):
        self.out = torch.softmax(x, dim=self.dim)
        return self.out
    def parameters(self):
        return []

#ReLU activation function
class ReLU:
    def __init__(self):
        pass
    def __call__(self, x):
        self.out = torch.relu(x)
        return self.out
    def parameters(self):
        return []

#Flatten layer
class Flatten:
    def __init__(self, start_dim=1, end_dim=-1):
        self.start_dim = start_dim
        self.end_dim = end_dim
    def __call__(self, x):
        #(d_1, d_2, ..., d_n) -> (d_1, d_2*...*d_)
        shape = list()
        if self.end_dim == -1:
            for i, dim in enumerate(x.shape):
                if (i <= self.start_dim) or (len(shape) == 0):
                    shape.append(dim)
                else:
                    shape[-1] *= dim
        else:
            for i, dim in enumerate(x.shape):
                if (i <= self.start_dim) or (len(shape) == 0):
                    shape.append(dim)
                elif i > self.end_dim:
                    shape.append(dim)
                else:
                    shape[-1] *= dim
        self.out = x.view(shape)
        return self.out
    def parameters(self):
        return []

#Vanila RNN layer
class RNN:
    def __init__(self, n_input, n_hidden):
        self.n_input = n_input
        self.n_hidden = n_hidden
        #self.non_linearity = non_linearity
        self.W_xh = torch.randn(n_input, n_hidden) / n_input**0.5 #kaiming initialization
        self.W_hh = torch.randn(n_hidden, n_hidden) / n_hidden**0.5 #kaiming initialization
        self.b_h = torch.zeros(n_hidden)
        
    def __call__(self, x):
        #(N, L, H_in) -> (N, L, H_out)
        #h_t -> (N, H_out)
        batch_size, seq_len, _ = x.size()
        h_t = torch.zeros(batch_size, self.n_hidden)
        output = []
        for i in range(seq_len):
            h_t = torch.tanh(x[:, i, :] @ self.W_xh + h_t @ self.W_hh + self.b_h)
            output.append(h_t)

        self.h = torch.stack(output, dim=1)
        return self.h
    def parameters(self):
        return [self.W_xh, self.W_hh, self.b_h]

#LSTM layer
class LSTM:
    def __init__(self, n_input, n_hidden):
        self.n_input = n_input
        self.n_hidden = n_hidden
        self.W_ii = torch.randn(n_input, n_hidden) / n_input**0.5 #kaiming initialization
        self.W_hi = torch.randn(n_hidden, n_hidden) / n_hidden**0.5 #kaiming initialization
        self.b_i = torch.zeros(n_hidden)
        
        self.W_if = torch.randn(n_input, n_hidden) / n_input**0.5 #kaiming initialization
        self.W_hf = torch.randn(n_hidden, n_hidden) / n_hidden**0.5 #kaiming initialization
        self.b_f = torch.zeros(n_hidden)
        
        self.W_ig = torch.randn(n_input, n_hidden) / n_input**0.5 #kaiming initialization
        self.W_hg = torch.randn(n_hidden, n_hidden) / n_hidden**0.5 #kaiming initialization
        self.b_g = torch.zeros(n_hidden)
        
        self.W_io = torch.randn(n_input, n_hidden) / n_input**0.5 #kaiming initialization
        self.W_ho = torch.randn(n_hidden, n_hidden) / n_hidden**0.5 #kaiming initialization
        self.b_o = torch.zeros(n_hidden)

        
    def __call__(self, x):
        #(N, L, H_in) -> (N, L, H_out)
        #h_t -> (N, H_out)
        batch_size, seq_len, _ = x.size()
        h_t = torch.zeros(batch_size, self.n_hidden)
        c_t = torch.zeros(batch_size, self.n_hidden)
        output = []
        for i in range(seq_len):
            i_t = torch.sigmoid(x[:, i, :] @ self.W_ii + h_t @ self.W_hi + self.b_i)
            f_t = torch.sigmoid(x[:, i, :] @ self.W_if + h_t @ self.W_hf + self.b_f)
            g_t = torch.tanh(x[:, i, :] @ self.W_ig + h_t @ self.W_hg + self.b_g)
            o_t = torch.sigmoid(x[:, i, :] @ self.W_io + h_t @ self.W_ho + self.b_o)
            c_t = f_t * c_t + i_t * g_t
            h_t = o_t * torch.tanh(c_t)
            output.append(h_t)
        self.h = torch.stack(output, dim=1)
        self.c = c_t
        return self.h
    def parameters(self):
        return [self.W_ii, self.W_hi, self.b_i, self.W_if, self.W_hf, self.b_f, self.W_ig, self.W_hg, self.b_g, self.W_io, self.W_ho, self.b_o]

#GRU layer
class GRU:
    def __init__(self, n_input, n_hidden):
        self.n_input = n_input
        self.n_hidden = n_hidden
        
        self.W_ir = torch.randn(n_input, n_hidden) / n_input**0.5 #kaiming initialization
        self.W_hr = torch.randn(n_hidden, n_hidden) / n_hidden**0.5 #kaiming initialization
        self.b_r = torch.zeros(n_hidden)
        
        self.W_iz = torch.randn(n_input, n_hidden) / n_input**0.5 #kaiming initialization
        self.W_hz = torch.randn(n_hidden, n_hidden) / n_hidden**0.5 #kaiming initialization
        self.b_z = torch.zeros(n_hidden)
        
        self.W_in = torch.randn(n_input, n_hidden) / n_input**0.5 #kaiming initialization
        self.W_hn = torch.randn(n_hidden, n_hidden) / n_hidden**0.5 #kaiming initialization
        self.b_in = torch.zeros(n_hidden)
        self.b_hn = torch.zeros(n_hidden)
    
    def parameters(self):
        return [self.W_ir, self.W_hr, self.b_r, self.W_iz, self.W_hz, self.b_z, self.W_in, self.W_hn, self.b_in, self.b_hn]
    
    def __call__(self, x):
        batch_size, seq_len, _ = x.size()
        h_t = torch.zeros(batch_size, self.n_hidden)
        output = []
        
        for i in range(seq_len):
            r_t = torch.sigmoid(x[:, i, :] @ self.W_ir + h_t @ self.W_hr + self.b_r)
            z_t = torch.sigmoid(x[:, i, :] @ self.W_iz + h_t @ self.W_hz + self.b_z)
            n_t = torch.tanh(x[:, i, :] @ self.W_in + self.b_in + r_t * (h_t @ self.W_hn + self.b_hn))
            h_t = (1 - z_t) * n_t + z_t * h_t
            output.append(h_t)
        self.h = torch.stack(output, dim=1)
        
        return self.h

#Batch normalization
class BatchNorm1d:
    def __init__(self, num_features, eps=1e-5, momentum=0.1, training=True):
        self.num_features = num_features
        self.eps = eps
        self.momentum = momentum
        self.training = training
        self.running_mean = torch.zeros(num_features)
        self.running_var = torch.ones(num_features)
        self.gamma = torch.ones(num_features)
        self.beta = torch.zeros(num_features)
        
    def __call__(self, x):
        if self.training:
            if x.ndim == 2:
                dim = 0
            elif x.ndim == 3:
                dim = (0, 1)
            x_mean = x.mean(dim=dim, keepdim=True)
            x_var = x.var(dim=dim, keepdim=True)
            with torch.no_grad():
                self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * x_mean
                self.running_var = self.momentum * self.running_var + (1 - self.momentum) * x_var
        else:
            x_mean = self.running_mean
            x_var = self.running_var
        x_std = torch.sqrt(x_var + self.eps)
        x_hat = (x - x_mean) / x_std
        self.out = x_hat * self.gamma + self.beta
        return self.out
    
    def parameters(self):
        return [self.gamma, self.beta]

#Layer normalization
class LayerNorm:
    def __init__(self, normalized_shape, eps=1e-5):
        self.num_features = normalized_shape
        self.eps = eps
        self.gamma = torch.ones(normalized_shape)
        self.beta = torch.zeros(normalized_shape)
        
    def __call__(self, x):
        x_mean = x.mean(dim=-1, keepdim=True)
        x_var = x.var(dim=-1, keepdim=True, unbiased=False)
        self.out = (x - x_mean) / torch.sqrt(x_var + self.eps) * self.gamma + self.beta
        return self.out
    
    def parameters(self):
        return [self.gamma, self.beta]


In [121]:
#SGD optimizer
class SGD:
    def __init__(self, parameters, lr=0.001, momentum=0):
        self.parameters = parameters
        self.lr = lr
        self.momentum = momentum
        self.v = [torch.zeros_like(p) for p in parameters]
        #self.dampening = dampening
        #self.weight_decay = weight_decay
    
    def zero_grad(self):
        for p in self.parameters:
            p.grad = None
        
    def step(self):
        for (i, p) in enumerate(self.parameters):
            if p.grad is not None:
                self.v[i] = self.momentum * self.v[i] + (1 - self.momentum) * p.grad
                p.data -= self.lr * self.v[i]

#Adam optimizer
class Adam:
    def __init__(self, parameters, lr = 0.001, betas = (0.9, 0.999), eps = 1e-8):
        self.parameters = parameters
        self.lr = lr
        self.betas = betas
        self.eps = eps
        self.m = [torch.zeros_like(p) for p in parameters]
        self.v = [torch.zeros_like(p) for p in parameters]
        self.t = 0
        
    def zero_grad(self):
        for p in self.parameters:
            p.grad = None
    
    def step(self):
        self.t += 1
        for (i, p) in enumerate(self.parameters):
            if p.grad is not None:
                self.m[i] = self.betas[0] * self.m[i] + (1 - self.betas[0]) * p.grad
                self.v[i] = self.betas[1] * self.v[i] + (1 - self.betas[1]) * p.grad**2
                m_hat = self.m[i] / (1 - self.betas[0]**self.t)
                v_hat = self.v[i] / (1 - self.betas[1]**self.t)
                p.data -= self.lr * m_hat / (torch.sqrt(v_hat) + self.eps)

In [122]:
#Residual block
class ResidualBlock():
    def __init__(self, n_hidden):
        self.linear1 = Linear(n_hidden, n_hidden, bias=False)
        self.linear2 = Linear(n_hidden, n_hidden, bias=True)
        self.relu = ReLU()
        self.batchnorm = BatchNorm1d(n_hidden)

    def __call__(self, x):
        input = x
        x = self.linear1(x)
        x = self.batchnorm(x)
        x = self.relu(x)
        x = self.linear2(x)
        return input + x #residual connection
    def parameters(self):
        return self.linear1.parameters() + self.linear2.parameters() + self.batchnorm.parameters()
    
#TakeLayer
class TakeLayer:
    def __init__(self, index):
        self.index = index

    def __call__(self, x):
        return x[:, self.index, :]
    
    def parameters(self):
        return []

#SelfAttention layer
class SelfAttention:
    def __init__(self, n_embd, head_size):
        self.n_embd = n_embd
        self.head_size = head_size
        self.query = Linear(n_embd, head_size, bias=False)
        self.key = Linear(n_embd, head_size, bias=False)
        self.value = Linear(n_embd, head_size, bias=False)

    def parameters(self):
        return self.query.parameters() + self.key.parameters() + self.value.parameters()
    
    def __call__(self, x):
        #input shape: (N, L, H_in)
        #output shape: (N, L, H_out)
        N, L, _ = x.size()
        H_out = self.head_size
        q = self.query(x) #(N, L, H_out)
        k = self.key(x) #(N, L, H_out)
        v = self.value(x) #(N, L, H_out)
        
        wei = q @ k.transpose(-2, -1) * H_out**-0.5 #(N, L, L)
        tril = torch.tril(torch.ones(L, L)) #(L, L)
        wei = wei.masked_fill(tril == 0, float('-inf')) #(N, L, L)
        wei = F.softmax(wei, dim=-1) #(N, L, L)
        
        out = wei @ v #(N, L, H_out)
        return out
    
    def parameters(self):
        return self.query.parameters() + self.key.parameters() + self.value.parameters()

#MultiHeadAttention layer
class MultiHeadAttention:
    def __init__(self, n_embd, head_size, n_heads):
        self.n_embd = n_embd
        self.head_size = head_size
        self.n_heads = n_heads
        self.heads = [SelfAttention(n_embd, head_size) for _ in range(n_heads)]
        self.project = Linear(head_size * n_heads, n_embd)
    
    def parameters(self):
        return [param for head in self.heads for param in head.parameters()] + self.project.parameters()
    
    def __call__(self, x):
        out = torch.cat([head(x) for head in self.heads], dim=-1)
        out = self.project(out)
        return out
    
#FeedForward layer
class FeedForward:
    def __init__(self, n_embd):
        self.n_embd = n_embd
        self.net = Sequential(
            [
            Linear(n_embd, 4*n_embd, bias=True),
            ReLU(),
            Linear(4*n_embd, n_embd, bias=True)
            ]
        )
    
    def parameters(self):
        return self.net.parameters()
    
    def __call__(self, x):
        return self.net(x)

#ResidualTransformerBlock
class ResidualTransformerBlock:
    def __init__(self, n_embd, n_head):
        self.n_embd = n_embd
        head_size = n_embd // n_head
        self.n_head = n_head
        self.attention = MultiHeadAttention(n_embd, head_size, n_head)
        self.norm1 = LayerNorm(n_embd)
        self.feedforward = FeedForward(n_embd)
        self.norm2 = LayerNorm(n_embd)
    
    def parameters(self):
        return self.attention.parameters() + self.norm1.parameters() + self.norm2.parameters() + self.feedforward.parameters()
    
    def __call__(self, x):
        x = x + self.attention(self.norm1(x))
        x = x + self.feedforward(self.norm2(x))
        return x
    
#Token + position embedding
class TokenEmbedding:
    def __init__(self, vocab_size, n_embd):
        self.vocab_size = vocab_size
        self.n_embd = n_embd
        self.token_embedding_table = Embedding(vocab_size, n_embd)
        self.position_embedding_table = Embedding(block_size, n_embd)

    def parameters(self):
        return self.token_embedding_table.parameters() + self.position_embedding_table .parameters()
    
    def __call__(self, idx):
        B, T = idx.shape
        
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T)) # (T,C)

        return tok_emb + pos_emb
    
class GPT:
    def __init__(self, vocab_size, n_embd, n_head, n_layer, block_size):
        self.token_embedding = TokenEmbedding(vocab_size, n_embd)
        self.layernorm = LayerNorm(n_embd)
        self.lm_head = Linear(n_embd, vocab_size)
        self.transformer_blocks = Sequential([ResidualTransformerBlock(n_embd, n_head) for _ in range(n_layer)])
        self.block_size = block_size
        self.vocab_size = vocab_size
        self.n_embd = n_embd
        self.n_head = n_head
        self.n_layer = n_layer
    
    def parameters(self):
        return self.token_embedding.parameters() + self.lm_head.parameters() + self.transformer_blocks.parameters() + self.layernorm.parameters()
    
    def __call__(self, idx, targets=None):
        x = self.token_embedding(idx)
        x = self.transformer_blocks(x)
        x = self.layernorm(x)
        x = self.lm_head(x)
        
        if targets is None:
            loss = None
        else:
            B, T, C = x.shape
            x = x.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(x, targets)
        return (x, loss)
    
    def generate(self, context, max_new_tokens):
        for _ in range(max_new_tokens):
            logits, loss = self(context[:, -self.block_size:])
            logits = logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            context = torch.cat((context, idx_next), dim=1)
        return context
    

In [123]:
model = GPT(vocab_size, n_embd, n_head, n_layer, block_size)

In [124]:
@torch.no_grad()
def estimate_loss():
    out = {}
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    return out

In [125]:
for p in model.parameters():
    p.requires_grad = True

In [126]:
optimizer = Adam(model.parameters())

In [127]:
for iter in range(max_iters):
    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(model.generate(context, max_new_tokens=500)[0].tolist()))

step 0: train loss 4.6709, val loss 4.6674
step 500: train loss 2.2630, val loss 2.2892
step 1000: train loss 1.9158, val loss 2.0253
step 1500: train loss 1.7432, val loss 1.8937
step 2000: train loss 1.6424, val loss 1.8172
step 2500: train loss 1.5861, val loss 1.7763
step 3000: train loss 1.5445, val loss 1.7443
step 3500: train loss 1.5218, val loss 1.7282
step 4000: train loss 1.5104, val loss 1.7176
step 4500: train loss 1.4842, val loss 1.6960
step 4999: train loss 1.4591, val loss 1.6753

She first in lips my story lady?
Whost haste there, my lords, husband,--
Alll the conceatene's gross requites's burbuous bott
Rutger'd to with his isterimate much'd.

YORK:
Say forthip tendern prebear by hid?

Third Servant;
He, longmiebners the of sistent,
The satifict me: you would shols but your may boy!
Our knownes, and mother, you cattaion propes?
What hate to prease in inverence, had or the fear's!

KING HENRY VI:
For him, whush's stroke up your all not mock
With but worlds!

Servings, 

In [128]:
# generate from the model
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(model.generate(context, max_new_tokens=500)[0].tolist()))


Belsh! when how brothing from me which him;
And wink heaveng me that a no their virtues chames
With Richard him bring-hereademeness, whipe chierby,
And at her that the life; yet srew to fawar;
Thereof wild, bawd, you comming at the counself;
But presery the loved atttene orning frow thee is,
That news he whoses thing the what found;
My father'd firle to wold our changed;
See you prose no lives to not news be my forth please,
That is and what hataths and news.

PROTE:
Viraly husme to be sead goot
