In [None]:
import os
from itertools import chain

fpath_win = 'C:\\Users\\rwmil\\Documents\\GitHub\\llm\\llm\\'
fpath_mac = '/Users/richardmiller/Documents/GitHub/llm/'
os.chdir(fpath_mac)
print(os.getcwd())

In [None]:
with open('./LLMs-from-scratch-main/ch02/01_main-chapter-code/the-verdict.txt','r',encoding='utf-8') as f:
    raw_text = f.read()
f.close()

split_text = [re.split(r'([,.:;?_!"()\']|--|\s)',i) for i in raw_text.split()]
split_text = [i.strip() for item in split_text for i in item if i not in ['',' ']]
split_text.extend(['<|endoftext|>','<|unk|>'])

split_text = sorted(list(set(split_text)))
vocab = {token:i for i,token in enumerate(split_text)}

In [None]:
class SimpleTokenizer:
    def __init__(self,vocab):
        ''' The vocab object must be a dictionary or 
        dictionary-like object and must be defined 
        outside of the tokenizer class. It also must
        have the form of {string:integer}'''
        self.string_to_int = vocab
        self.int_to_string = {i:s for s,i in vocab.items()}
        self.re_string_encoder = r'([,.:;?_!"()\']|--|\s)' #Leave these here because I can use them later to define the regex
        self.re_string_decoder = r'\s+([,.:;?!"()\'])' # strings that I want to use later. If my texts become more complex.

    def encoder(self,text):
        preprocess = re.split(self.re_string_encoder,text)
        preprocess = [i.strip() for i in preprocess if i not in ['',' ']]
        preprocess = [i if i in self.string_to_int else '<|unk|>' for i in preprocess]
        
        
        return [self.string_to_int[s] for s in preprocess]

    def decoder(self,ids):
        text = ' '.join([self.int_to_string[i] for i in ids])
        text = re.sub(self.re_string_decoder,r'\1',text)
        return text

tokenizer = SimpleTokenizer(vocab)
text1 = "Hello, do you like tea?"
text2 = "In the sunlit terraces of the palace."
te = " <|endoftext|> ".join((text1, text2))
out = tokenizer.encoder(te)
out2 = tokenizer.decoder(out)

In [None]:
print(out2)

In [None]:
from importlib.metadata import version
import tiktoken
print("tiktoken version:", version("tiktoken"))

In [None]:
tokenizer = tiktoken.get_encoding('gpt2')
text = ('Hello, do you like tea? <|endoftext|> In the sunlit terraces of someunknownPlace.')
integers = tokenizer.encode(text,allowed_special={'<|endoftext|>'})
print(integers)

In [None]:
strings = tokenizer.decode(integers)
print(strings)

## Section 2 Using Byte Pair Encoding

In [None]:
import os
import tiktoken

fpath_win = 'C:\\Users\\rwmil\\Documents\\GitHub\\llm\\llm\\'
fpath_mac = '/Users/richardmiller/Documents/GitHub/llm/'
os.chdir(fpath_mac)
print(os.getcwd())

In [None]:
#Get text data
with open('./LLMs-from-scratch-main/ch02/01_main-chapter-code/the-verdict.txt') as f:
    raw_text = f.read()
    
f.close()

#Initialize tokenizer and encode raw text from file
tokenizer = tiktoken.get_encoding('gpt2')
encoded_text = tokenizer.encode(raw_text)

#Remove last 50 characters
encoded_sample = encoded_text[:50]


In [None]:
#Split encoded sample into x and y targets

context_size = 4 #Determines number of tokens included in input
x = encoded_sample[:context_size]
y = encoded_sample[1:context_size+1]

for i in range(1,context_size+1):
    context = encoded_sample[:i]
    desired = encoded_sample[i]
    print(context,'----->',desired)
    

In [None]:
for i in range(1,context_size+1):
    context = encoded_sample[:i]
    desired = encoded_sample[i]
    print(tokenizer.decode(context),'----->',tokenizer.decode([desired]))

## Section 3

In [None]:
import torch
from torch.utils.data import DataLoader, Dataset
import tiktoken


In [None]:

class GPTDatasetV1(Dataset):
    def __init__(self,txt,tokenizer,max_length,stride):
        '''
        Looks like tokenizer is instantiated before being passed to this class.
        '''
        
        self.input_ids = []
        self.target_ids = []
        
        #Get token id's
        self.token_ids = tokenizer.encode(txt)
        
        for i in range(0,len(self.token_ids)-max_length,stride):
            input_chunk = self.token_ids[i:i+max_length]
            target_chunk = self.token_ids[i+1:i+max_length+1]
            self.input_ids.append(torch.tensor(input_chunk))
            self.target_ids.append(torch.tensor(target_chunk))
            
    def __len__(self):
        return len(self.input_ids)
    
    def __getitem__(self,idx):
        return self.input_ids, self.target_ids
    
    
        

In [None]:
def create_dataloader_v1(
    txt,
    batch_size=25,
    max_length=255,
    stride=128,
    shuffle=True,
    drop_last=True,
    num_workers=0,
):
    tokenizer = tiktoken.get_encoding('gpt2')
    dataset = GPTDatasetV1(txt,tokenizer,max_length,stride)
    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        drop_last=drop_last,
        num_workers=num_workers
    )
    
    return dataloader



In [None]:
with open('./LLMs-from-scratch-main/ch02/01_main-chapter-code/the-verdict.txt') as f:
    raw_data = f.read()
    
f.close()

#max_length and stride can be changed, using a larger stride will
#result in less overfitting

#batch_size is a tunable hyperparameter to make the model better.
#Higher batch_size means more memory usage and less noise
#Lower batch_size means less memory usage but more nosie in training

dataloader = create_dataloader_v1(
    raw_data,
    batch_size=1,
    max_length=4,
    stride=1,
    shuffle=False
)

data_iter = iter(dataloader)
first_batch = next(data_iter)


In [None]:
input_ids = torch.tensor([2,3,5,1])
vocab_size = 6
output_dim = 3
torch.manual_seed(123)
embedding_layer = torch.nn.Embedding(vocab_size,output_dim)



In [None]:
vocab_size = 50257
output_dim = 256
token_embedding_layer = torch.nn.Embedding(vocab_size,output_dim)
max_length = 4
dataloader = create_dataloader_v1(raw_data,batch_size=8,max_length=max_length,stride=max_length,shuffle=False)
data_iter = iter(dataloader)
inputs,targets = next(data_iter)

token_embeddings = token_embedding_layer(inputs[1])

context_length = max_length
pos_embedding_layer = torch.nn.Embedding(context_length,output_dim)
pos_embedding = pos_embedding_layer(torch.arange(context_length))
print(pos_embedding.shape,token_embedding.shape)

In [None]:
input_embeddings = token_embeddings+pos_embedding

# Chapter 3 - Self-Attention

In [None]:
import torch
inputs = torch.tensor([
    [0.43,0.15,0.89],
    [0.55,0.87,0.66],
    [0.57,0.85,0.64],
    [0.22,0.58,0.33],
    [0.77,0.25,0.10],
    [0.05,0.80,0.55],
])

query = inputs[1]
attn_scores_2 = torch.empty(inputs.shape[0])
for i, x_i in enumerate(inputs):
    attn_scores_2[i] = torch.dot(x_i,query)
    
print(attn_scores_2)

In [None]:
attn_weights_2_tmp = attn_scores_2/attn_scores_2.sum()
print('Attention Weights: ',attn_weights_2_tmp)
print('Sum: ',attn_weights_2_tmp.sum())

In [None]:
#Computing attention weights of a single token by scalar multiplication of a vector
attn_weights_2 = torch.softmax(attn_scores_2, dim=0)
query = inputs[1]
context_vec_2 = torch.zeros(query.shape)
for i, x_i in enumerate(inputs):
    context_vec_2 += attn_weights_2[i]*x_i
    
print(context_vec_2)

In [None]:
#Computing attention weights of all tokens by dot product
attn_scores = torch.empty(6,6)
for i, x_i in enumerate(inputs):
    for j, x_j in enumerate(inputs):
        attn_scores[i,j] = torch.dot(x_i,x_j)
        
print(attn_scores)

In [None]:
attn_scores = inputs@inputs.T
print(attn_scores)

In [None]:
attn_weights = torch.softmax(attn_scores,dim=-1)
print(attn_weights)

## Chatper 3 Section 4.1

In [None]:
import torch
inputs = torch.tensor([
    [0.43,0.15,0.89],
    [0.55,0.87,0.66],
    [0.57,0.85,0.64],
    [0.22,0.58,0.33],
    [0.77,0.25,0.10],
    [0.05,0.80,0.55],
])

#Define input element, input embadding size, and output embedding size
x_2 = inputs[1]
d_in = inputs.shape[1]
d_out = 2

#Initialize Weight tensors
#requires_grad is set to false to reduce output clutter, but during model training it needs to be true
torch.manual_seed(123) #For repeatability
W_query = torch.nn.Parameter(torch.rand(d_in,d_out), requires_grad=False)
W_key = torch.nn.Parameter(torch.rand(d_in,d_out), requires_grad=False)
W_value = torch.nn.Parameter(torch.rand(d_in,d_out), requires_grad=False)


query_2 = x_2@W_query
key_2 = x_2@W_key
value_2 = x_2@W_value

#Weight Tensors are optimized during training and are used to make predictions
#Attention Weights are tensors that give the amount of attention to different parts of
# the input. Attention weights are like saying since this word directly preceeds the unknown I
# will pay more attention to it than I will to another word that is 40 tokens away.

#Get all keys and values:
keys = inputs@W_key
values = inputs@W_value


In [None]:
#Computing an individual weight
keys_2 = keys[1]
attn_score_22 = query_2.dot(keys_2)
print('Attention score for position 2: ',attn_score_22)

#Compute all weights using tensor dot product
attn_scores_2 = query_2@keys.T
print('Attention Scores: ',attn_scores_2)

#Notice that the second value in attn_scores_2 matches the value of attn_score_22

#Calculate attention scores from attention weights
#To help normalize we are dividing by the square root
#I think this is going to be like the standard deviation where
# you divide by sqrt(N) for a population.
d_k = keys.shape[-1]
attn_weights_2 = torch.softmax(attn_scores_2/d_k**0.5, dim=-1)
print('Scaled Attention Weights: ',attn_weights_2)

#Calculate context vector
context_vect_2 = attn_weights_2 @ values
print('Context Vector: ',context_vect_2)

#Query refers to the current item (token, word, sentance, etc.)
#Key is the indexing value of a token and is used to match the query
#Value the actual content or input values

## Chapter 3 Section 4.2

In [None]:
#It's useful to organize all the code in Chapter 3 Section 4.1 into a single
# succinct python class for use in the LLM architecture.

import torch.nn as nn
class SelfAttention_v1(nn.Module):
    def __init__(self,d_in,d_out):
        '''
        Initializes weight tensors for queries, values, and keys.
        Transforms from input dimension, d_in, to output dimension, d_out.
        '''
        super().__init__()
        self.W_query = nn.Parameter(torch.rand(d_in,d_out))
        self.W_key = nn.Parameter(torch.rand(d_in,d_out))
        self.W_value = nn.Parameter(torch.rand(d_in,d_out))
        
    def forward(self,x):
        '''
        Compute keys, queries, and values tensors by cross product with their respective weights.
        Calculate attention scores.
        Normalize attention scores with softmax to get attention weight tensor.
        Create a context vector.
        
        '''
        keys = x@self.W_key
        queries = x@self.W_query
        values = x@self.W_value
        
        attn_scores = queries@keys.T
        attn_weights = torch.softmax(
            attn_scores/keys.shape[-1]**0.5, 
            dim=-1
        )
        
        context_vect = attn_weights@values
        return context_vect
    
    
torch.manual_seed(123)
sa_v1 = SelfAttention_v1(d_in,d_out)
print(sa_v1(inputs))

#NOTE: using nn.Linear layers will perform the matrix multiplication and with optimized weight initializations.
# This should result in quicker and more reliable training versus using randomized weights.

#Will impliment this in SelfAttention_v2

In [None]:
class SelfAttention_v2(nn.Module):
    def __init__(self, d_in, d_out, qkv_bias=False):
        super().__init__()
        
        self.W_query = nn.Linear(d_in,d_out,bias=qkv_bias)
        self.W_key = nn.Linear(d_in,d_out,bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        
    def forward(self, x):
        queries = self.W_query(x)
        keys = self.W_key(x)
        values = self.W_value(x)
        
        attn_scores = queries@keys.T
        attn_weights = torch.softmax(
            attn_scores/keys.shape[-1]**0.5,
            dim=-1
        )
        
        context_vect = attn_weights@values
        
        return context_vect
    
    
torch.manual_seed(789)
sa_v2 = SelfAttention_v2(d_in,d_out)

print(sa_v2(inputs))

# Chapter 3 Section 5

In [None]:
#Casual Self-Attention (AKA Masked Attention): Prevents a model from learning words after the token word. 
#Because the model won't have access to information after some token. Restricts the model's ability to learn
# from the previous data before the token.

#Multi-Head Self-Attention: Splits the Self-Attention mechanism into multiple sub-units or 'heads', this
# splits up the learning process. Each head learns from different parts of the input text or different contexts.
# This allows or increases the model's ability to perform complex tasks.

#Consider the case of data represented as a tensor, T. In 2-dimensions, the current token data is the diagonal
# elements of the input array. So, the values above the diagonal are masked. In a multi-dimensional tensor, this
# would correspond to the diagonal hyper-plane. All elements in the volume above the diagonal hyper-plane
# will be masked. This prevents the model from accessing data after a given token and restricts it to data from
# before the given token.

#The unmasked weights are normalized to 1.



## Chapter 3 Section 5.1 - Applying a Casual Attention Mask

In [None]:
#Class from Chapter 3 Section 4.2

class SelfAttention_v2(nn.Module):
    def __init__(self, d_in, d_out, qkv_bias=False):
        super().__init__()
        
        self.W_query = nn.Linear(d_in,d_out,bias=qkv_bias)
        self.W_key = nn.Linear(d_in,d_out,bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        
    def forward(self, x):
        queries = self.W_query(x)
        keys = self.W_key(x)
        values = self.W_value(x)
        
        attn_scores = queries@keys.T
        attn_weights = torch.softmax(
            attn_scores/keys.shape[-1]**0.5,
            dim=-1
        )
        
        context_vect = attn_weights@values
        
        return context_vect
    
    
torch.manual_seed(789)
sa_v2 = SelfAttention_v2(d_in,d_out)

In [None]:
#Compute attention weights with the SelfAttention_v2 Class from
# Chapter 3 Section 4.2
queries = sa_v2.W_query(inputs)
keys = sa_v2.W_key(inputs)
attn_scores = queries@keys.T
attn_weights = torch.softmax(
    attn_scores/keys.shape[-1]**0.5,
    dim=-1
)

#Print the tensor we want to mask:
print('Attention Weights Tensor: ',attn_weights)

#Use tril() function to create a masking tensor with 0's in the upper triangle
context_length = attn_scores.shape[0]
mask_simple = torch.tril(torch.ones(context_length, context_length))

print('Simple Masking Tensor: ',mask_simple)

#Normalize Masking Tensor
row_sums = mask_simple.sum(dim=-1,keepdim=True)
mask_simple_norm = mask_simple/row_sums
print('Normalized Masking Tensor: ', mask_simple_norm)

#Masking and renormalizing will prevent upper triangular data from the softmax from leaking into
# the masking.

In [None]:
#Improving on the masking function

#Softmax converts the data into a Poisson probability distribution. We can represent zeros in the upper
# triangle matrix as e^-inf. This can be beneficial to us. If we replace the upper triangle of the tensor with -inf
# and then take the softmax, it will become an upper triangle of masked zeros.

#Define an improved masking tensor:
mask = torch.triu(torch.ones(context_length,context_length),diagonal=1) #diagonal=1 starts at the line above the diagonal
masked = attn_scores.masked_fill(mask.bool(),-torch.inf) #Uses the masked_fill method in pytorch to fill the upper triangle with -infinity

print('Masking Tensor: ',masked)

#Make Attention Weights tensor
attn_weights = torch.softmax(masked/keys.shape[-1]**0.5, dim=1)
print('Attention Weights: ',attn_weights)

#Calculate Context Tensor:
context_vect = attn_weights@values
print('Context Tensor: ', context_vect)

## Chapter 3 Section 5.2

In [None]:
#Dropout can be used to help prevent overfitting to available data and increase general usefulness of
# your model. It does this by ignoring hidden layer units.

#Dropout is only used during training and is disabled for model predictions.

#For transformer architectures, dropout is typically employed at two points in the training process.
# 1. After calculating attention weights
# 2. After applying attention weights to value vectors

#Here we will use a dropout rate of 50%, but typically dropout rates are 10% to 20% (0.1 to 0.2)

#Apply the dropout to a 6x6 tensor for illustration purposes:
torch.manual_seed(123)
dropout = torch.nn.Dropout(0.5)
example = torch.ones(6,6)
print(dropout(example))

#To account for half of the values being dropped out, the other values are doubled.

In [None]:
#Apply the dropout method to the attention weights:
torch.manual_seed(123)
print(dropout(attn_weights))

## Chapter 3 Section 5.3 - Implementing a Compact Casual Attention Class

In [None]:
#Impliment a CasualAttention class by modifying the SelfAttention class. This will then be used
# later as a template for the MultiHeadAttion class that we will code.
import torch.nn as nn
class CasualAttention(nn.Module):
    def __init__(
        self,
        d_in,
        d_out,
        context_length,
        dropout,
        qkv_bias = False,
    ):
        super().__init__()
        
        self.d_out = d_out
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.dropout = nn.Dropout(dropout)
        self.register_buffer(
            'mask',
            torch.triu(torch.ones(
                context_length,
                context_length), 
                       diagonal=1
            ),
        )
        
    def forward(self,x):
        b,num_tokens,d_in = x.shape
        queries = self.W_query(x)
        keys = self.W_key(x)
        values = self.W_value(x)

        attn_scores = queries@keys.transpose(1,2)
        attn_scores.masked_fill_(
            self.mask.bool()[:num_tokens,:num_tokens],
            -torch.inf
        )#################################
        attn_weights = torch.softmax(
            attn_scores/keys.shape[-1]**0.5,
            dim=-1
        )
        
        attn_weights = self.dropout(attn_weights)
        
        context_vect = attn_weights@values
        
        return context_vect
    

In [None]:
#Make a batched set of identical data inputs:
inputs = torch.tensor([
    [0.43,0.15,0.89],
    [0.55,0.87,0.66],
    [0.57,0.85,0.64],
    [0.22,0.58,0.33],
    [0.77,0.25,0.10],
    [0.05,0.80,0.55],
])
d_in = inputs.shape[1]
d_out = 2
batch = torch.stack((inputs,inputs,), dim=0)

torch.manual_seed(123)
context_length = batch.shape[1]


ca = CasualAttention(d_in, d_out, context_length, 0.0)
context_vect = ca(batch)
print('Context Tensor: ',context_vect.shape)

## Chapter 3 Section 6 - Extending Single-Head Attention to Multi-Head Attention

## Chapter 3 Section 6.1 - Stacking Multiple Single-Head Attention Layers

In [None]:
class MultiHeadAttentionWrapper(nn.Module):
    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        
        self.heads = nn.ModuleList(
            [
                CasualAttention(d_in, d_out, context_length, dropout, qkv_bias)
                for _ in range(num_heads)
            ]
        )
        
    def forward(self, x):
        return torch.cat([head(x) for head in self.heads], dim=-1)
    
#By using list comprehension in the self.heads assignment, we are doing these in sequence. This is not
# very efficient. But we can modify this class to perform assignments in parallel. This class has very
# compact code, but it isn't very efficient. If we combine the MultiHeadAttentionWrapper class with the
# CasualAttentionClass we can make it even more betterer.

In [None]:
inputs = torch.tensor([
    [0.43,0.15,0.89],
    [0.55,0.87,0.66],
    [0.57,0.85,0.64],
    [0.22,0.58,0.33],
    [0.77,0.25,0.10],
    [0.05,0.80,0.55],
])
d_in = inputs.shape[1]
d_out = 2
batch = torch.stack((inputs,inputs,), dim=0)

torch.manual_seed(123)
context_length = batch.shape[1] #Number of tokens
d_in,d_out = 3,1
mha = MultiHeadAttentionWrapper(d_in, d_out, context_length, 0.0, num_heads=2)

context_vecs = mha(batch)
print(context_vecs)
print(context_vecs.shape)

## Chapter 3 Section 6.2 - Implimenting Multi-Head Attention with Weight Splits

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(
        self, d_in, d_out,
        context_length, dropout,
        num_heads, qkv_bias=False,
    ):
        super().__init__()
        
        assert (d_out%num_heads==0), "d_out must be divisible by num_heads with no remainder."
        
        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = nn.Linear(d_out, d_out)
        self.dropout = nn.Dropout(dropout)

        self.register_buffer(
            "mask",
            torch.triu(
                torch.ones(
                        context_length,
                        context_length
                ),
                diagonal = 1
            )
        )
        
    def forward(self,x):
        #Initialize values and weight tensors
        b, num_tokens, d_in = x.shape
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)

        #Reshape and resize weight tensors to inflate them (opposite of flattning)
        # Tensors start with shape (b,num_tokens, d_out) and end with a shape of
        # (b, num_tokens, num_heads, head_dim)
        # where head_dim = d_out/num_heads
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)

        keys = keys.transpose(1,2)
        queries = queries.transpose(1,2)
        values = values.transpose(1,2)

        #Perform matrix multiplication to get attention scores:
        # This is analagous to having a tensor, T with shape (1,2,3,4) and performing 2 dot products:
        #  first = T[0,0,:,:]
        #  first_result = first@first.T
        #
        # and then performing:
        #
        #  second = T[0,1,:,:]
        #. second_result = second@second.T
        attn_scores = queries@keys.transpose(2,3)

        #Mask attention scores
        mask_bool = self.mask.bool()[:num_tokens,:num_tokens]
        attn_scores.masked_fill_(mask_bool, -torch.inf)
        
        #Normalize attention scores
        attn_weights = torch.softmax(attn_scores/keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)
        
        #Dotproduct to get result
        context_vec = (attn_weights@values).transpose(1,2)
        
        #Reshape outputs back to size (b,num_tokens,d_out)
        context_vec = context_vec.contiguous().view(
            b, num_tokens, self.d_out
        )

        #Projection Layer - Not necessary, but often used in LLM architecture. I'm not sure why it's used.
        # this layer projects the results from one vector space into another vector space, by some linear mapping.
        # I'm not sure if this is to save disk space/memory, or if there is some optimal dimension for the vector
        # space to decode this back to text. Maybe to decrease processing speed/cycles used.
        context_vec = self.out_proj(context_vec)
        
        

In [None]:
inputs = torch.tensor([
    [0.43,0.15,0.89],
    [0.55,0.87,0.66],
    [0.57,0.85,0.64],
    [0.22,0.58,0.33],
    [0.77,0.25,0.10],
    [0.05,0.80,0.55],
])
d_in = inputs.shape[1]
d_out = 2
batch = torch.stack((inputs,inputs,), dim=0)
torch.manual_seed(123)

batch_size, context_length, d_in = batch.shape
d_out = 2

mha = MultiHeadAttention(d_in,d_out,context_length, 0.0, num_heads=2)
context_vecs = mha(batch)



# Chapter 4 - Implimenting a GPT Model from Scratch to Generate Text

## Chapter 4 Section 1 - Conding an LLM Architecture

In [None]:
#GPT Model Configuration Variables
GPT_CONFIG_124M = {
            'vocab_size':50257,
            'context_length':1024,
            'emb_dim':768,
            'n_heads':12,
            'n_layers':12,
            'drop_rate':0.1,
            'qkv_bias':False,
}


In [None]:
#Listing 4.1 - A Placeholder GPT Model Architecture Class
import torch
import torch.nn as nn

class DummyGPTModel(nn.Module):
    def __init__(self,cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg['vocab_size'],cfg['emb_dim'])
        self.pos_emb = nn.Embedding(cfg['context_length'],cfg['emb_dim'])
        self.drop_emb = nn.Dropout(cfg['drop_rate'])
        #Placeholder for transformer blocks
        self.trf_blocks = nn.Sequential(
            *[
                DummyTransformerBlock(cfg)
                for _ in range(cfg['n_layers'])
            ]
        )
        
        #Placeholder for LayerNorm
        self.final_norm = DummyLayerNorm(cfg['emb_dim'])
        self.out_head = nn.Linear(
            cfg['emb_dim'],cfg['vocab_size'],bias=False
        )
        
    def forward(self,in_idx):
        print(in_idx.shape)
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(
            torch.arange(seq_len, device=in_idx.device)
        )
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        
        return logits
    
#Use pass through code placeholders for transformer and
#Layer norm blocks.
class DummyTransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()

    def forward(self, x):
        return x

class DummyLayerNorm(nn.Module):
    def __init__(self, normalized_shape, eps=1e-5):
        super().__init__()

    def forward(self,x):
        return x
        
        

In [None]:
import tiktoken



def encoder(txt):
    tensor = torch.tensor(tokenizer.encode(txt))
    return tensor


tokenizer = tiktoken.get_encoding('gpt2')

txts = ['Every effort moves you','Every day holds a']
batch = [encoder(tx) for tx in txts]
batch = torch.stack(batch,dim=0)

print(batch)

torch.manual_seed(123)
model = DummyGPTModel(GPT_CONFIG_124M)
logits = model(batch)
print("Output shape:", logits.shape)
print(logits)

In [None]:
torch.manual_seed(123)

batch_example = torch.randn(2, 5)
layer = nn.Sequential(nn.Linear(5,6), nn.ReLU())
out = layer(batch_example)
print(out)

In [None]:
mean = out.mean(dim=-1, keepdim=True)
var = out.var(dim=-1, keepdim=True)
print(mean, var)

In [None]:
out_norm = (out-mean)/torch.sqrt(var)
mean = out_norm.mean(dim=-1, keepdim=True)
var = out_norm.var(dim=-1, keepdim=True)
print(out_norm)
print(mean, var)

## Chapter 4 Section 2 - Normalizing Activations with Layer Normalization

In [None]:
#Listing 4.2 - A Layer Normalization Class
#Code is implimented below the hashed line.
import torch
import torch.nn as nn

class DummyGPTModel(nn.Module):
    def __init__(self,cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg['vocab_size'],cfg['emb_dim'])
        self.pos_emb = nn.Embedding(cfg['context_length'],cfg['emb_dim'])
        self.drop_emb = nn.Dropout(cfg['drop_rate'])
        #Placeholder for transformer blocks
        self.trf_blocks = nn.Sequential(
            *[
                DummyTransformerBlock(cfg)
                for _ in range(cfg['n_layers'])
            ]
        )
        
        #Placeholder for LayerNorm
        self.final_norm = LayerNorm(cfg['emb_dim'])
        self.out_head = nn.Linear(
            cfg['emb_dim'],cfg['vocab_size'],bias=False
        )
        
    def forward(self,in_idx):
        print(in_idx.shape)
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(
            torch.arange(seq_len, device=in_idx.device)
        )
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        
        return logits
    
#Use pass through code placeholders for Transformer Block.
class DummyTransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()

    def forward(self, x):
        return x
    
###############################################################################
class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))
        
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1,keepdim=True)
        norm_x = (x-mean)/torch.sqrt(var+self.eps)
        return self.scale*norm_x+self.shift

## Chapter 4 Section 3 - Implementing a Feed Forward Network with GELU Activations

In [None]:
#Listing 4.3 - An Implementation of the GELU Activation Function
class GELU(nn.Module):
    def __init__(self):
        super().__init__()
        
    def forward(self, x):
        return 0.5*x*(1+torch.tanh(
            torch.sqrt(torch.tensor(2.0/torch.pi))*
            (x+0.044715*torch.pow(x,3))
        ))

In [None]:
import matplotlib.pyplot as plt
gelu,relu = GELU(), nn.ReLU()
x = torch.linspace(-3,3,100)
y_gelu, y_relu = gelu(x), relu(x)
plt.figure(figsize=(8,3))

for i, (y,label) in enumerate(zip([y_gelu,y_relu],['GELU','ReLU']), 1):
    plt.subplot(1,2,i)
    plt.plot(x,y)
    plt.title(f'{label} activation function')
    plt.xlabel('x')
    plt.ylabel(f'{label}(x)')
    plt.grid(True)
    
plt.tight_layout()
plt.show()

In [None]:
#Listing 4.4 - A Feed Forward Neural Network Module
class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(cfg['emb_dim'],4*cfg['emb_dim']),
            GELU(),
            nn.Linear(4*cfg['emb_dim'], cfg['emb_dim']),
        )
        
    def forward(self,x):
        return self.layers(x)

In [None]:
ffn = FeedForward(GPT_CONFIG_124M)
x = torch.rand(2,3,768)
out = ffn(x)
print(out.shape)

## Chapter 4 Section 4 - Adding Shortcut Connections

In [None]:
#Listing 4.5 - A Neural Network to Illustrate Shortcut Connections

class ExampleDeepNeuralNetwork(nn.Module):
    def __init__(self, layer_sizes, use_shortcut):
        super().__init__()
        self.use_shortcut = use_shortcut
        self.layers = nn.ModuleList([
            nn.Sequential(nn.Linear(layer_sizes[0], layer_sizes[1]),
                GELU()),
            nn.Sequential(nn.Linear(layer_sizes[1], layer_sizes[2]),
                GELU()),
            nn.Sequential(nn.Linear(layer_sizes[2], layer_sizes[3]),
                GELU()),
            nn.Sequential(nn.Linear(layer_sizes[3], layer_sizes[4]),
                GELU()),
            nn.Sequential(nn.Linear(layer_sizes[4], layer_sizes[5]),
                GELU())
            
        ])

    def forward(self, x):
        for layer in self.layers:
            layer_output = layer(x)
            if self.use_shortcut and x.shape == layer_output.shape:
                x = x + layer_output
            else:
                x = layer_output
        return x
    
layer_sizes = [3, 3, 3, 3, 3, 1]
sample_input = torch.tensor([[1., 0., -1.]])
torch.manual_seed(123)
model_without_shortcut = ExampleDeepNeuralNetwork(
    layer_sizes, use_shortcut=False
)

In [None]:
#Build a function that prints the loss.
def print_gradients(model,x):
    output = model(x)
    target = torch.tensor([[0.]])
    loss = nn.MSELoss()
    loss = loss(output, target)
    loss.backward()
    
    for name, param in model.named_parameters():
        if 'weight' in name:
            print(f'{name} has a gradient of {param.grad.abs().mean().item()}')





In [None]:
#Instantiate a model without skip connections
#This has a vanishing gradient problem
print_gradients(model_without_shortcut, sample_input)

In [None]:
#Instantiate a model with skip connections (shortcuts).
torch.manual_seed(123)
model_with_shortcut = ExampleDeepNeuralNetwork(
    layer_sizes, use_shortcut=True
)
print_gradients(model_with_shortcut, sample_input)

## Chapter 4 Section 6 - Coding the GPT Model

### 1.a Import modules

In [7]:
import torch
import torch.nn as nn

### 1.b Define the MultiHeadAttention Class

In [8]:
class MultiHeadAttention(nn.Module):
    def __init__(
        self, d_in, d_out,
        context_length, dropout,
        num_heads, qkv_bias=False,
    ):
        super().__init__()
        
        assert (d_out%num_heads==0), "d_out must be divisible by num_heads with no remainder."
        
        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = nn.Linear(d_out, d_out)
        self.dropout = nn.Dropout(dropout)

        self.register_buffer(
            "mask",
            torch.triu(
                torch.ones(
                        context_length,
                        context_length
                ),
                diagonal = 1
            )
        )
        
    def forward(self,x):
        #Initialize values and weight tensors
        b, num_tokens, d_in = x.shape
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)

        #Reshape and resize weight tensors to inflate them (opposite of flattning)
        # Tensors start with shape (b,num_tokens, d_out) and end with a shape of
        # (b, num_tokens, num_heads, head_dim)
        # where head_dim = d_out/num_heads
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)

        keys = keys.transpose(1,2)
        queries = queries.transpose(1,2)
        values = values.transpose(1,2)

        #Perform matrix multiplication to get attention scores:
        # This is analagous to having a tensor, T with shape (1,2,3,4) and performing 2 dot products:
        #  first = T[0,0,:,:]
        #  first_result = first@first.T
        #
        # and then performing:
        #
        #  second = T[0,1,:,:]
        # second_result = second@second.T
        attn_scores = queries@keys.transpose(2,3)

        #Mask attention scores
        mask_bool = self.mask.bool()[:num_tokens,:num_tokens]
        attn_scores.masked_fill_(mask_bool, -torch.inf)
        
        #Normalize attention scores
        attn_weights = torch.softmax(attn_scores/keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)
        
        #Dotproduct to get result
        context_vec = (attn_weights@values).transpose(1,2)
        
        #Reshape outputs back to size (b,num_tokens,d_out)
        context_vec = context_vec.contiguous().view(
            b, num_tokens, self.d_out
        )

        #Projection Layer - Not necessary, but often used in LLM architecture. I'm not sure why it's used.
        # this layer projects the results from one vector space into another vector space, by some linear mapping.
        # I'm not sure if this is to save disk space/memory, or if there is some optimal dimension for the vector
        # space to decode this back to text. Maybe to decrease processing speed/cycles used.
        context_vec = self.out_proj(context_vec)
        

### 1.c Define Feed Forward Class

In [None]:
class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(cfg['emb_dim'],4*cfg['emb_dim']),
            GELU(),
            nn.Linear(4*cfg['emb_dim'], cfg['emb_dim']),
        )
        
    def forward(self,x):
        return self.layers(x)

## 1.d Define LayerNorm Class

In [9]:
class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))
        
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1,keepdim=True)
        norm_x = (x-mean)/torch.sqrt(var+self.eps)
        return self.scale*norm_x+self.shift

### 2. Define Transformer Block

In [10]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        
        self.att = MultiHeadAttention(
            d_in = cfg['emb_dim'],
            d_out = cfg['emb_dim'],
            context_length = cfg['context_length'],
            num_heads = cfg['n_heads'],
            dropout = cfg['drop_rate'],
            qkv_bias = cfg['qkv_bias'],
        )
        
        self.ff = FeedForward(cfg)
        self.norm1 = LayerNorm(cfg['emb_dim'])
        self.norm2 = LayerNorm(cfg['emb_dim'])
        self.drop_shortcut = nn.Dropout(cfg['drop_rate'])
        
    def forward(self,x):
        shortcut = x
        x = self.norm1(x)
        x = self.att(x)
        x = self.drop_shortcut(x)
        
        x = x + shortcut
        
        shortcut = x
        x = self.norm2(x)
        x = self.ff(x)
        x = self.drop_shortcut(x)
        
        x = x + shortcut
        
        return x

### 3. Define the GPTModel Class

In [11]:
class GPTModel(nn.Module):
    def __init__(self, cfg):
        self.tok_emb = nn.Embedding(cfg['vocab_size'],cfg['emb_dim'])
        self.pos_emb = nn.Embedding(cfg['context_length'],cfg['emb_dim'])
        self.drop_emb = nn.Embedding(cfg['drop_rate'])
        
        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg['n_layers'])]
        )
        
        self.final_norm = LayerNorm(cfg['emb_dim'])
        self.out_head = nn.Linear(
            cfg['emb_dim'],cfg['vocab_size'],bias=False
        )
        
        def forward(self,in_idx):
            batch_size, seq_len, = in_idx
            tok_embeds = self.tok_emb(in_idx)
            pos_embeds = self.pos_emb(
                torch.arange(seq_len, device=in_idx.device)
            )
            
            x = tok_embeds + pos_embeds
            x = self.drop_emb(x)
            x = self.trf_blocks(x)
            x = self.final_norm(x)
            
            logits = self.out_head(x)
            
            return logits

In [None]:
GPT_CONFIG_124M = {
            'vocab_size':50257,
            'context_length':1024,
            'emb_dim':768,
            'n_heads':12,
            'n_layers':12,
            'drop_rate':0.1,
            'qkv_bias':False,
        }

torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)

out = model(batch)


import tiktoken

def encoder(txt):
    tensor = torch.tensor(tokenizer.encode(txt))
    return tensor


tokenizer = tiktoken.get_encoding('gpt2')

txts = ['Every effort moves you','Every day holds a']
batch = [encoder(tx) for tx in txts]
batch = torch.stack(batch,dim=0)