# Generování textu znakovou RNN

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
import tqdm

import torch
import torch.nn.functional as F
from torch import nn

import ans

V tomto cvičení nebudeme používat GPU, protože budeme zpracovávat znaky po jednom a v takto malých dávkách overhead způsobený neustálými přesuny dat mezi GPU a RAM výpočty pouze zpomalí.

# Data

Namísto obrazu tentokrát použijeme textová data. Konkrétně se jedná o novinové nadpisy, které se budeme snažit generovat automaticky. Všechna data jsou v jediném souboru `data/headlines.txt`.

Z textu byly odstraněny háčky, čárky a všechny nestandardní znaky. Není tedy potřeba řešit kódování apod.

In [None]:
data = open('data/headlines.txt').read()
lines = [line for line in data.split('\n') if line]

Ukázka dat:

In [None]:
for i in range(10):
    print(i, random.choice(lines))

Sada znaků = náš slovník:

In [None]:
chars = list(sorted(set(data)))
print(len(chars), chars)

Všimněme si, že první znak je `'\n'`. Ten použijeme jako stop znak, neboli speciální token, jenž bude označovat konec sekvence. Pokud tedy při postupném generování věty dojde na tento znak, proces zastavíme.

Následující tabulka (`dict`) nám usnadní převod znaku na index.

In [None]:
chr2idx = {c: i for i, c in enumerate(chars)}

Podíváme se na statistické rozložení prvních znaků ve větách.

In [None]:
counts = {c: 0 for c in chars}
for line in lines:
    counts[line[0]] += 1
counts = np.array([counts[c] for c in chars], dtype=np.float)
p0 = counts / counts.sum()

In [None]:
plt.figure(figsize=(16, 8))
rects = plt.bar(range(len(chars)), 100. * p0)
plt.xticks(range(len(chars)), ['{}'.format(repr(c)) for c in chars])
for r in rects:
    x, w, h = r.get_x(), r.get_width(), r.get_height()
    plt.text(x + w / 2., h + 0.1, '{:.1f}'.format(h), ha='center', va='bottom', fontsize=8)
plt.ylabel('počet');

# Sekvenční data a PyTorch

## Embedding

Následující funkce `str2idt` převede řetězec na sekvenci čísel (index tensor) odpovídajících indexům znaků v tabulce. Pokud např. bude celý náš "slovník" `chars = ['a', 'b', 'c']`, pak funkce `str2idt` převede řetězec `'acba'` na `[0, 2, 1, 0]`. Výsledek vrátí jako PyTorch `torch.Tensor`.

In [None]:
def str2idt(string):
    tensor = torch.zeros(len(string)).long()
    for c in range(len(string)):
        tensor[c] = chr2idx[string[c]]
    return tensor

In [None]:
# nas slovnik ma 38 znaku, takze indexy znaku budou jine
x = str2idt('abca')
x

Další funkce bude dělat opak: převede sekvenci indexů na řetězec.

In [None]:
def idt2str(indices):
    return ''.join([chars[i] for i in indices])

In [None]:
idt2str(x)

Sekvenci čísel potřebujeme převést na vektory jednotlivých znaků. Tento proces se v anglické literatuře označuje jako embedding a PyTorch ho implementuje jako vrstvu třídou `Embedding`. Vyjádřením této operace diferencovatelnou vrstvou umožňuje učení vektorů, které tedy nemusejí být fixní. O tom ale až příště.

In [None]:
# velikost slovniku je `len(chars)`
# dimenze znakoveho vektoru bude napr. 30
emb = nn.Embedding(len(chars), 30)

# dopredny pruchod
e = emb(x)
e.shape

In [None]:
e

`Embedding` nedělá nic jiného, než že na výstup pro znak s indexem $i$ vrátí $i$-tý řádek své váhové matice `weight`, která drží vektory slov. Defaultně je tato matice inicializována náhodně. Pokud první písmeno v příkladu bylo 'a', jehož index ve "slovníku" `chars` je 12, první řádek embeddingu `e` bude odpovídat 13. řádku (index 12) matice `emb.weight`.

In [None]:
bool(torch.all(e[0] == emb.weight[12]))

## RNN v PyTorch

PyTorch implementuje tři z nejrozšířenějších typů sítí třídami `RNN`, `LSTM` a `GRU`. API je pro všechny stejné: dopředný průchod `forward` očekává "zespodu" nějaký vstup `input` a "zleva" minulý stav `h0`. U `LSTM` je tento stav dvouvektorový. Výstupem je `output`, což je vlastně sekvence skrytých stavů poslední vrstvy rekurentní sítě pro jednotlivé kroky v čase, a nový stav `hn` po provedení celého průchodu. Vše vystihuje následující obrázek.

![](https://i.stack.imgur.com/SjnTl.png)

Zdroj: https://stackoverflow.com/a/48305882/9418551

V nejjednoušším případě máme pouze jednu vrstvu sítě a jeden krok. Potom `output` a `hn` jsou stejné. To znamená, že `output` **neprochází žádnou lineární vrstvou**, jak jsme si to ukazovali na přednáškách, tedy že $y=W^{hy}h$, kde $y$ značí `output`. Transformaci na skóre/pravděpodobnost jednotlivých znaků tedy musíme provést sami. Parametry $W^{hy}$ RNN v PyTorch nezahrnují.

Vstupní tensory $x_i$ na obrázku očekávají PyTorch RNN ve tvaru `(seq, batch, dim)`, kde
- `seq` ... jak jdou znaky ve "věte" za sebou
- `batch` ... počet paralelně zpracovávaných sekvencí, nezávisle na sobě
- `dim` ... příznaky na vstupu

V našem případě jsou vstupy vektory (embeddingy) jednotlivých znaků. Například tedy: `(10, 3, 5)` by znamenalo:
- 3 paralelně zpracovávané
- 10-znakové věty,
- kde každý znak reprezentuje 5dimenzionální vektor

In [None]:
# do site posleme pouze jeden znak; tvar tensoru musi byt (seq, batch, dim), proto musime pouzit reshape
e0 = e[0].reshape(1, 1, -1)
e0.shape

Vytvoříme jednuduchou `torch.nn.RNN`, která na vstupu očekává vektor o rozměru 30 a bude mít skrytý vektor o rozměru 8.

In [None]:
rnn = nn.RNN(30, 8)
rnn

V PyTorch musíme řešit inicializaci i předávání skrytého stavu v jednotlivých krocích ručně. Umožňuje to tak větší flexibilitu.

In [None]:
# inicializace skryteho stavu a vstupu
# tensory opet musi byt tvaru (seq, batch, dim)
h = torch.rand(1, 1, 8)
h

In [None]:
# dopredny pruchod
o, h = rnn(e0, h)

RNN vrátí vždy dvojici `(output, hidden)`, které vysvětluje obrázek výše. V tomto jednoduchém případě, kdy máme pouze jedinou vrstvu, se jedná o tensory se shodnými hodnotami.

In [None]:
bool(torch.all(o == h))

Nyní už více samostatně. Zadefinujeme vlastní třídu, která bude řešit jednotlivé kroky sama ve svém dopředném průchodu. Vstupem tedy bude sekvence čísel, výstupem skóre jednotlivých kroků.

In [None]:
class CharRNN(nn.Module):
    def __init__(self, voc_size, emb_dim, hidden_size, output_size, num_layers=1):
        super().__init__()

        #################################################################
        # ZDE DOPLNIT
        
        self.emb = ...
        self.rnn = ...
        self.fc = ..
        
        self.reset_hidden()
        
        #################################################################

    def forward(self, x):
        
        #################################################################
        # ZDE DOPLNIT
        
        score = ...
        
        #################################################################
        
        return score

    def reset_hidden(self):
        
        #################################################################
        # ZDE DOPLNIT
        # funkce resetuje hidden state na nuly
        
        self.hidden = ...
        
        #################################################################

In [None]:
#################################################################
# ZDE DOPLNIT

voc_size = ...
emb_dim = ...
hidden_dim = ...
output_dim = ...

#################################################################

rnn = CharRNN(voc_size, emb_dim, hidden_dim, output_dim, num_layers=1)
stats = ans.Stats()

Vytvoříme si také funkci pro samplování z naší sítě. Funkce přijme model `rnn` a nějaký inicializační text `init_text` a vygeneruje text - vrací tedy string.

In [None]:
def sample(rnn, init_text='', maxlen=150, mode='multinomial', temperature=0.6):
    """
    generuje text pomoci modelu `rnn`
    
    vstupy:
        rnn ... rekurentni sit odvozena z `nn.Module`, ktera po zavolani vraci dvojici (vyst_skore, skryta_rep)
        init_text ... inicializacni text, na ktery generovani textu navaze
        maxlen ... maximalni delka generovaneho textu
        mode ... zpusob vyberu nasledujiciho znaku, viz komentare v kodu
        temperature ... vyhlazeni multinomialniho rozlozeni, viz komentare v kodu
    """
    # vystupni text bude pole (na konci prevedeme zpet na str)
    out_text = list(init_text)
    
    # pokud nezadan, inicializujeme nahodne, dle rozlozeni prvnich znaku
    if not out_text:
        k = np.random.choice(len(chars), p=p0)
        out_text = [chars[k]]
    
    # vstup projedeme siti, abychom ziskali aktualni hidden stav
    rnn.reset_hidden()
    if len(out_text) > 1:
        x = str2idt(out_text[:-1])
        score = rnn(x)
    
    # pravdepodobnosti muzeme pocitat softmaxem
    softmax = nn.Softmax(dim=2)

    while True:
        # nasledujici znak je posledni znak prozatimniho vystupu
        x = str2idt(out_text[-1])
        
        # dopredny pruchod
        score = rnn(x)
        
        # pravdepodobnosti znaku
        prob = softmax(score).flatten()
        
        # vyberem index `k` nasleduciho znaku
        if mode == 'multinomial':
            # nasledujici znak bude vybran dle ad hoc multinomialniho rozlozeni
            # parametr `temperature` ... vyssi hodnota znamena nahodnejsi vysledky
            # viz https://github.com/karpathy/char-rnn#sampling
            k = torch.multinomial(score.flatten().div(temperature).exp(), 1)[0]
        elif mode == 'argmax':
            #################################################################
            # ZDE DOPLNIT
            
            # nasledujici znak bude ten, jehoz pravdepodobnost vysla maximalni
            k = ...
            
            #################################################################
        elif mode == 'proportional':
            #################################################################
            # ZDE DOPLNIT
            
            # nasl. znak se vybere nahodne, ale s pravdepodobnosti proporcionalni k vystupu softmaxu
            # napr. pokud znak 'x' ma dle softmaxu 84 %, bude s pravdepodobnosti 84 % vybran jako vstup do dalsi iterace
            k = ...
            
            #################################################################
        
        #################################################################
        # ZDE DOPLNIT
        
        # zastavit, pokud end-token
        ...
        
        # pridat znak na vystup
        ...
        
        # zastavit, pokud text je moc dlouhy
        ...
        
        #################################################################
    
    return ''.join(out_text)

In [None]:
print(sample(rnn, init_text='prezident', mode='multinomial'))

# Trénování

V každé iteraci pomocí funkce `char_tensor` vytvoříme trénovací data `(inpt, targ)`, což budou číselné indexy znaků tak, jak je definuje tabulka `chr2idx`. Budeme trénovat generování znaků, tzn. že požadovaným výstupem (label, target) `targ[i]` pro vstup `inpt[i]` je vždy následující znak `inpt[i+1]`. Vektor `targ` je tedy v tomto případě stejného rozměru jako `inpt`. Nezapomeňte na poslední znak, který má jako target `\n` značící konec sekvence.

Vyzkoušejte si na příkladu:

In [None]:
line = random.choice(lines)
print(line)

In [None]:
#################################################################
# ZDE DOPLNIT

inpt = ...
targ = ...

#################################################################

In [None]:
print('vstup:   {} ... {}'.format(idt2str(inpt[:10]), idt2str(inpt[-10:])))
print('target:  {} ... {}'.format(idt2str(targ[:10]), idt2str(targ[-10:])))

In [None]:
crit = nn.CrossEntropyLoss()

In [None]:
#################################################################
# ZDE DOPLNIT

optimizer = ...

#################################################################

In [None]:
stats = ans.Stats()

In [None]:
example = sample(rnn, mode='multinomial')
max_per_epoch = 1000
rnn.train()

for epoch in range(1):
    # data budou nahodne prehazena
    train_lines = random.sample(lines, max_per_epoch)

    # progressbar
    pb = tqdm.tqdm_notebook(train_lines, desc='ep {:03d}'.format(len(stats)))
    
    stats.new_epoch()
    
    for it, line in enumerate(pb):
        rnn.reset_hidden()
    
        #################################################################
        # ZDE DOPLNIT
        
        ...
        loss = ...
        
        #################################################################
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if it % 100 == 0:
            rnn.eval()
            example = sample(rnn)
            rnn.train()
        
        stats.append_batch_stats('train', loss=float(loss))
        pb.set_postfix(loss='{:.3f}'.format(stats.ravg('train', 'loss')), ex=example[:40])
    
# pripadne ulozit model
# torch.save(rnn.state_dict(), f'lstm-{epoch:02d}.pth')

In [None]:
stats.plot_by_batch(block_len=10, right_metric=None)

In [None]:
rnn.eval()
for i in range(5):
    print(sample(rnn, init_text='prezi', mode='multinomial'))