# Neurónová sieť na generovanie textu II

V jednom z predchádzajúcich príkladov sme vytvorili neurónovú sieť, ktorá bola natrénovaná na texte knihy, pričom tréning spočíval v skúmaní slovosledu. Ukážeme iný postup kedy sa neurónová sieť počas trénovania nebude učiť postupnosť slov, ale postupnosť znakov.  Na trénovanie použijeme text knihy Rivers of Babylon od Petra Pišťanka. Na rozdiel od predchádzajúceho príkladu nebudeme text členiť na slová, ale budeme ho vnímať ako postupnosť znakov.

In [1]:
# importy
import torch
import torch.nn as nn
from torch.nn import functional as F
from google.colab import drive
import string

In [3]:
# Ak máte k dispozícii GPU s podporou NVIDIA Cuda výpočty budú rýchlejšie
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

Načítame text z disku Google, pričom vynecháme znaky CR a LF indikujúce prechod na nový riadok

In [4]:
#načítanie knihy z Google drive
drive.mount('/content/drive')
kniha = open('/content/drive/My Drive/ML Neuronova siet/Rivers of Babylon.txt', 'r')
text = kniha.read().replace('\n', '')

Mounted at /content/drive


Zistíme počet znakov a vypíšeme krátku ukážku z úvodu knihy

In [None]:
print("Počet znakov textu: ", len(text))
text[:500]

Nakoľko neurónovú sieť budeme trénovať na úrovni znakov, potrebujeme zistiť počet unikátnych znakov v texte. Necháme si ich vypísať

In [6]:
# všetky unikátne znaky v texte
znaky = sorted(list(set(text)))  # kvôli prehľadnosti utriedené
print(''.join(znaky))

 !&(),-./0124589:;?ABCDEFGHIJKLMNOPRSTUVWXYZabcdefghijklmnopqrstuvwxyz|ÁÇÔÚáäçéëíóôöúýČčĎďĺĽľŁňŕśŠšŤťŽž├┬┼


Budeme pracovať len s čistým textom, takže odstránime všetky nealfanumerické znaky. Ich zoznam získame pomocou funkcie string.punctuation(), pričom do takto získaného reťazca nealfanumerických znakov doplníme ďalšie znaky ktoré vidíme že sa tam vyskytujú.

In [None]:
neabecedne_znaky = string.punctuation # zoznam nealfanumerických znakov
neabecedne_znaky += '├┬┼'   # ďalšie znaky ktoré vidím že sa mi tam vyskytujú
neabecedne_znaky

In [8]:
# vynecháme nealfanumerické znaky
for znak in neabecedne_znaky:
   text = text.replace(znak, "")
znaky = sorted(list(set(text)))
unikatnych_znakov = len(znaky)   #vocab_size
print(''.join(znaky))
print(unikatnych_znakov)

 0124589ABCDEFGHIJKLMNOPRSTUVWXYZabcdefghijklmnopqrstuvwxyzÁÇÔÚáäçéëíóôöúýČčĎďĺĽľŁňŕśŠšŤťŽž
91


Nasleduje takzvaná tokenizácia vstupného textu, čiže konverzia postupnosti znakov na postupnosť celých čísel podľa nejakého slovníka znakov, ktoré sa v texte vyskytujú. Pre jednoduchosť vytvoríme jazykový model na úrovni znakov, takže jednoducho pretransformujeme jednotlivé znaky na celé čísla. V predchádzajúcom príklade sme použili jazykový model na úrovni slov, kedy sme číslo priradili každému unikátnemu slovu. Bolo ich niekoľko tisíc. Pri modeli na úrovni znakov ich máme 91. Teraz znaky nahradíme číslami

In [None]:
# mapovanie znakov na indexy
znaky_na_ix = { ch:i for i,ch in enumerate(znaky) }
znaky_na_ix

Mapovanie indexov späť na znaky

In [None]:
# indexy na znaky
ix_na_znaky = { i:ch for i,ch in enumerate(znaky) }
ix_na_znaky

Funkcie (lambda rekurzívne )na zakódovanie textu na čísla a na konverziu čísel späť na znaky. Vytvoríme kód aj na spätné mapovanie, takže môžeme zadať zoznam čísel a dekódovať ho, aby sme dostali textový reťazec

In [11]:

#kodovanie
text_na_cisla = lambda t: [znaky_na_ix[c] for c in t]
#dekodovanie
cisla_na_text = lambda c: ''.join([ix_na_znaky[i] for i in c])



In [None]:
# --- kód na vysvetlenie ---
print(text_na_cisla("ML v Pythone"))

[20, 19, 0, 54, 0, 23, 57, 52, 40, 47, 46, 37]


In [None]:
# --- kód na vysvetlenie ---
print(cisla_na_text([20, 19, 0, 54, 0, 23, 57, 52, 40, 47, 46, 37]))

ML v Pythone


Text pretransformovaný na čísla prekonvertujeme na tenzor knižnice torch.

In [13]:
# konverzia textu na tenzory
tenzory_textu = torch.tensor(text_na_cisla(text), dtype=torch.long)
print(tenzory_textu.shape)
tenzory_textu

torch.Size([529722])


tensor([24, 63, 46,  ..., 50, 75, 68])

Vstupné údaje, v našom prípade text v podobe tenzora rozdelíme na trénovaciu a validačnú množinu, pričom trénovacia množina bude obsahovať  90 % údajov. Zvyšných 10%  budú overovacie údaje.

In [14]:
# rozdelenie dát na trénovaciu a testovaciu množinu
# tu záleží na postupnosti dát (tenzorov zastupujúcich znaky)
# takže to musí byť rozdelené sekvenčne nie náhodne
n = int(0.9*len(tenzory_textu)) # 90%  textu od začiatku budú trénovacie dáta
train_data = tenzory_textu[:n]
test_data = tenzory_textu[n:]
train_data

tensor([24, 63, 46,  ..., 73, 35, 40])

Pre zaujímavosť zistíme veľkosť každej množiny

In [15]:
print("Trénovacia množina:",train_data.shape)
print("Testovacia množina:",test_data.shape)

Trénovacia množina: torch.Size([476749])
Testovacia množina: torch.Size([52973])


Ani v tomto prípade nezavedieme pri trénovaní neurónovej siete naraz celý text. To by bolo výpočtovo veľmi náročné. Preto text rozdelíme na malé bloky a tie budeme z textu vyberať náhodne. Inak povedané hodnotu indexu označujúceho začiatok každej vzorky vygeneruje generátor náhodných čísel. Nebudeme teda trénovať kontinuálne, ale po náhodne vybraných úsekoch. Veľkosť bloku bude parameter.
Na ilustráciu ukážeme spracovanie bloku prvých 16-tich znakov od začiatku textu


In [None]:
# --- kód na vysvetlenie ---
# pre lepšiu názornosť nie s tenzormi, ale s textom
blok = 16
tx1 = text[:blok]
tx2 = text[1:blok+1]
for t in range(blok):
    vstup = tx1[:t+1]
    výstup = tx2[t]
    print(f"pre {vstup} je výstup: {výstup}")

Neurónová sieť bude toto isté robiť s tenzormi

In [None]:
# --- kód na vysvetlenie ---
# s tenzorom číselných hodnôt
blok = 16
td1 = train_data[:blok]
td2 = train_data[1:blok+1]
for t in range(blok):
    vstup = td1[:t+1]
    výstup = td2[t]
    print(f"pre {vstup} je výstup: {výstup}")

Pre názornosť jednorozmerné tenzory usporiadame do riadkov. Získame štruktúru 4 x 8 čiže 4 riadky s ôsmimi stĺpcami. Vstup X sú tenzory 4 x 8, pričom každý z nich je kúskom tréningovej sady.

Ciele sú v pridruženom poli Y.  Poskytnú správnu odpoveď pre každú jednu pozíciu vo vnútri X Vstupy X a Y sú zavedené na spracovanie do NS aby vytvorili stratovú funkciu. V každom riadku je 8 prípadov, takže dávka so štruktúrou 4 x 8 obsahuje celkom 32 prípadov na trénovanie, ktoré sú úplne nezávislé.

In [None]:
torch.manual_seed(1337)  #inicializácia generátora náhodných čísel
batch_size = 16   # počet paralelne spracovávaných sekvencií 4
block_size = 12   # maximálna dĺžka kontextu pre predikciu 8


In [16]:
# načítanie dávky vstupných a výstupných údajov začiatok dávky ix náhodne
def nacitanie_davky(split):
    temp_data = train_data if split == 'train' else test_data
    ix = torch.randint(len(temp_data) - block_size, (batch_size,)) #náhodná poloha v texte
    x = torch.stack([temp_data[i:i+block_size] for i in ix])      #vstupy
    y = torch.stack([temp_data[i+1:i+block_size+1] for i in ix])  #výstupy
    x, y = x.to(device), y.to(device)
    return x, y



In [17]:
# --- kód na vysvetlenie ---
xb, yb = nacitanie_davky('train')
print('vstupy:')
print(xb.shape)
print(xb)
print('ciele:')
print(yb.shape)
print(yb)


NameError: ignored

In [18]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = nacitanie_davky(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out


In [19]:
class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        B,T,C = x.shape
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out



In [20]:
class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

In [21]:
#a simple linear layer followed by a non-linearity
class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)

In [22]:
# Transformer block: communication followed by computation
class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x


In [None]:
# super simple bigram model
class BigramLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(unikatnych_znakov, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, unikatnych_znakov)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

In [25]:
class GPTLanguageModel(nn.Module):

    def __init__(self):
        super().__init__()
        # each token directly reads off the logits for the next token from a lookup table
        self.token_embedding_table = nn.Embedding(unikatnych_znakov, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, unikatnych_znakov)

        # better init, not covered in the original GPT video, but important, will cover in followup video
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        B, T = idx.shape

        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx

In [26]:
# globálne parametre
batch_size = 64         # počet paralelne spracovávaných sekvencií
block_size = 256         # maximálna dĺžka kontextu pre predikciu (slovo ako blok znakov)
max_iters = 1000        # počet iterácií
eval_interval = 500
learning_rate = 3e-4

eval_iters = 200
n_embd = 384
n_head = 6
n_layer = 6
dropout = 0.2

#model = BigramLanguageModel()
model = GPTLanguageModel()
m = model.to(device)
# print the number of parameters in the model
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

10.808923 M parameters


In [27]:
#trénovanie
for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = nacitanie_davky('train')

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

step 0: train loss 4.4405, val loss 4.4361
step 500: train loss 2.1601, val loss 2.1775
step 999: train loss 1.5966, val loss 1.7531


In [28]:

# generovanie textu
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(cisla_na_text(m.generate(context, max_new_tokens=2000)[0].tolist()))

#open('more.txt', 'w').write(decode(m.generate(context, max_new_tokens=10000)[0].tolist()))

 jej medz môcť rozhodne riaditeľ sa pravedomDroda kňútenými je bezpáčuným nočne zdradskom povie z kotolne si tely Ale zahra Urban hy mu nič nevie Haslom On Potom nestsko je jasná Trocha to eštric dobrých dostatnostieafní dred kdel Za bežou mitý a vám troch fukách sn Vabstavíce Dá onáku mohmta škodov chápe Zalijúce sa potisy s utere S majazadaným čod S jaršak potePolokár upovie prídu riaditeľou pošenýom a dolu rasne som stežiarAm bez pápadne pridahom Baja ma plecami Hej on trpevší Si chnuť kobyPravé dvajstiptok a už tlhke Kedy bavo viac mu do láme nestá služby a stancom ma hravená po vločiek ak stlami pozorovená námysli reccedia Daje naší chanuté aj pomôže hysá na stranutácia zvykročí Špás bez o svojaných reečlených zvernarhých nuseje či naplatícia dokov ajsné ľadocove Ešte Ráczovi počku neschochytiť droh čaká je uže ľúbia vyhráva si zaplácajúcej cid pritiku svrcajok vidí deborkýka právycí pivák a sprodiči Ani červené Rácz ceste do zrknúť po sú výrozlačím a stola panke a zimyčíKe nemôže