In [5]:
%load_ext tensorboard
import tensorflow as tf
import tensorflow_datasets as tfds
from absl import app
from absl import flags
import numpy as np
import datetime
import sys
sys.path.insert(0, "/home/viktor/PycharmProjects/guided_research/transformer-to-snn-conversion/spiking_tf-master")
from spiking_tf.src import spiking_models, plots, file_handling

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [6]:
timesteps = 1
max_rate = 2

n_in = 28*28
n_hidden = 800
n_out = 10

batch_size = 128
epochs = 1

thr = 0.1
tau = 10.0

output_path = file_handling.get_default_path_str()

### SPIKING NEURAL NETWORK

In [16]:
def spikalize_img(image, label):
    '''Transform image to spikes. Spike with poisson distributed rate proportional to pixel brightness.'''
    flattened = tf.reshape(image, [28*28])
    rand = tf.random.uniform(shape=[timesteps, 28*28])
    return tf.cast(flattened/255*max_rate > rand, tf.float32), label

tf.random.set_seed(1234)

(ds_train, ds_test) = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True
)

ds_train = ds_train.map(
    spikalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(batch_size)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.experimental.AUTOTUNE)

ds_test = ds_test.map(
    spikalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.experimental.AUTOTUNE)

inputs = tf.keras.layers.Input(shape=(timesteps, 28*28))
mid_z = tf.keras.layers.Layer(spiking_models.LifNeuronCell(n_in, n_hidden, tau=tau, threshold=thr))(inputs)
out_z = tf.keras.layers.Layer(spiking_models.IntegratorNeuronCell(n_hidden, n_out))(mid_z)

model = tf.keras.models.Model(inputs=inputs, outputs=[out_z])

model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
    optimizer="adam",
    metrics=["sparse_categorical_accuracy"],
    run_eagerly=False
)

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="logs", histogram_freq=1)

history = model.fit(
    ds_train, epochs=epochs, validation_data=ds_test, callbacks=[tensorboard_callback]
)











In [None]:
%tensorboard --logdir logs/fit

In [17]:
model.evaluate(ds_test)



[20.899494171142578, 0.0]

In [22]:
model.layers[2].get_weights()

[]

### WEIGHT CONVERSION

In [23]:
def flatten(image, label):
    '''Transform image to the flattened version of itself'''
    flattened = tf.reshape(image, [28*28])
    flattened = tf.expand_dims(flattened, 0)
    return tf.cast(flattened, tf.float32), label

tf.random.set_seed(1234) 

(ds_train, ds_test) = tfds.load('mnist', split=['train', 'test'], shuffle_files=True, as_supervised=True)
ds_train = ds_train.map(flatten, num_parallel_calls=tf.data.experimental.AUTOTUNE).cache().shuffle(batch_size).batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)
ds_test = ds_test.map(flatten, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(batch_size).cache().prefetch(tf.data.experimental.AUTOTUNE)

model = tf.keras.Sequential([
  tf.keras.layers.Dense(800, activation='relu', input_shape=(784,), use_bias=False),
  tf.keras.layers.Dense(10, use_bias=False),
])

# IMPORTANT > Model can be defined in both ways, you need to pass each layer in a different way

# input_ = tf.keras.Input(shape=(784,))
# middle = tf.keras.layers.Dense(800, activation='relu', use_bias=False)(input_)
# out = tf.keras.layers.Dense(10, use_bias=False)(middle)
# model = tf.keras.Model(input_, out)


model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    optimizer="adam",
    metrics=["sparse_categorical_accuracy"],
    run_eagerly=False
)

log_dir = "logs/fit/analog_feedforward" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

history = model.fit(
    ds_train, epochs=epochs, validation_data=ds_test, callbacks=[tensorboard_callback]
)



















In [24]:
def get_activations_layer(layer_in, layer_out, x, batch_size=None):
    if batch_size is None:
        batch_size = 10

    if len(x) % batch_size != 0:
        x = x[: -(len(x) % batch_size)]

    return Model(layer_in, layer_out).predict(x, batch_size)

In [25]:
def weight_conversion_model(weights, bias):

    # Get weights from trained network
    converted_weights = weights
    converted_bias = bias

    # model based normalization
    previous_factor = 1
    for l in range(len(converted_weights)):
        max_pos_input = 0
        # Find maximum input for this layer
        for o in range(converted_weights[l].shape[0]):
            input_sum = 0
            for i in range(converted_weights[l].shape[1]):
                input_sum += tf.math.maximum(0, converted_weights[l][o, i])
            if converted_bias is not None and converted_bias[l] is not None:
                input_sum += tf.math.maximum(0, converted_bias[l][o])
            max_pos_input = tf.math.maximum(max_pos_input, input_sum)

        # get the maximum weight in the layer, in case all weights are negative, max_pos_input would be zero, so we use the max weight to rescale instead
        max_wt = tf.math.reduce_max(converted_weights[l])
        if converted_bias is not None and converted_bias[l] is not None:
            max_bias = tf.math.reduce_max(converted_bias[l])
            max_wt = tf.math.maximum(max_wt, max_bias)
        scale_factor = tf.math.maximum(max_wt, max_pos_input)
        # Rescale all weights
        applied_factor = scale_factor/previous_factor
        converted_weights[l] = converted_weights[l] / applied_factor
        if converted_bias is not None and converted_bias[l] is not None:
            converted_bias[l] = converted_bias[l] / scale_factor
        previous_factor = scale_factor
        print(f"Scale factor for this layer is {previous_factor}")
            
    return converted_weights, converted_bias

In [26]:
def weight_conversion_robust_and_data_based(weights, bias, model, data, normalization_method='robust', ppercentile=0.99):

    if normalization_method == 'data':
        ppercentile = 1.0

    # Get weights from trained network
    converted_weights = weights
    converted_bias = bias
    
    # use training set to find max_act for each neuron
            
    activations = []
    for l in range(0, len(converted_weights)):
        activation = get_activations_layer(model.input, model.layers[l].output, data)
        activation_per_neuron = [np.max(activation[:, i]) for i in range(activation.shape[1])]
        activations.append(activation_per_neuron)
        
    previous_factor = 1
    for l in range(len(converted_weights)):
        # get the p-percentile of the activation
        pos_inputs = activations[l]
        pos_inputs.sort()
        max_act = pos_inputs[int(ppercentile * (len(pos_inputs) - 1))]
        # get the maximum weight in the layer
        max_wt = tf.math.reduce_max(converted_weights[l])
        if converted_bias is not None and converted_bias[l] is not None:
            max_bias = tf.math.reduce_max(converted_bias[l])
            max_wt = tf.math.maximum(max_wt, max_bias)
        scale_factor = tf.math.maximum(max_wt, max_act)

        applied_factor = scale_factor / previous_factor
        # rescale weights
        converted_weights[l] = converted_weights[l] / applied_factor

        # rescale bias
        if converted_bias is not None and converted_bias[l] is not None:
            converted_bias[l] = converted_bias[l] / scale_factor
        previous_factor = scale_factor
        print(f"Scale factor for this layer is {previous_factor}")

    return converted_weights, converted_bias

### Testing weight conversion methods

In [30]:
weights = np.load("analog_feedforward.npy", allow_pickle=True)

In [31]:
weights_ = np.array([weights[0], weights[1]])
# bias = np.array([weights[1], weights[3]])


In [27]:
from tensorflow.keras.models import Model

(ds_train, ds_test) = tfds.load('mnist', split=['train', 'test'], shuffle_files=True, as_supervised=True)
ds_train = ds_train.map(flatten, num_parallel_calls=tf.data.experimental.AUTOTUNE)

result = get_activations_layer(model.input, model.layers[1].output, ds_train)

In [32]:
converted_weights_data_or_robust = weight_conversion_robust_and_data_based(weights_, None, model, ds_train)

InternalError: CUB reduce error out of memory [Op:Max]

In [140]:
converted_weights_model = weight_conversion_model(weights_, None)

Scale factor for this layer is 0.9999994039535522
Scale factor for this layer is 3.2180092334747314


### Applying weights to the architecture

In [None]:
converted_weights = weight_conversion(weights_)

expanded_converted_wt_0 = tf.expand_dims(converted_weights[0], 0)
expanded_converted_wt_1 = tf.expand_dims(converted_weights[1], 0)

model.layers[1].set_weights(expanded_converted_wt_0)
model.layers[2].set_weights(expanded_converted_wt_1)

In [9]:
model.evaluate(ds_test)



[2.3025832176208496, 0.09799999743700027]

In [39]:
expanded_wt_0 = tf.expand_dims(weights[0], 0)
expanded_wt_1 = tf.expand_dims(weights[2], 0)

model.layers[1].set_weights(expanded_wt_0)
model.layers[2].set_weights(expanded_wt_1)

In [22]:
model.layers[1].get_weights()[1]

IndexError: list index out of range

In [40]:
model.evaluate(ds_test)



[2.266212224960327, 0.3068000078201294]

In [18]:
random_wt_0 = tf.random.normal(weights[0].shape)
random_wt_0 = tf.expand_dims(random_wt_0, 0)

random_wt_1 = tf.random.normal(weights[2].shape)
random_wt_1 = tf.expand_dims(random_wt_1, 0)

model.layers[1].set_weights(random_wt_0)
model.layers[2].set_weights(random_wt_1)

In [19]:
model.evaluate(ds_test)



[13.528854370117188, 0.10949999839067459]