In [7]:
import numpy as np
import pandas as pd
import math

In [2]:
# @title Extra Functions
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def tanh(x):
    return np.tanh(x)


def sigmoid_derivative(x):
    return x * (1 - x)

def tanh_derivative(x):
    return 1 - x ** 2


def softmax(x):
    e_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
    return e_x / np.sum(e_x, axis=-1, keepdims=True)

In [5]:
class RecurrentNeuralNetwork:
    def __init__ (self, input, output, recurrences, expected_output, learning_rate):
        #initial input 
        self.x = np.zeros(input)
        #input size 
        self.input = input
        #expected output 
        self.y = np.zeros(output)
        #output size
        self.output = output
        #weight matrix 
        self.w = np.random.random((output, output))
        #matrix used in RMSprop in order to decay the learning rate
        self.G = np.zeros_like(self.w)
        #length of the recurrent network
        self.recurrences = recurrences
        #learning rate 
        self.learning_rate = learning_rate
        #array for storing inputs
        self.ia = np.zeros((recurrences+1,input))
        #array for storing cell states
        self.ca = np.zeros((recurrences+1,output))
        #array for storing outputs
        self.oa = np.zeros((recurrences+1,output))
        #array for storing hidden states
        self.ha = np.zeros((recurrences+1,output))
        #forget gate 
        self.af = np.zeros((recurrences+1,output))
        #input gate
        self.ai = np.zeros((recurrences+1,output))
        #cell state
        self.ac = np.zeros((recurrences+1,output))
        #output gate
        self.ao = np.zeros((recurrences+1,output))
        #array of expected output values
        self.expected_output = np.vstack((np.zeros(expected_output.shape[0]), expected_output.T))
        #declare LSTM cell 
        self.LSTM = LSTM(input, output, recurrences, learning_rate)
    
    #sigmoid activation function
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    #derivative of sigmoid 
    def dsigmoid(self, x):
        return self.sigmoid(x) * (1 - self.sigmoid(x))  
    
    #Forward Propagation
    def forwardProp(self):
        for i in range(1, self.recurrences+1):
            self.LSTM.x = np.hstack((self.ha[i-1], self.x))
            cs, hs, f, c, o = self.LSTM.forwardProp()
            #store cell state from the forward propagation
            self.ca[i] = cs #cell state
            self.ha[i] = hs #hidden state
            self.af[i] = f #forget state
            self.ai[i] = inp #inpute gate
            self.ac[i] = c #cell state
            self.ao[i] = o #output gate
            self.oa[i] = self.sigmoid(np.dot(self.w, hs)) #activate the weight*input
            self.x = self.expected_output[i-1]
        return self.oa

    # Back propagation
def backProp(self):
        totalError = 0
        #cell state
        dfcs = np.zeros(self.output)
        #hidden state,
        dfhs = np.zeros(self.output)
        #weight matrix
        tu = np.zeros((self.output,self.output))
        #forget gate
        tfu = np.zeros((self.output, self.input+self.output))
        #input gate
        tiu = np.zeros((self.output, self.input+self.output))
        #cell unit
        tcu = np.zeros((self.output, self.input+self.output))
        #output gate
        tou = np.zeros((self.output, self.input+self.output))
        for i in range(self.recurrences, -1, -1):
            error = self.oa[i] - self.expected_output[i]
            tu += np.dot(np.atleast_2d(error * self.dsigmoid(self.oa[i])), np.atleast_2d(self.ha[i]).T)
            error = np.dot(error, self.w)
            self.LSTM.x = np.hstack((self.ha[i-1], self.ia[i]))
            self.LSTM.cs = self.ca[i]
            fu, iu, cu, ou, dfcs, dfhs = self.LSTM.backProp(error, self.ca[i-1], self.af[i], self.ai[i], self.ac[i], self.ao[i], dfcs, dfhs)
            totalError += np.sum(error)
            #forget gate
            tfu += fu
            #input gate
            tiu += iu
            #cell state
            tcu += cu
            #output gate
            tou += ou   
        self.LSTM.update(tfu/self.recurrences, tiu/self.recurrences, tcu/self.recurrences, tou/self.recurrences)  
        self.update(tu/self.recurrences)
        return totalError
    
    def update(self, u):
        self.G = 0.95 * self.G + 0.1 * u**2  
        self.w -= self.learning_rate/np.sqrt(self.G + 1e-8) * u
        return
    
    def sample(self):
        for i in range(1, self.recurrences+1):
            self.LSTM.x = np.hstack((self.ha[i-1], self.x))
            cs, hs, f, inp, c, o = self.LSTM.forwardProp()
            maxI = np.argmax(self.x)
            self.x = np.zeros_like(self.x)
            self.x[maxI] = 1
            self.ia[i] = self.x 
            #store cell states
            self.ca[i] = cs
            #store hidden state
            self.ha[i] = hs
            #forget gate
            self.af[i] = f
            #input gate
            self.ai[i] = inp
            #cell state
            self.ac[i] = c
            #output gate
            self.ao[i] = o
            self.oa[i] = self.sigmoid(np.dot(self.w, hs))
            maxI = np.argmax(self.oa[i])
            newX = np.zeros_like(self.x)
            newX[maxI] = 1
            self.x = newX
        return self.oa

In [8]:
# @title LSTM
class LSTMCell:
    def __init__(self, input_size, hidden_size):
        self.hidden_size = hidden_size
        self.input_size = input_size

        # Initialize weights for the input gate
        self.W_i = np.random.randn(input_size + hidden_size, hidden_size) * 0.1
        self.b_i = np.zeros((1, hidden_size))

        # Initialize weights for the forget gate
        self.W_f = np.random.randn(input_size + hidden_size, hidden_size) * 0.1
        self.b_f = np.zeros((1, hidden_size))

        # Initialize weights for the output gate
        self.W_o = np.random.randn(input_size + hidden_size, hidden_size) * 0.1
        self.b_o = np.zeros((1, hidden_size))

        # Initialize weights for the cell state
        self.W_c = np.random.randn(input_size + hidden_size, hidden_size) * 0.1
        self.b_c = np.zeros((1, hidden_size))

        # Initialize the internal states to zero
        self.h_prev = np.zeros((1, hidden_size))
        self.C_prev = np.zeros((1, hidden_size))

        self.current_weights = [self.W_i, self.b_i, 
                                self.W_f, self.b_f,
                                self.W_o, self.b_o,
                                self.W_c, self.b_c]

    def concatenate_input_hidden(self, input_t):
        # Ensure input_t is a 2D array with shape (1, input_size)
        input_t = input_t.reshape(1, -1)  # Reshaping to make it 2D
        # Concatenate the current input with the previous hidden state
        combined = np.hstack((input_t, self.h_prev))
        return combined

    def forward_step(self, input_t):
        combined = self.concatenate_input_hidden(input_t)
        # Compute gate activations
        self.i_t = sigmoid(np.dot(combined, self.W_i) + self.b_i)
        self.f_t = sigmoid(np.dot(combined, self.W_f) + self.b_f)
        self.o_t = sigmoid(np.dot(combined, self.W_o) + self.b_o)
        self.g_t = tanh(np.dot(combined, self.W_c) + self.b_c)
        # Update cell state
        self.C_t = self.f_t * self.C_prev + self.i_t * self.g_t
        # Compute hidden state
        C_tanh = tanh(self.C_t)
        self.h_t = self.o_t * C_tanh
        # Store intermediates for backward pass
        self.cache = (combined, self.i_t, self.f_t, self.o_t, self.g_t, self.C_prev, C_tanh)
        # Update previous states
        self.h_prev = self.h_t
        self.C_prev = self.C_t
        return self.h_t, self.C_t

    def forward_sequence(self, input_sequence):
        # Assuming input_sequence is shaped (time_steps, input_size)
        # Initialize outputs
        h_sequence = np.zeros((input_sequence.shape[0], self.hidden_size))
        C_sequence = np.zeros((input_sequence.shape[0], self.hidden_size))

        for t in range(input_sequence.shape[0]):
            h_t, C_t = self.forward_step(input_sequence[t])
            h_sequence[t] = h_t
            C_sequence[t] = C_t

        return h_sequence, C_sequence

    def compute_loss_and_gradient(self, y_true, y_pred, loss_type='MSE', gradient=False):
        """ 
        Compute the loss and its gradient.

        Parameters:
        - y_true: np.array, true labels or target values
        - y_pred: np.array, predicted labels or values from the last layer of the network
        - loss_type: str, type of the loss function ('MSE' for mean squared error or 'CE' for cross-entropy)

        Returns:
        - loss: float, the computed loss value
        - grad_loss: np.array, gradient of the loss with respect to y_pred
        """
        if loss_type == 'MSE':
            # Compute MSE loss
            loss = np.mean((y_true - y_pred) ** 2)
            # Compute gradient w.r.t. y_pred for MSE
            if gradient:
                grad_loss = -2 * (y_true - y_pred) / y_true.size
        elif loss_type == 'CE':
            # Compute Cross-Entropy loss
            # Assuming y_true is one-hot encoded for classification tasks
            loss = -np.sum(y_true * np.log(y_pred + 1e-9)) / y_true.shape[0]
            # Compute gradient w.r.t. y_pred for CE
            if gradient:
                grad_loss = -(y_true / (y_pred + 1e-9)) / y_true.shape[0]
        else:
            raise ValueError("Invalid loss type specified. Choose 'MSE' or 'CE'.")
        if gradient:
            return loss, grad_loss
        else:
            return loss

    def bakward_stochastic(self, input_t, y_true, learning_rate=0.01, h_range=(1e-5, 1e-2), loss_type='MSE'):
        # Input Gate Loss
        y_pred, _ = self.forward_step(input_t)
        h_W = np.random.uniform(h_range[0], h_range[1], self.W_i.shape)
        h_b = np.random.uniform(h_range[0], h_range[1], self.b_i.shape)
        
        original_loss = self.compute_loss_and_gradient(y_true, y_pred, loss_type)

        original_W = self.W_i.copy()
        original_b = self.b_i.copy()
        self.W_i = self.W_i + h_W
        d_loss_W = self.compute_loss_and_gradient(y_true, y_pred, loss_type)
        self.W_i = original_W

        self.b_i = self.b_i + h_b
        d_loss_b = self.compute_loss_and_gradient(y_true, y_pred, loss_type)
        self.b_i = original_b

        dW_i = (d_loss_W - original_loss) / h_W
        db_i = (d_loss_b - original_loss) / h_b
        self.W_i -= learning_rate * dW_i
        self.b_i -= learning_rate * db_i

        # Forget Gate Loss
        h_W = np.random.uniform(h_range[0], h_range[1], self.W_f.shape)
        h_b = np.random.uniform(h_range[0], h_range[1], self.b_f.shape)
        
        original_W = self.W_f.copy()
        original_b = self.b_f.copy()
        self.W_f = self.W_f + h_W
        d_loss_W = self.compute_loss_and_gradient(y_true, y_pred, loss_type)
        self.W_f = original_W

        self.b_f = self.b_f + h_b
        d_loss_b = self.compute_loss_and_gradient(y_true, y_pred, loss_type)
        self.b_f = original_b

        dW_f = (d_loss_W - original_loss) / h_W
        db_f = (d_loss_b - original_loss) / h_b
        self.W_f -= learning_rate * dW_f
        self.b_f -= learning_rate * db_f

        # Output Gate Loss
        h_W = np.random.uniform(h_range[0], h_range[1], self.W_o.shape)
        h_b = np.random.uniform(h_range[0], h_range[1], self.b_o.shape)
        
        original_W = self.W_o.copy()
        original_b = self.b_o.copy()
        self.W_o = self.W_o + h_W
        d_loss_W = self.compute_loss_and_gradient(y_true, y_pred, loss_type)
        self.W_o = original_W

        self.b_o = self.b_o + h_b
        d_loss_b = self.compute_loss_and_gradient(y_true, y_pred, loss_type)
        self.b_o = original_b

        dW_o = (d_loss_W - original_loss) / h_W
        db_o = (d_loss_b - original_loss) / h_b
        self.W_o -= learning_rate * dW_o
        self.b_o -= learning_rate * db_o

        # Cell Gate Loss
        h_W = np.random.uniform(h_range[0], h_range[1], self.W_c.shape)
        h_b = np.random.uniform(h_range[0], h_range[1], self.b_c.shape)
        
        original_loss = self.compute_loss_and_gradient(y_true, y_pred, loss_type)

        original_W = self.W_c.copy()
        original_b = self.b_c.copy()
        self.W_c = self.W_c + h_W
        d_loss_W = self.compute_loss_and_gradient(y_true, y_pred, loss_type)
        self.W_c = original_W

        self.b_c = self.b_c + h_b
        d_loss_b = self.compute_loss_and_gradient(y_true, y_pred, loss_type)
        self.b_c = original_b

        dW_c = (d_loss_W - original_loss) / h_W
        db_c = (d_loss_b - original_loss) / h_b
        self.W_c -= learning_rate * dW_c
        self.b_c -= learning_rate * db_c

        return dW_i, db_i, dW_f, db_f, dW_c, db_c, dW_o, db_o

    def backward_step(self, dh, dC, cache):
        combined, i, f, o, g, C_prev, C_tanh = cache
        # Derivatives of the activation functions
        do = dh * C_tanh * sigmoid_derivative(o)
        dC_tanh = dh * o * tanh_derivative(C_tanh)
        dC += dC_tanh
        di = dC * g * sigmoid_derivative(i)
        dg = dC * i * tanh_derivative(g)
        df = dC * C_prev * sigmoid_derivative(f)
        dC_prev = dC * f
        
        dcombined_i = np.dot(di, self.W_i.T)
        dcombined_f = np.dot(df, self.W_f.T)
        dcombined_o = np.dot(do, self.W_o.T)
        dcombined_g = np.dot(dg, self.W_c.T)
        dcombined = dcombined_i + dcombined_f + dcombined_o + dcombined_g
        
        # Gradients with respect to parameters
        dW_i = np.dot(combined.T, di)
        dW_f = np.dot(combined.T, df)
        dW_o = np.dot(combined.T, do)
        dW_c = np.dot(combined.T, dg)
        
        db_i = np.sum(di, axis=0, keepdims=True)
        db_f = np.sum(df, axis=0, keepdims=True)
        db_o = np.sum(do, axis=0, keepdims=True)
        db_c = np.sum(dg, axis=0, keepdims=True)
        
        # Update LSTM parameters (shown separately in the next step)
        return dW_i, dW_f, dW_o, dW_c, db_i, db_f, db_o, db_c, dcombined[:, :self.input_size], dC_prev

    def backward_sequence(self, input_sequence, dh_sequence, learning_rate=0.01):
        dC_next = np.zeros((1, self.hidden_size))
        dh_next = np.zeros((1, self.hidden_size))
        gradients = np.zeros_like(self.W_i)  # Placeholder for accumulated gradients
        
        for t in reversed(range(len(input_sequence))):
            dh = dh_sequence[t] + dh_next
            cache = self.cache[t]  # Assuming you've stored each timestep's cache during the forward pass
            grad_values = self.backward_step(dh, dC_next, cache)
            dW_i, dW_f, dW_o, dW_c, db_i, db_f, db_o, db_c, dh_next, dC_next = grad_values
            
            # Accumulate gradients from all timesteps
            gradients += np.array([dW_i, dW_f, dW_o, dW_c, db_i, db_f, db_o, db_c])
            
            # Update parameters here or after accumulating all gradients
            
        # Example parameter update after accumulating gradients
        self.W_i -= learning_rate * gradients[0]
        self.W_f -= learning_rate * gradients[1]
        self.W_o -= learning_rate * gradients[2]
        self.W_c -= learning_rate * gradients[3]
        self.b_i -= learning_rate * gradients[4]
        self.b_f -= learning_rate * gradients[5]
        self.b_o -= learning_rate * gradients[6]
        self.b_c -= learning_rate * gradients[7]

    def train_on_loss(self, input_sequence, pseudo_loss_grad, learning_rate=0.01):
        # Assuming pseudo_loss_grad is a gradient of loss w.r.t. LSTM output
        # Forward pass through the sequence
        self.forward_sequence(input_sequence)

        # Create a dummy gradient sequence based on pseudo_loss_grad
        # Assuming the loss gradient is the same for each timestep for simplification
        dh_sequence = np.array([pseudo_loss_grad] * len(input_sequence))
        dC_sequence = np.zeros_like(dh_sequence)  # Assuming no direct gradient for cell state

        # Perform backward pass and update parameters
        self.backward_sequence(dh_sequence, dC_sequence, learning_rate)


In [9]:
# Define LSTM input and model parameters
input_size = 3  # Size of input vector
hidden_size = 4  # Size of LSTM's hidden state
time_steps = 5  # Length of the sequence

# Generate a random sequence of inputs
input_sequence = np.random.rand(time_steps, input_size)

# Define a simple loss derivative as a placeholder
# In practice, this would come from the derivative of the loss function
# Assuming the loss is with respect to the hidden state of the last timestep
dh_sequence = np.zeros((time_steps, hidden_size))
dh_sequence[-1] = np.array([0.5] * hidden_size)  # Simple gradient from loss

In [10]:
# Initialize the LSTM model
lstm_model = LSTMCell(input_size, hidden_size)

# Perform the forward pass
h_sequence, _ = lstm_model.forward_sequence(input_sequence)

In [11]:
dh_sequence

array([[0. , 0. , 0. , 0. ],
       [0. , 0. , 0. , 0. ],
       [0. , 0. , 0. , 0. ],
       [0. , 0. , 0. , 0. ],
       [0.5, 0.5, 0.5, 0.5]])

In [12]:
# Placeholder for simplicity; in practice, use actual loss derivatives
dC_sequence = np.zeros_like(dh_sequence)  # Assuming no direct loss from cell states for simplicity

# Backward pass through the sequence
gradients = lstm_model.backward_sequence(input_sequence, dh_sequence)

# Normally, you'd compute the actual gradients here based on the loss
# For this example, we directly use dh_sequence as a placeholder for loss gradients

# Update LSTM parameters (assuming the method is correctly implemented within LSTMCell)
learning_rate = 0.01  # Placeholder learning rate
lstm_model.update_parameters(gradients, learning_rate=learning_rate)

ValueError: not enough values to unpack (expected 7, got 1)