# Predict characters using 1 cell RNN using Chainer

### Basics

In [1]:
import numpy as np
import sys
import glob
import shutil

### Create the dataset

In [2]:
def create_dataset(ds_path):
"""
Creates dataset from a corpus of texts.
Args :
    ds_path : File containing text or a .txt file.
Returns:
    char_idx : Character-to-Index mapping of the 
        unique characters in the ds_path.
    idx_char : Character-to-Index mapping of the 
        unique characters in the ds_path. 
    idx : Indexes of the sequence of characters in
        the ds_path
    data : Sequence of character in the ds_path
"""
    data = open(ds_path, 'r').read()
    symbols = list(set(data))
    char_idx = {char:idx for idx, char in enumerate(symbols)}
    idx_char = {idx:char for idx, char in enumerate(symbols)}
    idx = [char_idx[char] for idx, char in enumerate(data)]
    idx = np.asarray(idx, dtype=np.int32)
    return char_idx, idx_char, idx, data

### RNN Model

In [3]:
import chainer
import chainer
from chainer import cuda, Function, gradient_check, report, training, utils, Variable
from chainer import datasets, iterators, optimizers, serializers
from chainer import Link, Chain, ChainList
import chainer.functions as F
import chainer.links as L
from chainer.training import extensions

In [4]:
class RNN(chainer.Chain):
"""
    Chainer implementation of a single cell RNN.
    Note : 
        L.EmbedID handles word vectorization
        required by typical NLP tasks. Takes 
        a character ID [1, 1] vector as input
        and returns a vector with dimensions
        [1 , #of items in alphabet of the dataset]
        as output.
"""
    def __init__(self, n_vocab, n_units):
        super(RNN, self).__init__(
            embedID = L.EmbedID(n_vocab, n_units), # This layer generated the typical word vector
            l1 = L.Linear(n_units, n_units), # The input layer
            h1 = L.Linear(n_units, n_units), # hidden Layer
            l2 = L.Linear(n_units, n_vocab) # output layer
        )
        self.r_h1 = None # hidden state for the first hidden layer
    
    
    def reset_state(self):
        self.r_h1 = None
    
    def __call__(self, x):
    """
        This is called each time an input is
        fed forward into the model.
    """
        h = self.embedID(x) # get the word vector.
        if self.r_h1 is None: # if it is the first time step.
            self.r_h1 = F.tanh(self.l1(h)) # Simply feed forward and initialize the recurrent hidden state.
        else:
            self.r_h1 = F.tanh(self.l1(h) + self.h1(self.r_h1)) # recurrent state = f(Wht-1, Wht). general update rule for RNNs.
        y = self.l2(self.r_h1) # get the output.
        return y
            

### Setup the dataset iterator

In [5]:
class ParallelSequentialIterator(chainer.dataset.Iterator):
 
    def __init__(self, dataset, batch_size, repeat=True):
        self.dataset = dataset
        self.batch_size = batch_size
        self.epoch = 0
        self.is_new_epoch = False
        self.repeat = repeat
        length = len(dataset)
        self.offsets = [i * length // batch_size for i in range(batch_size)]
        self.iteration = 0
 
    def __next__(self):
        length = len(self.dataset)
        if not self.repeat and self.iteration * self.batch_size >= length:
            raise StopIteration
        cur_words = self.get_words()
        self.iteration += 1
        next_words = self.get_words()
 
        epoch = self.iteration * self.batch_size // length
        self.is_new_epoch = self.epoch < epoch
        if self.is_new_epoch:
            self.epoch = epoch
         
        return list(zip(cur_words, next_words))
 
    @property
    def epoch_detail(self):
        return self.iteration * self.batch_size / len(self.dataset)
 
    def get_words(self):
        return [self.dataset[(offset + self.iteration) % len(self.dataset)]
                for offset in self.offsets]
 
    def serialize(self, serializer):
        self.iteration = serializer('iteration', self.iteration)
        self.epoch = serializer('epoch', self.epoch)

### Setup the updater

In [6]:
class BPTTUpdater(chainer.training.StandardUpdater):
 
    def __init__(self, train_iter, optimizer, bprop_len, device):
        super(BPTTUpdater, self).__init__(
            train_iter, optimizer, device=device)
        self.bprop_len = bprop_len
 
    def update_core(self):
        loss = 0
        train_iter = self.get_iterator('main')
        optimizer = self.get_optimizer('main')
 
        for i in range(self.bprop_len):
            batch = train_iter.__next__()
            x, t = self.converter(batch, self.device)
            loss += optimizer.target(chainer.Variable(x), chainer.Variable(t))
        optimizer.target.cleargrads()
        loss.backward()
        loss.unchain_backward()
        optimizer.update()

### Setup the training script

In [7]:
n_epochs = 10  # epochs
n_units = 100   # neurons in the hidden cell
batch_size = 100
train_test_split = 100000

In [8]:
char_idx, idx_char, idx, train_data = create_dataset('dataset/alllines.txt')
model = RNN(len(char_idx), n_units)
model = L.Classifier(model)

eval_classifier_model = model.copy()
eval_model = model.predictor

optimizer = optimizers.Adam(alpha = 0.0005)
optimizer.setup(model)

train = idx[0:len(idx) - train_test_split] #len(idx) = 4583798
test = idx[len(idx) - train_test_split : -1]

train_iter = ParallelSequentialIterator(train, batch_size)
test_iter = ParallelSequentialIterator(test, batch_size, repeat=False)

updater = BPTTUpdater(train_iter, optimizer, 10, -1)

trainer = chainer.training.Trainer(updater, (n_epochs, 'epoch'), out='results')

trainer.extend(extensions.Evaluator(test_iter, eval_classifier_model, device = -1,eval_hook=lambda _: eval_model.reset_state()))

trainer.extend(extensions.dump_graph('main/loss'))

interval = (1, 'epoch')

trainer.extend(extensions.snapshot_object(model, 'epoch-{.updater.epoch}.model'), trigger=interval)
trainer.extend(extensions.snapshot(), trigger=interval)
trainer.extend(extensions.LogReport())
trainer.extend(extensions.PrintReport(['epoch', 'main/loss', 'validation/main/loss', 'main/accuracy', 'validation/main/accuracy', 'elapsed_time']))
trainer.extend(extensions.PlotReport(['main/loss', 'validation/main/loss'], x_key='epoch', file_name='loss.png'))
trainer.extend(extensions.PlotReport(['main/accuracy', 'validation/main/accuracy'], x_key='epoch', file_name='accuracy.png'))

trainer.run()

epoch       main/loss   validation/main/loss  main/accuracy  validation/main/accuracy  elapsed_time
[J1           1.90275     1.72204               0.451338       0.48914                   239.171       
[J2           1.6945      1.65879               0.496474       0.50504                   477.275       
[J3           1.6494      1.63114               0.506748       0.51236                   708.12        
[J4           1.61887     1.61378               0.515134       0.51772                   945.897       
[J5           1.60761     1.60275               0.51791        0.52096                   1201.65       
[J6           1.58992     1.5941                0.522435       0.5246                    1386.39       
[J7           1.58275     1.58818               0.525981       0.5251                    1525.2        
[J8           1.57637     1.5841                0.526068       0.5268                    1651.83       
[J9           1.56942     1.58028               0.529235   

### Prediction

In [171]:
model = RNN(len(char_idx), n_units)

model = L.Classifier(model)
serializers.load_npz('results/epoch-3.model', model)

data = np.asarray([2], dtype=np.int32)

for i in range(0, 100):
    
    pred_model = model.predictor(Variable(data))
    predicted_char = idx_char[np.argmax(pred_model.data)]
    print(predicted_char, end='')
    data = np.asarray([char_idx[predicted_char]],dtype=np.int32)

here are you."
"Where are you."
"Where are you."
"Where are you."
"Where are you."
"Where are you."
