# Recurrent Neural Network Example: Predict Embedded Reber Grammar

A reber grammar is a definition how to create strings. Allowed values for the next character depend on the previous character.
That makes them a good learning problem for models that need memory of past events.

<img src="data/Artificial_grammar_learning_example.jpg">



## Generate data

In [1]:
import random
import numpy as np
np.random.seed(17)
random.seed(17)

In [2]:
vocabulary="TPXVS"

prev_to_next_mapping={
        "T":("P","T"),
        "TP":("P","T"),
        "PP":("P","T"),
        "PT":("S","X"),
        "TX":("X","V"),
        "TT":("X","S"),
        "XX":("X","V"),
        "V":("X","V"),
        "VX":("X","V"),
        "VV":("P","S"),
        "VP":("S","S"),
        "XV":("P","S")
        }

def create_reber_string():
    reber_string='V' if np.random.random()<0.5 else 'T'
    while reber_string[-1] !="S":
        sampled_option=random.random()<0.5
        #print(prev_to_next_mapping[reber_string[-2:]],sampled_option)
        reber_string+=prev_to_next_mapping[reber_string[-2:]][sampled_option]
    return reber_string

def embedded_reber_string(max_size=30):
    if max_size<5:
        raise Exception("Embedded reber strings are at least of size 5")
    start='V' if np.random.random()<0.5 else 'T'
    middle=create_reber_string()
    while len(middle)>max_size-2:
        middle=create_reber_string()
    end=start
    return start+middle+end

def not_reber_string(max_size=30,vocab:set=set(vocabulary)):
    correct=embedded_reber_string(max_size)
    swap_index=random.randint(0,len(correct)-1)
    replacement_char= random.sample(vocab.difference(set(correct[swap_index])),1)[0]
    return correct[:swap_index]+replacement_char+correct[swap_index+1:]

In [3]:
vocab_lookup=dict(zip(vocabulary, range(len(vocabulary))))
reber_vocabulary_size=len(vocab_lookup)

def one_hot_encode(string,vocab_lookup,max_size):
    encoded=[np.zeros(len(vocab_lookup)) for _ in range(max_size)]
    for i,c in enumerate(string):
        encoded[i][vocab_lookup[c]]=1
    return encoded
    
def generate_batch(max_size=30,batch_size=32):
    labels=np.random.randint(2, size=[batch_size])
    strings=[embedded_reber_string(max_size) if x == 1 else not_reber_string(max_size) for x in labels]
    #strings_padded=np.array([word+"-"*(max_size-len(word)) for word in strings])
    one_hot_string=[one_hot_encode(s,vocab_lookup,max_size) for s in strings]
    return labels, strings, one_hot_string

generate_batch(batch_size=2,max_size=5)


(array([1, 1]),
 ['TTTST', 'TVVST'],
 [[array([1., 0., 0., 0., 0.]),
   array([1., 0., 0., 0., 0.]),
   array([1., 0., 0., 0., 0.]),
   array([0., 0., 0., 0., 1.]),
   array([1., 0., 0., 0., 0.])],
  [array([1., 0., 0., 0., 0.]),
   array([0., 0., 0., 1., 0.]),
   array([0., 0., 0., 1., 0.]),
   array([0., 0., 0., 0., 1.]),
   array([1., 0., 0., 0., 0.])]])

In [4]:
#sanity check
labels, words,_= generate_batch(batch_size=10000,max_size=50)
nonr=np.array(words)[labels==0]
reb=np.array(words)[labels==1]
print(len(nonr),len(reb))
len(set(nonr)),len(set(reb)),len(set(nonr).intersection(set(reb)))
# equal creation, no overlap (good)
# when creating wrong string, we get more uniq strings than the real reber strings 
# 1500 that are valid reber strings

4993 5007


(1589, 195, 0)

**Learning:**
- never skip sanity checks in the input data (and in general, unit test:)
-> I did not orginally have the sanity check and the non reber strings I created contained valid reber strings


## Build model

In [5]:
import tensorflow as tf

# to make this notebook's output stable across runs
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)


In [6]:
class ReberStringPredictor():
    
    def __init__(self, vocab_size,num_neurons=100,learning_rate=1e-4,num_layers=1,init="he",
                dropout_rate_rnn=0.2,dropout_rate_fc=0.):

        # TODO validate params
        self.vocab_size=vocab_size
        self.num_neurons=num_neurons
        self.learning_rate=learning_rate
        self.num_layers=num_layers
        self.init=init
        self.dropout_rate_fc=dropout_rate_fc
        self.dropout_rate_rnn=dropout_rate_rnn

        
    def construct_model(self):
        reset_graph()

        self.X=tf.placeholder(tf.float32,[None,None,self.vocab_size],name="inputs")
        self.word_lenghts=tf.reduce_sum(self.X,axis=[1,2],name="word_lenghts")
        self.target=tf.placeholder(tf.float32,[None],name="targets")
        self.is_training=tf.placeholder_with_default(True,shape=(),name="is_training")
        
        init=tf.contrib.layers.variance_scaling_initializer() if self.init=="he" else tf.contrib.layers.xavier_initializer()
        with tf.variable_scope("rnn", initializer=init):
            cells=[tf.contrib.rnn.GRUCell(num_units=self.num_neurons,
                                                activation=tf.nn.relu) for _ in range(self.num_layers)]
        
            layers=[tf.contrib.rnn.DropoutWrapper(cell, input_keep_prob=tf.cond(self.is_training,
                                                lambda: 1-self.dropout_rate_rnn,
                                                lambda: 1.0)) for cell in cells]
            rnn=tf.contrib.rnn.MultiRNNCell(layers)
            self.outputs, self.final_state=tf.nn.dynamic_rnn(rnn,self.X,
                                                       sequence_length=self.word_lenghts,
                                                       dtype=tf.float32) # 'final_state' is a tensor of shape [batch_size, cell_state_size]

        num_outputs=1 # is_rebber_string
        logits=tf.layers.dense(self.final_state[-1],num_outputs)
        logits=tf.reshape(logits,[-1])
        logit_dropped=tf.layers.dropout(logits,self.dropout_rate_fc,training=self.is_training)
        
        
        self.loss=tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=self.target,logits=logits))

        self.training_op= tf.train.AdamOptimizer(self.learning_rate).minimize(self.loss)

        # evaluation-only tensors (not needed in training)
        self.probability=tf.sigmoid(logits,name="y_proba")
        self.y_pred=tf.cast(tf.greater(self.probability,0.5),tf.float32)
        self.accuracy=1-tf.reduce_mean(tf.abs(self.y_pred-self.target))
    
    def get_training_operation(self):
        return self.training_op
    


In [7]:
# visualize single example string
max_string_size=10
targets, words, one_hot_words=generate_batch(max_size=max_string_size,batch_size=1)
m=ReberStringPredictor(vocab_size=len(vocab_lookup),num_neurons=3,num_layers=1)
m.construct_model()
print("original: ",words)
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    print("inputs",sess.run(m.X, feed_dict={m.X:one_hot_words}))
    print("input lenghts (without padding)",sess.run(m.word_lenghts, feed_dict={m.X:one_hot_words})) 
    print("outputs",sess.run(m.outputs, feed_dict={m.X:one_hot_words}))
    print("last neural state",sess.run(m.final_state, feed_dict={m.X:one_hot_words,m.is_training:False}))
    print("Output",sess.run(m.probability, feed_dict={m.X:one_hot_words}))

# when using different string lenghts the outputs of the padded parts are 0


The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.

Instructions for updating:
This class is equivalent as tf.keras.layers.GRUCell, and will be replaced by that in Tensorflow 2.0.
Instructions for updating:
This class is equivalent as tf.keras.layers.StackedRNNCells, and will be replaced by that in Tensorflow 2.0.
Instructions for updating:
Please use `keras.layers.RNN(cell)`, which is equivalent to this API
Instructions for updating:
Please use `layer.add_weight` method instead.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
Instructions for updating:
Call initializer instance with the dtype argument instead 

## Train model

In [15]:
max_string_size=50

num_training_examples=30000
batch_sizes=[64]
dropouts=[0.]
max_string_size=30
for batch_size in batch_sizes:
    print("batch_size",batch_size)
    for dropout in dropouts:
        print("dropout",dropout)
        m=ReberStringPredictor(reber_vocabulary_size,num_neurons=30,
                   num_layers=1,learning_rate=1e-2,init="he",dropout_rate_fc=dropout,dropout_rate_rnn=dropout)
        m.construct_model()
        saver=tf.train.Saver()
        with tf.Session() as sess:
            
            sess.run(tf.global_variables_initializer())
            training_op=m.get_training_operation()
            for num_batches in range(num_training_examples//batch_size):
                targets, words, encoded_words=generate_batch(max_size=max_string_size,batch_size=batch_size)
                sess.run([training_op],feed_dict={m.X:encoded_words,m.target:targets})
                if num_batches%50==0:
                    targets, words, encoded_words=generate_batch(max_size=max_string_size,batch_size=100)
                    loss,acc,pr,pred=sess.run([m.loss,m.accuracy,m.probability,m.y_pred],feed_dict={m.X:encoded_words,m.target:targets,
                                                                                       m.is_training:False})
                    print("Accuracy: ", acc)
            saver.save(sess,"./reber_string_predictor")

batch_size 64
dropout 0.0
Accuracy:  0.53999996
Accuracy:  0.56
Accuracy:  0.8
Accuracy:  0.95
Accuracy:  0.95
Accuracy:  1.0
Accuracy:  1.0
Accuracy:  0.99
Accuracy:  1.0
Accuracy:  1.0


### Notes:
- get the network to overfit -> I dont get over a acc of 90, why? -> bug in input data (see sanity check)
- after fix was implemented we easily get 100% accuracy, even on newly created test/validation data

## Final demo on difficult example



In [16]:
num_examples_during_learning=30000
valid_reber_strings=[embedded_reber_string() for _ in range(num_examples_during_learning)]

gt20=[s for s in valid_reber_strings if len(s)>20]
gt27=[s for s in valid_reber_strings if len(s)>27]
len(gt20),len(gt27)

(3, 0)

In [27]:
# idea taken taken from https://github.com/ageron/handson-ml/blob/master/14_recurrent_neural_networks.ipynb

test_strings = [
    "",
    "TTPPTXXXXXXXXXXXXXVPST",#22 long
    "TTPPTXXXXXXXXXXXXXVPSS",
    "TTPPPPPPPPPPPPPPPTXXXXXXXXXVST",#30 long -> reason: happens too rarely, most likely not in training set
    "TTPPPPPPPPPPPPPPPTXXXXXXXXXVSP"]
X_test = [one_hot_encode(s, vocab_lookup,max_size=38)
          for s in test_strings]

with tf.Session() as sess:
    saver.restore(sess, "./reber_string_predictor")
    y_proba = tf.get_default_graph().get_tensor_by_name("y_proba:0")
    inputs = tf.get_default_graph().get_tensor_by_name("inputs:0")
    y_proba_val = y_proba.eval(feed_dict={inputs: X_test},session=sess)

print(y_proba_val)
print("Estimated probability that these are Reber strings:")
for index, string in enumerate(test_strings):
    print("{}: {:.3f}%".format(string, 100 * y_proba_val[index]))

INFO:tensorflow:Restoring parameters from ./reber_string_predictor
[0.44921276 0.9995407  0.00312981 0.99518853 0.00112865]
Estimated probability that these are Reber strings:
: 44.921%
TTPPTXXXXXXXXXXXXXVPST: 99.954%
TTPPTXXXXXXXXXXXXXVPSS: 0.313%
TTPPPPPPPPPPPPPPPTXXXXXXXXXVST: 99.519%
TTPPPPPPPPPPPPPPPTXXXXXXXXXVSP: 0.113%


**Learning**

As usual, looking at examples is useful. 
- Start from trivial to complex.
- Define beforehand your expectations (similar to tests)
- define expected behavior before modeling. Example: What should the model do when empty strings are passed (not included in training set)? What is the max string size allowed? (currently: exception)