In [None]:
from keras.src.callbacks import EarlyStopping
from yaml import unsafe_load

from database_io import *
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras import layers, Model, Input
from tensorflow.keras.utils import Sequence
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping


In [None]:
# path of databases (must exist)
db_path = "db/"

# filenames of databases (this must be sqlite3 databases)
train_fname = "surf17_train.db"
validation_fname = "surf17_validation.db"
test_fname = "surf17_test.db"

dim_syndr = 8
dim_fsyndr = 4
n_steps_net1 = 20
n_steps_net2 = 3

data = DatabaseIO(dim_syndr, dim_fsyndr, n_steps_net1, n_steps_net2)

In [11]:
try:
    data.close_databases()
except:
    pass
data.load_data(db_path + train_fname, db_path + validation_fname, db_path + test_fname)



batch_size = 64
n_batches_train = 1000
n_batches_validation = 100



class DecoderSequence(Sequence):
    def __init__(self, data, batch_size, n_batches, data_type):
        self.data = data
        self.batch_size = batch_size
        self.n_batches = n_batches
        self.data_type = data_type
        self.on_epoch_end()

    def __len__(self):
        return self.n_batches

    def __getitem__(self, idx):
        # return the idx-th batch captured at epoch start
        return self.epoch_batches[idx]

    def on_epoch_end(self):
        """Called automatically by Keras at the end of each epoch."""
        gen = self.data.gen_batches(
            self.batch_size,
            self.n_batches,
            data_type=self.data_type
        )

        self.epoch_batches = []
        for _ in range(self.n_batches):
            batch_x1, batch_x2, batch_fx, batch_l1, batch_l2, batch_y = next(gen)

            # Wrap into Keras multi-input format
            inputs = (batch_x1, batch_x2, batch_fx, batch_l1) #, batch_l1, batch_l2)
            outputs = batch_y
            self.epoch_batches.append((inputs, outputs))

train_seq = DecoderSequence(
    data,
    batch_size=batch_size,
    n_batches=n_batches_train,
    data_type='training'
)

val_seq = DecoderSequence(
    data,
    batch_size=batch_size,
    n_batches=n_batches_validation,
    data_type='validation'
)


loaded databases and checked exclusiveness training, validation, and test keys
N_training=400000, N_validaiton=10000, N_test=5000.


In [15]:
x1 = Input(shape=(None, dim_syndr), name="x1_full")
x2 = Input(shape=(n_steps_net2, dim_syndr), name="x2_recent")
fx = Input(shape=(dim_fsyndr,), name="final_increment")

l1_input = Input(shape=(), dtype=tf.int32, name="seq_len")

x1_masked = layers.Masking(mask_value=0.0)(x1)

dropout_rate = 0.2
layer_depth = 64

mask_layer = layers.Lambda(lambda x: tf.sequence_mask(x[1], maxlen=tf.shape(x[0])[1]), output_shape=(None,))([x1, l1_input])


# Network 1 (full syndrome history)
h1 = layers.LSTM(layer_depth, activation="tanh", return_sequences=True, kernel_regularizer=keras.regularizers.l2(1e-5))(x1_masked)
h1 = layers.Dropout(dropout_rate)(h1)
h1 = layers.LSTM(layer_depth, activation="tanh", kernel_regularizer=keras.regularizers.l2(1e-5))(h1)
h1 = layers.Dropout(dropout_rate)(h1)

p1 = layers.Dense(layer_depth, activation="relu", name="p1", kernel_regularizer=keras.regularizers.l2(1e-5))(h1)
p1 = layers.Dropout(dropout_rate)(p1)
p1 = layers.Dense(1, activation="sigmoid", kernel_regularizer=keras.regularizers.l2(1e-5), name="p1_prob")(p1)



# Network 2 (recent syndrome + final increment)
h2 = layers.LSTM(layer_depth, activation="tanh", kernel_regularizer=keras.regularizers.l2(1e-5), return_sequences=True)(x2)
h2 = layers.Dropout(dropout_rate)(h2)
h2 = layers.LSTM(layer_depth, activation="tanh", kernel_regularizer=keras.regularizers.l2(1e-5))(h2)
h2 = layers.Dropout(dropout_rate)(h2)

p2 = layers.Concatenate()([h2, fx])
p2 = layers.Dense(layer_depth, activation="relu", name="p2", kernel_regularizer=keras.regularizers.l2(1e-5))(p2)
p2 = layers.Dropout(dropout_rate)(p2)
p2 = layers.Dense(1, activation="sigmoid", name="p2_prob", kernel_regularizer=keras.regularizers.l2(1e-5))(p2)



# Final combination p = probabilistic sum
p_final = layers.Lambda(lambda x: x[0]*(1-x[1]) + x[1]*(1-x[0]))([p1, p2]) # XOR Gate

model = Model(inputs=[x1, x2, fx, l1_input], outputs=p_final)
model.summary()

model.compile(
    loss="binary_crossentropy",
    optimizer=tf.keras.optimizers.Adam(1e-3),
    metrics=["accuracy"]
)


num_epochs = 1000

checkpoint = ModelCheckpoint(
    'best_model.keras',        # file path to save the model
    monitor='val_accuracy',    # metric to monitor
    verbose=1,                 # prints message when saving
    save_best_only=True,       # only save if improved
    mode='max'                 # 'min' for loss, 'max' for accuracy
)

early_stop = EarlyStopping(
    monitor='val_accuracy',    # use validation accuracy to monitor
    patience=50,               # stop after n epochs with no improvement
    min_delta=1e-4,            # threshold for minimum change over epochs -> "no improvement"
    restore_best_weights=True, # keeps weights from the best epoch
    verbose=1
)


results = model.fit(
    train_seq,
    steps_per_epoch=n_batches_train,
    epochs=num_epochs,
    verbose=1,
    validation_data=val_seq,
    validation_steps=n_batches_validation,
    callbacks=[checkpoint]
)

Epoch 1/1000


ValueError: Layer "functional_5" expects 3 input(s), but it received 4 input tensors. Inputs received: [<tf.Tensor 'data:0' shape=(None, 20, 8) dtype=bool>, <tf.Tensor 'data_1:0' shape=(None, 3, 8) dtype=bool>, <tf.Tensor 'data_2:0' shape=(None, 4) dtype=bool>, <tf.Tensor 'data_3:0' shape=(None,) dtype=int32>]

# Model with L1 input

In [None]:
# x1 = Input(shape=(None, dim_syndr), name="x1_full")
# x2 = Input(shape=(n_steps_net2, dim_syndr), name="x2_recent")
# fx = Input(shape=(dim_fsyndr,), name="final_increment")
# #
# l1_input = Input(shape=(), dtype=tf.int32, name="seq_len")
#
#
#
# def make_mask(inputs):
#     x1, lengths = inputs
#     return tf.sequence_mask(lengths, maxlen=tf.shape(x1)[1])
#
# mask = layers.Lambda(make_mask, name="seq_mask")([x1, l1_input])
#
# dropout_rate = 0.2
# layer_depth = 64
#
# # mask_layer = layers.Lambda(lambda x: tf.sequence_mask(x[1], maxlen=tf.shape(x[0])[1]), output_shape=(None,))([x1, l1_input])
#
#
# # Network 1 (full syndrome history)
# h1 = layers.LSTM(layer_depth, activation="tanh", return_sequences=True, kernel_regularizer=keras.regularizers.l2(1e-5))(x1, mask=mask)
# h1 = layers.Dropout(dropout_rate)(h1)
# h1 = layers.LSTM(layer_depth, activation="tanh", kernel_regularizer=keras.regularizers.l2(1e-5))(h1)
# h1 = layers.Dropout(dropout_rate)(h1)
#
# p1 = layers.Dense(layer_depth, activation="relu", name="p1", kernel_regularizer=keras.regularizers.l2(1e-5))(h1)
# p1 = layers.Dropout(dropout_rate)(p1)
# p1 = layers.Dense(1, activation="sigmoid", kernel_regularizer=keras.regularizers.l2(1e-5), name="p1_prob")(p1)
#
#
#
# # Network 2 (recent syndrome + final increment)
# h2 = layers.LSTM(layer_depth, activation="tanh", kernel_regularizer=keras.regularizers.l2(1e-5), return_sequences=True)(x2)
# h2 = layers.Dropout(dropout_rate)(h2)
# h2 = layers.LSTM(layer_depth, activation="tanh", kernel_regularizer=keras.regularizers.l2(1e-5))(h2)
# h2 = layers.Dropout(dropout_rate)(h2)
#
# p2 = layers.Concatenate()([h2, fx])
# p2 = layers.Dense(layer_depth, activation="relu", name="p2", kernel_regularizer=keras.regularizers.l2(1e-5))(p2)
# p2 = layers.Dropout(dropout_rate)(p2)
# p2 = layers.Dense(1, activation="sigmoid", name="p2_prob", kernel_regularizer=keras.regularizers.l2(1e-5))(p2)
#
#
#
# # Final combination p = probabilistic sum
# p_final = p1 * (1 - p2) + p2 * (1 - p1)
#  # XOR Gate
#
# model = Model(inputs=[x1, x2, fx, l1_input], outputs=p_final)
# model.summary()
#
# model.compile(
#     loss="binary_crossentropy",
#     optimizer=tf.keras.optimizers.Adam(1e-3),
#     metrics=["accuracy"]
# )
#
#
# num_epochs = 1000
#
# checkpoint = ModelCheckpoint(
#     'best_model.keras',        # file path to save the model
#     monitor='val_accuracy',    # metric to monitor
#     verbose=1,                 # prints message when saving
#     save_best_only=True,       # only save if improved
#     mode='max'                 # 'min' for loss, 'max' for accuracy
# )
#
# early_stop = EarlyStopping(
#     monitor='val_accuracy',    # use validation accuracy to monitor
#     patience=50,               # stop after n epochs with no improvement
#     min_delta=1e-4,            # threshold for minimum change over epochs -> "no improvement"
#     restore_best_weights=True, # keeps weights from the best epoch
#     verbose=1
# )
#
#
# results = model.fit(
#     train_seq,
#     steps_per_epoch=n_batches_train,
#     epochs=num_epochs,
#     verbose=1,
#     validation_data=val_seq,
#     validation_steps=n_batches_validation,
#     callbacks=[checkpoint]
# )

In [6]:
print("Model inputs:", model.inputs)
print("Model outputs:", model.outputs)
model.summary()

Model inputs: [<KerasTensor shape=(None, None, 8), dtype=float32, sparse=False, ragged=False, name=x1_full>, <KerasTensor shape=(None, 3, 8), dtype=float32, sparse=False, ragged=False, name=x2_recent>, <KerasTensor shape=(None, 4), dtype=float32, sparse=False, ragged=False, name=final_increment>, <KerasTensor shape=(None,), dtype=int32, sparse=False, ragged=False, name=seq_len>]
Model outputs: [<KerasTensor shape=(None, 1), dtype=float32, sparse=False, ragged=False, name=keras_tensor_23>]


In [7]:
model = keras.models.load_model("best_model.keras", safe_mode=False)

print("Model inputs:", model.inputs)
print("Model outputs:", model.outputs)
model.summary()



TypeError: <class 'keras.src.models.functional.Functional'> could not be deserialized properly. Please ensure that components that are Python object instances (layers, models, etc.) returned by `get_config()` are explicitly deserialized in the model's `from_config()` method.

config={'module': 'keras.src.models.functional', 'class_name': 'Functional', 'config': {}, 'registered_name': 'Functional', 'build_config': {'input_shape': None}, 'compile_config': {'optimizer': {'module': 'keras.optimizers', 'class_name': 'Adam', 'config': {'name': 'adam', 'learning_rate': 0.0010000000474974513, 'weight_decay': None, 'clipnorm': None, 'global_clipnorm': None, 'clipvalue': None, 'use_ema': False, 'ema_momentum': 0.99, 'ema_overwrite_frequency': None, 'loss_scale_factor': None, 'gradient_accumulation_steps': None, 'beta_1': 0.9, 'beta_2': 0.999, 'epsilon': 1e-07, 'amsgrad': False}, 'registered_name': None}, 'loss': 'binary_crossentropy', 'loss_weights': None, 'metrics': ['accuracy'], 'weighted_metrics': None, 'run_eagerly': False, 'steps_per_execution': 1, 'jit_compile': False}}.

Exception encountered: <class 'keras.src.layers.core.lambda_layer.Lambda'> could not be deserialized properly. Please ensure that components that are Python object instances (layers, models, etc.) returned by `get_config()` are explicitly deserialized in the model's `from_config()` method.

config={'module': 'keras.layers', 'class_name': 'Lambda', 'config': {'name': 'seq_mask', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None}, 'function': {'module': 'builtins', 'class_name': 'function', 'config': 'make_mask', 'registered_name': 'function'}, 'arguments': {}}, 'registered_name': None, 'build_config': {'input_shape': [[None, None, 8], [None]]}, 'name': 'seq_mask', 'inbound_nodes': [{'args': [[{'class_name': '__keras_tensor__', 'config': {'shape': [None, None, 8], 'dtype': 'float32', 'keras_history': ['x1_full', 0, 0]}}, {'class_name': '__keras_tensor__', 'config': {'shape': [None], 'dtype': 'int32', 'keras_history': ['seq_len', 0, 0]}}]], 'kwargs': {}}]}.

Exception encountered: Could not locate function 'make_mask'. Make sure custom classes and functions are decorated with `@keras.saving.register_keras_serializable()`. If they are already decorated, make sure they are all imported so that the decorator is run before trying to load them. Full object config: {'module': 'builtins', 'class_name': 'function', 'config': 'make_mask', 'registered_name': 'function'}

In [None]:
from scipy import optimize as optim

def decay(x, p_logical, x0):
  """ This functions is used to make a exponential fit to the fidelity
      curves """
  return (1 + (1 - 2 * p_logical)**(x - x0)) / 2.

def calc_stats(data, n_sampling=5000, x0_max=10, verbose=False):
    """ calculates the logical error rate and error bars """

    # since it is possible that the batch does not contain fidelities
    # for all steps, hence we need a list with all steps for which
    # predictions exist (we call it 'steps')
    steps, data_nonzero = [], []
    fids, rs_means_l, plogs_bs = [], [], []

    # in the following we assume that the first step is s = 1
    for s in range(1, len(data) + 1):
        dat = data[s - 1]
        if len(dat) != 0:
            # non-trivial data points
            steps.append(s)
            data_nonzero.append(dat)
            # fidelities
            fids.append(np.mean(dat))

    # fit decay curve to the non-trivial data
    popt, pcov = optim.curve_fit(
        decay, steps, fids, bounds=((0.0001, 0.0001), (.1, x0_max)))
    plog, x0 = popt[0], popt[1]
    if x0 > 0.99 * x0_max:
        print("WARNING, x0 is larger than", x0_max,
            "the fitting algorithm fails")
    if plog > .09:
        print("WARNING, plog is larger than 9%, the fitting algorithm fails")

    

    res_dict = {'steps': steps, 'fids': fids, 'plog': plog, 'x0': x0}
    if verbose:
        print("logical error rate:", round(plog * 100, 5), "%")
        print("x0 offset", round(x0, 3))

    return res_dict


num_samples = 5000
for batch in data.gen_batches(num_samples, 1, data_type='training'):
    errors = 0
    x1, x2, fx, l1, _, y_actual = batch
    y_pred_bool = np.empty_like(y_actual)
    y_prob = model.predict((x1,x2,fx,l1))
    for idx in range(y_actual.size):
        y_pred = y_prob[idx] > 0.5
        y_pred_bool[idx] = y_pred
        # print(f"Predicted Probability: {y_prob[idx]}  Prediction: {y_pred}  Actual: {y_actual[idx]}")
        if (y_pred != y_actual[idx]) :
            errors += 1

    comp_list = np.equal(y_pred_bool, y_actual).astype('float')
    # reshape into a list of lists
    comparison = []
    for n in range(max(l1)):
        comparison.append([])
    for n in range(len(comp_list)):
        idx = l1[n] - 1
        comparison[idx].append(comp_list[n])
    stats_dict = calc_stats(comparison, n_sampling=5000, x0_max=10, verbose=True)

print(f"Logical Fidelity: {1 - errors / num_samples}")