In [3]:
# Dataset: source code for 'requests' python libraray
text = open("data/requests.txt").read()

In [4]:
import numpy as np

# sorted list of all unique characters in the text
chars = sorted(list(set(text)))
vocab_size = len(chars)

# string-to-integer mapping
stoi = { char:i for i,char in enumerate(chars) }

# integer-to-string mapping
itos = { i:char for i,char in enumerate(chars) }

# lookup functions for the mappings
encode = lambda s: [stoi[c] for c in s]
decode = lambda l: ''.join([itos[i] for i in l])

# encode the entire text file and convert to a numpy array
data = np.array(encode(text))

In [5]:
import numpy as np

# Each character has weights of a 32 long vector, defined by n_embed (embedding dimension)
n_embd = 32

# Max input sequence length
max_seq_len = 1000

# Initialize embedding matrix
embedding_matrix = np.random.randn(vocab_size, n_embd)

In [6]:
# Standard expansion factor of four
ffwd_expansion_factor = 4

# Initialize hidden layer and output layer
# Use Kaiming init to intelligently scale the layer's random weights
W1 = np.random.randn(n_embd, n_embd * ffwd_expansion_factor) * np.sqrt(2.0 / n_embd)
W2 = np.random.randn(n_embd * ffwd_expansion_factor, n_embd) * np.sqrt(2.0 / n_embd)
 

In [7]:
def cross_entropy_loss(y_pred, y_true):

    # Add a small epsilon to the prediction to avoid log(0), which is undefined.
    epsilon = 1e-9
    
    # cross-entropy formula
    loss = -np.sum(y_true * np.log(y_pred + epsilon))

    return loss

In [8]:
class self_attention_block:
    def __init__(self, W_query, W_key, W_value):
        
        self.W_query   = W_query 
        self.W_key = W_key 
        self.W_value = W_value

        self.W_query_grad = np.zeros_like(self.W_query)
        self.W_key_grad = np.zeros_like(self.W_key)
        self.W_value_grad = np.zeros_like(self.W_value)

        self.cache = {}

        
    def forward(self, x):

        self.cache['x'] = x

        B, T, n_embd = x.shape
        
        queries = x @ self.W_query    # (B, T, n_embd)
        keys = x @ self.W_key         # (B, T, n_embd) 
        values = x @ self.W_value     # (B, T, n_embd)

        self.cache['queries'] = queries  
        self.cache['keys'] = keys
        self.cache['values'] = values

        # Make key query attention pattern
        # Divide by sqrt of dimension for numerical stability
        attention_scores = (queries @ keys.transpose(0, 2, 1)) / np.sqrt(keys.shape[-1])
        self.cache['attention_scores'] = attention_scores

        # Causal mask
        mask = np.tril(np.ones((T, T))) == 0  # Upper triangle is True
        attention_scores[:, mask] = -np.inf  # Apply mask to all batches

        # softmax
        stable_scores = attention_scores - np.max(attention_scores, axis=-1, keepdims=True)
        attention_weights = np.exp(stable_scores) / np.sum(np.exp(stable_scores), axis=-1, keepdims=True)
        self.cache['attn_weights'] = attention_weights
        
        # final output: attended inputs
        output = attention_weights @ values  # (B, T, n_embd)

        return output
        
    def backward(self, d_output):
        
        # Gradient through: output = attention_weights @ values
        d_attention_weights = d_output @ self.cache['values'].transpose(0, 2, 1)
        d_values = self.cache['attn_weights'].transpose(0, 2, 1) @ d_output  

        # Apply jacobian to backprop through the softmax function
        d_attention_scores = self.cache['attn_weights'] * (d_attention_weights - np.sum(d_attention_weights * self.cache['attn_weights'], axis=-1, keepdims=True))

        # Scale factor
        scale = 1.0 / np.sqrt(self.cache['keys'].shape[-1])
        
        # Gradient through scaling
        d_attention_scores_scaled = d_attention_scores * scale
        
        # Gradient through: queries @ keys.transpose(0, 2, 1)
        d_queries = d_attention_scores_scaled @ self.cache['keys']
        d_keys = d_attention_scores_scaled.transpose(0, 2, 1) @ self.cache['queries']

        # Gradient through: queries = x @ W_query (and same for keys, values)
        self.W_query_grad, d_x_from_queries = self.linear_backward(d_queries, self.W_query, self.cache['x'])
        self.W_key_grad, d_x_from_keys = self.linear_backward(d_keys, self.W_key, self.cache['x'])  
        self.W_value_grad, d_x_from_values = self.linear_backward(d_values, self.W_value, self.cache['x'])
        
        # Sum gradients from all three paths
        d_x = d_x_from_queries + d_x_from_keys + d_x_from_values
        return d_x

    @staticmethod
    def linear_backward(d_output, W, x_from_cache):
        d_x = d_output @ W.T
        x_reshaped, dy_reshaped = x_from_cache.reshape(-1, x_from_cache.shape[-1]), d_output.reshape(-1, d_output.shape[-1])
        d_W = x_reshaped.T @ dy_reshaped
        return d_W, d_x

    def optimizer (self, learning_rate):
        self.W_query -= (self.W_query_grad * learning_rate)
        self.W_key -= (self.W_key_grad * learning_rate)
        self.W_value-= (self.W_value_grad * learning_rate)

In [9]:
class Model:
    def __init__(self,embedding_matrix, W1, W2, temperature=1.0, max_sequence_length=1000, n_embd=32):

        # Initialize weight matrices
        self.embedding_matrix = embedding_matrix
        self.unembedding_matrix = embedding_matrix.transpose()
        self.position_matrix = np.random.randn(max_sequence_length, n_embd)
        self.W1 = W1
        self.W2 = W2

        # Transformer block
        W_query = np.random.randn(n_embd, n_embd) * 0.02
        W_key = np.random.randn(n_embd, n_embd) * 0.02  
        W_value = np.random.randn(n_embd, n_embd) * 0.02
        self.transformer = self_attention_block(W_query, W_key, W_value)
        
        self.cache = {} # A dictionary to store forward pass values

        # Temperature hyperparameter
        self.temperature = temperature
        
        # Gradient buckets
        self.embedding_matrix_grad = np.zeros_like(self.embedding_matrix)
        self.position_matrix_grad = np.zeros_like(self.position_matrix)
        self.W1_grad = np.zeros_like(self.W1)
        self.W2_grad = np.zeros_like(self.W2)


    def forward(self, x_batch):
        
        x_batch = np.array(x_batch)
        if x_batch.ndim == 1:
            x_batch = x_batch[None, :]  # Add batch dimension: (T,) -> (1, T)
        
        self.cache['x_batch'] = x_batch

        # Output shape: (B, T, n_embd)
        embd = self.embedding_matrix[x_batch]
        self.cache['embd'] = embd

        # Positional embeddings
        B, T = x_batch.shape
        pos = self.position_matrix[:T]  # Slice for sequence length
        self.cache['pos'] = pos
        
        # Add position to token embeddings
        attn_input = embd + pos
        
        # Self-attention
        attn_output = self.transformer.forward(attn_input)
        self.cache['attn_output'] = attn_output
        
        hidden = attn_output @ self.W1
        self.cache['hidden'] = hidden
        
        hidden_activated = np.maximum(0, hidden)
        self.cache['hidden_activated'] = hidden_activated 
        
        processed_vectors = hidden_activated @ self.W2 # Shape: (B, T, n_embd)
        self.cache['processed_vectors'] = processed_vectors

        # Final projection to logits
        logits = processed_vectors @ self.unembedding_matrix
        
        return logits

    def pred (self, x):

        logits = self.forward(x)[0, -1]  # Get batch 0, last position
        
        scaled_logits = logits / self.temperature
        
        ## Apply softmax function to logits
        stable_logits = scaled_logits - np.max(scaled_logits) # This ensures the largest logit is 0
        preds = np.exp(stable_logits) / np.sum(np.exp(stable_logits))       
        
        char_pred = np.random.choice(range(0, len(chars)), p=preds)
        
        return char_pred

    def calc_loss (self, logits, y_batch):

        # Get the dimensions for indexing
        B, T, C = logits.shape

        # Stable softmax
        max_logits = np.max(logits, axis=-1, keepdims=True)
        stable_logits = logits - max_logits
        exp_logits = np.exp(stable_logits)
        probabilities = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
    
        # Get the probabilities for the correct target characters using efficient indexing
        correct_char_probs = probabilities[np.arange(B)[:, None], np.arange(T), y_batch]

        # Calculate negative log likelihood
        loss_array = -np.log(correct_char_probs + 1e-9)
    
        # Average the loss over the whole batch to get a single number
        mean_loss = np.mean(loss_array)

        self.loss = mean_loss
        
        # Return probabilities because they are the starting point for backpropagation
        return mean_loss, probabilities

    
    # Calculates the gradients for a specific layer and it's resulting vector
    @staticmethod
    def linear_backward(d_output, W, x_from_cache):

        # d_W = x.T @ dy
        # d_x = dy @ W.T

        d_x = d_output @ W.T

        # Flaten weight and input arrays to calculate weight gradients
        x_reshaped, dy_reshaped = x_from_cache.reshape(-1, x_from_cache.shape[-1]), d_output.reshape(-1, d_output.shape[-1])
        d_W = x_reshaped.T @ dy_reshaped

        return d_W, d_x


    def backward (self, d_logits):

        # unembedding layer
        grad_unembed, d_processed = self.linear_backward(d_logits, self.unembedding_matrix, self.cache['processed_vectors'])
        self.embedding_matrix_grad = grad_unembed.transpose()

        # Activated hidden layer
        grad_W2, d_hidden_activated = self.linear_backward(d_processed, self.W2, self.cache['hidden_activated'])
        self.W2_grad = grad_W2

        d_hidden = d_hidden_activated * (self.cache['hidden'] > 0)

        # Hidden layer
        grad_W1, d_attn = self.linear_backward(d_hidden, self.W1, self.cache['attn_output'])
        self.W1_grad = grad_W1

        # Attention block
        d_attn_input = self.transformer.backward(d_attn)

         # Split gradient between embeddings and positions (attn_input = embd + pos)
        d_embed = d_attn_input  
        d_pos = d_attn_input  
        
        # Update position matrix gradients
        B, T = self.cache['x_batch'].shape
        self.position_matrix_grad = np.zeros_like(self.position_matrix)
        self.position_matrix_grad[:T] += np.sum(d_pos, axis=0)  # Sum over batch dimension
    
        # Perform reverse lookup on embedding array
        np.add.at(self.embedding_matrix_grad, self.cache['x_batch'], d_embed)

    def optimizer (self, learning_rate): 

        self.embedding_matrix -= (self.embedding_matrix_grad * learning_rate)
        self.position_matrix -= (self.position_matrix_grad * learning_rate)
        self.W1 -= (self.W1_grad * learning_rate)
        self.W2 -= (self.W2_grad * learning_rate)
        self.transformer.optimizer(learning_rate)


In [10]:
model = Model(embedding_matrix, W1, W2)
# Get next character predictions for 'd'
logits = model.forward([[stoi['d'], stoi['d']]])
model.pred([int(stoi['r'])])

  hidden = attn_output @ self.W1
  hidden = attn_output @ self.W1
  hidden = attn_output @ self.W1
  processed_vectors = hidden_activated @ self.W2 # Shape: (B, T, n_embd)
  processed_vectors = hidden_activated @ self.W2 # Shape: (B, T, n_embd)
  processed_vectors = hidden_activated @ self.W2 # Shape: (B, T, n_embd)
  logits = processed_vectors @ self.unembedding_matrix
  logits = processed_vectors @ self.unembedding_matrix
  logits = processed_vectors @ self.unembedding_matrix
  queries = x @ self.W_query    # (B, T, n_embd)
  queries = x @ self.W_query    # (B, T, n_embd)
  queries = x @ self.W_query    # (B, T, n_embd)
  keys = x @ self.W_key         # (B, T, n_embd)
  keys = x @ self.W_key         # (B, T, n_embd)
  keys = x @ self.W_key         # (B, T, n_embd)
  values = x @ self.W_value     # (B, T, n_embd)
  values = x @ self.W_value     # (B, T, n_embd)
  values = x @ self.W_value     # (B, T, n_embd)


np.int64(126)

In [11]:
import random

def get_batch(data, batch_size, block_size):

    x_batch = []
    y_batch = []

    # Generate batchs 
    for block in [0] * batch_size:

        # Get random range in datast of size=block_size
        slice_idx = random.randrange(0, len(data) - block_size)
        x_batch.append(data[slice_idx:slice_idx+block_size])
        y_batch.append(data[slice_idx+1:slice_idx+block_size+1])

    return np.array(x_batch), np.array(y_batch)



In [None]:
# Training hyperparameters
max_iters = 10000
learning_rate = 1e-4 # A common starting point for learning rate
batch_size = 32
block_size = 50

np.seterr(all='ignore')

# Training loop
for step in range(max_iters):
    
    # Get a mini-batch of data
    x_batch, y_batch = get_batch(data, batch_size, block_size)

    
    # Calculate loss and probabilites
    logits = model.forward(x_batch)
    loss_initial, probabilities = model.calc_loss(logits, y_batch)

    # Backward Pass
    one_hot_array = np.eye(vocab_size)[y_batch]
    initial_gradient = probabilities - one_hot_array
    
    model.backward(initial_gradient)

    # Gradient clipping
    ax_grad_norm = 1.0
    for grad in [model.embedding_matrix_grad, model.W1_grad, model.W2_grad, 
                 model.position_matrix_grad, model.transformer.W_query_grad, 
                 model.transformer.W_key_grad, model.transformer.W_value_grad]:
        grad_norm = np.linalg.norm(grad)
        if grad_norm > max_grad_norm:
            grad *= (max_grad_norm / grad_norm)

    # Optimizer
    model.optimizer(learning_rate)

    # Gradient zeroing
    model.embedding_matrix_grad.fill(0)
    model.W1_grad.fill(0)
    model.W2_grad.fill(0)
    model.position_matrix_grad.fill(0)
    model.transformer.W_query_grad.fill(0)
    model.transformer.W_key_grad.fill(0)
    model.transformer.W_value_grad.fill(0)

    if step % 1000 == 0:
        print(f"Step {step}, Loss: {loss_initial}")

print(f"Model loss: {model.loss}")


In [49]:
# Control the model's 'creativity'
temperature = 1
model.temperature = temperature

# Let the model generate some code!
initial_char = "p"

generation_length = 50
charIdxs = [int(stoi[initial_char])]

for i in range(generation_length):
    charIdxs.append(model.pred(charIdxs))

char_preds = [itos[charIdx] for charIdx in charIdxs]
print("".join(char_preds))

passsapocpysbesotm/drrcts(.an(tpdoknaa(tmint_ss(_hm


  queries = x @ self.W_query    # (B, T, n_embd)
  queries = x @ self.W_query    # (B, T, n_embd)
  queries = x @ self.W_query    # (B, T, n_embd)
  keys = x @ self.W_key         # (B, T, n_embd)
  keys = x @ self.W_key         # (B, T, n_embd)
  keys = x @ self.W_key         # (B, T, n_embd)
  values = x @ self.W_value     # (B, T, n_embd)
  values = x @ self.W_value     # (B, T, n_embd)
  values = x @ self.W_value     # (B, T, n_embd)
  hidden = attn_output @ self.W1
  hidden = attn_output @ self.W1
  hidden = attn_output @ self.W1
  processed_vectors = hidden_activated @ self.W2 # Shape: (B, T, n_embd)
  processed_vectors = hidden_activated @ self.W2 # Shape: (B, T, n_embd)
  processed_vectors = hidden_activated @ self.W2 # Shape: (B, T, n_embd)
  logits = processed_vectors @ self.unembedding_matrix
  logits = processed_vectors @ self.unembedding_matrix
  logits = processed_vectors @ self.unembedding_matrix
  attention_scores = (queries @ keys.transpose(0, 2, 1)) / np.sqrt(keys.shap