# Using Local Learning Rules to perform simple tasks in a Feedforward Pytorch Network
Code by Rishika Mohanta

In [2]:
# Import necessary packages
import torch
import torch.nn as nn
import torch.nn.functional as F

import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

Here we implement a simple toy feedforward network with simple learning rules.

In [3]:
# Create toy input for AND problem
input = torch.tensor([[-1,-1],[-1,1],[1,-1],[1,1]], dtype=torch.float)
output = torch.tensor([[-1], [-1], [-1], [1]], dtype=torch.float)


In [4]:
from networks import FeedForwardNetwork, FeedForwardLearningRule
import torch
import torch.nn as nn
import torch.nn.functional as F

In [104]:
import torch
import torch.nn as nn

class FeedForwardNetwork(nn.Module):
    """
    Simple Feed-Forward Neural Network with arbitrary number of hidden layers
    """
    def __init__(self, n_input : int, hidden_layer_sizes : list, n_output: int):
        """
        Initialize the network
        :param n_input: number of input neurons
        :param hidden_layer_sizes: list of number of hidden neurons in each layer
        :param n_output: number of output neurons
        """
        super().__init__()
        self.layers = nn.ModuleList([nn.Linear(n_input, hidden_layer_sizes[0], bias = False)]) # first layer
        self.layers.extend([nn.Linear(h1, h2, bias = False) for h1, h2 in zip(hidden_layer_sizes, hidden_layer_sizes[1:])]) # hidden layers
        self.layers.append(nn.Linear(hidden_layer_sizes[-1], n_output, bias = False)) # output layer
    
    def forward(self, x):
        """
        Forward pass through the network
        :param x: network input
        """
        for layer in self.layers:
            x = torch.sigmoid(layer(x)) # apply ReLU activation function
        self.hebbian_update(x,0.1)
        return x
    
    def reset_parameters(self):
        """
        Reset the parameters of the network
        """
        for layer in self.layers: # reset all layers
            layer.reset_parameters()
    
    def hebbian_forward(self,input,lr,max_weight=1):
        """
        Perform forward Hebbian update of the network
        :param input: input to the network
        :param lr: learning rate
        """
        x = input
        for layer in self.layers:
            y = torch.sigmoid(layer(x))
            if max_weight is not None:
                layer.weight.data += lr * torch.mm(y.t(), x) * (max_weight - torch.abs(layer.weight.data))
            else:
                layer.weight.data += lr * torch.mm(y.t(), x)
            x = y
        

    def get_weights(self):
        """
        Get the weights of the network
        """
        return [layer.weight.data.numpy() for layer in self.layers]
    
    def set_weights(self, weights):
        """
        Set the weights of the network
        :param weights: list of weights
        """
        for layer, weight in zip(self.layers, weights):
            layer.weight.data = torch.from_numpy(weight)

    def arbitrary_forward(self, input, synapse_updater):
        """
        Update the weights and biases of the network using an arbitrary update function
        :param input: input to the network
        :param synapse_update_function: function that updates the weights and biases of the network
        """
        x = input
        for layer in self.layers:
            y = torch.sigmoid(layer(x))
            for i in range(y.shape[1]):
                for j in range(x.shape[1]):

                    layer.weight.data[i,j] = synapse_updater(torch.cat([x[:,j],y[:,i],layer.weight.data[i,j].view(1)]))
            x = y

class FeedForwardLearningRule(nn.Module):
    """
    A class that implements an arbitrary local memoryless learning rule
    """
    def __init__(self, layers):
        """
        Initialize the learning rule
        """
        super().__init__()
        self.input_size = 2
        self.output_size = 1
        self.layers = layers

    def forward(self, x):
        """
        Forward pass through the network
        :param x: network input
        """
        for layer in self.layers:
            x = torch.relu(layer(x))
        return x

In [105]:
temp = FeedForwardNetwork(2, [5,10], 1)
init_weights = temp.get_weights()
with torch.no_grad():
    temp.reset_parameters()
    input_tensor = []
    output_tensor = []
    for steps in range(3):
        random_input = torch.randn(1,2)
        output = temp.forward(random_input)
        temp.hebbian_update(random_input, 0.1)
        input_tensor.append(random_input)
        output_tensor.append(output)
    input_tensor = torch.cat(input_tensor)
    output_tensor = torch.cat(output_tensor)

In [106]:
temp = FeedForwardNetwork(2, [5,10], 1)
# temp.set_weights(init_weights)

In [107]:
temp(input_tensor[0].view(1,-1))

tensor([[0.4087]], grad_fn=<SigmoidBackward0>)

In [103]:
hidden_layer_sizes = [5,5]
shared_layers = nn.ModuleList([nn.Linear(3, hidden_layer_sizes[0], bias = False)])
shared_layers.extend([nn.Linear(h1, h2, bias = False) for h1, h2 in zip(hidden_layer_sizes, hidden_layer_sizes[1:])])
shared_layers.append(nn.Linear(hidden_layer_sizes[-1], 1, bias = False))
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(temp.parameters(), lr=0.1)

for i in range(100):
    optimizer.zero_grad()
    output = temp(input_tensor)
    loss = loss_function(output, output_tensor)
    loss.backward()
    optimizer.step()
    
temp.arbitrary_update(input_tensor[0].view(1,-1),FeedForwardLearningRule(shared_layers))
temp(input_tensor[1].view(1,-1))
loss = loss_function(temp(input_tensor[1].view(1,-1)), output_tensor[1].view(1,-1))
loss.backward()

In [None]:
# # Implement a Neural Network with an arbitrary synaptic learning rule
# class ToyNetwork(nn.Module):
#     """
#     Simple Feed-Forward Neural Network with arbitrary number of hidden layers and no biases
#     """
#     def __init__(self, n_input : int, hidden_layer_sizes : list, n_output: int):
#         """
#         Initialize the network
#         :param n_input: number of input neurons
#         :param hidden_layer_sizes: list of number of hidden neurons in each layer
#         :param n_output: number of output neurons
#         """
#         super().__init__()
#         self.layers = nn.ModuleList([nn.Linear(n_input, hidden_layer_sizes[0],bias=False)]) # first layer
#         self.layers.extend([nn.Linear(h1, h2, bias=False) for h1, h2 in zip(hidden_layer_sizes, hidden_layer_sizes[1:])]) # hidden layers
#         self.layers.append(nn.Linear(hidden_layer_sizes[-1], n_output, bias=False)) # output layer
    
#     def forward(self, x):
#         """
#         Forward pass through the network
#         :param x: network input
#         """
#         for layer in self.layers[:-1]: # skip the last layer
#             x = torch.relu(layer(x)) # apply ReLU activation function
#         x = torch.tanh(self.layers[-1](x)) # apply tanh activation function to have output in [-1,1]
#         return x
    
#     def reset_parameters(self):
#         """
#         Reset the parameters of the network
#         """
#         for layer in self.layers: # reset all layers
#             layer.reset_parameters()
    
#     def supervised_hebbian_update(self, input, output, learning_rate):
#         """
#         Perform a Hebbian update on the network in the backward direction given the output and input
#         :param input: input to the network
#         :param output: target output
#         :param learning_rate: learning rate
#         """
#         y = output # target output
#         for layer_no in range(len(self.layers)-1, 0, -1): # go through all layers in reverse order
#             # forward pass through the network up to before the current layer
#             x = input
#             for layer in self.layers[:layer_no]: # go through all layers up to the current layer
#                 x = torch.relu(layer(x)) # apply ReLU activation function
            
#             deltaw = learning_rate * torch.mm(y.t(), x) # calculate Hebbian update
#             self.layers[layer_no].weight.data += deltaw # update weights
#             self.layers[layer_no].weight.data = torch.clamp(self.layers[layer_no].weight.data, min=-1, max=1) # clip weights to [-1,1]
#             y = x # update target output
    
#     def unsupervised_hebbian_update(self, input, learning_rate):
#         """
#         Perform a Hebbian update on the network in the forward direction given only the input
#         :param input: input to the network
#         :param learning_rate: learning rate
#         """
#         x = input # start with input to the network
#         for layer in self.layers: # go through all layers
#             y = layer(x) # forward step through the network
#             deltaw = learning_rate * x.t().mm(y).t() # calculate Hebbian update
#             layer.weight.data += deltaw # update weights
#             layer.weight.data  = layer.weight.data.clamp(-1, 1) # clip weights to [-1,1]
#             x = layer(x) # current output becomes input for the next layer
    
#     def semisupervised_hebbian_update(self, input, output, learning_rate):
#         """
#         Perform a Hebbian update on the network in the forward direction given the input but some information about error is available as a global context
#         :param input: input to the network
#         :param output: target output
#         :param learning_rate: learning rate
#         """
#         binarized_prediction = (self.forward(input)>0).float()
#         binarized_target = (output>0).float()
#         loss = F.mse_loss(binarized_prediction, binarized_target) # calculate MSE loss
        
#         global_context = loss.item() 
#         # Interpretation: 0 if correct, 1 if incorrect
#         # Scale to be consistent with the sign convention of the Hebbian update
#         global_context = 1-2*global_context
#         # Interpretation: -1 if incorrect, 1 if correct
#         # Condition the sign of the global reward on the state of the network
#         global_context = -(2*binarized_prediction-1)*global_context
#         # Interpretation: 
#         # -1 if incorrect, 1 if correct (neuron is active); 
#         # -1 if correct, 1 if incorrect (both inactive)

#         x = input
#         for layer in self.layers:
#             y = layer(x)
#             deltaw = learning_rate * x.t().mm(y).t() * global_context
#             layer.weight.data += deltaw
#             layer.weight.data  = layer.weight.data.clamp(-1, 1)
#             x = y   

In [5]:
n_replications = 100 # number of replications to estimate the average performance
print_every = None
hidden_layer_sizes = [10] # number of hidden neurons in each layer


success_rate = [] 
for _ in tqdm(range(n_replications)): # run the weight training
    model = ToyNetwork(2, hidden_layer_sizes, 1) # initialize the network
    for iters in range(400): # train the network for 400 iterations
        random_index = np.random.randint(0, 4) # randomly select an example from the dataset to train on
        model.supervised_hebbian_update(input[random_index].unsqueeze(0), output[random_index].unsqueeze(0), 0.01) # perform a Hebbian update
        if print_every is not None and iters % print_every == 0: 
            out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
            print(f'Iteration {iters}: XOR = {out}')
    
    out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
    expected_out = (output>0).float().detach().numpy().T[0] # get the expected output of the network
    if np.all(out == expected_out):
        success_rate.append(1)
    else:
        success_rate.append(0)

success_rate = np.mean(success_rate)
print(f'Success rate: {success_rate}')


  0%|          | 0/100 [00:00<?, ?it/s]

Success rate: 0.73


In [6]:
n_replications = 100 # number of replications to estimate the average performance
print_every = None
hidden_layer_sizes = [100] # number of hidden neurons in each layer


success_rate = [] 
for _ in tqdm(range(n_replications)): # run the weight training
    model = ToyNetwork(2, hidden_layer_sizes, 1) # initialize the network
    for iters in range(400): # train the network for 400 iterations
        random_index = np.random.randint(0, 4) # randomly select an example from the dataset to train on
        model.supervised_hebbian_update(input[random_index].unsqueeze(0), output[random_index].unsqueeze(0), 0.01) # perform a Hebbian update
        if print_every is not None and iters % print_every == 0: 
            out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
            print(f'Iteration {iters}: XOR = {out}')
    
    out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
    expected_out = (output>0).float().detach().numpy().T[0] # get the expected output of the network
    if np.all(out == expected_out):
        success_rate.append(1)
    else:
        success_rate.append(0)

success_rate = np.mean(success_rate)
print(f'Success rate: {success_rate}')


  0%|          | 0/100 [00:00<?, ?it/s]

Success rate: 0.98


In [8]:
n_replications = 100 # number of replications to estimate the average performance
print_every = None
hidden_layer_sizes = [10,10] # number of hidden neurons in each layer


success_rate = [] 
for _ in tqdm(range(n_replications)): # run the weight training
    model = ToyNetwork(2, hidden_layer_sizes, 1) # initialize the network
    for iters in range(1000): # train the network for 400 iterations
        random_index = np.random.randint(0, 4) # randomly select an example from the dataset to train on
        model.supervised_hebbian_update(input[random_index].unsqueeze(0), output[random_index].unsqueeze(0), 0.01) # perform a Hebbian update
        if print_every is not None and iters % print_every == 0: 
            out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
            print(f'Iteration {iters}: XOR = {out}')
    
    out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
    expected_out = (output>0).float().detach().numpy().T[0] # get the expected output of the network
    if np.all(out == expected_out):
        success_rate.append(1)
    else:
        success_rate.append(0)

success_rate = np.mean(success_rate)
print(f'Success rate: {success_rate}')


  0%|          | 0/100 [00:00<?, ?it/s]

Success rate: 0.0


In [9]:
# Create toy input for XOR problem
input = torch.tensor([[-1,-1],[-1,1],[1,-1],[1,1]], dtype=torch.float)
output = torch.tensor([[-1], [1], [1], [-1]], dtype=torch.float)


In [288]:
n_replications = 100 # number of replications to estimate the average performance
print_every = None
hidden_layer_sizes = [10] # number of hidden neurons in each layer


success_rate = [] 
for _ in tqdm(range(n_replications)): # run the weight training
    model = ToyNetwork(2, hidden_layer_sizes, 1) # initialize the network
    for iters in range(400): # train the network for 400 iterations
        random_index = np.random.randint(0, 4) # randomly select an example from the dataset to train on
        model.supervised_hebbian_update(input[random_index].unsqueeze(0), output[random_index].unsqueeze(0), 0.01) # perform a Hebbian update
        if print_every is not None and iters % print_every == 0: 
            out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
            print(f'Iteration {iters}: XOR = {out}')
    
    out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
    expected_out = (output>0).float().detach().numpy().T[0] # get the expected output of the network
    if np.all(out == expected_out):
        success_rate.append(1)
    else:
        success_rate.append(0)

success_rate = np.mean(success_rate)
print(f'Success rate: {success_rate}')


  0%|          | 0/100 [00:00<?, ?it/s]

Success rate: 0.29


In [289]:
n_replications = 100 # number of replications to estimate the average performance
print_every = None
hidden_layer_sizes = [10,10] # number of hidden neurons in each layer


success_rate = [] 
for _ in tqdm(range(n_replications)): # run the weight training
    model = ToyNetwork(2, hidden_layer_sizes, 1) # initialize the network
    for iters in range(400): # train the network for 400 iterations
        random_index = np.random.randint(0, 4) # randomly select an example from the dataset to train on
        model.supervised_hebbian_update(input[random_index].unsqueeze(0), output[random_index].unsqueeze(0), 0.01) # perform a Hebbian update
        if print_every is not None and iters % print_every == 0: 
            out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
            print(f'Iteration {iters}: XOR = {out}')
    
    out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
    expected_out = (output>0).float().detach().numpy().T[0] # get the expected output of the network
    if np.all(out == expected_out):
        success_rate.append(1)
    else:
        success_rate.append(0)

success_rate = np.mean(success_rate)
print(f'Success rate: {success_rate}')


  0%|          | 0/100 [00:00<?, ?it/s]

Success rate: 0.0


In [290]:
n_replications = 100 # number of replications to estimate the average performance
print_every = None
hidden_layer_sizes = [100] # number of hidden neurons in each layer


success_rate = [] 
for _ in tqdm(range(n_replications)): # run the weight training
    model = ToyNetwork(2, hidden_layer_sizes, 1) # initialize the network
    for iters in range(400): # train the network for 400 iterations
        random_index = np.random.randint(0, 4) # randomly select an example from the dataset to train on
        model.supervised_hebbian_update(input[random_index].unsqueeze(0), output[random_index].unsqueeze(0), 0.01) # perform a Hebbian update
        if print_every is not None and iters % print_every == 0: 
            out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
            print(f'Iteration {iters}: XOR = {out}')
    
    out = (model.forward(input)>0).float().detach().numpy().T[0] # get the output of the network
    expected_out = (output>0).float().detach().numpy().T[0] # get the expected output of the network
    if np.all(out == expected_out):
        success_rate.append(1)
    else:
        success_rate.append(0)

success_rate = np.mean(success_rate)
print(f'Success rate: {success_rate}')

  0%|          | 0/100 [00:00<?, ?it/s]

Success rate: 0.99


In [291]:
# model = ToyNetwork(2, [100,100], 1)
# print_every = 10
# for iters in range(200):
#     model.hebbian_update(input, output, 0.01)
#     if iters % print_every == 0:
#         print(f'Iteration {iters}: XOR = {model.forward(input).detach().numpy()}')

In [10]:
# Simple LSTM that predicts the next change in synaptic weight locally
class weightUpdater(nn.Module):
    def __init__(self, lstm, fc):
        super(weightUpdater, self).__init__()
        self.lstm = lstm
        self.fc = fc
        self.input_size = lstm.input_size
        self.hidden_size = lstm.hidden_size
        self.output_size = fc.out_features
    
    def forward(self, x):
        out,_ = self.lstm(x)
        out = self.fc(out)
        return out
    
    def init_hidden(self, batch_size):
        return torch.zeros(1, batch_size, self.hidden_size)

In [11]:
shared_lstm = nn.LSTM(2, 10, 1)
shared_fc = nn.Linear(10, 1)

synapse = weightUpdater(shared_lstm, shared_fc)

In [12]:
synapse(torch.tensor([[[0.,1.],[1.,0.],[1.,0.]]]))

tensor([[[-0.2215],
         [-0.2007],
         [-0.2007]]], grad_fn=<AddBackward0>)