In [1]:
import numpy as np
import matplotlib.pyplot as plt

In [2]:
SPIKE_PROB = 0.8
TRAINING_BIAS = 0.01
POTENTIAL_DECAY = 0.75
CURRENT_DECAY = 0.5
INPUT_VALUES=[0,1]

In [5]:
# This class takes lists of input currents, membrane potentials, and spikes and plots them.

class Spike_Plotter:
    def __init__(self):
        pass
        
    def Plot_Spiking_Behavior(self,input_current,membrane_potential,spikes,title):
        fig,axes = plt.subplots(3,1,sharex=True)
        t = np.arange(len(input_current))
        self.Plot_Current(axes[0],t,input_current)
        self.Plot_Membrane_Potential(axes[1],t,membrane_potential)
        self.Plot_Spikes(axes[2],t,spikes)
        axes[0].set_title(title)
        plt.show()
    
    def Plot_Current(self,ax,t,input_current):
        ax.plot(t,input_current)
        ax.set_ylabel("I (mV)")
    
    def Plot_Membrane_Potential(self,ax,t,membrane_potential):
        ax.plot(t,membrane_potential)
        ax.set_ylabel("V (mV)")
    
    def Plot_Spikes(self,ax,t,spikes):
        ax.plot(t,spikes)
        ax.set_ylabel("Spikes")
        ax.set_xlabel("Time (ms)")




In [6]:
# The Neuron Class manages all the parameters and models that are used for testing.  
# The class requires the models are first initialized by giving the model parameters before running
# Then the proper model's membrane potential function can be called for a given input current
# The returned voltage and spiking behavior across the duration of the current will be returned

#POTENTIAL_DECAY = 0.8
#CURRENT_DECAY = 0.8

#class Neuron:
#    def __init__(self):
#        self.LIF_initialilzed = False

#    def Initialize_LIF(self,neuron_resting_voltage,neuron_firing_threshold,C_m, R_m):
#        self.C_m = C_m
#        self.R_m = R_m
#        self.resting_voltage=neuron_resting_voltage
#        self.voltage_threshold=neuron_firing_threshold
#        self.LIF_initialized = True
        
#    def LIF_Neuron_Voltage_Current_Change(self,t,I, V_m, spike_trains, trained_weights,spike_occurrence_array):
#        if self.LIF_initialized:
#            I[t] = CURRENT_DECAY*I[t-1] + np.sum(spike_trains[:,t]*trained_weights)
#            if V_m[t-1] < self.voltage_threshold:
#                V_m[t] = POTENTIAL_DECAY*V_m[t-1] + I[t]
#                spike_occurrence_array[t-1] = 0
#            else:
#                V_m[t] = self.resting_voltage
#                spike_occurrence_array[t-1] = 1
#
#            return I, V_m
#        else:
#            raise Exception('LIF was not initialized')
#        
#    def LIF_Membrane_Potential(self,time, spike_trains, trained_weights):
#
#        spike_occurrence_array = np.zeros(time)
#        current_voltage = self.resting_voltage
#        V_m = np.zeros(time)
#        input_current = np.zeros(time)
#        for t in range(1,time):
#            input_current,V_m = self.LIF_Neuron_Voltage_Current_Change(t,input_current, \
#                                       V_m,spike_trains, trained_weights,spike_occurrence_array)
#
#        return input_current, V_m,spike_occurrence_array
        
    

In [None]:
class Simplified_Neuron:
    def __init__(self,layer_number,parameters):
        self.parameters = parameters
        self.layer_number=layer_number
        
        self.spike_fired=0
        self.current_potential = resting_potential
        self.time_since_spike=0
        self.presynaptic_neurons = None
        self.weights = None
        
    def Update_Membrane_Potential(self):
        self.spike_fired=0
        self.time_since_spike+=1
        new_potential = self.parameters.resting_potential
        if self.parameters.minimum_potential<self.current_potential and self.current_potential < self.parameters.threshold_potential:
            incoming_spikes = self.Check_Presynaptic_Spikes()
            new_potential = self.current_potential+np.sum(incoming_spikes*self.weights)-self.parameters.delay
        elif self.parameters.threshold_potential <= self.current_potential:
            self.spike_fired=1
            self.time_since_spike=0
            new_potential = self.parameters.resting_potential
        elif self.current_potential <= self.parameters.minimum_potential:
            new_potential = self.resting_potential
        self.current_potential = new_potential
    
    def Update_Weights(self):
        for i in range(len(self.presynaptic_neurons)):
            new_weight=self.New_Weight(self.presynaptic_neurons[i],self.weights[i])
            self.weights[i]=new_weight
    
    
    def Check_Presynaptic_Spikes(self):
        spikes = []
        for neuron in self.presynaptic_neurons:
            spikes.append(neuron.Check_If_Spike_Fired())
        return np.array(spikes)
            
    def Check_If_Spike_Fired(self):
        return self.spike_fired
    
    def Get_Time_Since_Spike(self):
        return self.time_since_spike
    
    def Set_Presynaptic_Neurons(self,presynaptic_neurons)
        self.presynaptic_nerons = presynaptic_neurons
        weights = []
        for i in range(len(presynaptic_neurons)):
            weights.append(0.5)
        self.weights = np.array(weights)
    
    def Recursive_Weight_Updates(self):
        if self.presynaptic_neurons not None:
            for i in range(len(self.presynaptic_neurons)):
                neuron = self.presynaptic_neurons[i]
                neuron.Recursive_Weight_Updates()
                self.weights[i]=self.New_Weight(neuron,self.weights[i])
    
    def Recursive_Spike_Propagation(self):
        if self.presynaptic_neurons not None:
            for i in range(len(self.presynaptic_neurons)):
                neuron = self.presynaptic_neurons[i]
                neuron.Recursive_Spike_Propagation()
                self.Update_Membrane_Potential()
        
    
    def Get_Delta_T(self,presynaptic_neuron):
        presynaptic_spike=presynaptic_neuron.Check_If_Spike_Fired()
        presynaptic_t = presynaptic_neuron.Get_Time_Since_Spike()
        if presynaptic_spike==1 or self.spike_fired==1:
            delta_t=presynaptic_t - self.time_since_spike
        else:
            delta_t=0
        return delta_t
        
    def Get_Delta_Weight(self,presynaptic_neuron):
        delta_t=self.Get_Delta_T(presynaptic_neuron)
        delta_weight=0
        if delta_t<=-2:
            delta_weight=self.A_minus*np.exp(delta_t/self.tau_minus)
        elif delta_t>=2:
            delta_weight=self.A_plus*np.exp(self.delta_t/tau_plus)
        return delta_weight
    
    def New_Weight(self,presynaptic_neuron,old_weight):
        delta_weight=Get_Delta_Weight(presynaptic_neuron)
        new_weight=old_weight
        if delta_weight>0:
            new_weight = old_weight + self.parameters.learning_rate*(self.parameters.max_w-old_weight)
        elif delta<=0:
            new_weight = old_weight + self.parameters.learning_rate*(old_weight-self.parameters.min_w)
        return new_weight


In [None]:
class Network_Parameters():
     def __init__(learning_rate,minimum_potential,threshold_potential,resting_potential,delay,A_minus,A_plus,tau_minus,tau_plus,w_max,w_min):
        self.threshold_potential = threshold_potential
        self.refractory_potential = refractory_potential
        self.resting_potential = resting_potential
        self.minimum_potential = minimum_potential
        self.delay = delay
        self.learning_rate = learning_rate
        self.A_minus=A_minus
        self.A_plus=A_plus
        self.tau_minus=tau_minus
        self.tau_plus=tau_plus
        self.w_max = w_max
        self.w_min = w_min

class Spiking_Neural_Network:
    def __init__(self,num_of_input_neurons,parameters):
        self.parameters=parameters
        self.input_neurons = []
        self.output_neurons = []
        for i in range(num_of_input_neurons):
            neuron=Simplified_Neuron(1,self.parameters)
            self.input_neurons.append(neuron)
            self.output_neurons.append(neuron)
        self.num_of_layers = 1
        
    def Add_New_Layer(self,num_of_neurons):
        self.num_of_layers+=1
        new_output_neurons = []
        for i in range(len(num_of_neurons)):
            neuron=Simplified_Neuron(self.num_of_layers,self.parameters)
            neuron.Set_Presynaptic_Neurons(self.output_neurons)
            new_output_neurons.append(neuron)
        self.output_neurons=new_output_neurons
        
    def Update_Network_Weights(self):
        for neuron in self.output_neurons:
            neuron.Recursive_Weight_Updates()
            
    def Update_Membrane_Potential(self):
        for neuron in self.output_neurons:
            neuron.Recursive_Spike_Propogation()
            
    
    

In [7]:
class Data:
    def __init__(self, size, num_neurons, gate):
        self.spike_train_size = size
        self.gate = gate
        if len(INPUT_VALUES) == 0 or len(INPUT_VALUES) == 1 or num_neurons == 0 or num_neurons == 1:
            raise Exception("Input values/number of neurons cannot be 0 or 1")
        elif len(INPUT_VALUES) == num_neurons:
            self.num_neurons = num_neurons
        else:
            raise Exception("Input values length and number of neurons should be same")

        
    """
    Based on inputs, get spike probabilities
    """
    def get_spike_prob(self,neuron_input):
        if neuron_input == 1:
            return 1-SPIKE_PROB, SPIKE_PROB
        else:
            return SPIKE_PROB,1-SPIKE_PROB
    
    """
    Usage of AND/OR/XOR operators on multiple spike trains
    """
    def and_gate(self,spike_trains):
        result = np.logical_and(spike_trains[0],spike_trains[1])
        for i in range(2, len(spike_trains)):
            result = np.logical_and(result, spike_trains[i])
        return result    
    def or_gate(self,spike_trains):
        result = np.logical_and(spike_trains[0],spike_trains[1])
        for i in range(2, len(spike_trains)):
            result = np.logical_or(result, spike_trains[i])
        return result
    def xor_gate(self,spike_trains):
        result = np.logical_and(spike_trains[0],spike_trains[1])
        for i in range(2, len(spike_trains)):
            result = np.logical_xor(result, spike_trains[i])
        return result
    
    """
    Generate spike trains
    """
    def encode(self,neurons):
        #Need to change this to someother distribution
        spike_trains = []
        for neuron_value in neurons:
            prob_dist = self.get_spike_prob(neuron_value)
            spike_trains.append(np.random.choice(INPUT_VALUES, self.spike_train_size,p=prob_dist))
        self.spike_trains = np.asarray(spike_trains)
        return self.spike_trains
    
    """
    Generate teacher neuron based on input gate and spike trains
    """
    def teacher_neuron(self):
        spike_trains = self.spike_trains
        gate = self.gate
        if gate == "AND":
            self.teacher = self.and_gate(spike_trains)
        if gate == "OR":
            self.teacher = self.or_gate(spike_trains)
        if gate == "XOR":
            self.teacher = self.xor_gate(spike_trains)
        return self.teacher.astype(int)
    
    """
    Generate input, encoded data of data size - data_size
    """
    def generate_data(self,data_size):
        
        generated_data = []
        neurons_input = [] 
        for _ in range(self.num_neurons):
            neurons_input.append(np.random.choice([0,1],data_size))
        neurons_input = np.asarray(neurons_input).T

        for each_neuron_set in neurons_input:
            generated_data.append((each_neuron_set, self.encode(each_neuron_set), self.teacher_neuron()))
            
        return generated_data


class Train:
    def __init__(self,spike_train,teacher):
        self.bias = TRAINING_BIAS
        self.teacher = teacher
        self.spike_trains = spike_train
        self.len_spike_train = len(spike_train)
        self.weigths = np.random.normal(0,0.1,self.len_spike_train)
    
    def get_a_corr(self,spike_train, teacher):
        #This needs to be changed
        return np.logical_and(spike_train,teacher).astype(int)
        
    
    def fit(self):
        for i,spike_train in enumerate(self.spike_trains):
            for j,(n,t) in enumerate(zip(spike_train,teacher)):
                a_corr = self.get_a_corr(spike_train, teacher)

                dw = a_corr[j]*n*t - TRAINING_BIAS
                self.weigths[i] = abs(self.weigths[i] + dw -1)
        return self.weigths
        
        

In [8]:
def get_accuracy(teacher,output):
    return sum(output)/sum(teacher)

In [9]:
#LIP Parameters
neuron_resting_voltage=0
neuron_firing_threshold=0.5
C_m=2
R_m=1.5



neuron=Neuron()
neuron.Initialize_LIF(neuron_resting_voltage,neuron_firing_threshold,C_m,R_m)
plotter = Spike_Plotter()


time = 20
data = Data(time,2,"AND")
generated_data = data.generate_data(5)


for (neuron_input, spike_train, teacher) in generated_data:
    train_model = Train(spike_train,teacher)
    trained_weights = train_model.fit()
    input_current, membrane_potential,spike_occurrence = neuron.LIF_Membrane_Potential(time,spike_train,trained_weights)
#     plotter.Plot_Spiking_Behavior(input_current,membrane_potential,spike_occurrence,"LIF Neuron")
    accuracy = get_accuracy(teacher,spike_occurrence)
    print("Accuracy for neurons set: ", neuron_input, "is ", accuracy)


Accuracy for neurons set:  [1 0] is  3.0
Accuracy for neurons set:  [0 1] is  3.0
Accuracy for neurons set:  [0 1] is  3.0
Accuracy for neurons set:  [1 1] is  0.0
Accuracy for neurons set:  [0 1] is  3.0


In [10]:
sum(np.logical_and(spike_train[0],spike_train[1]))

3

In [11]:
sum(spike_occurrence)

9.0

In [12]:
trained_weights

array([0.97551509, 0.95895063])

In [13]:
# #Testingbed
# data = Data(10,2,"AND")
# spike_train, teacher = data.generate_data()
# # teacher = data.teacher_neuron()

# train = Train(spike_train,teacher)
# weights = train.fit()
# print(spike_train[0])
# print(spike_train[1])
# print(teacher)
# print(weights)
