In [None]:
from pathlib import Path
from dataclasses import dataclass 
import requests
import numpy as np
import torch
import torch.nn as nn
from jaxtyping import Float, Int

import torch.optim as optim
import torch.nn.functional as F
from typing import List
import requests

import unicodedata
import json
from collections import Counter, defaultdict
import base64
from IPython.display import Image, display

from torch.utils.data import Dataset, DataLoader, TensorDataset


@dataclass 
class Config:
    d_model : int 
    d_vocab : int
    d_hidden : int # for MLP
    n_context_max : int # important for training loop (max "slice" size)
    n_context: int 
    n_layers : int
    
    # d_head : int # for Attn (if separate wq and wk)
    #no n_context
    #name var : type

# guttenburg dataset code in existing notebooks

### Dimensions

\begin{align*}
    &\text{d-model} = d_m & : & \text{model dimension (num neurons)} \\
    &d_v = \text{d-vocab} & : & \text{vocab dimension} \\
    &n_c = \text{n-context} & : & \text{context window (len of seq entered)}
\end{align*}

Where $d_n << d_m$

## Getting Data

In [None]:
url = "https://www.gutenberg.org/files/67098/67098-0.txt"
response = requests.get(url)
text = response.text

start_marker = "*** START OF THE PROJECT GUTENBERG EBOOK"
end_marker = "*** END OF THE PROJECT GUTENBERG EBOOK"

start_idx = text.find(start_marker)
end_idx = text.find(end_marker)
content_area = text[start_idx:end_idx].split("\n", 1)[1]

chapter_idx = content_area.upper().find("CHAPTER I")

raw_text = content_area[chapter_idx:]
print(raw_text[:500000])


## Tokenize Text

In [None]:
class Tokenizer:
    def __init__(self, text):
        # clean and sort data 
        cleaned = self.clean_text(text)
        self.chars = sorted(set(cleaned))
        # vocab size 
        self.vocab_size = len(self.chars)

        # vocab map 
        self.encode = {}
        self.decode = {}

        for i, chars in enumerate(self.chars):
            self.encode[chars] = i
            self.decode[i] = chars

        # sanity check 
        print(f"Vocab chars:" ,self.chars)
        print(f"Vocab size:" , self.vocab_size)

    #simplify
    #all letters lowercase
    #each punctuation into a token each letter a token
    #get a set of tokens
    #this set is d_vocab

    # clean the text 
    def clean_text(self, text: str) -> list[str]:
        return [x for x in text.lower() if x.isalpha() or x in " .!?"]

    # encoder 
    def tokenize(self, text):
    # will update words to nums with vocab map 
        cleaned = self.clean_text(text)

        tokens = []
        
        for char in cleaned:
            if char in self.encode:
                tokens.append(self.encode[char])

        print("Encoded tokens:", tokens)
        return tokens

    # decoder 
    def detokenize(self, tokens):
    #inverse of tokenize (nums to words )
        words = []
        for id in tokens:
            if id in self.decode:
                words.append(self.decode[id])

        final = "".join(words)
        print("Decoded text:", final)
        
        return final
        

In [None]:
# test case for tokenizer class 
tokenizer = Tokenizer(raw_text)

sample_text = "Chapter I. The starting of the journey!"

encoded = tokenizer.tokenize(sample_text)
decoded = tokenizer.detokenize(encoded)

## Multilayer Perceptron
$$
    \texttt{MLP}(\mathbf{X}) = W_d \cdot \sigma_{\texttt{ReLU}} (W_u \cdot x + b_u) + b_d, \qquad \texttt{MLP} : \mathbb{R}^{d_m} \to \mathbb{R}^{d_m}
$$

In [None]:
class MLP(nn.Module):
    def __init__(self, config: Config): # matrices to initialize
        super().__init__()
        self.linear_up: nn.Linear = nn.Linear(config.d_model, config.d_hidden)
        self.linear_down: nn.Linear = nn.Linear(config.d_hidden, config.d_model)
    
    def forward(self, x: Float[torch.Tensor, "* d_model"]) -> Float[torch.Tensor, "* d_model"]:
        x = self.linear_up(x)
        x = torch.relu(x)
        x = self.linear_down(x)
        return x  

## Attention Head
### Weight Matrix
$$
    \mathbf{W}_{QK} := \mathbf{W}_{Q} \cdot \mathbf{W}_{K}^T, \qquad \mathbf{W}_{Q}, \mathbf{W}_{K} \in \mathbb{R}^{d_m \times d_n}
$$

### Autoregressive Masking (M) Matrix
$$
    M_{i,j} = 
    \begin{cases}
        0 &j \geq i \\
        -\infty &j < i>
    \end{cases}
$$

### Forward Pass
$$A(\mathbf{X}) = \sigma_{\text{softmax}} (\mathbf{X} \; \mathbf{W}_{QK} \; \mathbf{X}^\text{T} + \mathbf{M}) \; \mathbf{X} \; \mathbf{W}_{OV}$$

In [None]:
class AttentionHead(nn.Module):
    def __init__(self, config: Config):
        super().__init__()
        self.config = config
        # weights (use nn.parameter) to create a matrix to track gradients
        self.wqk = nn.Parameter(torch.randn(config.d_model, config.d_model))
        self.wov = nn.Parameter(torch.randn(config.d_model, config.d_model))

        ## Create M Matrix
    def M_matrix(self, n):
        # matrix with 0 at and below the diagonal and -inf above the diagonal
        M = torch.ones((n, n))
        M = torch.triu(M, diagonal=1)
        M = M.masked_fill(M == 1, float('-inf'))
        print(M)
        
    
    def forward(self, x: Float[torch.Tensor, "* d_model"]) -> Float[torch.Tensor, "* d_model"]:
        # use weights to compute Aâ¨‰
        # X as input: n_seq by d_model
        n_seq = x.shape[0]
        M = self.M_matrix(n_seq)
        attention_pattern = x @ self.wqk @ x.T + M
        attention_of_X = nn.Softmax(attention_pattern) @ x @ self.wov
        
        return attention_of_X

### Transformer Block
$$
    \text{TB}(X) = X + A(X) + \text{MLP}(X), \qquad \text{TB}: \mathbb{R}^{n_c \times d_m} \to \mathbb{R}^{n_c \times d_m}
$$

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, config: Config):
        super().__init__()
        self.config = config
        #self.ln = nn.LayerNorm(config)
        self.mlp = MLP(config)
        self.attention = AttentionHead(config)


    def forward(self, x: Float[torch.Tensor, "* d_model"]) -> Float[torch.Tensor, "* d_model"]:
        #output = x + mlp(x) + attentionhead(x)   
        output_x = x + self.mlp(x) + self.attention(x)
        #x = self.ln(x_1)
        #x = self.ln(x_2)

        return output_x

In [None]:
class transformer(nn.Module):
    def __init__(self, config: Config):
        super().__init__()
        self.config = config
        self.token_embedding = nn.Embedding(config.d_vocab, config.d_model)
        self.pos_embedding = nn.Embedding(config.n_context_max, config.d_model)
        #self.transformerblocks = nn.modules list of transformer blocks
        self.transformerblocks = nn.ModuleList([TransformerBlock(config) for _ in range(config.n_layers)])
        
    def forward(self, x: Int[torch.Tensor, "n_context"]) -> Float[torch.Tensor, "n_context d_vocab"]:
        x = self.token_embedding(x) # converts int d-vector to d-model vector
        x = x + self.pos_embedding(torch.arange(x.shape[0])) # x = E + P
        # pos_embedding(x) uses nn.Embedding of torch.arrange(n_context)
        for i in range(self.config.n_layers):
            x = self.transformerblocks[i](x)
        x = x @ self.token_embedding.weight.T # unembedding 
        #n_contex long - sequence if ints of length n  - float tneosry by n_model  and output is float tencsosr by d-vocab \n",
        #d_model to d_vocab transpose or do a lineear map  - unembed nn.linear
        #dmodel to dvocab 

        return x
    
    def generator(self, num_tokens = 10, input_text = ""):# some text, number of new token, and return esseuquence of text - tokenzise text, sequence of numbers, numbers in model and get probaility, sample probablities, detonize 
    
        tokenizer = Tokenizer(raw_text)
        tokenized_text =  tokenizer.tokenize(input_text)
        input_tensor = torch.tensor(tokenized_text, dtype=torch.long)
        for i in range(num_tokens):
                out = self.forward(input_tensor)
                print("Finished running through forward!")
                probailities = torch.softmax(out[:, -1], dim = -1)
                new_token = torch.multinomial(probailities, num_samples= 1)
                new_input_tensor = torch.cat([input_tensor, new_token], dim = -1)
                input_tensor = new_input_tensor
        detokenized_text = tokenizer.detokenize(input_tensor.tolist())

        return detokenized_text
   
# use nn.ModuleList for TB seqeunce & MHA (to create a list of TBS)
# print(f"{x.shape = }") for debugging

# pick a unique dataset to train data on

# if traning models: aim for < 10 million parameters for now
#   sum(x.numel() for x in mymodel.parameters())


In [None]:
#Generate function based on user input for n_layers : int, d_model : int, d_vocab : int, d_hidden : int
# def Generator(User_n_layers : int, User_d_model : int, User_d_vocab : int, User_d_hidden : int):
#     configuration = Config(User_n_layers, User_d_model, User_d_vocab, user_d_hidden)
#     Tranformer_model = transformer(configuration)
#     return Tranformer_model

#model_initialize_something 

# Save and Load Models

In [None]:
def save_model(model, tokens, config, path="my_first_transformer.pt"):
    torch.save({
        "model_state": model.state_dict(),
        "config": config,                   
        "vocab": tokenizer.vocab,          
    }, path)
    print(f"Model saved to {path}")

def load_model(path="my_first_transformer.pt"):
    load_in = torch.load(path)
    config = load_in["config"]
    model_loaded = transformer(config)

    tokenizer = Tokenizer(None)
    tokenizer.vocab = load_in["vocab"]
    print(f"Model loaded from {path}")
    return model_loaded, tokenizer, config


In [None]:
class Book_Dataset(torch.utils.data.Dataset):
    def __init__(self, raw_text, train_len):
        self.tokenizer = Tokenizer(raw_text)
        self.tokens = torch.tensor(self.tokenizer.tokenize(raw_text), dtype=torch.long)
        self.n_context = train_len

    def __len__(self):
        return len(self.tokens) - self.n_context
    
    def __getitem__(self, index):
        x = self.tokens[index:index + self.n_context]
        y = self.tokens[index + 1:index + self.n_context + 1]
        return x, y

In [None]:
# training loop 

def train_loop(samples, batchsize, model, epochs):

    # wrap an iterable to enable easy access to samples 
    data_loader = DataLoader(samples, batch_size = batchsize, shuffle = True)

    optimizer = torch.optim.AdamW(model.parameters(), lr = 1e-5)
    criterion = nn.CrossEntropyLoss()

    # if don't need to do a split then use: 
    print("<<<< Training Started >>>>")
    
    model.train()
    loss_train = 0 
    for x, y in data_loader: 
        optimizer.zero_grad()
        output = model(x)
        if y.ndim > 1:
            y = torch.argmax(y, dim =1)
        y = y.long()

        batch, training_len, vocab = output.shape

        loss = criterion(output.view(batch * training_len, vocab), y.view(batch * training_len))

        loss.backward()
        optimizer.step()

        loss_train += loss.item()
            
        print(f"Epoch {i +1}: Loss = {loss_train:.4f}")

    print("<<<< Training Complete >>>>")

### Implement Training 

#### Already ran at beginning of code
1) import raw text 
2) tokenizer = Tokenizer(raw_text)


#### Now: 
Set up config, run tokenized text through dataset (update to tensor), train loop  


In [None]:
cfg = Config(d_model=64, d_vocab=31, d_hidden=128, n_layers=2, n_context=64)
model = transformer(cfg)
samples = Book_Dataset(raw_text, cfg.n_context)
train_loop(samples, batchsize=32, model=model, epochs=5)


In [None]:
train_loop(samples, batchsize= 32, model=transformer, epochs = 50)


Initialize model:
1. Training loop on model and generation on model.
2. (model_stupid) no training loop, generation on model.

# Statistical Analysis
(Using markov chains?)