# Generování textu znakovou RNN

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
import tqdm

from IPython.core.debugger import set_trace

In [None]:
import torch
import torch.nn.functional as F
from torch import nn

In [None]:
import ans

%load_ext autoreload
%autoreload 2

V tomto cvičení nebudeme používat GPU, protože budeme zpracovávat znaky po jednom a v takto malých dávkách overhead způsobený neustálými přesuny dat mezi GPU a RAM výpočty pouze zpomalí.

# Data

Namísto obrazu tentokrát použijeme textová data. Konkrétně se jedná o novinové nadpisy, které se budeme snažit generovat automaticky. Všechna data jsou v jediném souboru, který si stáhněte [odsud](https://1drv.ms/t/s!AotVPA94wWKxoWLULaBqvPXiNS5t) a uložte jako `data/headlines.txt`.

Z textu byly odstraneny hacky, carky a vsechny nestandardni znaky. Neni tedy potreba resit kodovani apod.

In [None]:
data = open('data/headlines.txt').read()
lines = [line.strip() for line in data.split('\n') if line]

Ukázka dat:

In [None]:
for i in range(10):
    print(i, random.choice(lines))

Sada znaků = náš slovník:

In [None]:
chars = list(sorted(set(data)))
print(len(chars), chars)

Následující tabulka (`dict`) nám usnadní převod znaku na index.

In [None]:
chr2idx = {c: i for i, c in enumerate(chars)}

Podíváme se na statistické rozložení prvních znaků ve větách.

In [None]:
counts = {c: 0 for c in chars}
for line in lines:
    counts[line[0]] += 1
counts = np.array([counts[c] for c in chars], dtype=np.float)
p0 = counts / counts.sum()

In [None]:
plt.figure(figsize=(16, 8))
rects = plt.bar(range(len(chars)), 100. * p0)
plt.xticks(range(len(chars)), ['{}'.format(repr(c)) for c in chars])
for r in rects:
    x, w, h = r.get_x(), r.get_width(), r.get_height()
    plt.text(x + w / 2., h + 0.1, '{:.1f}'.format(h), ha='center', va='bottom', fontsize=8)
plt.ylabel('počet')
plt.show()

# Sekvenční data a PyTorch

## Embedding

Následující funkce převede řetězec na sekvenci čísel odpovídajících indexům znaků v tabulce. Pokud např. `chars = ['a', 'b', 'c']`, pak řetězec `'acba'` převede na `[0, 2, 1, 0]`. Výsledek vrátí jako PyTorch `Variable`.

In [None]:
def char_tensor(string):
    tensor = torch.zeros(len(string)).long()
    for c in range(len(string)):
        tensor[c] = chr2idx[string[c]]
    return tensor

In [None]:
x = char_tensor('abca')
x

Další funkce bude dělat opak: převede sekvenci indexů na řetězec.

In [None]:
def to_string(indices):
    if isinstance(indices, torch.Tensor):
        indices = indices.data
    return ''.join([chars[i] for i in indices])

In [None]:
to_string(x)

Sekvenci čísel potřebujeme převést na vektory jednotlivých znaků. Tento proces se v anglické literatuře označuje jako embedding a PyTorch ho implementuje jako vrstvu třídou `Embedding`. Vyjádřením této operace diferencovatelnou vrstvou umožňuje učení vektorů, které tedy nemusejí být fixní. O tom ale až příště.

In [None]:
# velikost slovniku je `len(chars)`
# dimenze znakoveho vektoru bude napr. 30
emb = nn.Embedding(len(chars), 30)

# dopredny pruchod
e = emb(x)
e.shape

In [None]:
e

`Embedding` nedělá nic jiného, než že na výstup pro znak s indexem $i$ vrátí $i$-tý řádek své váhové matice `weight`, která drží vektory slov. Defaultně je tato matice inicializována náhodně. Pokud první písmeno v příkladu bylo 'a', jehož index ve "slovníku" `chars` je 12, první řádek embeddingu `e` bude odpovídat 13. řádku (index 12) matice `emb.weight`.

In [None]:
bool(torch.all(e[0] == emb.weight[12]))

## RNN v PyTorch

PyTorch implementuje tři z nejrozšířenějších typů sítí třídami `RNN`, `LSTM` a `GRU`. API je pro všechny stejné: dopředný průchod `forward` očekává "zespodu" nějaký vstup `input` a "zleva" minulý stav `h0`. U `LSTM` je tento stav dvouvektorový. Výstupem je `output`, což je vlastně sekvence skrytých stavů poslední vrstvy rekurentní sítě pro jednotlivé kroky v čase, a nový stav `hn` po provedení celého průchodu. Vše vystihuje následující obrázek.

![](https://i.stack.imgur.com/SjnTl.png)

Zdroj: https://stackoverflow.com/a/48305882/9418551

V nejjednoušším případě máme pouze jednu vrstvu sítě a jeden krok. Potom `output` a `hn` jsou stejné. `output` tedy **neprochází žádnou lineární vrstvou**, jak by se mohlo na první pohled zdát. Transformaci na skóre/pravděpodobnost jednotlivých znaků tedy musíme provést sami.

**Příklad:** porovnejme `output` a `hidden`.
tensory by měly být tvaru `(seq, batch, dim)`
- `seq` ... jak jdou znaky ve "věte" za sebou
- `batch` ... počet paralelně zpracovávaných sekvencí, nezávisle na sobě
- `dim` ... příznaky na vstupu

Například tedy: `(10, 3, 5)` by znamenalo:
- 3 paralelně zpracovávané
- 10-znakové věty,
- kde každý znak reprezentuje 5dimenzionální vektor

In [None]:
# do site posleme pouze jeden znak
e0 = e[0].reshape(1, 1, -1)
e0.shape

In [None]:
# RNN ocekava na vstupu vektor o rozmeru 6 a skryty stav bude mit rozmer 8
rnn = nn.RNN(30, 8)

# inicializace skryteho stavu a vstupu
# tensory by mely byt tvaru (seq, batch, dim)
h = torch.rand(8)
o, h = rnn(e0)

print(o)
print(h)

Nyní už více samostatně. Zadefinujeme vlastní třídu, která bude řešit jednotlivé kroky sama ve svém dopředném průchodu. Vstupem tedy bude sekvence čísel, výstupem skóre jednotlivých kroků a skrytý stav z posledního kroku.

In [None]:
class RNN(nn.Module):
    def __init__(self, voc_size, emb_dim, hidden_size, output_size, n_layers=1):
        super(RNN, self).__init__()

        #################################################################
        # ZDE DOPLNIT
        
        self.emb = ...
        self.rnn = ...
        self.fc = ...
        
        #################################################################

    def forward(self, x, hidden):
        
        #################################################################
        # ZDE DOPLNIT
        
        ...
        
        #################################################################
        
        return score, hidden

    def init_hidden(self):
        
        #################################################################
        # ZDE DOPLNIT
        # funkce vrati skryty vektor nainicalizovany na nuly
        
        ...
        
        #################################################################
        
        return hidden

In [None]:
#################################################################
# ZDE DOPLNIT

voc_size = ...
emb_dim = ...
hidden_dim = ...
output_dim = ...

#################################################################

rnn = RNN(voc_size, emb_dim, hidden_dim, output_dim, n_layers=1)
stats = ans.Stats()

Vytvoříme si také funkci pro samplování z naší sítě. Funkce přijme model `rnn`, nějaký inicializační text `init_text`, příp. i inicializační `hidden`, a vygeneruje text - vrací tedy string.

In [None]:
def sample(rnn, init_text='', hidden=None, maxlen=150, mode='multinomial', temperature=0.6):
    """
    generuje text pomoci modelu `rnn`
    
    vstupy:
        rnn ... rekurentni sit odvozena z `nn.Module`, ktera po zavolani vraci dvojici (vyst_skore, skryta_rep)
        init_text ... inicializacni text, na ktery generovani textu navaze
        hidden ... inicializace skryte reprezentace
        maxlen ... maximalni delka generovaneho textu
        mode ... zpusob vyberu nasledujiciho znaku, viz komentare v kodu
        temperature ... vyhlazeni multinomialniho rozlozeni, viz komentare v kodu
    """
    # vystupni text bude pole (na konci prevedeme zpet na str)
    out_text = list(init_text)
    
    # pokud nezadan, inicializujeme nahodne, dle rozlozeni prvnich znaku
    if not out_text:
        s = np.random.choice(len(chars), p=p0)
        out_text = [chars[s]]
    
    # to same hidden
    if hidden is None:
        hidden = rnn.init_hidden()
        
        # vstup projedeme siti, abychom ziskali aktualni hidden stav
        x = char_tensor(out_text)
        for i in range(len(out_text)):
            score, hidden = rnn(x[i], hidden)
    
    # nasledujici znak je posledni znak prozatimniho vystupu
    x = char_tensor(out_text[-1])
    
    # pravdepodobnosti muzeme pocitat softmaxem
    softmax = nn.Softmax(dim=2)

    while True:
        # dopredny pruchod
        score, hidden = rnn(x, hidden)
        
        # pravdepodobnosti znaku
        p = softmax(score).detach().numpy().squeeze()
        
        # vyberem index `k` nasleduciho znaku
        if mode == 'multinomial':
            # nasledujici znak bude vybran dle ad hoc multinomialniho rozlozeni
            # parametr `temperature` ... vyssi hodnota znamena nahodnejsi vysledky
            # viz https://github.com/karpathy/char-rnn#sampling
            k = torch.multinomial(score.view(-1).div(temperature).exp(), 1)[0]
        elif mode == 'argmax':
            #################################################################
            # ZDE DOPLNIT
            
            # nasledujici znak bude ten, jehoz pravdepodobnost vysla maximalni
            k = ...
            
            #################################################################
        elif mode == 'proportional':
            #################################################################
            # ZDE DOPLNIT
            
            # nasl. znak se vybere nahodne, ale s pravdepodobnosti proporcionalni k vystupu softmaxu
            # napr. pokud znak 'x' ma dle softmaxu 84 %, bude s pravdepodobnosti 84 % vybran jako vstup do dalsi iterace
            k = ...
            
            #################################################################
        
        #################################################################
        # ZDE DOPLNIT
        
        # zastavit, pokud end-token
        ...
        
        # pridat znak na vystup
        ...
        
        # zastavit, pokud text je moc dlouhy
        ...
        
        # pripravit vstupni vektor `x` pro dalsi iteraci
        ...
        
        #################################################################
    
    return ''.join(out_text)

In [None]:
print(sample(rnn, init_text='prezident', mode='multinomial'))

# Trénování

V každé iteraci pomocí funkce `char_tensor` vytvoříme trénovací data $x_i$, $y_i$, což budou číselné indexy znaků tak, jak je definuje tabulka `chr2idx`. Budeme trénovat generování znaků, tzn. že požadovaným výstupem $y_i$ (label, target) pro vstup $x_i$ je vždy následující znak $y_i=x_{i+1}$. Vektor `y_train` je tedy v tomto případě stejného rozměru jako `X_train`. Poslední znak má jako label `\n`, značící konec sekvence.

Vyzkoušejte si na příkladu:

In [None]:
line = random.choice(lines)
print(line)

In [None]:
#################################################################
# ZDE DOPLNIT

x = ...
y = ...

#################################################################

In [None]:
print('data:   {} ... {}'.format(to_string(x[:10]), to_string(x[-10:])))
print('label:  {} ... {}'.format(to_string(y[:10]), to_string(y[-10:])))

In [None]:
optimizer = torch.optim.Adam(rnn.parameters(), lr=...)
criterion = nn.CrossEntropyLoss()

In [None]:
example = sample(rnn, mode='argmax')
max_per_epoch = ...

for epoch in range(1):
    # data budou nahodne prehazena
    train_ids = np.random.permutation(len(lines))[:max_per_epoch]

    # progressbar
    pb = tqdm.tqdm_notebook(train_ids, desc='ep {:03d}'.format(epoch))
    
    stats.new_epoch()
    
    for it, idx in enumerate(pb):
        hidden = rnn.init_hidden()
        rnn.zero_grad()
        loss = 0.
    
        #################################################################
        # ZDE DOPLNIT
        
        x = ...
        y = ...
        
        for ic, c in enumerate(lines[idx]):
            # dopredny pruchod pro `ic`-ty znak
            ...
            loss += ...
        
        loss /= len(x)
        
        #################################################################

        loss.backward()
        optimizer.step()
        
        if it % 100 == 0:
            example = sample(rnn)
        
        stats.append_batch_stats('train', loss=float(loss))
        pb.set_postfix(loss='{:.3f}'.format(stats.ravg('train', 'loss')), ex=example[:40])
    
# pripadne ulozit model
# torch.save(rnn.state_dict(), f'lstm-{epoch:02d}.pth')

In [None]:
stats.plot_by_batch(block_len=10, right_metric=None)

In [None]:
for i in range(5):
    print(sample(rnn, init_text='prezident', mode='multinomial'))