**Task 2** | 
1.Rotary embeddings - so in RoFormer  the limitation of direction is reduced by introducing a rotated attention mechanism. 
In this mechanism, the attention is allowed to focus not only on positions to the left and right but also on positions above and below the current position. 
conclusion -- This modification enables the model to capture diagonal relationships in the input sequence, which can be particularly useful for tasks where such dependencies are crucial.

In [5]:
import sys
import os

import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.nn import functional as F

#for testing of my implementation
from transformers import GPT2Model, GPT2Config, GPT2Tokenizer

from math import pi, log
from torch.cuda.amp import autocast
from torch import nn, einsum, broadcast_tensors
from einops import rearrange, repeat


import warnings
from dataclasses import dataclass
from typing import Optional, Tuple, Union


import torch.utils.checkpoint
from torch.cuda.amp import autocast
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

In [4]:
import os
os.environ['CURL_CA_BUNDLE'] = ''

Collecting einops
  Downloading einops-0.7.0-py3-none-any.whl (44 kB)
     -------------------------------------- 44.6/44.6 kB 731.0 kB/s eta 0:00:00
Installing collected packages: einops
Successfully installed einops-0.7.0
Note: you may need to restart the kernel to use updated packages.


Errors faced in TASK 2:

Can't test the implementation due to time constraints.


In [6]:
# learned rotation helpers

def apply_learned_rotations(rotations, t, start_index = 0, freq_ranges = None):
    if exists(freq_ranges):
        rotations = einsum('..., f -> ... f', rotations, freq_ranges)
        rotations = rearrange(rotations, '... r f -> ... (r f)')

    rotations = repeat(rotations, '... n -> ... (n r)', r = 2)
    return apply_rotary_emb(rotations, t, start_index = start_index)

@autocast(enabled = False)
def apply_rotary_emb(freqs, t, start_index = 0, scale = 1., seq_dim = -2):
    rot_dim, seq_len = freqs.shape[-1], t.shape[seq_dim]
    freqs = freqs[-seq_len:].to(t)

    end_index = start_index + rot_dim
    assert rot_dim <= t.shape[-1], f'feature dimension {t.shape[-1]} is not of sufficient size to rotate in all the positions {rot_dim}'
    t_left, t, t_right = t[..., :start_index], t[..., start_index:end_index], t[..., end_index:]
    t = (t * freqs.cos() * scale) + (rotate_half(t) * freqs.sin() * scale)
    return torch.cat((t_left, t, t_right), dim = -1)




In [14]:
def exists(val):
    return val is not None

def default(val, d):
    return val if exists(val) else d

def broadcat(tensors, dim = -1):
    broadcasted_tensors = broadcast_tensors(*tensors)
    return torch.cat(broadcasted_tensors, dim = dim)


def rotate_half(x):
    x = rearrange(x, '... (d r) -> ... d r', r = 2)
    x1, x2 = x.unbind(dim = -1)
    x = torch.stack((-x2, x1), dim = -1)
    return rearrange(x, '... d r -> ... (d r)')

In [16]:
class RotaryEmbedding(nn.Module):
    def __init__(
        self,
        dim,
        custom_freqs = None,
        freqs_for = 'lang',
        theta = 10000,
        max_freq = 10,
        num_freqs = 1,
        learned_freq = False,
        use_xpos = False,
        xpos_scale_base = 512,
        interpolate_factor = 1.,
        theta_rescale_factor = 1.,
        seq_before_head_dim = False
    ):
        super().__init__()

        theta *= theta_rescale_factor ** (dim / (dim - 2))

        if exists(custom_freqs):
            freqs = custom_freqs
        elif freqs_for == 'lang':
            freqs = 1. / (theta ** (torch.arange(0, dim, 2)[:(dim // 2)].float() / dim))
        elif freqs_for == 'pixel':
            freqs = torch.linspace(1., max_freq / 2, dim // 2) * pi
        elif freqs_for == 'constant':
            freqs = torch.ones(num_freqs).float()
        else:
            raise ValueError(f'unknown modality {freqs_for}')

        self.cache = dict()
        self.cache_scale = dict()
        self.freqs = nn.Parameter(freqs, requires_grad = learned_freq)

        self.learned_freq = learned_freq

        # default sequence dimension

        self.seq_before_head_dim = seq_before_head_dim
        self.default_seq_dim = -3 if seq_before_head_dim else -2

        # interpolation factors

        assert interpolate_factor >= 1.
        self.interpolate_factor = interpolate_factor

        # xpos

        self.use_xpos = use_xpos
        if not use_xpos:
            self.register_buffer('scale', None)
            return

        scale = (torch.arange(0, dim, 2) + 0.4 * dim) / (1.4 * dim)
        self.scale_base = xpos_scale_base
        self.register_buffer('scale', scale)

    def get_seq_pos(self, seq_len, device, dtype, offset = 0):
        return (torch.arange(seq_len, device = device, dtype = dtype) + offset) / self.interpolate_factor

    def rotate_queries_or_keys(self, t, seq_dim = None, offset = 0, freq_seq_len = None):
        seq_dim = default(seq_dim, self.default_seq_dim)

        assert not self.use_xpos, 'you must use `.rotate_queries_and_keys` method instead and pass in both queries and keys, for length extrapolatable rotary embeddings'

        device, dtype, seq_len = t.device, t.dtype, t.shape[seq_dim]

        if exists(freq_seq_len):
            assert freq_seq_len >= seq_len
            seq_len = freq_seq_len

        freqs = self.forward(lambda: self.get_seq_pos(seq_len, device = device, dtype = dtype, offset = offset), cache_key = f'freqs:{seq_len}|offset:{offset}')

        if seq_dim == -3:
            freqs = rearrange(freqs, 'n d -> n 1 d')

        return apply_rotary_emb(freqs, t, seq_dim = seq_dim)

    def rotate_queries_with_cached_keys(self, q, k, seq_dim = None, offset = 0):
        seq_dim = default(seq_dim, self.default_seq_dim)

        q_len, k_len = q.shape[seq_dim], k.shape[seq_dim]
        assert q_len <= k_len
        rotated_q = self.rotate_queries_or_keys(q, seq_dim = seq_dim, freq_seq_len = k_len)
        rotated_k = self.rotate_queries_or_keys(k, seq_dim = seq_dim)

        rotated_q = rotated_q.type(q.dtype)
        rotated_k = rotated_k.type(k.dtype)

        return rotated_q, rotated_k

    def rotate_queries_and_keys(self, q, k, seq_dim = None):
        seq_dim = default(seq_dim, self.default_seq_dim)

        assert self.use_xpos
        device, dtype, seq_len = q.device, q.dtype, q.shape[seq_dim]

        seq = self.get_seq_pos(seq_len, dtype = dtype, device = device)
        freqs = self.forward(lambda: seq, cache_key = f'freqs:{seq_len}')
        scale = self.get_scale(lambda: seq, cache_key = f'scale:{seq_len}').to(dtype)

        if seq_dim == -3:
            freqs = rearrange(freqs, 'n d -> n 1 d')
            scale = rearrange(scale, 'n d -> n 1 d')

        rotated_q = apply_rotary_emb(freqs, q, scale = scale, seq_dim = seq_dim)
        rotated_k = apply_rotary_emb(freqs, k, scale = scale ** -1, seq_dim = seq_dim)

        rotated_q = rotated_q.type(q.dtype)
        rotated_k = rotated_k.type(k.dtype)

        return rotated_q, rotated_k

    def get_scale(self, t, cache_key = None):
        assert self.use_xpos

        if exists(cache_key) and cache_key in self.cache:
            return self.cache[cache_key]

        if callable(t):
            t = t()

        scale = 1.
        if self.use_xpos:
            power = (t - len(t) // 2) / self.scale_base
            scale = self.scale ** rearrange(power, 'n -> n 1')
            scale = torch.cat((scale, scale), dim = -1)

        if exists(cache_key):
            self.cache[cache_key] = scale

        return scale

    @autocast(enabled = False)
    def forward(self, t, cache_key = None):
        should_cache = not self.learned_freq and exists(cache_key)

        if should_cache and cache_key in self.cache:
            return self.cache[cache_key]

        if callable(t):
            t = t()

        freqs = self.freqs

        freqs = einsum('..., f -> ... f', t.type(freqs.dtype), freqs)
        freqs = repeat(freqs, '... n -> ... (n r)', r = 2)

        if should_cache:
            self.cache[cache_key] = freqs

        return freqs

In [18]:
rotary_emb = RotaryEmbedding(dim = 32)

In [13]:
class MHSelfAttention(nn.Module):
    def __init__(self,config):
        super(MHSelfAttention, self).__init__()
        
        d_model = config.n_embd
        n_head = config.n_head
        
        bias = config.bias
        dropout= config.dropout
        
        
        assert d_model % n_head == 0
        
        self.n_head = n_head
        self.head_dim = d_model // n_head
        self.b_bias = bias
        self.dropout = config.dropout

        self.c_attn = nn.Linear(3 * d_model,d_model)
        self.c_proj = nn.Linear(d_model, d_model)
        
        self.attn_dropout = nn.Dropout(dropout)
        self.resid_dropout = nn.Dropout(dropout)
        
        #adding flash attention referenced from NANOGPT
        
        # flash attention make GPU go brrrrr but support is only in PyTorch >= 2.0
        self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
        if not self.flash:
            print("WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0")
            # causal mask to ensure that attention is only applied to the left in the input sequence
            self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
                                        .view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        batch_size, seq_len, d_model = x.size()
        q = self.query(x).view(batch_size, seq_len, self.n_head, self.head_dim).transpose(1, 2)
        k = self.key(x).view(batch_size, seq_len, self.n_head, self.head_dim).transpose(1, 2)
        v = self.value(x).view(batch_size, seq_len, self.n_head, self.head_dim).transpose(1, 2)
        
        q = rotary_emb.rotate_queries_or_keys(q)
        k = rotary_emb.rotate_queries_or_keys(k)
        
        # causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
        if self.flash:
            # efficient attention using Flash Attention CUDA kernels
            out = torch.nn.functional.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=self.dropout if self.training else 0, is_causal=True)
        else:
            attn_ = q @ k.transpose(-2, -1) / torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float32))
            attn_ = att.masked_fill(self.bias[:,:,:T,:T] == 0, float('-inf'))
            attn_weights = F.softmax(attn_, dim=-1)
            attn_weights = self.att_dropout(attn_weights) 
            out = attn_weights @ v
        out = out.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)
        out = self.projection(out)
        out = self.att_dropout(out)
        return out

PositionwiseFeedForward layer (  as pointwise in assignment)

In [5]:
class PointwiseFeedForward(nn.Module):
    def __init__(self, config):
        super(PointwiseFeedForward, self).__init__()
        self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)  #fc layer
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.dropout)
        self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)  #projection layer

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = self.linear2(x)
        x = self.dropout(x)
        return x


Normalization layer is defined . We can also use Functional's layernorm for this task

In [6]:
class LayerNorm(nn.Module):
    def __init__(self, d_model,bias, eps=1e-5):
        super(LayerNorm, self).__init__()
        self.weight = nn.Parameter(torch.ones(d_model))
        self.bias = nn.Parameter(torch.zeros(d_model)) if bias else none
        self.eps = eps

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True)
        return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)


Defining a data class decorator so that it can handle input values as well as pre trained values properly

In [7]:
from dataclasses import dataclass
@dataclass
class GPTConfig:
    block_size: int = 1024
    vocab_size: int = 50304 # GPT-2 vocab_size of 50257, padded up to nearest multiple of 64 for efficiency
    n_layer: int = 12
    n_head: int = 12
    n_embd: int = 768
    dropout: float = 0.0
    bias: bool = True # True: bias in Linears and LayerNorms, like GPT-2. False: a bit better and faster


In [8]:
class GPT2Layer(nn.Module):
    def __init__(self, config):
        super(GPT2Layer, self).__init__()
        self.ln_1 = LayerNorm(config.n_embd, bias=config.bias)
        self.attn = MHSelfAttention(config)
        self.ln_2 = LayerNorm(config.n_embd, bias=config.bias)
        self.mlp = PointwiseFeedForward(config)

    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        x = x + self.ff(self.norm2(x))
        return x

In [19]:
class GPT2(nn.Module):
    def __init__(self,config):
        super(GPT2, self).__init__()
        
        
        
        
        self.wte = nn.Embedding(config.vocab_size, config.n_embd),
        self.wpe = nn.Embedding(config.block_size, config.n_embd),
        self.drop = nn.Dropout(config.dropout),
        self.h = nn.ModuleList([GPT2Layer(config) for _ in range(config.n_layer)]),
        self.ln_f = LayerNorm(config.n_embd, bias=config.bias),
        
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        
        self.apply(self._init_weights)
        
    
    def forward(self, x):
        
        positions = torch.arange(0, t, dtype=torch.long, device=device) 

        # forward the GPT model itself
        
        tok_emb = self.wte(x) 
        
        pos_emb = self.wpe(pos) 
        
        x = self.drop(tok_emb + pos_emb)
        
        for layer in self.h:
            x = layer(x)
        
        x = self.ln_f(x)
        x = self.lm_head(x)
        return x
    


    
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
        elif isinstance(module, nn.LayerNorm):
            torch.nn.init.ones_(module.weight)
            torch.nn.init.zeros_(module.bias)

    
    def load_pretrained_weights(self, model_name='gpt2'):
        # Load pretrained weights from Hugging Face model
        
        config_ = dict(bias= True, n_layer=12, n_head=12, n_embd=768, vocab_size=50257 , block_size= 1024 )
        config = GPTConfig(**config_)
        pretrained_model = GPT2Model.from_pretrained(model_name,resume_download=True)
        
        state_dict = GPT2(config).state_dict()
        sd_keys = state_dict.keys()
        
        sd_keys = [k for k in sd_keys if not k.endswith('.attn.bias')]
        

        state_dict_pretrained = pretrained_model.state_dict()
        sdt_keys = state_dict_pretrained.keys()
        
        
        
        sdt_keys = [k for k in sdt_keys if not k.endswith('.attn.masked_bias')] 
        sdt_keys = [k for k in sdt_keys if not k.endswith('.attn.bias')]
        
        
        transposed = ['attn.c_attn.weight', 'attn.c_proj.weight', 'mlp.c_fc.weight', 'mlp.c_proj.weight']
        print(sd_keys)
        print(sdt_keys)
        
        for k in sdt_keys:
            if any(k.endswith(w) for w in transposed):
                # special treatment for the Conv1D weights we need to transpose
                # assert state_dict_pretrained[k].shape[::-1] == sd[k].shape
                with torch.no_grad():
                    state_dict[k].copy_(state_dict_pretrained[k].t())
            else:
                # vanilla copy over the other parameters
               # assert state_dict_pretrained[k].shape == state_dict[k].shape
                with torch.no_grad():
                    state_dict[k].copy_(state_dict_pretrained[k])

        return model
        
        

In [21]:
# Example usage

config= GPTConfig()
my_gpt_model = GPT2(config)

# Load pretrained weights from Hugging Face GPT-2
my_gpt_model.load_pretrained_weights()

# Perform a sample prediction
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
input_sequence = tokenizer.encode("Hello, how are you today?", return_tensors="pt")
output_hidden_states = my_gpt_model(input_sequence)

print("Input Sequence:", input_sequence)
print("Output Hidden States Shape:", output_hidden_states.shape)





KeyboardInterrupt: 

keys mismatch between gpt2-small and my implementation