# Minimal character RNN

![Character sequence](images/charseq.jpeg)

Related paper by Andrej Karpathy: [Karpathy, Andrej, Justin Johnson, and Li Fei-Fei. "Visualizing and understanding recurrent networks." arXiv preprint arXiv:1506.02078 (2015).](https://arxiv.org/abs/1506.02078)

Related blogpost by Andrej Karpathy: [The Unreasonable Effectiveness of Recurrent Neural Networks](http://karpathy.github.io/2015/05/21/rnn-effectiveness/)

Original code by Andrej Karpathy: [gist](https://gist.github.com/karpathy/d4dee566867f8291f086)

In [None]:
from __future__ import print_function

import torch
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim

from torch.autograd import Variable

import numpy as np

from tqdm import tqdm

import sys

## Load the data

A Shakespeare sample can be downloaded from [here](https://github.com/karpathy/char-rnn/raw/master/data/tinyshakespeare/input.txt).

In [None]:
with open("data/tinyshakespeare.txt", "r") as data_file:
    data = data_file.read()

Show the amount of characters in the text:

In [None]:
data_size = len(data)
print("Number of symbols in text:", data_size)

Build an alphabet from the text:

In [None]:
alphabet = set(data)
alphabet_size = len(alphabet)
print("Alphabet size:", alphabet_size)

Assign a number to every symbol in the alphabet:

In [None]:
symbol_to_id = {}
id_to_symbol = {}
for symbol_id, symbol in enumerate(sorted(alphabet)):
    symbol_to_id[symbol] = symbol_id
    id_to_symbol[symbol_id] = symbol

Transform a symbol into a one-hot-encoded vector:

In [None]:
def one_hot_encoding(symbol):
    one_hot_encoded = torch.zeros(alphabet_size)
    symbol_id = symbol_to_id[symbol]
    one_hot_encoded[symbol_id] = 1
    return one_hot_encoded

Transform a sequence of symbols into a one-dimensional tensor of symbol IDs:

In [None]:
def labels_tensor(symbols):
    return torch.Tensor([symbol_to_id[symbol] for symbol in symbols]).long()

## Model

In [None]:
hidden_size = 100

class MinCharRNN(nn.Module):
    
    def __init__(self):
        super(MinCharRNN, self).__init__()
        
        self.input_to_hidden = nn.Linear(alphabet_size, hidden_size)
        self.hidden_to_hidden = nn.Linear(hidden_size, hidden_size)
        self.hidden_to_output = nn.Linear(hidden_size, alphabet_size)

    def forward(self, input_symbol, hidden_state):
        hidden_state = torch.tanh(self.input_to_hidden(input_symbol) + self.hidden_to_hidden(hidden_state))
        output = self.hidden_to_output(hidden_state)
        return output, hidden_state

## Training

Function to initialize every module (layer) of our model:

In [None]:
def initialize_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.uniform_(m.weight, -0.01, 0.01)

Initialize the model, the loss funcion and the optimization algorithm:

In [None]:
learning_rate = 1e-1

model = MinCharRNN()    
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adagrad(model.parameters(), lr=learning_rate)

model.apply(initialize_weights)

Uncomment to load a previously saved model:

In [None]:
# model.load_state_dict(torch.load("models/min-char-rnn.torch"))

Function to print a sample text with a fixed amount of characters:

In [None]:
sample_size = 200
first_symbol = "\n"
symbol_ids = list(range(alphabet_size))

def print_sample():
    sample = ""
    
    with torch.no_grad():
        v_input_symbol = Variable(one_hot_encoding(first_symbol))
        v_hidden_state = Variable(torch.zeros((1, hidden_size)))    

        for sample_id in range(sample_size):
            v_logits, v_hidden_state = model(v_input_symbol, v_hidden_state)

            v_probabilities = F.softmax(v_logits, dim=1)
            probabilities = v_probabilities.data.squeeze(0).numpy()

            symbol_id = np.random.choice(symbol_ids, p=probabilities)
            symbol = id_to_symbol[symbol_id]
            sample += symbol

            v_input_symbol = Variable(one_hot_encoding(symbol))

    print(sample)

Initial sample without training:

In [None]:
print_sample()

In [None]:
epochs = 10
sequence_size = 25
batches = data_size // (sequence_size + 1)
gradient_clipping = 5

initial_state = torch.zeros((1, hidden_size))

for epoch_id in range(epochs):
    # reset the state before every epoch
    last_hidden_state = initial_state
    
    epoch_accumulated_loss = 0.0
    
    # train
    model.train(mode=True)
    
    with tqdm(total=batches) as progress_bar:
        for batch_id in range(batches):
            batch_start = batch_id * sequence_size

            # reuse the hidden state from last batch
            hidden_state = Variable(last_hidden_state)

            # clear the gradient information from the past batch
            optimizer.zero_grad()

            # for every symbol in the batch
            # try predict the next symbol
            # and meassure the loss
            predictions = []
            for sequence_id in range(sequence_size):
                v_input_symbol = Variable(one_hot_encoding(data[batch_start + sequence_id]))

                v_prediction, hidden_state = model(v_input_symbol, hidden_state)

                predictions.append(v_prediction)

            # create all labels
            v_labels = Variable(labels_tensor(data[batch_start + 1:batch_start + sequence_size + 1]))

            # create all predictions
            v_predictions = torch.cat(predictions)

            # backpropagate through time
            v_loss = loss_function(v_predictions, v_labels)
            v_loss.backward()

            # gradient clipping to avoid exploding gradients
            for parameter in model.parameters():
                parameter.grad.data.clamp_(-gradient_clipping, gradient_clipping)

            # update parameters
            optimizer.step()

            # batch logging
            loss = v_loss.data.item()
            epoch_accumulated_loss += loss
            
            progress_bar.set_postfix(loss="{:.03f}".format(loss))
            progress_bar.update()

            # take the hidden state out of the variable
            # to avoid backpropagating the next batch to this one
            last_hidden_state = hidden_state.data
    
    # epoch logging
    mean_loss = epoch_accumulated_loss / float(batches)
    print("Epoch {:d}/{:d} Mean Loss: {:.03f} Sample:".format(epoch_id + 1, epochs, mean_loss))
    print()
    model.train(mode=False)
    print_sample()
    sys.stdout.flush()

Uncomment to save the model:

In [None]:
# torch.save(model.state_dict(), "models/min-char-rnn.torch")