# Constructing and Running ANNs for RF Mutual Information Investigation
## Thomas Possidente

### Imports and Value Initialization

In [1]:
# ANN Building Imports
import keras
from keras import backend as K
from keras import optimizers
from keras.engine.topology import Layer
from keras.models import Sequential
from keras.layers import Dense
import tensorflow as tf

# Standard Imports
import pandas as pd
import numpy as np

# Value Inits - Specify as needed
num_inputs = int(1000)  # number of dummy images in set
size = int(16)          # Dimension of each dummy image should be size*size
RF_size = int(4)        # Dimensions of the RF to be analyzed should be RF_size*RF_size

# Value Inits - Leave these alone
num_of_RFs = int((size*size) / (RF_size*RF_size))
inputs_by_RF = np.empty([16000, 16])


  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


### Loading Inputs

In [2]:
inputs = pd.read_csv('test.csv')
inputs = inputs.drop('X1', axis = 0) # Taking out col names
inputs = inputs.apply(pd.to_numeric)  # converting to floats

inputs = inputs.values # convert to np ndarray
inputs = inputs.reshape(1000,16,16) # reshape to desired dims (1000 examples, 16*16)

### Preprocessing Inputs

In [3]:
# Loop through num_inputs and extract each RF input, flatten, and store in ndarray

origin_x = 0
origin_y = 0
count = 0

for n in range(0, num_of_RFs): # For each RF field in dummy image
    for i in range(0, num_inputs): # For each dummy image in set
        single_RF_input = inputs[i, origin_y:(origin_y + RF_size), origin_x:(origin_x + RF_size)] # Extract 1 RF 
        single_RF_input_flat = single_RF_input.reshape((RF_size*RF_size)) 
        inputs_by_RF[count,] = single_RF_input_flat 
        count += 1
    if(origin_x == (size - RF_size)):  # Changes RF field over image
        origin_x = 0
        origin_y += RF_size
    else:
        origin_x += RF_size
        
inputs_by_RF = inputs_by_RF.reshape(num_of_RFs, num_inputs, (RF_size*RF_size)) 
# ^Reshaping to (RF Location, Image number, flattened RF) - this makes it easier to select inputs for each separate ANN 

### Building Network

In [4]:
np.set_printoptions(threshold=np.nan)

sess = tf.Session()


class Hebbian(Layer):
    
    
    def __init__(self, output_dim, lmbda=1.0, eta=0.0005, connectivity='random', connectivity_prob=0.25, **kwargs):
        '''
        Constructor for the Hebbian learning layer.

        args:
            output_dim - The shape of the output / activations computed by the layer.
            lambda - A floating-point valued parameter governing the strength of the Hebbian learning activation.
            eta - A floating-point valued parameter governing the Hebbian learning rate.
            connectivity - A string which determines the way in which the neurons in this layer are connected to
                the neurons in the previous layer.
        '''
        self.output_dim = output_dim
        self.lmbda = lmbda
        self.eta = eta
        self.connectivity = connectivity
        self.connectivity_prob = connectivity_prob

        super(Hebbian, self).__init__(**kwargs)
        
    
    
    def random_conn_init(self, shape, dtype=None):
        A = np.random.normal(0, 1, shape)
        A[self.B] = 0
        return tf.constant(A, dtype=tf.float32)


    def zero_init(self, shape, dtype=None):
        return np.zeros(shape)


    def build(self, input_shape):
        # create weight variable for this layer according to user-specified initialization
        if self.connectivity == 'random':
            self.B = np.random.random(input_shape[0]) < self.connectivity_prob
        elif self.connectivity == 'zero':
            self.B = np.zeros(self.output_dim)
            
        if self.connectivity == 'all':
            self.kernel = self.add_weight(name='kernel', shape=(np.prod(input_shape[1:]), \
                        np.prod(self.output_dim)), initializer='uniform', trainable=False)
        elif self.connectivity == 'random':
            self.kernel = self.add_weight(name='kernel', shape=(np.prod(input_shape[1:]), \
                        np.prod(self.output_dim)), initializer=self.random_conn_init, trainable=False)
        elif self.connectivity == 'zero':
            self.kernel = self.add_weight(name='kernel', shape=(np.prod(input_shape[1:]), \
                        np.prod(self.output_dim)), initializer=self.zero_init, trainable=False)
        else:
            raise NotImplementedError


        # call superclass "build" function
        super(Hebbian, self).build(input_shape)


    def call(self, x):
        x_shape = tf.shape(x)
        batch_size = tf.shape(x)[0]

        # reshape to (batch_size, product of other dimensions) shape
        x = tf.reshape(x, (tf.reduce_prod(x_shape[1:]), batch_size))

        # compute activations using Hebbian-like update rule
        activations = x + self.lmbda * tf.matmul(self.kernel, x)  # why "x +", should this be removed?

        # compute outer product of activations matrix with itself
        outer_product = tf.matmul(tf.expand_dims(x, 1), tf.expand_dims(x, 0))  # Why outer product? Should this line be removed?

        # update the weight matrix of this layer
        self.kernel = self.kernel + tf.multiply(self.eta, tf.reduce_mean(outer_product, axis=2)) # Still why outer prod?
        self.kernel = tf.multiply(self.kernel, self.B) # zeroing node connections that are meant to be zeros
        return K.reshape(activations, x_shape)

        
        


In [5]:
model = Sequential()
model.add(Hebbian(input_shape = (RF_size,1), output_dim = 4))

In [9]:
                                                # TODO how to compile model without a real optimizer/loss function?
                                                # then how to get values out and make sure weights are changing (probably
                                                # just model.predict)
            
            # dummy_opt seems to be working (maybe), need to figure out how to dummy loss correctly. Maybe need to
            # pass fake y_true values?
            
def dummy_loss(): pass  
#def dummy_loss(y_true, y_pred): return y_pred             

class dummy_opt(keras.optimizers.Optimizer): pass
    
dummyLoss = dummy_loss()
dummyOpt = dummy_opt()

model.compile(optimizer= dummyOpt,loss = dummyLoss)

  **kwargs)


ValueError: The model cannot be compiled because it has no loss to optimize.

In [7]:

model.fit(inputs_by_RF[0])

RuntimeError: The model needs to be compiled before being used.