W tym notatniku będziemy mogli wytrenować lub korzystać z już wytrenowanego modelu.
Tutaj model jest zbudowany z warstw wbudowanych w PyTorch, aby przyspieszyć jego trening.

Po dokładny opis krok-po-kroku jak ten model powstał, odsyłam do notatnika `GPT.ipynb`


In [71]:
# imports
import torch
import torch.nn as nn
import torch.nn.functional as F
import gdown
import os

# hyperparameters
batch_size = 64
context_size = 192
train_iterations = 5_000
learning_rate = 5e-4 
device = 'cuda' if torch.cuda.is_available() else 'cpu' # wyćwiczenie tego modelu na cpu jest niemalże niemożliwe, inferencja jeszcze "ujdzie"
embedding_size = 192
amount_of_heads = 6
amount_of_blocks = 4
dropout = 0.1  # prawdopodobieństwo dropoutu
MODEL_PATH = "GPT_model.pth"
MODEL_LINK = f"https://drive.google.com/uc?id=1ZQPHKPPda6mXOwuBaJZfcu142X1tYiQ-&export=download"
rng = torch.Generator()
rng.manual_seed(42)

# downloading the dataset

<torch._C.Generator at 0x73ce5eb6efd0>

In [72]:
inference_or_train = input("Do you want to inference (I) already trained model or to train (T) a model?")
if inference_or_train.upper() == 'I':
    inference_or_train = 'I'
elif inference_or_train.upper() == 'T':
    temp = input("Do you want to train new (N) model from scratch or an already existing (E) model?")
    if temp.upper() == 'N':
        inference_or_train = "TN"
    elif temp.upper() == "E":
        inference_or_train = "TE"
    else: raise ValueError(f' choice "{temp}" not recognized')
else: raise ValueError(f' choice "{inference_or_train}" not recognized')

dataset_link = "https://drive.google.com/uc?id=1TQjhbN1jrQx7eMgySFkMfwahh7IZy2a8"
dataset_path = "data/Shakespeare.txt"
if not os.path.isfile(dataset_path):
    print("Downloading dataset...")
    gdown.download(dataset_link, dataset_path)
    print("Done!")
else:
    print("Dataset is already downloaded")

with open(dataset_path, "r", encoding="utf-8") as f:
    whole_text = f.read() 

Dataset is already downloaded


In [73]:
# vocabulary generation and vocab-to-token encodings
def get_random_batch(split: str, batch_size:int, block_size: int, my_rng: torch.Generator) -> (torch.Tensor, torch.Tensor):
    """
    Returns a random batch of data - tensor of shape (batch_size, block_size). 
    :param split: can be either "train" or "test". When train the train dataset is used
    :returns: A tuple of two tensors - first with training data and second with labels/targets
    """
    if split == "train": my_data = train_data
    elif split == "test": my_data = test_data
    else: raise ValueError(f"Expected either `train` or `test` for the split argument, got {split} instead")
    idx = torch.randint(len(my_data) - block_size, size=(batch_size,), generator=my_rng)
    # randint jest end-exclusive, dlatego nie trzeba modyfikować indeksów mimo że target jest i+1
    x = torch.stack([data[i: i+block_size] for i in idx])
    y = torch.stack([data[i+1: i+block_size+1] for i in idx])
    return x, y

# train and evaluate model
@torch.no_grad()
def evaluate_model(m, split: str, n_samples: int) -> float:
    xb, yb = get_random_batch(split, n_samples, context_size, rng)
    m.eval()
    logits = m(xb)
    logits = logits.view(batch_size * context_size, vocab_size)
    yb = yb.view(batch_size * context_size)
    loss = F.cross_entropy(logits, yb)
    m.train()
    return loss.item()

def train_model(m, n_iter, context_size=context_size):
    optimizer = torch.optim.AdamW(m.parameters(), lr=0.001)
    for i in range(n_iter):
        xb, yb = get_random_batch("train", batch_size, context_size, rng)
        logits = m(xb)
        # musimy nieco zmienić kształt, ponieważ funkcja cross_entropy oczekuje kształtu (Batch, Channels) dla Inputu oraz (Batch,) dla Targetu
        # obecnie nasz input ma kształt (Batch, context_size, embedding_dim) a target (Batch, context_size). Musimy złączyć wymiary Batch oraz context w jeden
        logits = logits.view(batch_size * context_size, vocab_size)
        yb = yb.view(batch_size * context_size)
        loss = F.cross_entropy(logits, yb)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if i  % (n_iter // 10) == 0:
            train_loss = evaluate_model(m, "train", batch_size)
            test_loss = evaluate_model(m, "test", batch_size)
            print(f"Iteration {i + 1} train loss: {train_loss:.3f} | test loss: {test_loss:.3f}")
        elif i == n_iter - 1:
            train_loss = evaluate_model(m, "train", batch_size)
            test_loss = evaluate_model(m, "test", batch_size)
            print(f"Final iteration train loss: {train_loss:.3f} | test loss: {test_loss:.3f}")
            
from math import sqrt
class Head(nn.Module):
    def __init__(self, embedding_size, head_size, context_size):
        super().__init__()
        self.sqrt_head_size = sqrt(head_size)
        self.keys = nn.Linear(embedding_size, head_size, bias=False)
        self.queries = nn.Linear(embedding_size, head_size, bias=False)
        self.values = nn.Linear(embedding_size, head_size, bias=False)
        # wykorzystujemy register_buffer, aby jednocześnie zapisać ten Tensor w modelu i wykluczyć go z optymalizacji podczas treningu
        self.register_buffer("tril", torch.tril(torch.ones(context_size, context_size))) 
        
    def forward(self, x):
        k = self.keys(x)
        q = self.queries(x)
        wei = q @ k.transpose(-1, -2)
        # przed wykonaniem softmaxu skalujemy dzieląc przez pierwiastek z head_size
        wei = wei / self.sqrt_head_size
        wei = wei.masked_fill(self.tril == 0, float("-inf"))  
        wei = F.softmax(wei, dim=-1)
        
        v = self.values(x)
        out = wei @ v
        return out
        
class MultiHeadedAttention(nn.Module):
    def __init__(self, embedding_size:int, head_size: int, context_size: int, num_heads: int):
        super().__init__()
        self.heads = nn.ModuleList([Head(embedding_size, head_size, context_size) for _ in range(num_heads)])
        
    def forward(self,x):
        # łączymy rezultaty w ostatnim wymiarze. Tworząc wiele głów, na przykład 4, zmniejszamy proporcjonalnie wymiar
        # każdej z nich. Wtedy łącząc ich rezultaty w ostatnim wymiarze uzyskamy taki sam wymiar jak przy pojedyńczej Head
        return torch.cat([single_head(x) for single_head in self.heads], dim=-1)
            
def try_model_generation(model_class, n_embd = 32, train_iterations=2500, context_size=7):
    model_instance = model_class(vocab_size=vocab_size, n_embd=n_embd, head_size=18)
    model_instance = model_instance.to(device)
    xb, _ = get_random_batch("train", batch_size, context_size, rng)
    xb = xb.to(device)
    train_model(model_instance, train_iterations, context_size=context_size)
    generated_output = model_instance.generate(xb, 500)[0].cpu()
    print(2 * "\n")
    print(f"Generated 500 new tokens, output shape: {generated_output.shape}")
    print(20*"-" + "\n" + f"Generated text: {decode(generated_output.tolist())}")   
    
class FeedForward(nn.Module):
    def __init__(self, n:int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n, n * 4),
            nn.ReLU(),
            nn.Linear(n * 4, n)
        )
    def forward(self, x):
        return self.net(x)
        
class Block(nn.Module):
    def __init__(self, n_embd, n_head, context_size):
        super().__init__()
        if n_embd % n_head != 0:
            error_msg = (f"Class Block expected n_embd to be divisible by n_head, but got {n_embd = } and {n_head = }"
                         f"which leaves remainder of {n_embd % n_head}.")
            raise ValueError(error_msg)
        single_head_size = n_embd//n_head
        self.sa = MultiHeadedAttention(n_embd, single_head_size, context_size, num_heads=n_head)
        self.ffwd = FeedForward(n_embd)
        self.layer_norm1 = nn.LayerNorm([n_embd])
        self.layer_norm2 = nn.LayerNorm([n_embd])
        self.drop = nn.Dropout(p=dropout)
    def forward(self, x):
        x = x + self.drop(self.sa(self.layer_norm1(x)))  # Res connection: x + F(x)
        x = x + self.drop(self.ffwd(self.layer_norm2(x)))  # Res connection: x + F(x)
        return x

class BigramModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, embedding_size)
        self.position_embedding_table = nn.Embedding(context_size, embedding_size)
        blocks_list = [Block(embedding_size, amount_of_heads, context_size) for _ in range(amount_of_blocks)]
        blocks_list.append(nn.LayerNorm([embedding_size]))
        self.blocks = nn.Sequential(*blocks_list)
        self.model_head = nn.Linear(embedding_size, vocab_size)
        self.drop = nn.Dropout(dropout)
        
    def forward(self, input_indices):
        B, T = input_indices.shape
        token_emb = self.token_embedding_table(input_indices) # B, T, C
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # T, C
        # automatyczny broadcasting pozwala dodać tensory (B,T,C) i (T,C)
        x = token_emb + pos_emb
        # dodajemy dropout po policzeniu sumy token embeddings i positional embeddings
        x = self.drop(x)
        # przeprowadzamy self-attention
        x = self.blocks(x)
        # dodajemy dropout przed wejściem do ostatniej warstwy Linear
        x = self.drop(x)
        # zamieniamy embeddingi na logity
        logits = self.model_head(x)
        return logits
         
    def generate(self, current_context: torch.Tensor, max_new_tokens: int):
        """
        Metoda przyjmuje kontekst i na jego podstawie generuje `max_new_tokens` nowych tokenów.
        :param current_context: Tensor o wymiarach (Batch, czas) gdzie 'czas' to kolejne znaki kontekstu
        :return: Tensor o wymiarach (Batch, czas + max_new_tokens) zawierający podany kontekst, a do niego "doklejoną"
            wygenerowaną treść
        """
        for _ in range(max_new_tokens):
            # korzystamy z tabeli Embeddingów, więc musimy podać co najwyżej ostatnie context_size znaków
            logits = self(current_context[:, -context_size:])
            logits = logits[:, -1, :]  
            probabilities = F.softmax(logits, dim=-1)  
            idx_next = torch.multinomial(probabilities, num_samples=1)
            current_context = torch.cat((current_context, idx_next), dim=1) # (Batch, czas + 1)
        return current_context
    
def generate_using_model(m: BigramModel, max_new_tokens):
    xb, _ = get_random_batch("test", 2, context_size, rng)
    generated_output = m.generate(xb, max_new_tokens)[0].cpu()
    decoded_output = decode(generated_output.tolist())
    # print(f"Model was given context:\n{decode((xb.cpu()).tolist())}")
    print(f"\n\nModel generated:\n{decoded_output}")
    

In [74]:
vocab = sorted(list(set(whole_text)))  # set zapewnia unikalność znaków, lista daje się posortować
vocab_size = len(vocab)
print(f"Vocab: {"".join(vocab)}")
print(f"Vocab len: {len(vocab)}")

stoi = {char: i for i, char in enumerate(vocab)}
itos = {i: char for i, char in enumerate(vocab)}
encode = lambda s: [stoi[c] for c in s]  # zamienia string na listę liczb
decode = lambda l: "".join(itos[i] for i in l)

data = torch.tensor(encode(whole_text))
data = data.to(device)

train_to_all_ratio = 0.85
n = int(train_to_all_ratio * data.numel())
train_data, test_data = data[:n], data[n:]
print(f"{len(train_data) = }, {len(test_data) = }")

if inference_or_train[0] == "T":
    if inference_or_train[1] == "E": 
        model = None
        raise NotImplementedError("Dotrenowywanie modeli jeszcze nie jest zaimplementowane, upsss")
    elif inference_or_train[1] == "N":
        model = BigramModel().to(device)
        model.train()
        train_model(model, train_iterations, context_size=context_size)
        print("saving model...", end="")
        torch.save(model.state_dict(), MODEL_PATH)
        print("model saved")
        
elif inference_or_train[0] == "I":
    model = BigramModel().to(device)
    should_download = input("Do you want to download (Y) the model, or is it already downloaded?")
    if should_download.upper() == "Y":
        gdown.download(MODEL_LINK, MODEL_PATH)
    else:
        print("Not downloading the model then, accessing locally stored model...", end="")
    model.load_state_dict(torch.load(MODEL_PATH, weights_only=True, map_location=device))
    model.eval()
    print("model loaded")
    how_many_tokens = int(input("How many tokens do you wish to generate during inference?"))
    generate_using_model(model, how_many_tokens)

    

        

Vocab: 
 !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz
Vocab len: 65
len(train_data) = 948084, len(test_data) = 167310


Downloading...
From: https://drive.google.com/uc?id=1ZQPHKPPda6mXOwuBaJZfcu142X1tYiQ-&export=download
To: /home/uxert/projekty/ProjektLLM/GPT_model.pth
100%|██████████| 10.4M/10.4M [00:00<00:00, 18.4MB/s]


model loaded


Model generated:
 less expected: he replied,
It was a bare petition of a state
To one whom they had punish'd.

MENENIUS:
Very well:
Could he say less?

COMINIUS:
I offer'd to awaken his regard
For's private from what you unto you,
Which wrings your care out oned? Mareius Warwick, joy
That more shadow. What dreams?

STANLEY:
No, no provost to the popular, sir, though; go to.

KING RICHARD III:
Peace! and I see when over striction: if not acquaintance.

PRINCE EDWARD:
My love! thy minds I keep me Paris.

QUEEN MARGARET:
Heaven, and be, perform it be in God's head!

WARWICK:
Say, my lord, I Somerset, seek my lodge!

NORTHUMBERLAND:
What is yourself?

NORFOLK:
Well, Signor, by your Paulina, in heaven,
Here an oad-faced night glory's name.

HASTINGS:
Be gone, for me!

DUCHESS OF YORK:
No, never and we will live our groans,
To medicine.

KING RICHARD III:
Tyrantly; and not trickly on thy hands,
Both and defend mack the lamphal to see, call your hope:
Therefore was king soverei