In [1]:
%pip install torch

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [63]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import pickle

# TODO: ukljuci cudu
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# TODO: pogledat autotuning da optimiziram hiperparametre
block_size = 64
batch_size = 128
max_iterations = 25
learning_rate = 3e-3
evaluation_iterations = 100
n_embd = 384
n_layer = 8
n_head = 8
dropout = 0.2

In [48]:
with open('data/data_for_learning.txt', 'r', encoding='utf-8') as file:
    text = file.read()

# Getting all the characters used in the text
chars = sorted(set(text))

print(chars)
print(len(chars))

vocab_size = len(chars)

['\n', ' ', '!', '"', "'", '(', ')', ',', '-', '.', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'Y', 'Z', '[', ']', '_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '\ufeff']
78


In [49]:
# (!! Character level tokenizer !!)
# Encoder and decoder for converting characters into numbers (tokens) and vice versa
string_to_int = { ch:i for i,ch in enumerate(chars) }
int_to_string = { i:ch for i,ch in enumerate(chars) }

encode = lambda list: [string_to_int[ch] for ch in list]
decode = lambda list: ''.join([int_to_string[i] for i in list])

encoded_text = encode('hello')
print(encoded_text)
decoded_text = decode(encoded_text)
print(decoded_text)

data = torch.tensor(encode(text), dtype=torch.long)

print(data)

[58, 55, 62, 62, 65]
hello
tensor([77, 42, 58,  ..., 70,  9,  0])


In [50]:
# Data splitting
train_len = int(len(data) * 0.8)

train_data = data[:train_len]
val_data = data[train_len:]

def get_batch(split):
    data = train_data if split == 'train' else val_data

    ix = torch.randint(len(data) - block_size, (batch_size,))
    
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])

    x, y = x.to(device), y.to(device)

    return x, y

x, y = get_batch('train')

In [51]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()

    for split in ['train', 'val']:
        losses = torch.zeros(evaluation_iterations)

        for k in range(evaluation_iterations):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()

        out[split] = losses.mean()

    model.train()

    return out

In [62]:
class Head(nn.Module):
    """ Single head of the multi-head attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('trill', torch.tril(torch.ones(block_size, block_size)))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Input of size (batch, time-step, channels)
        # Output of size (batch, time-step, head size)
        B, T, C = x.shape
        k = self.key(x) # (B, T, hs)
        q = self.query(x) # (B, T, hs)

        # Compute attention scores ("affinities")
        wei = q @ k.transpose(-2, -1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) = (B, T, T)
        wei = wei.masked_fill(self.trill[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)

        # Perform the weighted aggregation of the values
        v = self.value(x) # (B, T, hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) = (B, T, hs)
        return out

# Self attention layer
class MultiHeadAttention(nn.Module):
    def __init__(self, n_head, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(n_head)])
        self.proj = nn.Linear(n_head * head_size, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1) # (B, T, F) -> (B, T, [h1, h1, h1, h1, h2, h2, h2, h2, ...])
        out = self.dropout(self.proj(out))
        return out

# Feed forward layer
class FeedForward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)

# Decoder layer
class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size) # self attention
        self.ffwd = FeedForward(n_embd) # feed forward
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        y = self.sa(x)
        x = self.ln1(x + y)
        y = self.ffwd(x)
        y = self.ln2(x + y)
        return x

# Using nn.Module so that parameters, for example, from nn-Linear are learnable
class GptLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.positional_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)]) # 4 decoder layers
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)

        self.apply(self._init_weights)

    def _init_weights(self, module):
        if not isinstance(module, (nn.Linear, nn.Embedding)):
            return
        
        torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

        if isinstance(module, nn.Linear) and module.bias is not None:
            torch.nn.init.zeros_(module.bias)

    def forward(self, index, targets=None):
        B, T = index.shape
        
        tok_emb = self.token_embedding_table(index) # (B, T, C)
        pos_emb = self.positional_embedding_table(torch.arange(T, device=device)) # (T, C)
        x = tok_emb + pos_emb # (B, T, C)
        x = self.blocks(x) # (B, T, C)
        x = self.ln_f(x) # (B, T, C)
        logits = self.lm_head(x) # (B, T, vocab_size)

        if targets is None:
            loss = None
        else:
            # batch, time, channels (vocab size)
            B, T, C = logits.shape

            logits = logits.view(B * T, C) # Configuring the shape, because the input is N,C, so the N = B*T

            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss
    
    def generate(self, index, max_new_tokens):
        # Index is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # Get the predictions
            logits, loss = self.forward(index)

            # Focus only on the last time step
            logits = logits[:, -1, :] # Is now (B, C)

            # Apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1)

            # Sample from the distribution
            index_next = torch.multinomial(probs, num_samples=1) # Is now (B, 1)

            # Append sampled index to the running sequence
            index = torch.cat((index, index_next), dim=1) # Is now (B, T+1)

        return index
    
model = GptLanguageModel(vocab_size)
m = model.to(device)

# context = torch.zeros((1, 1), dtype=torch.long, device=device)
# generated_chars = decode(m.generate(context, max_new_tokens=500)[0].tolist())
# print(generated_chars)

In [64]:
# Create a pytorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iteration in range(max_iterations):
    # losses = estimate_loss()
    print(f'step: {iteration}')
    # TODO: popraviti estimate loss funkciju
    # if iteration % evaluation_iterations == 0:
    #     losses = estimate_loss(model)
    #     print(f'step: {iteration}, train loss: {losses['train']:.3f}, val loss: {losses['val']:.3f}')

    # Sample a batch of data
    xb, yb = get_batch('train')

    # Evaluate the loss
    logits, loss = model.forward(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

print(loss.item())

step: 0
step: 1
step: 2
step: 3
step: 4
step: 5
step: 6
step: 7
step: 8
step: 9
step: 10
step: 11
step: 12
step: 13
step: 14
step: 15
step: 16
step: 17
step: 18
step: 19
step: 20
step: 21
step: 22
step: 23
step: 24
3.1439876556396484


In [72]:
with open('model/model-01.pkl', 'wb') as file:
    pickle.dump(model, file)

In [None]:
# This is how we load it now
with open('model/model-01.pkl', 'rb') as file:
    model = pickle.load(file)

In [70]:
data = torch.tensor(encode(text), dtype=torch.long, device=device).unsqueeze(1)
context = torch.zeros((1, 1), dtype=torch.long, device=device)

print(data)
print(context)

tensor([[77],
        [42],
        [58],
        ...,
        [70],
        [ 9],
        [ 0]])
tensor([[0]])


In [71]:
# context = torch.zeros((1, 1), dtype=torch.long, device=device)
prompt = "What is the book of OZ?"
context = torch.tensor(encode(prompt), dtype=torch.long, device=device).unsqueeze(1)

generated_chars = decode(model.generate(context, max_new_tokens=block_size)[0].tolist())

print(generated_chars)

W tai ruehoroa yeath nsemaakinbssefswo  etdat  ow 
 .Ieae"oh odlp
