In [1]:
import tensorflow as tf
from tensorflow.contrib import rnn
import numpy as np
# to make this notebook's output stable across runs
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)

  from ._conv import register_converters as _register_converters


# Bi-directional Recurrent Neural Network 

# Example 1

In [None]:
import tensorflow as tf
import numpy as np

tf.reset_default_graph()

# Create input data
#2 instances with 10 times steps and 8 inputs (features)
X = np.random.randn(2, 10, 8)

# The second instance is of length 6 
# variable length of sequences
X[1,6:] = 0
X_lengths = [10, 6]

cell = tf.nn.rnn_cell.LSTMCell(num_units=64, state_is_tuple=True)

outputs, states  = tf.nn.bidirectional_dynamic_rnn(
    cell_fw=cell,
    cell_bw=cell,
    dtype=tf.float64,
    sequence_length=X_lengths,
    inputs=X)

#outputs will be a tuple consisting of outputs of forward and backward LSTM layers. 
#Each of these outputs has shape [batch_size X time_steps X num_neurons]
output_fw, output_bw = outputs

#states will be a tuple consisting of states of forward and backward LSTM layers. 
#Each of these states has shape [batch_size X num_neurons]
states_fw, states_bw = states

result = tf.contrib.learn.run_n(
    {"outputs": outputs, "output_fw": output_fw, "output_bw": output_bw, "states_fw": states_fw, "states_bw": states_bw},
    n=1,
    feed_dict=None)
print(result[0]["outputs"])
print("--------------------------------------------------------------------------------")
print(result[0]["output_fw"])
print("--------------------------------------------------------------------------------")
print(result[0]["output_bw"])
print("--------------------------------------------------------------------------------")
print(result[0]["output_fw"].shape)
#(2, 10, 64)
print(result[0]["output_bw"].shape)
#(2, 10, 64)
print(result[0]["states_fw"].h.shape)
#(2, 64)
print(result[0]["states_bw"].h.shape)
#(2, 64)

# Example 2

In [None]:
import tensorflow as tf
import numpy as np

# Network Parameters
n_steps = 2
n_inputs = 3
num_hidden = 5

# tf Graph input
X = tf.placeholder(tf.float32, [None, n_steps, n_inputs])

lstm_fw_cell = tf.contrib.rnn.LSTMCell(num_hidden)
lstm_bw_cell = tf.contrib.rnn.LSTMCell(num_hidden)
outputs, states = tf.nn.bidirectional_dynamic_rnn(lstm_fw_cell, lstm_bw_cell, X, dtype=tf.float32)

X_batch = np.array([ 
    [[0,1,2],[9,8,7]],#instance 0
    [[3,4,5],[0,0,0]],#instance 1
    [[6,7,8],[6,5,4]],#instance 2
    [[9,0,1],[3,2,1]]])#instance 3

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    outputs_val, states_val = sess.run([outputs, states], feed_dict={X: X_batch})
    
states_val #consisting of c and h matrices.
#A tuple (output_state_fw, output_state_bw) containing the forward and the backward final states of bidirectional rnn.

outputs_val #A tuple (output_fw, output_bw) containing the forward and the backward rnn output
output_fw, output_bw = outputs_val

#output_fw is the same with outputs_val[0]
#output_bw is the same with outputs_val[1]

print(output_fw.shape)
print(output_bw.shape)
print(outputs_val[0].shape) #output_fw (4, 2, 5)
print(outputs_val[1].shape) #output_bw (4, 2, 5)

# Example 3

In [None]:
import numpy as np
# Batch size = 2, sequence length = 3, number features = 1, shape=(2, 3, 1)
values231 = np.array([
    [[1], [2], [3]],
    [[2], [3], [4]]])

# Batch size = 3, sequence length = 5, number features = 2, shape=(3, 5, 2)
#values352 = np.array([
#    [[1, 4], [2, 5], [3, 6], [4, 7], [5, 8]],
#    [[2, 5], [3, 6], [4, 7], [5, 8], [6, 9]],
#    [[3, 6], [4, 7], [5, 8], [6, 9], [7, 10]]
#])

import tensorflow as tf
tf.reset_default_graph()

tf_values231 = tf.constant(values231, dtype=tf.float32)

lstm_cell_fw = tf.contrib.rnn.LSTMCell(100)
lstm_cell_bw = tf.contrib.rnn.LSTMCell(105) # change to 105 just so can see the effect in output

(output_fw, output_bw), (output_state_fw, output_state_bw) = tf.nn.bidirectional_dynamic_rnn(
    cell_fw=lstm_cell_fw, 
    cell_bw=lstm_cell_bw, 
    inputs=tf_values231,
    dtype=tf.float32)
    
print(output_fw)
# tf.Tensor 'bidirectional_rnn/fw/fw/transpose:0' shape=(2, 3, 100) dtype=float32
print(output_bw)
# tf.Tensor 'ReverseV2:0' shape=(2, 3, 105) dtype=float32
print(output_state_fw.c)
# tf.Tensor 'bidirectional_rnn/fw/fw/while/Exit_2:0' shape=(2, 100) dtype=float32
print(output_state_fw.h)
# tf.Tensor 'bidirectional_rnn/fw/fw/while/Exit_3:0' shape=(2, 100) dtype=float32
print(output_state_bw.c)
# tf.Tensor 'bidirectional_rnn/bw/bw/while/Exit_2:0' shape=(2, 105) dtype=float32
print(output_state_bw.h)
# tf.Tensor 'bidirectional_rnn/bw/bw/while/Exit_3:0' shape=(2, 105) dtype=float32

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    output_run, state_run = sess.run([(output_fw, output_bw), (output_state_fw, output_state_bw)])

## static_bidirectional_rnn

In [None]:
import tensorflow as tf
from tensorflow.contrib import rnn
import numpy as np
old_v = tf.logging.get_verbosity()
tf.logging.set_verbosity(tf.logging.ERROR)

# Import MNIST data
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)
tf.logging.set_verbosity(old_v)

#This example is using MNIST handwritten digits. The dataset contains 60,000 examples for training and 10,000 examples 
#for testing. The digits have been size-normalized and centered in a fixed-size image (28x28 pixels) with values 
#from 0 to 1. For simplicity, each image has been flattened and converted to a 1-D numpy array of 784 features (28*28).
#MNIST image shape is 28*28px, we will then handle 28 sequences of 28 timesteps for every sample.

In [None]:
# Training Parameters
learning_rate = 0.001
training_steps = 10000
batch_size = 128
display_step = 200

# Network Parameters
num_input = 28 # MNIST data input (img shape: 28*28)
timesteps = 28 # timesteps
num_hidden = 128 # hidden layer num of features
num_classes = 10 # MNIST total classes (0-9 digits)

# tf Graph input
X = tf.placeholder(tf.float32, [None, timesteps, num_input], name='input_placeholder')
Y = tf.placeholder(tf.float32, [None, num_classes], name='labels_placeholder')

In [None]:
# Define weights
weights = {
    # Hidden layer weights => 2*n_hidden because of forward + backward cells
    'out': tf.Variable(tf.random_normal([2*num_hidden, num_classes]))
}
biases = {
    'out': tf.Variable(tf.random_normal([num_classes]))
}

In [None]:
#Now that we are receiving inputs of shape [batch_size,time_steps,n_input],we need to convert it into a list of 
#tensors of shape [batch_size,n_inputs] of length time_steps so that it can be then fed to static_rnn.

#processing the input tensor from [batch_size,n_steps,n_input] to "time_steps" number of [batch_size,n_input] tensors
input = tf.unstack(X, timesteps, 1) 

#Define lstm cells with tensorflow
# Forward direction cell
lstm_fw_cell = rnn.BasicLSTMCell(num_hidden, forget_bias=1.0)
# Backward direction cell
lstm_bw_cell = rnn.BasicLSTMCell(num_hidden, forget_bias=1.0)

# Get lstm cell output
outputs, output_state_fw, output_state_bw = rnn.static_bidirectional_rnn(lstm_fw_cell, lstm_bw_cell, input, dtype=tf.float32)
# Linear activation, using rnn inner loop last output
logits = tf.matmul(outputs[-1], weights['out']) + biases['out']
prediction = tf.nn.softmax(logits)

# Define loss and optimizer
loss_op = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=logits, labels=Y))
optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
train_op = optimizer.minimize(loss_op)

# Evaluate model (with test logits, for dropout to be disabled)
correct_pred = tf.equal(tf.argmax(prediction, 1), tf.argmax(Y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

# Initialize the variables (i.e. assign their default value)
init = tf.global_variables_initializer()

# Start training
with tf.Session() as sess:

    # Run the initializer
    sess.run(init)
    for step in range(1, training_steps+1):
        for iteration in range(mnist.train.num_examples // batch_size):
            batch_x, batch_y = mnist.train.next_batch(batch_size)
            # Reshape data to get 28 seq of 28 elements
            batch_x = batch_x.reshape((-1, timesteps, num_input))
            # Run optimization op (backprop)
            sess.run(train_op, feed_dict={X: batch_x, Y: batch_y})
        if step % display_step == 0 or step == 1:
            # Calculate batch loss and accuracy
            loss, acc = sess.run([loss_op, accuracy], feed_dict={X: batch_x,
                                                                 Y: batch_y})
            print("Step " + str(step) + ", Minibatch Loss= " + \
                  "{:.4f}".format(loss) + ", Training Accuracy= " + \
                  "{:.3f}".format(acc))

    print("Optimization Finished!")

    # Calculate accuracy for 128 mnist test images
    test_len = 128
    test_data = mnist.test.images[:test_len].reshape((-1, timesteps, num_input))
    test_label = mnist.test.labels[:test_len]
    print("Testing Accuracy:", \
        sess.run(accuracy, feed_dict={X: test_data, Y: test_label}))

## Multilayered Bidirectional LSTM using nn.bidirectional_dynamic_rnn

<img src="../images/Picture1.png" />

In [2]:
reset_graph()
old_v = tf.logging.get_verbosity()
tf.logging.set_verbosity(tf.logging.ERROR)

# Network Parameters
num_input = 28 # MNIST data input (img shape: 28*28)
timesteps = 28 # timesteps
rnn_size = 128 # hidden layer num of features
num_layers = 2

# tf Graph input
X = tf.placeholder(tf.float32, [None, timesteps, num_input], name='input_placeholder')
        
def bidirectional_lstm(X, num_layers, rnn_size):
    for layer in range(num_layers):
        with tf.variable_scope('encoder_{}'.format(layer),reuse=tf.AUTO_REUSE):
            cell_fw = tf.contrib.rnn.LSTMCell(rnn_size, initializer=tf.truncated_normal_initializer(-0.1, 0.1, seed=2))
            cell_bw = tf.contrib.rnn.LSTMCell(rnn_size, initializer=tf.truncated_normal_initializer(-0.1, 0.1, seed=2))
            
            (forward_output, backward_output), _ = tf.nn.bidirectional_dynamic_rnn(cell_fw,cell_bw, X, dtype=tf.float32)
            output = tf.concat(values = [forward_output, backward_output], axis=2)
    return output

output = bidirectional_lstm(X, num_layers, rnn_size)

init = tf.global_variables_initializer()
X_batch = np.random.rand(2, timesteps, num_input)

with  tf.Session() as sess:
    init.run()
    output_val = sess.run(output, feed_dict={X: X_batch})

print(output_val.shape)
#(2, 28, 256)
#After concatenation, the output will consist of the output of the last forward layer AND 
#of the output of the last backward layer

(2, 28, 256)
