In [19]:
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt

# 1. Load the dataset
words = open('names.txt', 'r').read().splitlines()

# 2. Build Vocabulary
chars = sorted(list(set(''.join(words))))
stoi = {s:i+1 for i,s in enumerate(chars)}
stoi['.'] = 0
itos = {i:s for s,i in stoi.items()}
vocab_size = len(itos)

# 3. Build the Dataset (Rolling Window)
def build_dataset(words, block_size):
    X, Y = [], []
    for w in words:
        context = [0] * block_size
        for ch in w + '.':
            ix = stoi[ch]
            X.append(context)
            Y.append(ix)
            context = context[1:] + [ix]
    return torch.tensor(X), torch.tensor(Y)

# Use a context length of 8 (WaveNet style)
block_size = 8 
Xtr, Ytr = build_dataset(words, block_size)

print(f"Dataset shape: {Xtr.shape}") # Expected:

Dataset shape: torch.Size([228146, 8])


In [20]:
# -----------------------------------------------------------------------------
# 1. Linear Layer
class Linear:
    def __init__(self, fan_in, fan_out, bias=True):
        # Kaiming Initialization for stability
        self.weight = torch.randn((fan_in, fan_out)) / (fan_in**0.5) 
        self.bias = torch.zeros(fan_out) if bias else None
  
    def __call__(self, x):
        self.out = x @ self.weight
        if self.bias is not None:
            self.out += self.bias
        return self.out
  
    def parameters(self):
        return [self.weight] + ( [] if self.bias is None else [self.bias])

# 2. Embedding Layer
class Embedding:
    def __init__(self, num_embeddings, embedding_dim):
        self.weight = torch.randn((num_embeddings, embedding_dim))
    
    def __call__(self, IX):
        self.out = self.weight[IX]
        return self.out
    
    def parameters(self):
        return [self.weight]

# 3. Tanh Activation
class Tanh:
    def __call__(self, x):
        self.out = torch.tanh(x)
        return self.out
    def parameters(self):
        return []
        
# 4. Sequential Container
class Sequential:
    def __init__(self, layers):
        self.layers = layers
    def __call__(self, x):
        for layer in self.layers:
            x = layer(x)
        self.out = x
        return self.out
    def parameters(self):
        return [p for layer in self.layers for p in layer.parameters()]

In [21]:
class FlattenConsecutive:
    def __init__(self, n):
        self.n = n # Group size (usually 2)
        
    def __call__(self, x):
        B, T, C = x.shape
        # Reshape: Group 'n' elements together in the channel dim
        x = x.view(B, T // self.n, C * self.n)
        
        # If we have reduced to a single time step, squeeze it out
        if x.shape[1] == 1:
            x = x.squeeze(1)
        
        self.out = x
        return self.out
    
    def parameters(self):
        return []

In [22]:
class BatchNorm1d:
    def __init__(self, dim, eps=1e-5, momentum=0.1):
        self.eps = eps
        self.momentum = momentum
        self.training = True
        self.gamma = torch.ones(dim)
        self.beta = torch.zeros(dim)
        self.running_mean = torch.zeros(dim)
        self.running_var = torch.ones(dim)
        
    def __call__(self, x):
        if self.training:
            # FIX: Determine dimensions to reduce based on input rank
            if x.ndim == 2:
                dim = 0
            elif x.ndim == 3:
                dim = (0, 1) # Reduce across Batch AND Time
            
            xmean = x.mean(dim, keepdim=True)
            xvar = x.var(dim, keepdim=True, unbiased=True)
        else:
            xmean = self.running_mean
            xvar = self.running_var
            
        xhat = (x - xmean) / torch.sqrt(xvar + self.eps)
        self.out = self.gamma * xhat + self.beta
        
        if self.training:
            with torch.no_grad():
                self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * xmean
                self.running_var = (1 - self.momentum) * self.running_var + self.momentum * xvar
        return self.out
    
    def parameters(self):
        return [self.gamma, self.beta]

In [23]:
n_embd = 24 # Dimensionality of character embeddings
n_hidden = 128 # Neurons in hidden layers

model = Sequential([
    Embedding(vocab_size, n_embd),
    FlattenConsecutive(2), Linear(n_embd * 2, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
    FlattenConsecutive(2), Linear(n_hidden * 2, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
    FlattenConsecutive(2), Linear(n_hidden * 2, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
    Linear(n_hidden, vocab_size)
])

# Initialize parameters
with torch.no_grad():
    model.layers[-1].weight *= 0.1 # Make last layer less confident

parameters = model.parameters()
print(f"Total parameters: {sum(p.nelement() for p in parameters)}")
for p in parameters:
    p.requires_grad = True

Total parameters: 76579


In [None]:
max_steps = 200000
batch_size = 32
lossi = []

for i in range(max_steps):
    # 1. Minibatch Construction
    ix = torch.randint(0, Xtr.shape[0], (batch_size,))
    Xb, Yb = Xtr[ix], Ytr[ix] 
    
    # 2. Forward Pass
    logits = model(Xb)
    loss = F.cross_entropy(logits, Yb) 
    
    # 3. Backward Pass
    for p in parameters:
        p.grad = None
    loss.backward()
    
    # 4. Update
    lr = 0.1 if i < 150000 else 0.01 # Learning rate decay
    for p in parameters:
        p.data += -lr * p.grad

    # Track stats
    if i % 10000 == 0:
        print(f"{i:7d}/{max_steps:7d}: {loss.item():.4f}")
    lossi.append(loss.log10().item())

plt.plot(torch.tensor(lossi).view(-1, 1000).mean(1))
plt.title("Training Loss (Log Scale)")
plt.show()

      0/ 200000: 3.2997
  10000/ 200000: 2.2513
  20000/ 200000: 2.0683
  30000/ 200000: 1.9112


In [None]:
# Sampling from the model
# Set model to eval mode (use running stats for BatchNorm)
for layer in model.layers:
    if isinstance(layer, BatchNorm1d):
        layer.training = False

for _ in range(10):
    out = []
    context = [0] * block_size
    while True:
        # Forward pass through the model
        logits = model(torch.tensor([context]))
        
        # Ensure logits are 2D: [batch_size, vocab_size]
        if logits.ndim > 2:
            logits = logits.view(-1, logits.shape[-1])
        
        # Clamp logits to prevent overflow in softmax
        logits = logits.clamp(min=-50, max=50)
        
        probs = F.softmax(logits, dim=-1)
        
        # Ensure probabilities are valid (no NaN or Inf)
        probs = torch.nan_to_num(probs, nan=1e-8)
        probs = probs / probs.sum(dim=-1, keepdim=True)  # Renormalize
        
        # Sample from the distribution
        ix = torch.multinomial(probs, num_samples=1).item()
        
        context = context[1:] + [ix]
        out.append(ix)
        if ix == 0: break
    
    print(''.join(itos[i] for i in out[:-1]))