<a href="https://colab.research.google.com/github/pashok3d/RemarqueGPT/blob/main/Lets_build_GPT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Preparation

In [None]:
# Installs

!pip install tiktoken -q
!pip install ipdb -q

In [None]:
# Imports

import tiktoken
from typing import List, Tuple

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

In [None]:
%pdb on

### Dataset loading and processing

In [None]:
with open("/content/the_dream_room.txt", "r") as f:
    lines = f.readlines()

In [None]:
lines = [l.replace('\xa0', ' ').strip() for l in lines if l.strip()]

In [None]:
text = "\n".join(lines)

In [None]:
text[:200]

'В цветущих садах веял майский ветерок От веток сирени, нависавших над оградой старой кладки, доносился густой сладкий аромат. Художник Фриц Шрамм медленно бродил по старинным переулкам городка. Время '

In [None]:
enc = tiktoken.encoding_for_model("gpt-4o")

In [None]:
tokens = enc.encode_ordinary(text)

In [None]:
dataset_token_len = len(tokens)

In [None]:
gpt_token_id_to_local_id = {
    token: i+1 for i, token in enumerate(set(tokens))
}

In [None]:
local_id_to_gpt_token_id = {
    v: k for k, v in gpt_token_id_to_local_id.items()
}

In [None]:
def encode(text) -> List[int]:
    tokens = enc.encode_ordinary(text)
    return [gpt_token_id_to_local_id[token] for token in tokens]

def decode(tokens: List[int]) -> str:
    return enc.decode([local_id_to_gpt_token_id[token] for token in tokens])

In [None]:
vocab_size = len(set(tokens))

### Dataloader

In [None]:
class TextDataset(Dataset):
    def __init__(self, text, context_window_size):
        self.gpt_tokens = enc.encode_ordinary(text)
        self.tokens = [gpt_token_id_to_local_id[token] for token in self.gpt_tokens]

        self.x = []
        self.y = []
        for i in range(len(self.tokens) - context_window_size):
            self.x.append(self.tokens[i:i+context_window_size])
            self.y.append(self.tokens[i+1:i+context_window_size+1])

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx) -> Tuple:
        return torch.tensor(self.x[idx]), torch.tensor(self.y[idx])

In [None]:
T = 6

In [None]:
ds = TextDataset(text, T)

In [None]:
next(iter(ds))

(tensor([1540,  703, 1282, 2069, 1551,  331]),
 tensor([ 703, 1282, 2069, 1551,  331, 1663]))

In [None]:
B = 5

In [None]:
dataloader = DataLoader(ds, batch_size=B, shuffle=True)

In [None]:
next(iter(dataloader))

[tensor([[ 170,  421,  496, 2857,    6, 1989],
         [2771,  124,  581,  613,  345,  503],
         [ 375, 1423, 2265,    6, 2083, 2011],
         [1292,  763,    8, 3217, 2711, 3045],
         [3208, 2503,    6, 1278,  128, 2967]]),
 tensor([[ 421,  496, 2857,    6, 1989,  264],
         [ 124,  581,  613,  345,  503,    6],
         [1423, 2265,    6, 2083, 2011, 2375],
         [ 763,    8, 3217, 2711, 3045,  299],
         [2503,    6, 1278,  128, 2967, 1239]])]

### Model

In [None]:
class Embedding:
    def __init__(self, vocab_size, emb_dim):
        self.embedding_table = nn.Embedding(vocab_size, emb_dim)

    def __call__(self, input):
        return self.embedding_table(input)

In [None]:
class MultiHeadAttention:
    def __init__(self, emb_dim, head_n):
        self.emb_dim = emb_dim
        self.QKV = nn.Linear(emb_dim, emb_dim * 3)
        self.head_n = head_n

    def __call__(self, input):
        """
        input: shape (B, T, C)
        """

        assert self.emb_dim % self.head_n == 0

        # Calculate q, k, v
        q, k, v = self.QKV(input).split(self.emb_dim, dim=-1)
        q = q.view(B, T, self.head_n, self.emb_dim // self.head_n).transpose(1, 2) # (B, nh, T, hs)
        k = k.view(B, T, self.head_n, self.emb_dim // self.head_n).transpose(1, 2)
        v = v.view(B, T, self.head_n, self.emb_dim // self.head_n).transpose(1, 2)

        # Multiply q and k
        qk = q @ k.transpose(-2, -1)

        # Mask
        qk[torch.tril(torch.ones_like(qk)) == 0] = -torch.inf

        # Softmax
        qk_softmax = qk.softmax(dim=-1)

        # Multiply by v
        new_v = qk_softmax @ v

        # Combine new values from multiple heads
        output = new_v.transpose(1, 2).contiguous().view(B, T, self.emb_dim)

        return output


In [None]:
class FeedForward:
    def __init__(self, emb_dim):
        self.m_1 = nn.Linear(emb_dim, emb_dim*2)
        self.a = nn.ReLU()
        self.m_2 = nn.Linear(emb_dim*2, emb_dim)

    def __call__(self, input):
        f1 = self.m_1(input)
        z1 = self.a(f1)
        f2 = self.m_2(z1)
        return f2

In [None]:
class DecoderBlock:
    def __init__(self, emb_dim, head_n):
        self.mha = MultiHeadAttention(emb_dim, head_n)
        self.ff = FeedForward(emb_dim)

    def __call__(self, x):
        x = self.mha(x)
        return self.ff(x)

In [None]:
class Decoder:
    def __init__(self, vocab_size, emb_dim, num_blocks, head_n):
        self.emb = Embedding(vocab_size, emb_dim)
        self.blocks = [DecoderBlock(emb_dim, head_n) for _ in range(num_blocks)]

    def __call__(self, x):
        x = self.emb(x)
        for block in self.blocks:
            x = block(x)
        return x

In [None]:
d = Decoder(vocab_size=vocab_size, emb_dim=16, num_blocks=3, head_n=4)

In [None]:
input = next(iter(dataloader))

In [None]:
x = input[0]

In [None]:
output = d(x)

In [None]:
output.shape

torch.Size([5, 6, 16])