In [123]:
import pandas
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F

### 1. Build the dataset

In [15]:
with open("names.txt", 'r') as f:
    names = f.readlines()
words = [name.strip() for name in names]
num_words = len(words)

In [12]:
# build dictionary
chars = '.' + 'abcdefghijklmnopqrstuvwxyz'
itoc = {i: c for i, c in enumerate(chars)}
ctoi = {c: i for i, c in enumerate(chars)}

In [104]:
def build_dataset(words, block_size=3):
    X, y = [], []
    for word in words:
        context = [0] * block_size
        for c in word + '.':
            idx = ctoi[c]
            X.append(context)
            y.append(idx)
            context = context[1:] + [idx]
    X = torch.tensor(X)
    y = torch.tensor(y)
    return X, y

block_size = 8
X, y = build_dataset(words=words, block_size=block_size)

# split into training, validation and testing set
n1, n2 = int(0.8 * num_words), int(0.9 * num_words)
X_train, y_train = X[:n1], y[:n1]
X_val , y_val = X[n1:n2], y[n1:n2]
X_train, y_train = X[n2:], y[n2:]

### 2. Define the Architecture

In [154]:
class Linear:
    """
    L = AX+B
    """

    def __init__(self, features_in, features_out, bias=True):
        self.weight = torch.randn((features_in, features_out)) / features_in**0.5
        self.bias = torch.zeros(features_out) if bias else None

    def __call__(self, x):
        self.out = x @ self.weight
        if self.bias is not None:
            self.out += self.bias
        return self.out

    def parameters(self):
        return [self.weight] + ([] if self.bias is None else [self.bias])
        

# -----------------------------------------------------------------------------

class BatchNorm1D:

    """
    https://en.wikipedia.org/wiki/Batch_normalization
    """

    def __init__(self, dim, eps=1e-5, momentum=0.1):
        self.eps = eps
        self.momentum = momentum
        self.training = True
        # backprop params 
        self.gamma = torch.ones(dim)
        self.beta = torch.zeros(dim)
        # buffers params for momentum update
        self.running_mean = torch.zeros(dim)
        self.running_var = torch.ones(dim)

    def __call__(self, x):
        # compute xmean and xvar
        if self.training:
            if x.ndim == 2:
                dim = 0
            elif x.ndim == 3:
                dim = (0,1)
            xmean = x.mean(dim, keepdim=True)
            xvar = x.var(dim, unbiased=False, keepdim=True)
        else:
            xmean = self.running_mean
            xvar = self.running_var

        xhat = (x - xmean) / torch.sqrt(xvar + self.eps)
        self.out = self.gamma * xhat + self.beta

        # update the buffers
        if self.training:
            with torch.no_grad():
                self.running_mean = self.momentum * xmean + (1 - self.momentum) * self.running_mean
                self.running_var = self.momentum * xvar + (1 - self.momentum) * self.running_var

        return self.out

    def parameters(self):
        return [self.gamma, self.beta]

# -----------------------------------------------------------------------------

class Tanh:

    def __call__(self, x):
        self.out = torch.tanh(x)
        return self.out

    def parameters(self):
        return []

# -----------------------------------------------------------------------------

class Embedding:
  
    def __init__(self, num_embeddings, embedding_dim):
        self.weight = torch.randn((num_embeddings, embedding_dim))
    
    def __call__(self, IX):
        self.out = self.weight[IX]
        return self.out
  
    def parameters(self):
        return [self.weight]

# -----------------------------------------------------------------------------

class Sequential:

    def __init__(self, layers):
        self.layers = layers

    def __call__(self, x):
        for layer in self.layers:
            x = layer(x)
        self.out = x
        return self.out

    def parameters(self):
        return [p for layer in self.layers for p in layer.parameters()]

# -----------------------------------------------------------------------------

class FlattenConsecutive:

    def __init__(self, n):
        self.n = n

    def __call__(self, x):
        B, T, C = x.shape
        x = x.view(B, T//self.n, C*self.n)
        if x.shape[1] == 1:
            x = x.squeeze(1)
        self.out = x
        return self.out

    def parameters(self):
        return []


### 3. Train the model

In [135]:
# define the model
vocab_size = len(chars)
emb_dim = 24 # dim of character embedding
n_hidden = 128 

model = Sequential([
    Embedding(vocab_size, emb_dim), 
    FlattenConsecutive(2), Linear(emb_dim * 2, n_hidden, bias=False), BatchNorm1D(n_hidden), Tanh(),
    FlattenConsecutive(2), Linear(n_hidden * 2, n_hidden, bias=False), BatchNorm1D(n_hidden), Tanh(),
    FlattenConsecutive(2), Linear(n_hidden * 2, n_hidden, bias=False), BatchNorm1D(n_hidden), Tanh(),
    Linear(n_hidden, vocab_size),
])

# params init
with torch.no_grad():
    model.layers[-1].weight *= 0.1

parameters = model.parameters()
for p in parameters:
    p.requires_grad = True


In [138]:
# train the model
num_epochs = 200000
mini_batch_size = 32
lossi = []

for i in range(num_epochs):
    # create mini-batch
    idxs = torch.randint(low=0, high=X_train.shape[0], size=(mini_batch_size,))
    Xb, Yb = X_train[idxs], y_train[idxs]

    # forward pass 
    logits = model(Xb)
    loss = F.cross_entropy(logits, Yb)
    lossi.append(loss)

    # backward pass
    for p in parameters:
        p.grad = None
    loss.backward()

    # update weights
    lr = 0.1 if i < 15000 else 0.01
    for p in parameters:
        p.data += -lr * p.grad

    # show stats
    if i % 10000 == 0:
        print(f"Epoch {i+1}: {loss}")

Epoch 1: 2.3597769737243652
Epoch 10001: 1.9274364709854126
Epoch 20001: 2.132906436920166
Epoch 30001: 1.6108429431915283
Epoch 40001: 1.511803388595581
Epoch 50001: 1.630881667137146
Epoch 60001: 1.9283009767532349
Epoch 70001: 1.7336162328720093
Epoch 80001: 1.6993306875228882
Epoch 90001: 1.8537205457687378
Epoch 100001: 1.5659072399139404
Epoch 110001: 1.9198318719863892
Epoch 120001: 1.9517216682434082
Epoch 130001: 1.4510877132415771
Epoch 140001: 1.8526337146759033
Epoch 150001: 1.7783403396606445
Epoch 160001: 1.7433059215545654
Epoch 170001: 1.7774038314819336
Epoch 180001: 1.8917286396026611
Epoch 190001: 1.5836371183395386


#### testing

In [113]:
t = Embedding(vocab_size, emb_dim)
p0 = X_train[idxs]
t1 = t(p0)
t1.shape

torch.Size([32, 8, 24])

In [116]:
t2 = FlattenConsecutive(2)
t3 = t2(t1)
t3.shape

torch.Size([32, 4, 48])

In [119]:
t4 = Linear(emb_dim * 2, n_hidden, bias=False)
t4.parameters()[0].shape

torch.Size([48, 128])

#### visualize loss

### 4. Evaluate the performance

### 5. Test the model

In [157]:
def generate_word(model):
    out = []
    context = [0] * block_size # initialize with all ...
    while True:
      # forward pass the neural net
      logits = model(torch.tensor([context]))
      probs = F.softmax(logits, dim=1)
      # sample from the distribution
      ix = torch.multinomial(probs, num_samples=1).item()
      # shift the context window and track the samples
      context = context[1:] + [ix]
      out.append(ix)
      # if we sample the special '.' token, break
      if ix == 0:
        break
    
    return ''.join(itos[i] for i in out)

In [158]:
# generate_word(model)

In [156]:
# context = [0] * block_size
# model(torch.tensor([context]))

In [153]:
# model.layers[3].running_mean.shape

torch.Size([1, 1, 128])