# Neurónová sieť text po znakoch Benchmark

In [83]:
# importy
import torch
import torch.nn as nn
from torch.nn import functional as F
import string
import time

In [63]:
# === trénovanie modelu CPU ===
device = torch.device("cpu")

#nParametre CPU
import psutil
print("CPU")
print ("fyzické jadrá: ",psutil.cpu_count(logical=False))
print ("logické jadrá ",psutil.cpu_count())

CPU
fyzické jadrá:  20
logické jadrá  28


In [84]:
# === trénovanie modelu GPU ===
from torch.cuda import is_available

# Pokiaľ možno na GPU
if torch.cuda.is_available():
  device = torch.device("cuda:0")
else:
  device = torch.device("cpu")

In [85]:
# Príprava textu
# ==============

#Načítanie knihy z lokálneho PC 
kniha = open('./Rivers of Babylon.txt', 'r', encoding="utf8")
text = kniha.read().replace('\n', '')
print("Počet znakov textu: ", len(text))
# všetky unikátne znaky v texte
znaky = sorted(list(set(text)))  # kvôli prehľadnosti utriedené
print(''.join(znaky))

#odstránenie neabecedných znakov
neabecedne_znaky = string.punctuation # zoznam nealfanumerických znakov
neabecedne_znaky += '├┬┼'   # ďalšie znaky ktoré vidím že sa mi tam vyskytujú
for znak in neabecedne_znaky:
   text = text.replace(znak, "")
znaky = sorted(list(set(text)))
unikatnych_znakov = len(znaky)   #vocab_size
print(''.join(znaky))
print("Počet unikátnych znakov: ",unikatnych_znakov)

Počet znakov textu:  551955
 !&(),-./0124589:;?ABCDEFGHIJKLMNOPRSTUVWXYZabcdefghijklmnopqrstuvwxyz|ÁÇÔÚáäçéëíóôöúýČčĎďĺĽľŁňŕśŠšŤťŽž├┬┼
 0124589ABCDEFGHIJKLMNOPRSTUVWXYZabcdefghijklmnopqrstuvwxyzÁÇÔÚáäçéëíóôöúýČčĎďĺĽľŁňŕśŠšŤťŽž
Počet unikátnych znakov:  91


In [86]:
# mapovanie znakov na indexy
znaky_na_ix = { ch:i for i,ch in enumerate(znaky) }
znaky_na_ix

# indexy na znaky
ix_na_znaky = { i:ch for i,ch in enumerate(znaky) }
ix_na_znaky

#kodovanie
text_na_cisla = lambda t: [znaky_na_ix[c] for c in t]
#dekodovanie
cisla_na_text = lambda c: ''.join([ix_na_znaky[i] for i in c])

# konverzia textu na tenzory
tenzory_textu = torch.tensor(text_na_cisla(text), dtype=torch.long)
print(tenzory_textu.shape)

# rozdelenie dát na trénovaciu a testovaciu množinu
# tu záleží na postupnosti dát (tenzorov zastupujúcich znaky)
# takže to musí byť rozdelené sekvenčne nie náhodne
n = int(0.9*len(tenzory_textu)) # 90%  textu od začiatku budú trénovacie dáta
train_data = tenzory_textu[:n]
test_data = tenzory_textu[n:]
print("Trénovacia množina:",train_data.shape)
print("Testovacia množina:",test_data.shape)

torch.Size([529722])
Trénovacia množina: torch.Size([476749])
Testovacia množina: torch.Size([52973])


In [87]:
# načítanie dávky vstupných a výstupných údajov začiatok dávky ix náhodne
def nacitanie_davky(split):
    temp_data = train_data if split == 'train' else test_data
    ix = torch.randint(len(temp_data) - block_size, (batch_size,)) #náhodná poloha v texte
    x = torch.stack([temp_data[i:i+block_size] for i in ix])      #vstupy
    y = torch.stack([temp_data[i+1:i+block_size+1] for i in ix])  #výstupy
    x, y = x.to(device), y.to(device)
    return x, y

torch.manual_seed(1337)  #inicializácia generátora náhodných čísel
batch_size = 4   # počet paralelne spracovávaných sekvencií 4
block_size = 8   # maximálna dĺžka kontextu pre predikciu 8

In [88]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = nacitanie_davky(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

# one head of self-attention
class Head(nn.Module): 
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

# multiple heads of self-attention in parallel
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out
    
# a simple linear layer followed by a non-linearity
class FeedFoward(nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)    

    # Transformer block: communication followed by computation
class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x

In [89]:
# ===  Model neurónovej siete ====
class GPTLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(unikatnych_znakov, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, unikatnych_znakov)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

In [90]:
# globálne parametre
batch_size = 64         # počet paralelne spracovávaných sekvencií
block_size = 256         # maximálna dĺžka kontextu pre predikciu (slovo ako blok znakov)
max_iters = 5000        # počet iterácií
eval_interval = 500
learning_rate = 3e-4

eval_iters = 200
n_embd = 384
n_head = 6
n_layer = 6
dropout = 0.2

#model = BigramLanguageModel()
model = GPTLanguageModel()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

10.808923 M parameters


In [91]:
from datetime import timedelta

def format_td(seconds, digits=3):
    isec, fsec = divmod(round(seconds*10**digits), 10**digits)
    return f'{timedelta(seconds=isec)}.{fsec:0{digits}.0f}'

#trénovanie pre daný počet iterácií

start_time = time.time()
n_time = time.time()
for iter in range(max_iters):

    # výpis aktuálnych hodnôt každú minútu
    sec = (time.time()-start_time)
    sec1 = (time.time()-n_time)
    
    if sec1 >= 60 or iter == max_iters - 1:  #každú minutu
        #losses = estimate_loss()       
        print("čas {} ".format(format_td(sec, digits=2)),
               iter,"z",max_iters, "iterácií", 
              
              )      
        n_time = time.time()

    # načítasnie dávky údajov z náhodného miesta textu
    xb, yb = nacitanie_davky('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
print("Trénovanie neurónovej siete trvalo {} minút".format(round((start_time - time.time())/60),2))

čas 0:01:00.15  271 z 5000 iterácií
čas 0:02:00.34  542 z 5000 iterácií
čas 0:03:00.47  807 z 5000 iterácií
čas 0:04:00.50  1071 z 5000 iterácií
čas 0:05:00.57  1336 z 5000 iterácií
čas 0:06:00.73  1559 z 5000 iterácií
čas 0:07:00.80  1724 z 5000 iterácií
čas 0:08:01.17  1890 z 5000 iterácií
čas 0:09:01.50  2055 z 5000 iterácií
čas 0:10:01.55  2234 z 5000 iterácií
čas 0:11:01.66  2504 z 5000 iterácií
čas 0:12:01.83  2773 z 5000 iterácií
čas 0:13:01.92  3039 z 5000 iterácií
čas 0:14:02.07  3306 z 5000 iterácií
čas 0:15:02.22  3563 z 5000 iterácií
čas 0:16:02.28  3821 z 5000 iterácií
čas 0:17:02.55  4055 z 5000 iterácií
čas 0:18:02.68  4306 z 5000 iterácií
čas 0:19:02.93  4535 z 5000 iterácií
čas 0:20:02.97  4771 z 5000 iterácií
čas 0:21:03.17  4996 z 5000 iterácií
čas 0:21:03.97  4999 z 5000 iterácií
Trénovanie neurónovej siete trvalo -21 minút


In [92]:

# generovanie textu
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(cisla_na_text(m.generate(context, max_new_tokens=500)[0].tolist()))

#open('more.txt', 'w').write(decode(m.generate(context, max_new_tokens=10000)[0].tolist()))

 énevymienú Vlhkosť je plný rodičkou Semtasí priami leží Robota dolu len vyberie vodky po umývaní sa nad tiPustí na Rácza Rácza Teraz je to radiátorov prázdno hlavu nepriamo v tme a potom sa Vzadu je teda skoro rozň Drží rozhodne Ešte čoskoro ponite Khunt sa do nej dávno si obšmiel Ešte zo všetko robotu záprah Kotolník so zatvorenými očami Brúrkami od zimy Tváre mu však každý pregignál Ten vám neskle mal Venu že hyste tak ľutovať okradol prikáže To je to aby ste si on Hurensson ulaÁšte nie povie 
