# Recurrent Neural Networks Basics #

In this tutorial, you will explore the basic concepts behind Recurrent Neural Networks and reproduce two of Jürgen Schmidhubers classic experiments.

In [None]:
import os
import numpy as np
import tensorflow as tf

# don't use GPU, you can change this
os.environ["CUDA_VISIBLE_DEVICES"]=""

## Experiment 1: Noise-Free and Noisy Sequences ##

In [None]:
def gen_sequence(time_steps=10, prob=0.5):
    
    num_steps = time_steps + 1
    num_symbols = time_steps + 1
    
    sequence = np.zeros((num_steps, num_symbols))
    
    for i in range(time_steps - 1):
            sequence[i + 1][i] = 1
    
    rand = np.random.uniform(0, 1)
    
    if rand < prob:
        # seq A - (y, a(1), a(2), ..., a(p-1), y)
        sequence[0][time_steps - 1] = 1
        sequence[num_steps - 1][time_steps - 1] = 1
    else:
        # seq B - (x, a(1), a(2), ..., a(p-1), x)
        sequence[0][time_steps] = 1
        sequence[num_steps - 1][time_steps] = 1
        
    return sequence

In [None]:
def simple_RNN(sequence, hidden_units, uniform_maxval, use_biases):

    num_steps = sequence.shape[0].value
    num_symbols = sequence.shape[1].value
    xav_init = tf.contrib.layers.xavier_initializer

    # initialize variables 
    W = tf.get_variable("W", shape=[num_symbols, hidden_units], initializer=
                        tf.random_uniform_initializer(- uniform_maxval, uniform_maxval))
    U = tf.get_variable("U", shape=[hidden_units, hidden_units], initializer=
                        tf.random_uniform_initializer(- uniform_maxval, uniform_maxval))
        
    if use_biases:
        b = tf.get_variable("b", shape=[hidden_units], initializer=tf.constant_initializer(0.))
    else:
        b = 0
    
    # define function that will be performed each step
    def step(input_tensor, hidden_state):
        
        new_hidden_state = tf.tanh(tf.matmul(input_tensor, W) + tf.matmul(hidden_state, U) + b)
        
        return new_hidden_state
    
    # unroll the RNN for N time steps
    hidden_state = tf.zeros([1, hidden_units], dtype=tf.float32)
    outputs = []
        
    for step_idx in range(num_steps):
        
        hidden_state = step(sequence[step_idx : step_idx + 1], hidden_state)
        outputs.append(hidden_state)
        
    return tf.concat(outputs, 0)

In [None]:
def LSTM(sequence, hidden_units, uniform_maxval, use_biases, output_function, state_function):
    
    num_steps = sequence.shape[0].value
    num_symbols = sequence.shape[1].value
    
    # parameters
    U = tf.get_variable("U", shape=[3, num_symbols, hidden_units], 
                        initializer=tf.random_uniform_initializer(- uniform_maxval, uniform_maxval))
    W = tf.get_variable("W", shape=[3, hidden_units, hidden_units], 
                        initializer=tf.random_uniform_initializer(- uniform_maxval, uniform_maxval))

    if use_biases:
        bi = tf.get_variable("bi", shape=[hidden_units], initializer=tf.constant_initializer(0.))
        bo = tf.get_variable("bo", shape=[hidden_units], initializer=tf.constant_initializer(0.))
        bc = tf.get_variable("bc", shape=[hidden_units], initializer=tf.constant_initializer(0.))
    else:
        bi = 0
        bo = 0
        bc = 0
        
    def step(input_tensor, hidden_state):
        # gather previous internal state and output state
        state, cell = tf.unstack(hidden_state)

        # gates
        input_gate = tf.sigmoid(tf.matmul(input_tensor, U[0]) + tf.matmul(state, W[0]) + bi)
        output_gate = tf.sigmoid(tf.matmul(input_tensor, U[1]) + tf.matmul(state, W[1]) + bo)
        
        # new internal cell state
        input_state = tf.matmul(input_tensor, U[2]) + tf.matmul(state, W[2]) + bc
        cell = cell + state_function(input_state) * input_gate

        # output state
        output_state = output_function(cell) * output_gate
                            
        return tf.stack([output_state, cell])
    
    # unroll LSTM for N time steps
    hidden_state = tf.zeros([2, 1, hidden_units], dtype=tf.float32)
    outputs = []
        
    for step_idx in range(num_steps):
        
        hidden_state = step(sequence[step_idx:step_idx+1], hidden_state)
        outputs.append(hidden_state[0])
        
    return tf.concat(outputs, 0)

In [None]:
def dense(sequence, num_outputs, uniform_maxval, use_bias, name="dense"):
    
    with tf.variable_scope(name):
        weights = tf.get_variable("W", shape=[sequence.shape[1].value, num_outputs], initializer=
                                  tf.random_uniform_initializer(- uniform_maxval, uniform_maxval))

        if use_bias:
            bias = tf.get_variable("b", shape=[num_outputs], initializer=tf.constant_initializer(0.))
        else:
            bias = 0

        return tf.matmul(sequence, weights) + bias

In [None]:
def multi_layer_RNN(sequence, net_builder, hidden_units=10, layers=3):
    
    for layer_idx in range(layers):
        
        with tf.variable_scope("layer{}".format(layer_idx + 1)):
            sequence = net_builder(sequence, hidden_units=hidden_units)
            
            if layer_idx < layers - 1:
                sequence = tf.stack(sequence)[:, 0, :]

    return sequence

In [None]:
def RNN_loss(sequence, logits_list):
        
    # create loss
    loss = 0
    num_steps = sequence.shape[0].value - 1
    
    for step_idx in range(num_steps):
        
        step_loss = tf.nn.softmax_cross_entropy_with_logits(
            labels=sequence[step_idx + 1:step_idx + 2], logits=logits_list[step_idx:step_idx + 1])
        step_loss = tf.reduce_mean(step_loss)
        
        loss += step_loss
        
    loss /= num_steps
    
    return loss
    
def RNN_abs_error(sequence, logits_list,):
    
    # calculate absolute error
    abs_errors = []
    num_steps = sequence.shape[0].value - 1
    
    for step_idx in range(num_steps):
        
        prediction = tf.nn.softmax(logits_list[step_idx])
        tmp_error = tf.reduce_max(tf.abs(prediction - sequence[step_idx + 1]))
        
        abs_errors.append(tmp_error)
        
    abs_error = tf.reduce_max(abs_errors)
    
    return abs_error
            
def RNN_train(loss, learning_rate=1, grad_max_value=5):
    # create training op
    opt = tf.train.GradientDescentOptimizer(learning_rate)
    grads = opt.compute_gradients(loss)
        
    if grad_max_value is not None:
        clipped_grads = [(tf.clip_by_value(grad, -grad_max_value, grad_max_value), var) 
                         for grad, var in grads]
    else:
        clipped_grads = grads
    
    train_op = opt.apply_gradients(clipped_grads)
        
    return train_op

In [None]:
def run_experiment_1(sequence_pl, abs_error, train_op, max_training_steps=100000, 
                     stop_threshold=0.25, threshold_steps=1000):
    with tf.Session() as session:

        session.run(tf.global_variables_initializer())

        last_step_above_threshold = 0

        # training and evaluation
        success = False
        
        for train_step_idx in range(max_training_steps):
            seq = gen_sequence(time_steps=sequence_pl.shape[0].value - 1)
            abs_error_val, _ = session.run([abs_error, train_op], feed_dict={
                sequence_pl: seq
            })
            #print("step:", train_step_idx)
            #print("err", abs_error_val)
            #print()

            if abs_error_val > stop_threshold:
                last_step_above_threshold = train_step_idx

            if train_step_idx - last_step_above_threshold >= threshold_steps:
                success = True
                break

        # report
        if success:
            return train_step_idx
        else:
            return None

In [None]:
time_step_trials = [4, 5, 6, 7, 8, 9, 10, 20, 50, 100]

num_trials = 10

RNN_results = {}
LSTM_results = {}

for time_steps in time_step_trials:
    
    for trial_idx in range(num_trials):
        
        # RNN
        tf.reset_default_graph()
        sequence_pl = tf.placeholder(shape=[time_steps, time_steps], dtype=tf.float32)

        outputs = simple_RNN(sequence_pl, 20, 0.2, False)
        logits = dense(outputs, time_steps, 0.2, False)
        
        loss = RNN_loss(sequence_pl, logits)
        abs_error = RNN_abs_error(sequence_pl, logits)
        train_op = RNN_train(loss)

        res = run_experiment_1(sequence_pl, abs_error, train_op)
        
        if trial_idx == 0:
            RNN_results[time_steps] = {trial_idx: res}
        else:
            RNN_results[time_steps][trial_idx] = res
        
        # LSTM
        tf.reset_default_graph()
        sequence_pl = tf.placeholder(shape=[time_steps, time_steps], dtype=tf.float32)

        outputs = LSTM(sequence_pl, 20, 0.2, False, lambda x: tf.tanh(x), lambda x: tf.tanh(x) * 2)
        logits = dense(outputs, time_steps, 0.2, False)
        
        loss = RNN_loss(sequence_pl, logits)
        abs_error = RNN_abs_error(sequence_pl, logits)
        train_op = RNN_train(loss)

        res = run_experiment_1(sequence_pl, abs_error, train_op)
        
        if trial_idx == 0:
            LSTM_results[time_steps] = {trial_idx: res}
        else:
            LSTM_results[time_steps][trial_idx] = res
            
    print("RNN results")
    print(RNN_results)
    print("LSTM results")
    print(LSTM_results)