Embedded Reber grammars were used by Hochreiter and Schmidhuber in their paper about LSTMs. They are artificial grammars that produce strings such as “BPBTSXXVPSEPE”. Check out Jenny Orr’s nice introduction to this topic, then choose a particular embedded Reber grammar (such as the one represented on Orr’s page), then train an RNN to identify whether a string respects that grammar or not. You will first need to write a function capable of generating a training batch containing about 50% strings that respect the grammar, and 50% that don’t.

https://www.willamette.edu/~gorr/classes/cs449/reber.html

In [136]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import numpy as np
import tensorrt
import tensorflow as tf

In [129]:
x = set('abc')
x.add('d')
x

{'a', 'b', 'c', 'd'}

In [221]:
vocabulary = set(list('BTSXPVE'))

class ReberNode:
    def __init__(self) -> None:
        self.links = []

    def add(self, letter: str, node = None):
        if node is None:
            node = ReberNode()
        self.links.append(dict(letter=letter, node=node))
        return node
    
    def get_invalid_letter(self):
        invalid_letters = list(vocabulary - {x['letter'] for x in self.links})
        return np.random.choice(invalid_letters)
    
    # We assume for simplicity that a given letter can only be sent to one node
    def get_valid_node(self, letter):
        for node in self.links:
            if node['letter'] == letter:
                return node['node']
        return None

    def generate(self, error_chance=0, result=None):
        if result is None:
            result = []

        if len(self.links) == 0:
            return ''.join(result)
        
        next = self.links[np.random.randint(len(self.links))]
        if np.random.random() < error_chance:
            result.append(self.get_invalid_letter())
        else:
            result.append(next['letter'])
        return next['node'].generate(error_chance, result)
    
    def is_valid(self, text):
        if text == '' and len(self.links) == 0:
            return True
        first_letter, *rest = list(text)
        next_node = self.get_valid_node(first_letter)
        if next_node is None:
            return False
        return next_node.is_valid(''.join(rest))
    
    def is_partial_valid(self, text, result=None):
        if result is None:
            result = []
        if text == '' and len(self.links) == 0:
            return result
        first_letter, *rest = list(text)
        next_node = self.get_valid_node(first_letter)
        if next_node is None:
            return result + [0] * len(text)
        result.append(1)
        return next_node.is_partial_valid(''.join(rest), result)

def create_inner():
    n0 = ReberNode()
    n1 = n0.add('B')
    n2 = n1.add('T')
    n2.add('S', n2)
    n3 = n2.add('X')
    n4 = n3.add('S')

    n5 = n1.add('P')
    n5.add('T', n5)
    n6 = n5.add('V')
    n6.add('V', n4)

    n3.add('X', n5)
    n6.add('P', n3)

    n_end = n4.add('E')
    return (n0, n_end)

def create_outer():
    i1_start, i1_end = create_inner()
    i2_start, i2_end = create_inner()

    n0 = ReberNode()
    n1 = n0.add('B')

    n1.add('T', i1_start)
    n2 = i1_end.add('T')

    n1.add('P', i2_start)
    i2_end.add('P', n2)

    n_end = n2.add('E')
    return n0

def gen(grammar, error_rate=0):
    string = grammar.generate(error_rate)
    is_valid = grammar.is_valid(string)
    return string, is_valid

grammar = create_outer()

grammar.is_partial_valid('BTIT')


[1, 1, 0, 0]

In [None]:
valid_set = set()
invalid_set = set()
target = 20_000

for _ in range(target * 100):
    text, valid = gen(grammar)
    if not valid:
        print('PROBLEM!')
        break
    if len(valid_set) < target:
        valid_set.add(text)
    else:
        break

for _ in range(target * 100):
    text, valid = gen(grammar, 0.1)
    if not valid:
        if len(invalid_set) < target:
            invalid_set.add(text)
        else:
            break

print(len(valid_set))
print(len(invalid_set))

In [224]:
# validation: 2000
# test: 2000
# training: 12000
valid_list = list(valid_set)
invalid_list = list(invalid_set)

training_ok = valid_list[:12000]
valid_ok = valid_list[12000:14000]
test_ok = valid_list[14000:]

training_bad = invalid_list[:12000]
valid_bad = invalid_list[12000:14000]
test_bad = invalid_list[14000:]

In [227]:
print(np.max(np.array([len(x) for x in valid_list])))
print(np.max(np.array([len(x) for x in invalid_list])))

60
43


In [170]:
def create_dataset(valid, invalid, shuffle=False, batch_size=32):
    valid_ds = tf.data.Dataset.from_tensor_slices(tf.constant(valid, dtype=tf.string)).map(lambda x: (x, 1.0))
    invalid_ds = tf.data.Dataset.from_tensor_slices(tf.constant(invalid, dtype=tf.string)).map(lambda x: (x, 0.0))
    ds = valid_ds.concatenate(invalid_ds)
    if shuffle:
        ds = ds.shuffle(100_000)
    return ds.cache().batch(batch_size).prefetch(1)

training_ds = create_dataset(training_ok, training_bad, shuffle=True)
valid_ds = create_dataset(valid_ok, valid_bad)
test_ds = create_dataset(test_ok, test_bad)

In [178]:
for x, y in training_ds.take(1):
    print(x[:10])
    print(y[:10])

tf.Tensor(
[b'BPBTSXXTTTTTTTTVPXVPSEPE' b'BTBPVPPTVVBTE'
 b'BTBPTTTTTVPXVPXTTTVPXVVETE' b'BTBTSSXXTTVPXTVPXVPXTTVPSETE'
 b'BEBTXXTTVVETE' b'BTBTSSXXXPXTVVETE' b'BPBPTTTTTTTTVPXTTTTTTVPSEPE'
 b'BPBTSSSSSSSXXVPXTTTTTVPSEPE' b'BEBPTVTSEPE' b'BPSTSBXVVEPE'], shape=(10,), dtype=string)
tf.Tensor([1. 0. 1. 1. 0. 0. 1. 1. 0. 0.], shape=(10,), dtype=float32)


In [213]:
vocab = [x.lower() for x in vocabulary]
text_vec_layer = tf.keras.layers.TextVectorization(vocabulary=vocab, split='character', max_tokens=60, pad_to_max_tokens=True)
text_vec_layer.get_vocabulary()
for x in training_ds.map(lambda x, y: x).take(1):
    print(text_vec_layer(x))

tf.Tensor(
[[5 2 5 ... 0 0 0]
 [5 7 5 ... 0 0 0]
 [5 2 5 ... 0 0 0]
 ...
 [5 7 5 ... 0 0 0]
 [5 7 5 ... 0 0 0]
 [5 7 5 ... 0 0 0]], shape=(32, 40), dtype=int64)


In [214]:
vocab_size = text_vec_layer.vocabulary_size()
model = tf.keras.Sequential([
    text_vec_layer,
    tf.keras.layers.Embedding(vocab_size, vocab_size, mask_zero=True),
    tf.keras.layers.GRU(128),
    tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(
    loss=tf.keras.losses.binary_crossentropy,
    optimizer=tf.keras.optimizers.Nadam(),
    metrics=[tf.keras.metrics.binary_accuracy]
)
hist = model.fit(training_ds, epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [215]:
vocab_size = text_vec_layer.vocabulary_size()
model = tf.keras.Sequential([
    text_vec_layer,
    tf.keras.layers.Embedding(vocab_size, vocab_size, mask_zero=True),
    tf.keras.layers.LSTM(512),
    tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(
    loss=tf.keras.losses.binary_crossentropy,
    optimizer=tf.keras.optimizers.Nadam(),
    metrics=[tf.keras.metrics.binary_accuracy]
)
hist = model.fit(training_ds, epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


# With partial

In [233]:
# validation: 2000
# test: 2000
# training: 12000
full_list = list([(x, grammar.is_partial_valid(x)) for x in valid_set | invalid_set])
np.random.shuffle(full_list)
print(len(full_list))

training_arr = full_list[:28_000]
validation_arr = full_list[28_000:32_000]
test_arr = full_list[32_000:]

36811


In [255]:
strings, is_partial_valid = zip(*test_arr[100:200])
strings = tf.constant(strings)
labels = tf.ragged.constant(is_partial_valid)
print(np.max([len(x) for x in is_partial_valid]))
labels.to_tensor()

39


<tf.Tensor: shape=(100, 39), dtype=int32, numpy=
array([[1, 1, 1, ..., 0, 0, 0],
       [1, 1, 1, ..., 0, 0, 0],
       [1, 0, 0, ..., 0, 0, 0],
       ...,
       [1, 1, 1, ..., 0, 0, 0],
       [1, 1, 1, ..., 0, 0, 0],
       [1, 1, 1, ..., 0, 0, 0]], dtype=int32)>

In [319]:
def create_dataset_2(array, shuffle=False, batch_size=32):
    strings, is_partial_valid = zip(*array)
    inputs = tf.constant(strings, dtype=tf.string)
    labels = tf.ragged.constant(is_partial_valid, dtype=tf.float32).to_tensor()
    ds = tf.data.Dataset.from_tensor_slices((inputs, labels))
    if shuffle:
        ds = ds.shuffle(100_000)
    return ds.cache().batch(batch_size).map(lambda x, y: (x, y[:, :tf.reduce_max(tf.strings.length(x)), tf.newaxis])).prefetch(1)

training_ds = create_dataset_2(training_arr, shuffle=True)
valid_ds = create_dataset_2(validation_arr)
test_ds = create_dataset_2(test_arr)

In [320]:
for x, y in test_ds.take(2):
    print(x)
    print(y)

tf.Tensor(
[b'BTBPVPXTVPXTTTVPXVPXTVVETE' b'BPBTSSSSSSSSXXVPXVPXTVVEPE'
 b'BTBTSXXTTTVPXTTTTTVVETE' b'BTBPTTTVPXTTTTTTTVPXVVETE'
 b'BPBTSSSSXEVVEPE' b'BXBPVPVEPE' b'BTBTSSSXXTTTTVPXVPXVPXVVETE'
 b'BTBTXXVPSEEE' b'BTBTXXTTTVPXVPXVVETE' b'BTXPTTVPXTTTVPSETS'
 b'BPBPTSVEPS' b'BPETVSXPE' b'BPBPVPXVPXTVPXTVPXTVPXTVPXTVVEPE'
 b'BPBPVPXTTVPXTTVPXVPXTVVEPE' b'BPBPVPXVBSPS'
 b'BPBPTTVPXTTTVPXTTTVPXVVEPE' b'BPBPPPXVPSEPB' b'BSBPTVPSEPE'
 b'BVBPTVPETVPSETE' b'BTBPTTTTTTTTVPXVPSETE'
 b'BPBPTTVPXVPXTTTTVPXTVPXTTVVEPE' b'BTBPVPXTTPVPSETE'
 b'BTBPTTTVPXVPXTTVPXTTVPXVVETE' b'BPBTSSSXXVPXVPXTTVPXTTVPXTVPSEPE'
 b'BTBTSXXTTTTTTVPXVPXTTVVETE' b'BPBPTBTTTVPSEPE'
 b'BTBTXXTVPXTVPXTVPXVPXVPSETE' b'BTBPTTTTTTTVPXTVPXTTVPSETE' b'BVBPVVEPE'
 b'BPBTXXVPXVPXVPXVPXTTVVEPE' b'TTVTSBXXSVVETE' b'BPBPPTTVVEPB'], shape=(32,), dtype=string)
tf.Tensor(
[[[1.]
  [1.]
  [1.]
  ...
  [0.]
  [0.]
  [0.]]

 [[1.]
  [1.]
  [1.]
  ...
  [0.]
  [0.]
  [0.]]

 [[1.]
  [1.]
  [1.]
  ...
  [0.]
  [0.]
  [0.]]

 ...

 [[1.]
  [1.]
 

In [345]:
vocab_size = text_vec_layer.vocabulary_size()
model = tf.keras.Sequential([
    text_vec_layer,
    tf.keras.layers.Embedding(vocab_size, vocab_size, mask_zero=True),
    # output ~ (32, sequence length, 128)
    tf.keras.layers.GRU(128, return_sequences=True),
    # output ~ (32, sequence length, 1)
    tf.keras.layers.Dense(1, activation='sigmoid'),
])

model.compile(
    loss=tf.keras.losses.binary_crossentropy,
    optimizer=tf.keras.optimizers.Nadam(),
    metrics=[tf.keras.metrics.binary_accuracy]
)
hist = model.fit(training_ds, epochs=20, validation_data=valid_ds)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [346]:
model.evaluate(test_ds)

  1/151 [..............................] - ETA: 5s - loss: 7.7568e-04 - binary_accuracy: 1.0000



[0.0013762167654931545, 0.9997318983078003]

In [352]:
test_strings = tf.constant(["BPXXTTVPXVPXTTTTTVVETE", "BPBTSSSSSSSXXTTVPXVPXTTTTTVVEPE"])
# test_strings

y_proba = model.predict(test_strings)
y_proba[:, -1, 0]
# print()
# print("Estimated probability that these are Reber strings:")
# for index, string in enumerate(test_strings):
#     print("{}: {:.2f}%".format(string, 100 * y_proba[index][0]))



array([0.43238285, 0.9999465 ], dtype=float32)

In [335]:
tf.ragged.constant([[1, 2, 3, 4, 5], [1,2,3]], ragged_rank=1)

<tf.RaggedTensor [[1, 2, 3, 4, 5], [1, 2, 3]]>