In [None]:
%cd ..

import numpy as np
import random

import KittyLM
from KittyLM.layers import Attention, KarpathyCausalSelfAttention
import torch
import torch.nn as nn

print(KittyLM.__version__)

# Set seeds for reproducibility
random.seed(42)               # Python random seed
np.random.seed(42)            # Numpy random seed
torch.manual_seed(42)         # PyTorch CPU seed
# torch.cuda.manual_seed(42)    # PyTorch GPU seed (if using CUDA)



class KittyLMConfig:
    """
    Config according to the GPT-2 weights on huggingface.
    Using a vocab size that is a multiple of 64 to speed up the processing

    """
    block_size = 1024
    vocab_size = 50304 # 50257 in the original and hf implementation weights
    n_layer = 12
    n_heads = 12
    d_model = 768
    dropout = 0.0
    bias = True

def parity_check_attn(config, input_B, input_T):

    # create random input tensor
    B, T, dim, n_heads = input_B, input_T, config.d_model, config.n_heads
    input_tensor = torch.randn(B, T, dim)

    # Calculate attention on input tensor using custom implemented attention class 
    attention_layer = Attention(config)
    custom_output = attention_layer(input_tensor)
    print(custom_output)
    k_attention = KarpathyCausalSelfAttention(config)
    k_output = k_attention(input_tensor)
    print(k_output)
    # Calculate attention using torch.nn.MultiheadAttention
    # https://pytorch.org/docs/stable/generated/torch.nn.MultiheadAttention.html
    multihead_attn = nn.MultiheadAttention(embed_dim=dim, num_heads=n_heads, dropout=config.dropout, bias=config.bias, batch_first=True )
    query = input_tensor.view(B, T, dim)
    key = query.clone()
    value = query.clone()

    attn_output, attn_output_weights = multihead_attn(query, key, value)
    print(attn_output)
    assert k_output.size() == custom_output.size(), f"custom attn output and pytorch attn output not same size: {custom_output.size()} vs. {attn_output.size()}"
    
    diff = torch.max(torch.abs(k_output -  attn_output))

    return "diff btwn custom implemented and pytorch multihead attn", diff.item()
    
print(parity_check_attn(KittyLMConfig, 1, 10))



In [11]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

class MLP(nn.Module):
    #pass 
    def __init__(self, config):
        super().__init__()
        self.c_fc = nn.Linear(config.d_model, 4*config.d_model, bias = config.bias)
        self.c_proj = nn.Linear(4*config.d_model, config.d_model, bias = config.bias)
        self.activation = nn.GELU() # avoid sudden zeroout of gradients and have a smoother actovation 
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, input):
        return self.dropout(self.c_proj(self.activation(self.c_fc(input))))
        

class Attention(nn.Module):
    #pass
    def __init__(self, config):
        super(Attention, self).__init__()
        self.q_proj = nn.Linear(config.d_model, config.d_model, bias = config.bias)
        self.k_proj = nn.Linear(config.d_model, config.d_model, bias = config.bias)
        self.v_proj = nn.Linear(config.d_model, config.d_model, bias = config.bias)
        self.c_attn = nn.Linear(config.d_model, 3 * config.d_model, bias=config.bias)
        # final projection after attention
        self.projection = nn.Linear(config.d_model, config.d_model, bias = config.bias)

        # these are self-explanatory
        self.attention_dropout = nn.Dropout(config.dropout)
        self.residual_dropout = nn.Dropout(config.dropout)

        self.n_heads = config.n_heads
        self.d_model = config.d_model
        self.dropout = config.dropout
        self.head_size = self.d_model // self.n_heads

        self.register_buffer(
            'causal_mask', 
            torch.tril(torch.ones(config.block_size, config.block_size)) # create a block_size * block_size mask
            .view(1, 1, config.block_size, config.block_size) # add singletons so that shape is B * nh * block_size * block_size
        )

    def forward(self, input):
        B, T, D = input.size() # batch, length, dimension

        # reshape q,k,v to (B, nh, T, hs) from (B, T, D) -> (B, T, nh, hs) -> (B, nh, T, hs)
        # view shouldnt be used to transpose / permute as it messes up the data. chain a 
        # seperate transpose operation to transpose the the sequence length and head dimensions 
        # q, k, v  = self.c_attn(input).split(self.d_model, dim=2)
        q = self.q_proj(input).view(B, T, self.n_heads, self.head_size).transpose(1, 2)
        k = self.k_proj(input).view(B, T, self.n_heads, self.head_size).transpose(1, 2)
        v = self.v_proj(input).view(B, T, self.n_heads, self.head_size).transpose(1, 2)

        # lets manually compute the attention score without einsum
        e = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
        e = e.masked_fill(self.causal_mask[:, :, :T, :T] == 0, float('-inf'))  # masking only the actual inportant information across sequencelength and head dimension
        alpha = F.softmax(e, dim = -1)
        alpha = self.attention_dropout(alpha)
        attention = alpha @ v
        attention = attention.transpose(1, 2).contiguous().view(B, T, D) # hstack all heads
        attention = self.projection(attention)
        attention = self.residual_dropout(attention)

        return attention


class LayerNorm(nn.Module):
    #pass
    def __init__(self, d_model, bias):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(d_model))
        if bias is not None:
            self.bias = nn.Parameter(torch.ones(d_model))

    def forward(self, input):
        ln = F.layer_norm(
            input = input,
            normalized_shape = self.weight.shape,
            weight = self.weight,
            bias = self.bias
        )
        return ln




In [None]:
import math
#import torch
#import torch.nn as nn
#import torch.nn.functional as F

#from layers import MLP, Attention, LayerNorm

class KittyLMConfig:
    """
    Config according to the GPT-2 weights on huggingface.
    Using a vocab size that is a multiple of 64 to speed up the processing

    """
    block_size = 1024
    vocab_size = 50304 # 50257 in the original and hf implementation weights
    n_layer = 12
    n_heads = 12
    d_model = 768
    dropout = 0.0
    bias = True

class KittyLMBlock(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.preln = LayerNorm(config.d_model, bias = config.bias)
        self.attention = Attention(config)
        self.postln = LayerNorm(config.d_model, bias = config.bias)
        self.mlp = MLP(config)

    def forward(self, input):
        input = self.preln(input)
        input = self.attention(input)
        input = self.postln(input)
        output = self.mlp(input)
        return output
        # pass

class KittyLM(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.token_embeddings = nn.Embedding(num_embeddings = config.vocab_size, embedding_dim = config.d_model)
        self.position_embeddings = nn.Embedding(num_embeddings = config.vocab_size, embedding_dim = config.d_model)
        self.blocks = nn.ModuleList([KittyLMBlock(config) for _ in config.n_layer])
        self.dropout = nn.Dropout(config.dropout)
        self.ln_f = LayerNorm(config.d_model, bias = config.bias)
        self.lm_head = nn.Linear(config.d_model, config.vocab_size, bias = False)

        # weight tying
        self.token_embeddings.weight = self.lm_head.weight

        #init weights
        self.apply(self._init_weights)
        for name, parameter in self.named_parameters():
            if name.endswith('projection.weight'):
                nn.init.normal_(parameter, mean = 0.0, std = 0.2 / math.sqrt(2 * config.n_layer))

        print(" parameter count : %.2fM", (self._get_parameter_count() / 1e6))

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.normal_(module.weight, mean = 0.0, std = 0.2)
            if module.bias is not None:
                nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            nn.init.normal_(module.weight, mean = 0.0, std = 0.2)

    def _get_parameter_count(self, non_embedding = True):
        nparams = sum(param.numel() for param in self.parameters)
        if non_embedding:
            nparams -= self.position_embeddings.weight.numel()
        return nparams

    
    def forward(self, input_ids):
        B, T = input_ids.size()
        assert T <= self.config.block_size, "Sequence length cannnot be greater than model capacity"

        token_embeddings = self.token_embeddings(input_ids)
        position_ids = torch.arange(0, T, dtype=torch.long, device=input_ids.device).unsqueeze(0)
        position_embeddings = self.position_embedding(position_ids)

        x = token_embeddings + position_embeddings
        x = self.dropout(x)
        for block in self.blocks:
            x = block(x)

        x = self.ln_f(x)
        logits = self.lm_head(x)
        return logits

