### Reber Grammar Detection
#### Notebook Author: Nirupam Purushothama

**About:** Use RNN to verify if the given string belongs to the Reber Grammar or not

What is Reber Grammar? [Read this](http://www.willamette.edu/~gorr/classes/cs449/reber.html)


**Reference Book:** Hands-on Machine Learning with Scikit-Learn and TensorFlow - Aurelien Geron

This is an exercise problem from the book. All code is taken from [handson tutorial code](https://github.com/ageron/handson-ml/)

In [2]:
import tensorflow as tf

In [1]:
# To support both python 2 and python 3
from __future__ import division, print_function, unicode_literals

# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)

# To plot pretty figures
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
plt.rcParams['axes.labelsize'] = 14
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "rnn"

def save_fig(fig_id, tight_layout=True):
    path = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID, fig_id + ".png")
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format='png', dpi=300)

#### 1. Generating the following Reber Grammar
<img width="400" src="https://cnl.salk.edu/~schraudo/teach/NNcourse/figs/reber.gif"/>

In [3]:
# Generate the grammar string.

np.random.seed(42)

default_reber_grammar = [
    [("B", 1)],           # (state 0) =B=>(state 1)
    [("T", 2), ("P", 3)], # (state 1) =T=>(state 2) or =P=>(state 3)
    [("S", 2), ("X", 4)], # (state 2) =S=>(state 2) or =X=>(state 4)
    [("T", 3), ("V", 5)], # and so on...
    [("X", 3), ("S", 6)],
    [("P", 4), ("V", 6)],
    [("E", None)]]        # (state 6) =E=>(terminal state)

embedded_reber_grammar = [
    [("B", 1)],
    [("T", 2), ("P", 3)],
    [(default_reber_grammar, 4)],
    [(default_reber_grammar, 5)],
    [("T", 6)],
    [("P", 6)],
    [("E", None)]]

def generate_string(grammar):
    state = 0
    output = []
    while state is not None:
        index = np.random.randint(len(grammar[state]))
        production, state = grammar[state][index]
        if isinstance(production, list):
            production = generate_string(grammar=production)
        output.append(production)
    return "".join(output)

In [4]:
def generate_corrupted_string(grammar, chars="BEPSTVX"):
    good_string = generate_string(grammar)
    index = np.random.randint(len(good_string))
    good_char = good_string[index]
    bad_char = np.random.choice(sorted(set(chars) - set(good_char)))
    return good_string[:index] + bad_char + good_string[index + 1:]

In [5]:
def string_to_one_hot_vectors(string, n_steps, chars="BEPSTVX"):
    char_to_index = {char: index for index, char in enumerate(chars)}
    output = np.zeros((n_steps, len(chars)), dtype=np.int32)
    for index, char in enumerate(string):
        output[index, char_to_index[char]] = 1.
    return output

In [6]:
def generate_dataset(size):
    good_strings = [generate_string(embedded_reber_grammar)
                    for _ in range(size // 2)]
    bad_strings = [generate_corrupted_string(embedded_reber_grammar)
                   for _ in range(size - size // 2)]
    all_strings = good_strings + bad_strings
    
    # THIS IS THE KEY POINT. SEE THAT THE N_STEPS IS SET TO THE MAXIMUM OF THE STRING LENGTHS GENERATED
    # THIS IS IMPORTANT BECAUSE THAT IS THE NUMBER OF RNN TIME LEVELS THAT THE NETWORK WILL HAVE.
    # NOW THIS MEANS THAT FOR A NLP MODEL WE NEED TO SET THE LEVELS TO THE MAXIMUM STRING LENGTH.
    
    n_steps = max([len(string) for string in all_strings])
    X = np.array([string_to_one_hot_vectors(string, n_steps)
                  for string in all_strings])
    seq_length = np.array([len(string) for string in all_strings])
    y = np.array([[1] for _ in range(len(good_strings))] +
                 [[0] for _ in range(len(bad_strings))])
    rnd_idx = np.random.permutation(size)
    return X[rnd_idx], seq_length[rnd_idx], y[rnd_idx]

In [7]:
# Training set

X_train, l_train, y_train = generate_dataset(10000)

In [10]:
# Build the model

reset_graph()

possible_chars = "BEPSTVX"
n_inputs = len(possible_chars)
n_neurons = 30
n_outputs = 1

learning_rate = 0.02
momentum = 0.95

# None for Steps forces the system to determine the Step-size at run-time and it is obviously the longest 
# length strength size
X = tf.placeholder(tf.float32, [None, None, n_inputs], name="X")
seq_length = tf.placeholder(tf.int32, [None], name="seq_length")
y = tf.placeholder(tf.float32, [None, 1], name="y")

gru_cell = tf.nn.rnn_cell.GRUCell(num_units=n_neurons)

# PASS THE SEQUENCE LENGTH HERE AND IT TAKES CARE OF APROPRIATELY PADDING THE STRING OR WEIGHTING IT TO ENSURE 
# THAT THE LOSS FUNCTIONS ARE APPROPRIATELY CALCULATED
outputs, states = tf.nn.dynamic_rnn(gru_cell, X, dtype=tf.float32,
                                    sequence_length=seq_length)

logits = tf.layers.dense(states, n_outputs, name="logits")
y_pred = tf.cast(tf.greater(logits, 0.), tf.float32, name="y_pred")
y_proba = tf.nn.sigmoid(logits, name="y_proba")

xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits)
loss = tf.reduce_mean(xentropy, name="loss")
optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate,
                                       momentum=momentum,
                                       use_nesterov=True)
training_op = optimizer.minimize(loss)

correct = tf.equal(y_pred, y, name="correct")
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32), name="accuracy")

init = tf.global_variables_initializer()
saver = tf.train.Saver()

In [11]:
# Validation set

X_val, l_val, y_val = generate_dataset(5000)

In [12]:
# Training
n_epochs = 50
batch_size = 50

with tf.Session() as sess:
    init.run()
    for epoch in range(n_epochs):
        X_batches = np.array_split(X_train, len(X_train) // batch_size)
        l_batches = np.array_split(l_train, len(l_train) // batch_size)
        y_batches = np.array_split(y_train, len(y_train) // batch_size)
        for X_batch, l_batch, y_batch in zip(X_batches, l_batches, y_batches):
            loss_val, _ = sess.run(
                [loss, training_op],
                feed_dict={X: X_batch, seq_length: l_batch, y: y_batch})
        acc_train = accuracy.eval(feed_dict={X: X_batch, seq_length: l_batch, y: y_batch})
        acc_val = accuracy.eval(feed_dict={X: X_val, seq_length: l_val, y: y_val})
        print("{:4d}  Train loss: {:.4f}, accuracy: {:.2f}%  Validation accuracy: {:.2f}%".format(
            epoch, loss_val, 100 * acc_train, 100 * acc_val))
        saver.save(sess, "./my_reber_classifier")

   0  Train loss: 0.6644, accuracy: 60.00%  Validation accuracy: 48.80%
   1  Train loss: 0.6397, accuracy: 64.00%  Validation accuracy: 56.46%
   2  Train loss: 0.5779, accuracy: 74.00%  Validation accuracy: 71.82%
   3  Train loss: 0.4811, accuracy: 76.00%  Validation accuracy: 74.28%
   4  Train loss: 0.4489, accuracy: 80.00%  Validation accuracy: 77.50%
   5  Train loss: 0.4118, accuracy: 84.00%  Validation accuracy: 75.84%
   6  Train loss: 0.6058, accuracy: 80.00%  Validation accuracy: 67.90%
   7  Train loss: 0.3978, accuracy: 86.00%  Validation accuracy: 75.86%
   8  Train loss: 0.2109, accuracy: 94.00%  Validation accuracy: 86.04%
   9  Train loss: 0.0844, accuracy: 96.00%  Validation accuracy: 89.78%
  10  Train loss: 0.0678, accuracy: 98.00%  Validation accuracy: 94.90%
  11  Train loss: 0.1084, accuracy: 98.00%  Validation accuracy: 93.18%
  12  Train loss: 0.0655, accuracy: 100.00%  Validation accuracy: 94.40%
  13  Train loss: 0.0966, accuracy: 98.00%  Validation accuracy

In [13]:
# Testing

test_strings = [
    "BPBTSSSSSSSXXTTVPXVPXTTTTTVVETE",
    "BPBTSSSSSSSXXTTVPXVPXTTTTTVVEPE"]
l_test = np.array([len(s) for s in test_strings])
max_length = l_test.max()
X_test = [string_to_one_hot_vectors(s, n_steps=max_length)
          for s in test_strings]

with tf.Session() as sess:
    saver.restore(sess, "./my_reber_classifier")
    y_proba_val = y_proba.eval(feed_dict={X: X_test, seq_length: l_test})

print()
print("Estimated probability that these are Reber strings:")
for index, string in enumerate(test_strings):
    print("{}: {:.2f}%".format(string, 100 * y_proba_val[index][0]))

INFO:tensorflow:Restoring parameters from ./my_reber_classifier

Estimated probability that these are Reber strings:
BPBTSSSSSSSXXTTVPXVPXTTTTTVVETE: 0.72%
BPBTSSSSSSSXXTTVPXVPXTTTTTVVEPE: 99.99%
