In [1]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

In [2]:
with open('data/anna.txt', 'r') as f:
    text = f.read()

In [3]:
#char encoding
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}

# text encoding
encoded = np.array([char2int[ch] for ch in text])

### Pre-processamento

In [5]:
def one_hot_encode(arr, n_labels):
    
    # Inicializa array
    one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)
    
    # Preenche com valor 1
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    # Reshape
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

In [11]:
# testar one_hot 

In [12]:
def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.
       
       Arguments
       ---------
       arr: DadosArray you want to make batches from
       batch_size: Tamanho do batch
       seq_length: Numero de caracteres encodados em cada sequencia
    '''
    
    batch_size_total = batch_size * seq_length
    n_batches = len(arr)//batch_size_total
    
    arr = arr[:n_batches * batch_size_total]
    arr = arr.reshape((batch_size, -1))
    
    for n in range(0, arr.shape[1], seq_length):
        x = arr[:, n:n+seq_length]
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

### Define a arquitetura

In [13]:
class CharRNN(nn.Module):
    
    def __init__(self, tokens, n_hidden=256, n_layers=2,
                               drop_prob=0.5, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        #definir lstm
        
        #definir dropout
        
        #definir camada fc 
      
    
    def forward(self, x, hidden):
               
        # camada lstm
        r_output, hidden = # 
        
        # dropout
        out = # 
        
        out = out.contiguous().view(-1, self.n_hidden)
        
        # camada fc
        out = #
        
        return out, hidden
    
    
    def init_hidden(self, batch_size):
        # Gera tensores de tamanho n_layers x betch_size x n_hidden
        weight = next(self.parameters()).data
        hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        
        return hidden

In [18]:
def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
    net.train()
    
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    #dados de treino/validacao
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    
    counter = 0
    n_chars = len(net.chars)
    for e in range(epochs):
        h = net.init_hidden(batch_size)
        
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1
            
            # One-hot encoding
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            
            # Cria variáveis para hidden state 
            h = tuple([each.data for each in h])

            net.zero_grad()
            
            # saida do modelo
            output, h = net(inputs, h)
            
            loss = criterion(output, targets.view(batch_size*seq_length).long())
            loss.backward()
            
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()
            
            if counter % print_every == 0:
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y

                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(batch_size*seq_length).long())
                
                    val_losses.append(val_loss.item())
                
                net.train() 
                
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

### Treinamento

In [15]:
n_hidden=64
n_layers=2

net = CharRNN(chars, n_hidden, n_layers)
print(net)

CharRNN(
  (lstm): LSTM(83, 64, num_layers=2, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=64, out_features=83, bias=True)
)


In [19]:
batch_size = 128
seq_length = 100
n_epochs = 5

train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)

Epoch: 1/5... Step: 10... Loss: 4.3304... Val Loss: 4.3099
Epoch: 1/5... Step: 20... Loss: 3.7938... Val Loss: 3.6659
Epoch: 1/5... Step: 30... Loss: 3.3357... Val Loss: 3.2176
Epoch: 1/5... Step: 40... Loss: 3.2312... Val Loss: 3.1474
Epoch: 1/5... Step: 50... Loss: 3.2396... Val Loss: 3.1303
Epoch: 1/5... Step: 60... Loss: 3.1969... Val Loss: 3.1266
Epoch: 1/5... Step: 70... Loss: 3.1740... Val Loss: 3.1248
Epoch: 1/5... Step: 80... Loss: 3.1828... Val Loss: 3.1229
Epoch: 1/5... Step: 90... Loss: 3.1814... Val Loss: 3.1222
Epoch: 1/5... Step: 100... Loss: 3.1681... Val Loss: 3.1211
Epoch: 1/5... Step: 110... Loss: 3.1701... Val Loss: 3.1203
Epoch: 1/5... Step: 120... Loss: 3.1486... Val Loss: 3.1195
Epoch: 1/5... Step: 130... Loss: 3.1591... Val Loss: 3.1184
Epoch: 2/5... Step: 140... Loss: 3.1566... Val Loss: 3.1178
Epoch: 2/5... Step: 150... Loss: 3.1592... Val Loss: 3.1172
Epoch: 2/5... Step: 160... Loss: 3.1404... Val Loss: 3.1165
Epoch: 2/5... Step: 170... Loss: 3.1161... Val Lo

### Teste

In [23]:
def predict(net, char, h=None, top_k=None):
        
        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        inputs = torch.from_numpy(x)
        
        
        h = tuple([each.data for each in h])
        out, h = net(inputs, h)

        p = F.softmax(out, dim=1).data
        
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        
        return net.int2char[char], h

In [24]:
def sample(net, size, prime='The', top_k=None):
        
    net.cpu()
    
    net.eval()
    
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    chars.append(char)
    
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [25]:
print(sample(net, 500, prime='Anna', top_k=5))

Annas wilt an se to ne an to sos to hhas. "hire wil hhit ose
nhang an tot as the toed wot ate
tis she as the win nesis nit on sot to sithed the, tal het tois
tang thas antetang ho terans tor
wet atee no nen he wolo tint, af sit on whe as shar hir whos. "he tare se sererinte his hh afithe wot seed,
we sor and so tas setos al sire tor wand af hos,,
he til tee her singas
tor terrand at hor,
af nat sin he teisd sans sin tore af hor sot tare he and she sere tos tito al setile tan san sis ate sas werer. "n
