<a href="https://colab.research.google.com/github/isapome/GainModulation/blob/main/Gain_modulation_2denselayers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [183]:
try:
  %tensorflow_version 2.x
except Exception:
  pass
import tensorflow as tf

import numpy as np
import time

In [184]:
!nvidia-smi -L

GPU 0: Tesla P100-PCIE-16GB (UUID: GPU-87bd67be-9af1-ce83-ac58-9a9caf3bfebe)


In [185]:
# Abstract layer class, do not initialize
class Layer:
    def __init__(self, shape_in, shape_out, s, w_std=0.02, activation=tf.identity, dropout_rate=0., flatten=False):
        # Arguments:
          # shape_in, shape_out := lists containing dimensions of layer input and output
          # s, w_std := weight initialization seed (integer) and standard deviation (float)
        if isinstance(shape_in, list):
          self.shape_in = shape_in
        else:
          self.shape_in = list(shape_in)
        self.shape_out = shape_out

        # Weights are sampled from a normal distribution, bias and attentional gain are initialized to 0
        self.ff_weights = tf.Variable(tf.random.normal(self.get_weights(), stddev=w_std, seed=s))
        self.bias = tf.Variable(tf.zeros(shape_out, dtype=tf.float32))
        self.attention = tf.Variable(tf.zeros(shape_out, dtype=tf.float64))

        if flatten is True:
          self.flatten = flatten
        else:
          self.flatten = np.shape(shape_in) > np.shape(shape_out)

        self.activation = activation
        self.dropout_rate, self.mask = dropout_rate, 0. #is self.mask the seed for a dropout of a forward pass? Is it the same for each layer? 
        #Does it mean that all the dropout layers will have the same mask in one go?
        self.layer_in, self.layer_out, self.cum_input = None, None, None

    # General forward pass operation to compute layer output
    def forward_pass(self, inputs, training_phase):
        shape_in = inputs.shape[1:]

        if shape_in != self.shape_in:
            raise Exception('Wrong input size: {}, required input: {}'.format(shape_in, self.shape_in))
            
        if self.flatten:
            inputs = tf.reshape(inputs, [-1, tf.reduce_prod(shape_in)])

        # Implementation in subclass (Dense, Conv, ...), returns layer-specific input
        self.layer_in, layer_out = self.get_ff_output(inputs)
        layer_out = tf.add(layer_out, self.bias)
        
        # Cumulative input, corresponds to y_in in report
        self.cum_input = tf.cast(layer_out, tf.float64)
        
        # Perform operation using 64 bits to prevent rounding errors (why? makes everything slower and is it really necessary to have 15 instead of 6 precision? 
        #Then why don't you use the same in APPROX? line 37 here vs line 33 in APPROX) 
        # Rounding errors not necessary in APP
        att_gain = tf.cast(tf.multiply(self.cum_input, self.attention), tf.float32)
        layer_out = self.activation(tf.add(layer_out, att_gain))

        if self.dropout_rate > 0 and training_phase:
          layer_out = tf.nn.dropout(layer_out, self.dropout_rate, seed=self.mask)

        self.layer_out = layer_out
        return layer_out

    # General backward pass to update attention and compute feedback
    def backward_pass(self, prev_feedback):

        # Activation function derivative over feedback
        gated_feedback = self.feedback_gating(prev_feedback)

        # Implementation in subclass, returns layer-specific feedback
        curr_feedback = self.get_fb_output(gated_feedback)

        # Compute attention update, cap attentional gain at 50%

        ####is this purely for bio reasons? Different caps not tested yet. In bio 10% gain on average, 50 is a bit too large

        delta_att = tf.reduce_sum(tf.multiply(tf.cast(gated_feedback, tf.float64), self.cum_input), axis=0) #how you change the betas
        self.attention.assign(tf.clip_by_value(tf.add(self.attention, delta_att), -0.5, 0.5))

        if self.flatten:
            curr_feedback = tf.reshape(curr_feedback, [-1] + self.shape_in)

        return curr_feedback

    def weight_update(self, rpe):
        attended_rpe = tf.multiply(tf.cast(rpe, tf.float64), self.attention)

        # Implementation in subclass, returns weight and bias update according to HEB
        delta_bias, delta_weights = self.get_weight_update(attended_rpe)

        # Reset attention and mask
        self.attention.assign(tf.zeros(self.shape_out, dtype=tf.float64))
        self.mask += 1. #???????

        self.bias.assign_add(delta_bias)
        self.ff_weights.assign_add(delta_weights)

    # Computes activation function derivative over feedback
    def feedback_gating(self, feedback):
        if self.activation == tf.nn.relu:
            zeros = tf.zeros_like(self.layer_out)
            return tf.where(tf.greater(self.layer_out, zeros), feedback, zeros)
        elif self.activation == tf.nn.softmax or self.activation == tf.nn.sigmoid:  #why? in the output layer do we still update only one set of weights?
            return tf.multiply(tf.multiply(self.layer_out, 1 - self.layer_out), feedback)
        elif self.activation == tf.math.tanh:
            return tf.multiply(tf.multiply(1 + self.layer_out, 1 - self.layer_out), feedback)
        else:
            return feedback

In [186]:
# Fully connected layer, see Layer descriptions
class Dense(Layer):
    def get_weights(self):
        return [tf.reduce_prod(self.shape_in), self.shape_out[0]]

    def get_ff_output(self, inputs):
        return inputs, tf.matmul(inputs, self.ff_weights)

    def get_fb_output(self, feedback):
        delta_weights = self.get_weight_update(self.attention)[1]
        new_weights = tf.add(delta_weights, self.ff_weights)
        return tf.matmul(feedback, tf.transpose(new_weights)) #can just return new_weights and do the matmul outside:)
    
    def get_weight_update(self, attention):
        norm_fb = tf.math.divide_no_nan(attention, self.cum_input)
        norm_fb = tf.cast(norm_fb, tf.float32)
        return tf.reduce_sum(norm_fb, axis=0), tf.matmul(tf.transpose(self.layer_in), norm_fb)

# Convolutional layer
class Conv(Layer):
    def __init__(self, shape_in, filters, s, padding, stride, w_std=0.02, activation=tf.nn.relu, dropout_rate=0.):
        # Arguments:
          # shape_in, filters := lists containing dimensions of layer input and filter width, height and count
          # s, w_std := weight initialization seed (integer) and standard deviation (float)
          # padding := string 'VALID' or 'SAME'
          # stride := list containing the horizonal and vertical step size

        # Number of weights equals: (width filters * length filters * depth of previous layer) over the number of filters
        self.get_weights = lambda : [int(filters[0] * filters[1] * shape_in[2]), filters[2]]

        # Returns a 4-dimensional Tensor containing patches from the input layer
        self.get_patches = lambda x: tf.image.extract_patches(x, [1, filters[0], filters[1], 1],
                                                              [1, stride[0], stride[1], 1], [1, 1, 1, 1], padding)
        self.get_patches_inv = None

        # Initialize remaining parameters
        n_patches = self.get_n_patches(filters[:2], padding, stride, shape_in[:2])
        super().__init__(shape_in, n_patches + filters[-1:], s, w_std, activation, dropout_rate)

    def get_n_patches(self, filters, padding, stride, shape_in):
        if padding == 'SAME':  # the size of output is the same as the input when stride=1
            return [int(tf.math.ceil(x / y)) for (x, y) in zip(shape_in, stride)]
        elif padding == 'VALID':  # no padding
            return [(x - y) // z + 1 for (x, y, z) in zip(shape_in, filters, stride)]
        else:
            raise Exception("Padding unknown")

    def get_ff_output(self, inputs):

        # If different sized baches, set to -> if True:
        if self.get_patches_inv is None: # compute inverse of get_patches
          with tf.GradientTape(persistent=True) as t:
              t.watch(inputs)
              patches = self.get_patches(inputs)

          # Number of patches per input node
          num_patches = t.gradient(patches, inputs)[0]
          self.get_patches_inv = lambda fb: t.gradient(patches, inputs, output_gradients=fb) / num_patches
        else:
          patches = self.get_patches(inputs)
        return patches, tf.einsum('abcd, de -> abce', patches, self.ff_weights)

    def get_fb_output(self, feedback):
        delta_weights = self.get_weight_update(self.attention)[1]
        new_weights = tf.add(delta_weights, self.ff_weights)
        curr_feedback = tf.einsum('abcd, de -> abce', feedback, tf.transpose(new_weights))
        curr_feedback = self.get_patches_inv(curr_feedback)
        return tf.where(tf.math.is_nan(curr_feedback), tf.zeros_like(curr_feedback), curr_feedback)

    def get_weight_update(self, attention):
        norm_fb = tf.math.divide_no_nan(attention, self.cum_input)
        norm_fb = tf.cast(norm_fb, tf.float32)
        return tf.reduce_sum(norm_fb, axis=0), tf.einsum('abcd,abce->de', self.layer_in, norm_fb)


# Layer initialization functions
def conv(filters, padding, stride, w_std=0.02, activation=tf.nn.relu, dropout_rate=0.):
    return lambda x, y: Conv(x, filters, y, padding, stride, w_std, activation, dropout_rate)

def dense(size_layer, w_std=0.02, activation=tf.nn.relu, dropout_rate=0., flatten=False):
    return lambda x, y: Dense(x, [size_layer], y, w_std, activation, dropout_rate, flatten=flatten)

In [187]:
# Layer management class
class Network:
    def __init__(self, shape_in, architecture, s):
        # Arguments:
          # shape_in := list containing the dimensions of the input
          # architecture := list of layer initialization functions
          # s := weight initialization seed (integer)
          
        #Initialize all layers
        ff_network = []
        for layer in architecture:
            new_layer = layer(shape_in, tf.random.set_seed(s))
            ff_network.append(new_layer)
            shape_in = ff_network[-1].shape_out
        self.ff_network = ff_network
        self.s = s

        for i in ff_network:
            print('Layer {}: {} -> {}, activation: {}, flatten: {}, dropout: {}%'.format(i.__class__.__name__, i.shape_in, i.shape_out, i.activation, i.flatten, i.dropout_rate))

    # Define or change learning parameters
    def compile(self, lr_weights=1e-0, lr_att=1e-2, exploration_rate=2e-2):
        self.lr_w = lr_weights
        self.lr_a = lr_att
        self.exploration_rate = exploration_rate

    # Fit network output to prediction using cross entropy loss over all output units
    def fit(self, inputs, y_pred, batch_size):
        output_network = self.forward_pass(inputs)
        delta = tf.subtract(y_pred, output_network) 
        self.backward_pass(delta/ batch_size * self.lr_a)

    # Attention-modulated Hebbian learning algorithm
    @tf.function
    def fit_attention(self, inputs, y_true):
        # 1. Action-selection
        batch_size = inputs.shape[0]
        output_network = self.forward_pass(inputs)
        y_pred = self.get_prediction(output_network, batch_size)
        rpe, rewards, loss = self.get_loss(output_network, y_true, y_pred, batch_size)

        # 2. Attentional phase
        for epoch in range(attention_span):
          self.fit(inputs, y_pred, batch_size)    

        # 3. Weight update
        for layer in self.ff_network[::-1]:
          layer.weight_update(rpe)

        return rewards, tf.math.reduce_mean(loss)
    
    def forward_pass(self, layer_out, trainingphase=True):
        for layer in self.ff_network:
            layer_out = layer.forward_pass(layer_out, trainingphase)
        return tf.nn.softmax(layer_out, axis=-1)
    
    def backward_pass(self, feedback):
        for layer in self.ff_network[::-1]:
            feedback = layer.backward_pass(feedback)

    def get_loss(self, output_network, y_true, y_pred, batch_size):
        # Compute training accuracy
        rewards = tf.math.count_nonzero(tf.multiply(y_true, y_pred), axis=1, dtype=tf.float32)

        # Compute prediction error according to cross entropy loss derivative (HERE it probably meant SQE derivative) (NO the two derivatives are the same for both CE and SQE)
        deltas = tf.subtract(rewards, tf.reduce_sum(tf.multiply(output_network, y_pred), axis=1))
        deltas = tf.clip_by_value(deltas, -2, 4) / batch_size * self.lr_w

        # Compute cross entropy loss
        clipped_out = tf.clip_by_value(output_network, 1e-15, 1-1e-15)
        loss = tf.reduce_sum(-tf.reduce_sum(y_true * tf.math.log(clipped_out), axis=-1)) #mean, in case of a minibatch?
        # should it be loss = -zy (activation of the neuron corresponding to the correct class) + log(sumj exp(activation)j)
        return tf.math.reduce_mean(deltas), tf.reduce_mean(rewards), loss

    def get_prediction(self, output_network, batch_size):
        argmax_vector = tf.argmax(output_network, axis=1)
        random_vector = tf.random.uniform([batch_size], minval=0, maxval=1, seed=self.s)                                                                                                                                                                                                                      
        multinomial_vector = tf.reshape(tf.random.categorical(output_network, 1, seed=self.s), [-1])
        selected_classes = tf.where(tf.greater(random_vector, self.exploration_rate), argmax_vector, multinomial_vector)
        return tf.one_hot(selected_classes, output_network.shape[1])

    @tf.function
    def evaluate(self, inputs, y_true):
        fb_out = self.forward_pass(inputs, False)
        equality = tf.equal(tf.argmax(fb_out, 1), tf.argmax(y_true, 1))
        accuracy = tf.reduce_mean(tf.cast(equality, tf.float32))
        return accuracy

In [188]:
def get_dataset(dataset, batch_size, batch_size_test):
    print("Dataset: {}".format(dataset))
    if dataset == "CIFAR10":
      from keras.datasets import cifar10
      (X_train, y_train), (X_test, y_test) = cifar10.load_data()
    elif dataset == "MNIST":
      from keras.datasets import mnist
      (X_train, y_train), (X_test, y_test) = mnist.load_data()
    else:
      raise Exception("Unknown dataset")
    if len(np.shape(X_train)) < 4:
      X_train = tf.expand_dims(X_train, -1).numpy()
      X_test = tf.expand_dims(X_test, -1).numpy()

    image_shape = np.shape(X_train)[1:]
    print('Dataset size: {}'.format(len(X_train)))
    print('Test size: {}'.format(len(X_test)))
    # X_train = tf.divide(tf.cast(X_train, tf.float32), 255.0)
    # X_test = tf.divide(tf.cast(X_test, tf.float32), 255.0)
    X_train = tf.divide(tf.cast(X_train, tf.float32), 255.0)
    X_test = tf.divide(tf.cast(X_test, tf.float32), 255.0)
    n_classes = np.max(y_test) + 1
    # n_classes = tf.cast(tf.reduce_max(y_test)+1, dtype=tf.int32)
    y_train = tf.reshape(tf.one_hot(y_train, n_classes), (-1, n_classes))
    y_test = tf.reshape(tf.one_hot(y_test, n_classes), (-1, n_classes))

    train_set = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(X_train)).batch(batch_size)
    test_set = tf.data.Dataset.from_tensor_slices((X_test, y_test)).shuffle(len(X_train)).batch(batch_size_test)
    return train_set, test_set, image_shape, n_classes

In [189]:
# Retrieve data set, define validation set
batch_size = 1 #what happens with a bigger batch? Have you tried? Basicalyy you would have to split your network, because you have to update the betas only in one direction
batch_size_test = 100
train_set, test_set, input_shape, n_classes = get_dataset("MNIST", batch_size, batch_size_test)
train_set.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
n_val= 1000//batch_size #number of batches to validate on
validation = train_set.take(n_val).unbatch().batch(batch_size_test)
train = train_set.skip(n_val)

# print(validation)
# print(train)

Dataset: MNIST
Dataset size: 60000
Test size: 10000


In [190]:
def train_network(attention_span, lr_a, lr_w, w_init, s):
    print('HEB_T{}_a{}_w{}_s{}'.format(attention_span, lr_a, lr_w, s))

    network_architecture = [
        # conv(filters=[3, 3, 32], padding='VALID', stride=[1, 1], w_std=w_init),
        # conv(filters=[3, 3, 32], padding='SAME', stride=[2, 2],  w_std=w_init, dropout_rate=0.8),
        dense(1500,  w_std=w_init),
        dense(1000,  w_std=w_init),
        dense(500,  w_std=w_init),
        dense(n_classes,  w_std=w_init, activation=tf.identity)
    ]

    network = Network(input_shape, network_architecture, s)
    network.compile(lr_weights=lr_w, lr_att=lr_a)
    counter=0

    old_acc=0.
    start=time.time()
    for epoch in range(150):  #n_epochs is not passed?
        tic = time.time()
        loss, rewards = 0, 0
        for (batch, (X, Y)) in enumerate(train):
            curr_reward, curr_loss = network.fit_attention(X, Y)
            loss+=curr_loss
            rewards += curr_reward

        avg_acc = 0.
        # for X_test, Y_test in iter(validation):
        for (b,(X_test, Y_test)) in enumerate(test_set):
            # print(tf.shape(X_test))
            curr_acc = network.evaluate(X_test, Y_test)
            avg_acc += curr_acc
        print('Epoch: {} Time elapsed: {:.2f}: Training loss: {:.4f} Training accuracy: {:.2f}% Validation accuracy: {:.2f}%'.format(epoch, time.time()-tic, loss/(batch+1), rewards/(batch+1) * 100, avg_acc/(b+1) * 100))
    avg_acc = 0.
    for (batch, (X_test, Y_test)) in enumerate(test_set):
        curr_acc = network.evaluate(X_test, Y_test)
        avg_acc += curr_acc
    print('Test accuracy: {}'.format(avg_acc / (batch + 1)))
    print('Time elapsed: {}'.format(time.time()-start))


In [191]:
n_epochs = 150
# input_shape = [32,32,3]
# input_shape = [28,28,1]
# n_classes = 10
attention_span = 1  #only one attention update per weight update? Why is it so slow then (current settings approx 2.5 minutes per epoch)? Because batch ==1 probably.
lr_attention = 5e-3
# attention_span * lr_attention constant to have the same global update. The max was approx 20
lr_weights = 1e-0
weight_seed=0
std_weights = 0.02

train_network(attention_span, lr_attention, lr_weights, std_weights, weight_seed)   


## NB QAGREL was compared with batch_size = 1. If you change the batch size you have to change also the learning rate! instead you used 1e-2, i.e. the same as I use for bs100. 
## NNB Too small validation set. Overestimates generalization capability of the model.

HEB_T1_a0.005_w1.0_s0
Layer Dense: [28, 28, 1] -> [1500], activation: <function relu at 0x7fc266b0d8c8>, flatten: True, dropout: 0.0%
Layer Dense: [1500] -> [1000], activation: <function relu at 0x7fc266b0d8c8>, flatten: False, dropout: 0.0%
Layer Dense: [1000] -> [500], activation: <function relu at 0x7fc266b0d8c8>, flatten: False, dropout: 0.0%
Layer Dense: [500] -> [10], activation: <function identity at 0x7fc266d17b70>, flatten: False, dropout: 0.0%
Epoch: 0 Time elapsed: 104.81: Training loss: 0.8871 Training accuracy: 68.95% Validation accuracy: 92.88%
Epoch: 1 Time elapsed: 104.42: Training loss: 0.2045 Training accuracy: 92.95% Validation accuracy: 95.80%
Epoch: 2 Time elapsed: 104.65: Training loss: 0.1396 Training accuracy: 94.75% Validation accuracy: 96.50%
Epoch: 3 Time elapsed: 104.48: Training loss: 0.1086 Training accuracy: 95.69% Validation accuracy: 96.93%
Epoch: 4 Time elapsed: 104.83: Training loss: 0.0890 Training accuracy: 96.25% Validation accuracy: 97.33%
Epoch: 

KeyboardInterrupt: ignored