In [1]:
import numpy as np
import tensorflow as tf
import keras

for g in tf.config.list_physical_devices("GPU"):
    tf.config.experimental.set_memory_growth(g, True)

print(tf.config.list_physical_devices())

np.random.seed(42)
tf.random.set_seed(42)

2025-09-17 13:37:38.972229: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-09-17 13:37:39.536611: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-09-17 13:37:41.883288: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'), PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


## Data Preparation

In [2]:
nodes = [
    {"current_node" : 0, "next_node" : [(1, "T"), (2, "P")]},
    {"current_node" : 1, "next_node" : [(1, "S"), (4, "X")]},
    {"current_node" : 2, "next_node" : [(2, "T"), (3, "V")]},
    {"current_node" : 3, "next_node" : [(4, "P"), (5, "V")]},
    {"current_node" : 4, "next_node" : [(2, "X"), (5, "S")]},
    {"current_node" : 5, "next_node" : [(6, "E"), (6, "E")]}
]

In [3]:
import pandas as pd

df = pd.DataFrame(nodes)
df # to visualize the graph of nodes

Unnamed: 0,current_node,next_node
0,0,"[(1, T), (2, P)]"
1,1,"[(1, S), (4, X)]"
2,2,"[(2, T), (3, V)]"
3,3,"[(4, P), (5, V)]"
4,4,"[(2, X), (5, S)]"
5,5,"[(6, E), (6, E)]"


In [53]:
from scipy.stats import norm

def pick_random_length(max_char_count, mu=7.5, sigma=2.0):
    lengths = np.arange(max_char_count)
    # mu = 7.5 center around 7-8
    # sigma = 2.0 adjust for spread; try 2.0 for a moderate peak

    probabilities = norm.pdf(lengths, loc=mu, scale=sigma)
    probabilities /= probabilities.sum()  # normalize

    reber_lenght = np.random.choice(lengths, p=probabilities)
    return reber_lenght

def pick_path(randomized_node=False):
    path_or_node = np.random.randint(0, 2) if randomized_node == False else np.random.randint(0, 6)
    return path_or_node


def generate_reber_string(nodes, is_reber=True, is_generator=False, **kwargs):
    """This method creates one instance of reber string, returns a string a tuple (reber, is_reber) or
    yields a tuple (reber, is_reber). Returns string only and only if is_reber isn't random
    and is_generator=False"""

    def create_reber_string(nodes, is_reber):
        node = 0
        reber = "B"
        max_char_count = kwargs.get("max_char_count", 16)
        
        # if is_reber true
        if is_reber:
            while node < 6:
                selected_path = pick_path()
                label = nodes[node]["next_node"][selected_path][1]
                node = nodes[node]["next_node"][selected_path][0] 
                if isinstance(label, list):
                    # this string is for inside the embedded reber
                    inner_reber = generate_reber_string(label)
                    reber += inner_reber
                else:
                    reber += label
        # else scope is quite overkill, I wont use
        else:
            try:            
                mistake_count = 0
                char_count = 0
                while (node < 6 or mistake_count == 0) and char_count <= max_char_count:
                    selected_path = pick_path()
                    label = nodes[node]["next_node"][selected_path][1] # either list or string
                    if isinstance(label, list):
                        # this string is for inside the embedded reber
                        inner_reber = generate_reber_string(label)
                        reber += inner_reber
                    else: # if string just add to reber
                        reber += label
                        if node != 6:
                            random_node = pick_path(True)
                            mistake_count += 1 if random_node != node else 0
                        else:
                            node = pick_path(True)
                        char_count += 1
                reber = reber[:pick_random_length(max_char_count)]
            except IndexError:
                print(f"IndexError : {node}, {reber}, {mistake_count}")
        
        return reber
    
    if not is_generator:
        if is_reber == "random":
            is_reber = bool(pick_path()) 
            return create_reber_string(nodes, is_reber), is_reber
        else:
            return create_reber_string(nodes, is_reber)
    else:
        dataset_size = kwargs.get("dataset_size", 10000)
        if is_reber == "random":
            is_reber = (bool(pick_path()) for i in range(dataset_size))
        return ((create_reber_string(nodes, is_reber), is_reber) for _ in range(dataset_size))

In [54]:
for i in range(5):
    print(generate_reber_string(nodes))

BTXSE
BPVVE
BPVPSE
BPVPXTTVVE
BPVVE


In [55]:
for i in range(10):
    is_reber = bool(pick_path())
    print(generate_reber_string(nodes, is_reber), is_reber)

BTXSE True
BPTTVPXTVPSE True
BTPP False
BPPPTTTTPP False
BTSSSXXVVE True
BPVPSE True
BTPPTP False
BTSXSE True
BPTVPSE True
BPTTVVE True


In [68]:
embedded_reber_nodes = [
    {"current_node" : 0, "next_node" : [(1, "T"), (2, "P")]},
    {"current_node" : 1, "next_node" : [(4, nodes), 
                                        (4, nodes)]},
    {"current_node" : 2, "next_node" : [(3, nodes), 
                                        (3, nodes)]},
    {"current_node" : 3, "next_node" : [(5, "P"), (5, "P")]},
    {"current_node" : 4, "next_node" : [(5, "T"), (5, "T")]},
    {"current_node" : 5, "next_node" : [(6, "E"), (6, "E")]}
]

def create_embedded_reber(embedded_nodes, can_corrupt=True, **kwargs):
    dataset_size = kwargs.get("dataset_size", 10000)
    POSSIBLE_CHARS = "BEPSTVX"

    for i in range(dataset_size):
        embedded_reber_str = ""
        if can_corrupt is True:
            corrupt = bool(pick_path()) # Decide this single instance will be corrupted or not
            embedded_reber_str += generate_reber_string(embedded_nodes)

            if corrupt is True:
                corrupter_char = np.random.choice(list(POSSIBLE_CHARS))
                idx = np.random.randint(0, len(embedded_reber_str))
                embedded_reber_str = embedded_reber_str[:idx] + corrupter_char + embedded_reber_str[idx+1:]
            yield embedded_reber_str, corrupt # which is label 
        else:
            embedded_reber_str += generate_reber_string(embedded_nodes)
            yield embedded_reber_str, True


In [None]:
for i in range(426):
    print(generate_reber_string(embedded_reber_nodes))

In [69]:
for s, l in create_embedded_reber(embedded_reber_nodes):
    print(s, l)

BTBPVVETE False
BTBTSXXTVPSETE False
BTBPTTVPXTVVETE False
BPBPTTTTVSSEPE True
BPBPVPSEPE False
BTBPTVVETE True
BTBBXSETE True
BTBTSSSXXTTTVPXETE True
BPBTSSXXVPSEPE False
BTBTXXTVVETE False
BTBTXXVVETE True
BPBPVPXTTTVVEPE False
BPBPVVEPE False
BPBTXXTTTTVVEPE False
VTBTXXTTTTVVETE True
XTBTSXSETE True
BPBTXXTVPSEPE False
BTBTSXXVVETE False
BPBPTTVPXVPXTTVVEPE False
BTBPTTBPSETE True
BPBTXXVVEBE True
BTBPTVPXTTVPSEXE True
BTBTSSSXXVVETE False
BTBTSSXXVPSETE True
BTBSVPSETE True
BTBPVPXVVETE False
BPBPVVPPE True
BTBPTTTTVPXTVVPXTTVPSETE True
BTBPTVVETE False
BPBPVBEPE True
BTBTSSSXXVPXTTVVETE False
BTBTXXTTTTVVETE False
BTBPTTVVETE False
BPBPTSTVPXVVEPE True
BPBTSXXTVVEPE False
BPBSTTTTTTVPXTTTVVEPE True
BPBPTVVBPE True
BPBPVPSEPT True
BPBTXSEPE True
XTBPTTTVVETE True
BTBTXXVVETE False
BPBPTTVXXVVEPE True
BPBTSSSXXTVBEPE True
BTBPTVPSETE True
BPBTXSEPE False
BTBTSSSXSETE False
BTBTXSETV True
BTBPTTVPVETE True
BPBPTTVPXTVPSEPE False
BTBPVPSESE True
BPBTSXXTVVEPE False
BPBPTVVEPE False
B

In [34]:
dataset = tf.data.Dataset.from_generator(
    create_embedded_reber,
    args=(embedded_reber_nodes),
    output_signature=(
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(), dtype=tf.bool)
    )
).shuffle(10000)

ValueError: Attempt to convert a value ({'current_node': 0, 'next_node': [(1, 'T'), (2, 'P')]}) with an unsupported type (<class 'dict'>) to a Tensor.

In [8]:
for x, y in dataset.take(3):
    print(x, y)

tf.Tensor(b'BPVPXTTTVPSE', shape=(), dtype=string) tf.Tensor(True, shape=(), dtype=bool)
tf.Tensor(b'BPTTVPSE', shape=(), dtype=string) tf.Tensor(True, shape=(), dtype=bool)
tf.Tensor(b'BPTTTTVPXVPXTTTVVE', shape=(), dtype=string) tf.Tensor(True, shape=(), dtype=bool)


2025-09-17 13:37:49.294784: I tensorflow/core/framework/local_rendezvous.cc:407] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


In [9]:
def preprocess(example):
    x, y = example
    x = tf.strings.unicode_split(x, 'UTF-8')

## Model Training

In [10]:
from keras import layers

# input shape : [batch_size, str, label]
inputs = layers.Input(shape=[])