# Recurrent Neural Networks with ``BigDL Keras``


With BigDL, now we can train the recurrent neural networks (RNNs) more neatly, such as the long short-term memory (LSTM) and the gated recurrent unit (GRU). To demonstrate the end-to-end RNN training and prediction pipeline, we take a classic problem in language modeling as a case study. Specifically, we will show how to predict the distribution of the next word given a sequence of previous words.

## Import packages

To begin with, we need to make the following necessary imports.

In [1]:
from __future__ import print_function
import math
import os
import time
import numpy as np

## Define classes for indexing words of the input document

In a language modeling problem, we define the following classes to facilitate the routine procedures for loading document data. In the following, the ``Dictionary`` class is for word indexing: words in the documents can be converted from the string format to the integer format. 

In this example, we use consecutive integers to index words of the input document.

In [2]:
class Dictionary(object):
    def __init__(self):
        self.word2idx = {}
        self.idx2word = []

    def add_word(self, word):
        if word not in self.word2idx:
            self.idx2word.append(word)
            self.word2idx[word] = len(self.idx2word) - 1
        return self.word2idx[word]

    def __len__(self):
        return len(self.idx2word)

The ``Dictionary`` class is used by the ``Corpus`` class to index the words of the input document.

In [3]:
class Corpus(object):
    def __init__(self, path):
        self.dictionary = Dictionary()
        self.train = self.tokenize(path + 'train.txt')
        self.valid = self.tokenize(path + 'valid.txt')
        self.test = self.tokenize(path + 'test.txt')

    def tokenize(self, path):
        """Tokenizes a text file."""
        assert os.path.exists(path)
        # Add words to the dictionary
        with open(path, 'r') as f:
            tokens = 0
            for line in f:
                words = line.split() + ['<eos>']
                tokens += len(words)
                for word in words:
                    self.dictionary.add_word(word)

        # Tokenize file content
        with open(path, 'r') as f:
            ids = np.zeros((tokens,), dtype='int32')
            token = 0
            for line in f:
                words = line.split() + ['<eos>']
                for word in words:
                    ids[token] = self.dictionary.word2idx[word]
                    token += 1

        return ids

## Load data as batches

We load the document data by leveraging the aforementioned ``Corpus`` class. 

To speed up the subsequent data flow in the RNN model, we pre-process the loaded data as batches. This procedure is defined in the following ``batchify`` function.

Note: The dataset used below can be downloaded from this link http://goo.gl/vT4cEw. You should also extract the zip file under the "/tmp" directory to let the code run successfully.

In [4]:
data_path = "/tmp/nlp/ptb."

corpus = Corpus(data_path)

def batchify(data, batch_size):
    """Reshape data into (num_example, batch_size)"""
    nbatch = data.shape[0] // batch_size
    data = data[:nbatch * batch_size]
    data = data.reshape((batch_size, 1, nbatch)).T
    return data

batch_size = 32
train_data = batchify(corpus.train, batch_size)
val_data = batchify(corpus.valid, batch_size)
test_data = batchify(corpus.test, batch_size)

## Provide an exposition of different RNN models
We can make different RNN models available with the following single ``RNNModel`` class.

Users can select their preferred RNN model or compare different RNN models by configuring the argument of the constructor of ``RNNModel``. We will show an example following the definition of the ``RNNModel`` class.

In [5]:
from bigdl.nn.keras.layer import *
from bigdl.nn.keras.topology import Sequential

class RNNModel():
    """A model with an encoder, recurrent layer, and a decoder."""   
    #batch_size was defined 32
    def __init__(self, mode, vocab_size, num_hidden, arg_input_shape = (1, batch_size), dropout=0.5):
            self.model = Sequential()
            if mode == 'rnn_relu':
                self.model.add(SimpleRNN(num_hidden, activation = "relu", input_shape = arg_input_shape))
            elif mode == 'rnn_tanh':
                self.model.add(SimpleRNN(num_hidden, input_shape = arg_input_shape))
            elif mode == 'lstm':
                self.model.add(LSTM(num_hidden, input_shape = arg_input_shape))
            elif mode == 'gru':
                self.model.add(GRU(num_hidden, input_shape = arg_input_shape))
            else:
                raise ValueError("Invalid mode %s. Options are rnn_relu, "
                                 "rnn_tanh, lstm, and gru"%mode)
            
            self.decoder = Dense(vocab_size, activation = "tanh")
            self.model.add(self.decoder)
            self.num_hidden = num_hidden

## Build the model

We go on to build the model, initialize model parameters, and configure the optimization algorithms for training the RNN model.

For demonstration, LSTM is the chosen RNN model type. For other RNN options, one can replace the 'lstm' string to 'rnn_relu', 'rnn_tanh', or 'gru'.

In [6]:
ntokens = len(corpus.dictionary)
model_type = 'lstm'
num_hid = 100
LSTM = RNNModel(model_type, ntokens, num_hid)

creating: createKerasSequential
creating: createKerasLSTM
creating: createKerasDense


In [7]:
print(LSTM.model.get_input_shape())
print(LSTM.model.get_output_shape())

(None, 1, 32)
(None, 10000)


## Train the model and evaluate on validation and testing data sets

Now we can define functions for training and evaluating the model. The following are two helper functions that will be used during model training and evaluation.

In [8]:
from bigdl.nn.criterion import *

LSTM.model.compile(optimizer='sgd', loss=CrossEntropyCriterion(), metrics=['accuracy'])

creating: createCrossEntropyCriterion
creating: createDefault
creating: createSGD
creating: createTop1Accuracy


## Execute Training

In [10]:
LSTM.model.fit(train_data, corpus.train[:len(train_data)], batch_size=8, nb_epoch=100,
validation_data=(val_data[:500], corpus.valid[:500]))

Py4JJavaError: An error occurred while calling o37.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46012.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46012.0 (TID 23016, localhost, executor driver): java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: Boxed Error
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$8.apply(DistriOptimizer.scala:262)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$8.apply(DistriOptimizer.scala:262)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5.apply(DistriOptimizer.scala:262)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5.apply(DistriOptimizer.scala:202)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: Boxed Error
	at scala.concurrent.impl.Promise$.resolver(Promise.scala:55)
	at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:47)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:244)
	at scala.concurrent.Promise$class.complete(Promise.scala:55)
	at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:23)
	at com.intel.analytics.bigdl.utils.ThreadPool$$anon$1.execute(ThreadPool.scala:203)
	at scala.concurrent.impl.Future$.apply(Future.scala:31)
	at scala.concurrent.Future$.apply(Future.scala:494)
	at com.intel.analytics.bigdl.utils.ThreadPool.invoke(ThreadPool.scala:168)
	at com.intel.analytics.bigdl.nn.ClassNLLCriterion.updateOutput(ClassNLLCriterion.scala:130)
	at com.intel.analytics.bigdl.nn.CrossEntropyCriterion.updateOutput(CrossEntropyCriterion.scala:40)
	at com.intel.analytics.bigdl.nn.CrossEntropyCriterion.updateOutput(CrossEntropyCriterion.scala:31)
	at com.intel.analytics.bigdl.nn.abstractnn.AbstractCriterion.forward(AbstractCriterion.scala:73)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$6$$anonfun$apply$2.apply$mcI$sp(DistriOptimizer.scala:251)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$6$$anonfun$apply$2.apply(DistriOptimizer.scala:243)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$6$$anonfun$apply$2.apply(DistriOptimizer.scala:243)
	at com.intel.analytics.bigdl.utils.ThreadPool$$anonfun$1$$anon$4.call(ThreadPool.scala:112)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more
Caused by: java.lang.AssertionError: assertion failed: curTarget 0 is out of range 1 to 10000
	at scala.Predef$.assert(Predef.scala:170)
	at com.intel.analytics.bigdl.nn.ClassNLLCriterion$$anonfun$updateOutput$5.apply(ClassNLLCriterion.scala:132)
	at com.intel.analytics.bigdl.nn.ClassNLLCriterion$$anonfun$updateOutput$5.apply(ClassNLLCriterion.scala:130)
	at com.intel.analytics.bigdl.utils.ThreadPool$$anonfun$invoke$2.apply(ThreadPool.scala:162)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1988)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$.optimize(DistriOptimizer.scala:312)
	at com.intel.analytics.bigdl.optim.DistriOptimizer.optimize(DistriOptimizer.scala:914)
	at com.intel.analytics.bigdl.nn.keras.KerasModel.fit(Topology.scala:106)
	at com.intel.analytics.bigdl.nn.keras.KerasModel.fit(Topology.scala:119)
	at com.intel.analytics.bigdl.python.api.PythonBigDLKeras.fit(PythonBigDLKeras.scala:697)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: Boxed Error
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$8.apply(DistriOptimizer.scala:262)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$8.apply(DistriOptimizer.scala:262)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5.apply(DistriOptimizer.scala:262)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5.apply(DistriOptimizer.scala:202)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.util.concurrent.ExecutionException: Boxed Error
	at scala.concurrent.impl.Promise$.resolver(Promise.scala:55)
	at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:47)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:244)
	at scala.concurrent.Promise$class.complete(Promise.scala:55)
	at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:23)
	at com.intel.analytics.bigdl.utils.ThreadPool$$anon$1.execute(ThreadPool.scala:203)
	at scala.concurrent.impl.Future$.apply(Future.scala:31)
	at scala.concurrent.Future$.apply(Future.scala:494)
	at com.intel.analytics.bigdl.utils.ThreadPool.invoke(ThreadPool.scala:168)
	at com.intel.analytics.bigdl.nn.ClassNLLCriterion.updateOutput(ClassNLLCriterion.scala:130)
	at com.intel.analytics.bigdl.nn.CrossEntropyCriterion.updateOutput(CrossEntropyCriterion.scala:40)
	at com.intel.analytics.bigdl.nn.CrossEntropyCriterion.updateOutput(CrossEntropyCriterion.scala:31)
	at com.intel.analytics.bigdl.nn.abstractnn.AbstractCriterion.forward(AbstractCriterion.scala:73)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$6$$anonfun$apply$2.apply$mcI$sp(DistriOptimizer.scala:251)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$6$$anonfun$apply$2.apply(DistriOptimizer.scala:243)
	at com.intel.analytics.bigdl.optim.DistriOptimizer$$anonfun$5$$anonfun$6$$anonfun$apply$2.apply(DistriOptimizer.scala:243)
	at com.intel.analytics.bigdl.utils.ThreadPool$$anonfun$1$$anon$4.call(ThreadPool.scala:112)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more
Caused by: java.lang.AssertionError: assertion failed: curTarget 0 is out of range 1 to 10000
	at scala.Predef$.assert(Predef.scala:170)
	at com.intel.analytics.bigdl.nn.ClassNLLCriterion$$anonfun$updateOutput$5.apply(ClassNLLCriterion.scala:132)
	at com.intel.analytics.bigdl.nn.ClassNLLCriterion$$anonfun$updateOutput$5.apply(ClassNLLCriterion.scala:130)
	at com.intel.analytics.bigdl.utils.ThreadPool$$anonfun$invoke$2.apply(ThreadPool.scala:162)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	... 16 more


Recall that the RNN model training is based on maximization likelihood of observations. For evaluation purposes, we have used the following two measures:

* Loss: the loss function is defined as the average negative log likelihood of the target words (ground truth) under prediction: $$\text{loss} = -\frac{1}{N} \sum_{i = 1}^N \text{log} \  p_{\text{target}_i}, $$ where $N$ is the number of predictions and $p_{\text{target}_i}$ the predicted likelihood of the $i$-th target word.

* Perplexity: the average per-word perplexity is $\text{exp}(\text{loss})$.

To orient the reader using concrete examples, let us illustrate the idea of the perplexity measure as follows.

* Consider the perfect scenario where the model always predicts the likelihood of the target word as 1. In this case, for every $i$ we have $p_{\text{target}_i} = 1$. As a result, the perplexity of the perfect model is 1. 

* Consider a baseline scenario where the model always predicts the likelihood of the target word randomly at uniform among the given word set $W$. In this case, for every $i$ we have $p_{\text{target}_i} = 1 / |W|$. As a result, the perplexity of a uniformly random prediction model is always $|W|$. 

* Consider the worst-case scenario where the model always predicts the likelihood of the target word as 0. In this case, for every $i$ we have $p_{\text{target}_i} = 0$. As a result, the perplexity of the worst model is positive infinity. 


Therefore, a model with a lower perplexity that is closer to 1 is generally more effective. Any effective model has to achieve a perplexity lower than the cardinality of the target set.
 