In [151]:
import numpy as np
import random
from numpy.random import randn

In [152]:
positives = 50
negatives = 50
signal_length = 10
pulse_length = 2

input_signals = np.zeros((positives + negatives,signal_length,1))
targets = np.zeros(positives + negatives,dtype=int)
for i in range(positives):
    targets[i] = 1
    pulse_start = random.randint(0,signal_length - pulse_length)
    for j in range(pulse_length):
        input_signals[i][pulse_start+j][0] = 1


In [153]:
class LeakyRNN:
    
    def __init__(self, input_size, output_size, hidden_size=64):
        # Weights
        self.Whh = randn(hidden_size, hidden_size) / 1000
        self.Wxh = randn(hidden_size, input_size) / 1000
        self.Why = randn(output_size, hidden_size) / 1000
    
        # Biases
        self.bh = np.zeros((hidden_size, 1))
        self.by = np.zeros((output_size, 1))

        # Neuronal time constant
        self.ntc = 0.1
    
    def forward(self, inputs, neuromod, delta_ts):
        '''
        Perform a forward pass of the RNN using the given inputs.
        Returns the final output and hidden state.
        - inputs is an array of one-hot vectors with shape (input_size, 1).
        '''
        inputs = np.reshape(inputs,(inputs.shape[0],inputs.shape[1],1))
        h = np.zeros((self.Whh.shape[0], 1))
        self.last_inputs = inputs
        self.last_hs = { 0: h }
        # Perform each step of the RNN
        for i, x in enumerate(inputs):
            # Added leak and neuromodulator parameterised activation function
            h = h + self.ntc * (-h + neuro_activation(self.Wxh @ x + self.Whh @ h + self.bh,neuromod,np.tanh))
            #h = np.tanh(self.Wxh @ x + self.Whh @ h + self.bh)
            self.last_hs[i + 1] = h
        
        # Compute the output
        y = self.Why @ h + self.by
        return y, h

    def backprop(self, d_y, learn_rate=2e-2):
        '''
        Perform a backward pass of the RNN.
        - d_y (dL/dy) has shape (output_size, 1).
        - learn_rate is a float.
        '''
        n = len(self.last_inputs)
    
        # Calculate dL/dWhy and dL/dby.
        d_Why = d_y @ self.last_hs[n].T
        d_by = d_y
    
        # Initialize dL/dWhh, dL/dWxh, and dL/dbh to zero.
        d_Whh = np.zeros(self.Whh.shape)
        d_Wxh = np.zeros(self.Wxh.shape)
        d_bh = np.zeros(self.bh.shape)
    
        # Calculate dL/dh for the last h.
        d_h = self.Why.T @ d_y
    
        # Backpropagate through time.
        for t in reversed(range(n)):
            # An intermediate value: dL/dh * (1 - h^2)
            temp = ((1 - self.last_hs[t + 1] ** 2) * d_h)
            
            # dL/db = dL/dh * (1 - h^2)
            d_bh += temp
            
            # dL/dWhh = dL/dh * (1 - h^2) * h_{t-1}
            d_Whh += temp @ self.last_hs[t].T
            
            # dL/dWxh = dL/dh * (1 - h^2) * x
            d_Wxh += temp @ self.last_inputs[t].T
            
            # Next dL/dh = dL/dh * (1 - h^2) * Whh
            d_h = self.Whh @ temp
    
        # Clip to prevent exploding gradients.
        for d in [d_Wxh, d_Whh, d_Why, d_bh, d_by]:
            np.clip(d, -1, 1, out=d)
    
        # Update weights and biases using gradient descent.
        self.Whh -= learn_rate * d_Whh
        self.Wxh -= learn_rate * d_Wxh
        self.Why -= learn_rate * d_Why
        self.bh -= learn_rate * d_bh
        self.by -= learn_rate * d_by

def neuro_activation(x, n, func):
    return func(x * n)

In [154]:
def processData(input_data, targets, backprop=True):
    '''
    Returns the RNN's loss and accuracy for the given data.
    - data is a dictionary mapping text to True or False.
    - backprop determines if the backward phase should be run.
    '''
    
    loss = 0
    num_correct = 0
    
    for i in range(len(input_data)):
        inputs = input_data[i]
        target = targets[i]
    
        # Forward
        out, _ = rnn.forward(inputs,np.ones((50,1),dtype=float),np.ones(len(input_data[i]),dtype=float))
        probs = softmax(out)
        
        # Calculate loss / accuracy
        loss -= np.log(probs[target])
        num_correct += int(np.argmax(probs) == target)
        
        if backprop:
            # Build dL/dy
            d_L_d_y = probs
            d_L_d_y[target] -= 1
            
            # Backward
            rnn.backprop(d_L_d_y)
    
    return loss / len(input_data), num_correct / len(input_data)

In [155]:
rnn = LeakyRNN(1,2,hidden_size=50)

def softmax(xs):
  # Applies the Softmax Function to the input array.
    return np.exp(xs) / sum(np.exp(xs))

for epoch in range(1000):
  train_loss, train_acc = processData(input_signals,targets,backprop=True)

  if epoch % 100 == 99:
      print('--- Epoch %d' % (epoch + 1))
      print('Train:\tLoss %.3f | Accuracy: %.3f' % (train_loss, train_acc))

    # test_loss, test_acc = processData(test_data, backprop=False)
    # print('Test:\tLoss %.3f | Accuracy: %.3f' % (test_loss, test_acc))

--- Epoch 100
Train:	Loss 0.688 | Accuracy: 0.560
--- Epoch 200
Train:	Loss 0.624 | Accuracy: 0.660
--- Epoch 300
Train:	Loss 0.647 | Accuracy: 0.640
--- Epoch 400
Train:	Loss 0.646 | Accuracy: 0.640
--- Epoch 500
Train:	Loss 0.647 | Accuracy: 0.640
--- Epoch 600
Train:	Loss 0.646 | Accuracy: 0.640
--- Epoch 700
Train:	Loss 0.638 | Accuracy: 0.640
--- Epoch 800
Train:	Loss 0.492 | Accuracy: 0.780
--- Epoch 900
Train:	Loss 0.092 | Accuracy: 0.980
--- Epoch 1000
Train:	Loss 0.023 | Accuracy: 1.000
