In [1]:
import numpy as np
from random import shuffle, seed

The following string generator is known as a Embedded Reber Grammar:
<img src="images/embreber.gif" style="width:700px;height:500px;">

In [2]:
def generate_embedded_reber_string(random_seed = 0):
    np.random.seed(random_seed)
    edge_char = np.random.choice(['T', 'P'])
    reber_string = 'B' + edge_char + 'B'
    states_dict = {'0': ['1', '2'], '1': ['1', '3'], '2': ['2', '4'], '3': ['2', '5'], '4': ['3', '5'], '5': ['6']}
    output_dict = {'0->1': 'T', '0->2': 'P', '1->1': 'S', '1->3': 'X',  '2->2': 'T', 
                   '2->4': 'V', '3->2': 'X', '3->5': 'S', '4->3': 'P', '4->5': 'V', '5->6': 'E'}
    current_state = '0'
    ch = ''
    while(ch != 'E'):
        previous_state = current_state
        current_state = np.random.choice(states_dict[current_state])
        transition = previous_state + "->" + current_state
        ch = output_dict[transition]
        reber_string = reber_string + ch
    reber_string = reber_string + edge_char + 'E'
    return reber_string

def generate_incorrect_reber_string(random_seed = 0):
    alphabet = set(['B', 'T', 'P', 'S', 'X', 'V', 'E'])
    reber_string = generate_embedded_reber_string(random_seed)
    make_error_flg = 1
    string_len = len(reber_string)
    correct_elements_indices = set(np.arange(string_len))
    while(make_error_flg == 1 and len(correct_elements_indices) >= 1):
        err_index = np.random.choice(list(correct_elements_indices))
        correct_elements_indices = correct_elements_indices - set([err_index])
        incorrect_letters = list(alphabet - set([reber_string[err_index]]))
        reber_string = reber_string[:err_index] + np.random.choice(incorrect_letters) + reber_string[err_index+1:]
        make_error_flg = np.random.choice([0, 1])
    return reber_string

In [3]:
print("correct string:   ", generate_embedded_reber_string())
print("incorrect string: ", generate_incorrect_reber_string())

correct string:    BTBPVPSETE
incorrect string:  BTBTVPPETE


In [4]:
def gen_n_strings(n, start_seed, func):
    X = []
    for i in np.arange(n):
        X.append(func(i+start_seed))
        if(i % 25000 == 0):
            print(i,'/', n)
    print(n, '/', n)
    return X

def seq_lengths_from_X(X):
    seq_lengths = [len(x) for x in X]
    return seq_lengths

def pad(X):
    zeros_seq = [[0,0,0,0,0,0,0]]
    max_len = max([len(x) for x in X])
    for i in range(len(X)):
        n_pads =  max_len - len(X[i]) 
        X[i].extend(zeros_seq*n_pads)
    return X

In [5]:
n_correct_train = 15000
n_incorrect_train = 15000
n_correct_test = 15000
n_incorrect_test = 15000
n_correct = n_correct_train + n_correct_test
n_incorrect = n_incorrect_train + n_incorrect_test
n_train = n_correct_train + n_incorrect_train
y_all = np.hstack((np.ones(n_correct), np.zeros(n_incorrect)))
X_all = []

print('generating', n_correct_train, 'correct strings...')
seed_idx = 0
X_all.extend(gen_n_strings(n_correct, seed_idx, generate_embedded_reber_string))
print('generating', n_incorrect_train, 'incorrect strings...')
seed_idx = n_correct
X_all.extend(gen_n_strings(n_incorrect, seed_idx, generate_incorrect_reber_string))
seed(0)            #shuffle data
shuffle(X_all)
seed(0)
shuffle(y_all)
X_train, y_train = X_all[:n_train], y_all[:n_train]
X_test, y_test = X_all[n_train:], y_all[n_train:]
print('\nX_train length:', len(X_train), '\ny_train shape:', y_train.shape)
print('X_test length:', len(X_test), '\ny_test shape:', y_test.shape)

generating 15000 correct strings...
0 / 30000
25000 / 30000
30000 / 30000
generating 15000 incorrect strings...
0 / 30000
25000 / 30000
30000 / 30000

X_train length: 30000 
y_train shape: (30000,)
X_test length: 30000 
y_test shape: (30000,)


In [6]:
for i in range(10):
    print(X_all[i], y_all[i])

BTBTSXSETE 1.0
BPXPTVVEPE 0.0
BPBTSSXSEPE 1.0
BTBTSSSPXSETE 0.0
BPBTXSEPE 1.0
PTBPTTVVETE 0.0
BPBPVPXVVEPE 1.0
BTBPTVPETE 0.0
BPEPVPXTVXXXE 0.0
BPBTSXXTTTVPXVVEPE 1.0


In [7]:
seq_length_train = seq_lengths_from_X(X_train)
seq_length_test = seq_lengths_from_X(X_test)

In [8]:
one_hot_dict = {'B': [1,0,0,0,0,0,0], 'T': [0,1,0,0,0,0,0], 'P': [0,0,1,0,0,0,0],
                'S': [0,0,0,1,0,0,0], 'X': [0,0,0,0,1,0,0], 'V': [0,0,0,0,0,1,0],
                'E': [0,0,0,0,0,0,1]}
#X_one_hot_all = np.zeros((len(X_all), max_len, len(one_hot_dict))) 
for i in range(len(X_train)):
    X_train[i] = [one_hot_dict[ch] for ch in list(X_train[i])]
for i in range(len(X_test)):
    X_test[i] = [one_hot_dict[ch] for ch in list(X_test[i])]
    
#X_train = np.array(pad(X_train))
#X_test = np.array(pad(X_test))
y_train = np.reshape(y_train, (y_train.shape[0], 1))
y_test = np.reshape(y_test, (y_test.shape[0], 1))
#print('X_train shape:', X_train.shape, 'X_test shape:', X_test.shape)
print('y_train shape:', y_train.shape, 'y_test shape:', y_test.shape)

y_train shape: (30000, 1) y_test shape: (30000, 1)


In [9]:
import tensorflow as tf

# to make this notebook's output stable across runs
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)
reset_graph()

In [10]:
n_inputs = len("BTPSXVE")
n_neurons = 64
n_outputs = 1

n_epochs = 35
lr = 1e-3
batch_size = 128
n_batches = int(np.ceil(len(X_train) / batch_size))

g = tf.Graph()
with g.as_default(): 
    with tf.name_scope("LSTM"):
        X = tf.placeholder(tf.float32, [None, None, n_inputs])
        y = tf.placeholder(tf.float32, [None, 1])
        seq_length = tf.placeholder(tf.int32, [None])
        learning_rate = tf.placeholder(tf.float32)
        lstm_cell = tf.nn.rnn_cell.LSTMCell(num_units=n_neurons)
        outputs, states = tf.nn.dynamic_rnn(lstm_cell, X, dtype=tf.float32, sequence_length= seq_length, swap_memory= True)
        logits = tf.layers.dense(states[0], n_outputs)
        xentropy= tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits)
        loss = tf.reduce_mean(xentropy)
        optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)
        y_pred = tf.cast(tf.greater(logits, 0.), tf.float32, name="y_pred")
        y_proba = tf.nn.sigmoid(logits, name="y_proba")
        equality = tf.equal(y_pred, y)
        accuracy = tf.reduce_mean(tf.cast(equality, tf.float32))
        training_op = optimizer.minimize(loss)
        init = tf.global_variables_initializer()
        saver = tf.train.Saver()
        
with tf.Session(graph = g) as sess:
    sess.run(init)
    for epoch in range(n_epochs):
        for batch_index in range(n_batches):
            batch_start_idx = batch_index * batch_size
            batch_end_idx = batch_start_idx + batch_size
            X_batch, y_batch, seq_length_batch = X_train[batch_start_idx: batch_end_idx],\
                                                 y_train[batch_start_idx: batch_end_idx],\
                                                 seq_length_train[batch_start_idx: batch_end_idx]
            X_batch = np.array(pad(X_batch))
            loss_val, acc_train, _=  sess.run([loss, accuracy, training_op],\
                                        feed_dict={X: X_batch, y: y_batch, seq_length: seq_length_batch, learning_rate: lr})

        print("Epoch:", epoch+1, "\tLoss:", loss_val)
        acc_val = sess.run(accuracy, feed_dict = {X: np.array(pad(X_test)), y: y_test, seq_length: seq_length_test} )
        print("Accuracy on test data:", acc_train, "Accuracy on validation data:", acc_val)
    saver.save(sess, "./my_reber_classifier")

Epoch: 1 	Loss: 0.560031
Accuracy on test data: 0.770833 Accuracy on validation data: 0.795933
Epoch: 2 	Loss: 0.430693
Accuracy on test data: 0.833333 Accuracy on validation data: 0.8529
Epoch: 3 	Loss: 0.347088
Accuracy on test data: 0.875 Accuracy on validation data: 0.8983
Epoch: 4 	Loss: 0.305636
Accuracy on test data: 0.895833 Accuracy on validation data: 0.918833


KeyboardInterrupt: 

In [36]:
random_number = np.random.randint(100000, high = 99999999)
y_sample = [np.random.randint(2)]
print('y_sample =', y_sample)
if y_sample == [1]:
    init_string_sample = generate_embedded_reber_string(random_number)
else: 
    init_string_sample = generate_incorrect_reber_string(random_number)
X_sample = np.array([one_hot_dict[ch] for ch in list(init_string_sample)])
X_sample = np.reshape(X_sample, (1, -1, n_inputs))
seq_length_sample = seq_lengths_from_X(X_sample)
with tf.Session(graph = g) as sess:
    saver.restore(sess, "./my_reber_classifier");
    y_proba_sample = y_proba.eval(feed_dict={X: X_sample, seq_length: np.array(seq_length_sample)})
print("Estimated probability that", init_string_sample, "is an Embedded Reber strings: {:.2f}%".format(y_proba_sample[0][0]*100))

y_sample = [0]
INFO:tensorflow:Restoring parameters from ./my_reber_classifier
Estimated probability that BTBPTTVVEPE is an Embedded Reber strings: 0.56%
