## 1. Import libraries

In [1]:
import numpy as np
import pandas as pd
import math

## 2a. Create the RNN Basic Unit

Here's the basic idea of the cell. The RNN is basically a repetition of the cell built over the time steps (e.g. `10` time steps).

<img src="https://datascience-enthusiast.com/figures/rnn_step_forward.png" width="500" height="auto" />
<img src="https://datascience-enthusiast.com/figures/rnn.png" width="800" height="auto" />

(Source: [Fisseha Berhane, n.d.](https://datascience-enthusiast.com/DL/Building_a_Recurrent_Neural_Network-Step_by_Step_v1.html))

Basic idea of backward,
<img src="https://datascience-enthusiast.com/figures/rnn_cell_backprop.png" width="700" height="auto" />

(Source: [Fisseha Berhane, n.d.](https://datascience-enthusiast.com/DL/Building_a_Recurrent_Neural_Network-Step_by_Step_v1.html))

Credits: [Fisseha Berhane](https://datascience-enthusiast.com/DL/Building_a_Recurrent_Neural_Network-Step_by_Step_v1.html) _(course)_, and [@brunoklein99](https://github.com/brunoklein99/deep-learning-notes/blob/master/rnn_utils.py) _(for some code)_.

In [45]:
def softmax(z):
    e_z = np.exp(z - np.max(z))
    return e_z / e_z.sum(axis=0)


class BasicUnit:
    
    def __init__(self, x_size=3, y_size=2, a_size=5):
        # Weight for multiplying current input
        self.ax_weights = np.random.randn(a_size, x_size)
        # Weight for multiplying past input (hidden-state)
        self.aa_weights = np.random.randn(a_size, a_size)
        # Weight for relating the hidden-state to output
        self.ya_weights = np.random.randn(y_size, a_size)
        # Bias for activation function
        self.a_bias = np.random.randn(a_size, 1)
        # Bias for relating the hidden-state to output
        self.y_bias = np.random.randn(y_size, 1)

    def cell_forward(self, x_values, a_prev):
        a_next = np.tanh(np.dot(self.aa_weights, a_prev) + np.dot(self.ax_weights, x_values) + self.a_bias)
        y_pred = softmax(np.dot(self.ya_weights, a_next) + self.y_bias)

        return dict(a_prev=a_prev, a_next=a_next, y_pred=y_pred)

    def cell_backward(self, x_values, hs, a_next_d):
        # Gradient of tanh respect to a_next derivative
        tanh_d = (1 - hs['a_next'] ** 2) * a_next_d

        # Gradient of loss respect to W_ax
        x_values_d = np.dot(self.ax_weights.T, tanh_d)
        ax_weights_d = np.dot(tanh_d, x_values.T)

        # Gradient with respect to W_aa
        a_prev_d = np.dot(self.aa_weights.T, tanh_d)
        aa_weights_d = np.dot(tanh_d, hs['a_prev'].T)

        # Gradient with respect to b_a
        a_bias_d = np.sum(tanh_d, 1, keepdims=True)

        return dict(x_values_d=x_values_d, 
                    a_prev_d=a_prev_d, 
                    ax_weights_d=ax_weights_d, 
                    aa_weights_d=aa_weights_d, 
                    a_bias_d=a_bias_d)

    def forward_pass(self, x_time_values, a_init=None):
        y_size, a_size = self.ya_weights.shape
        _, input_size, no_of_time_steps = x_time_values.shape

        a = np.zeros((a_size, input_size, no_of_time_steps))
        y_pred = np.zeros((y_size, input_size, no_of_time_steps))
        hss = []

        a_next = a_init if a_init is not None else np.zeros((a_size, input_size))

        for t in range(no_of_time_steps):
            cell = self.cell_forward(x_time_values[:,:,t], a_next)
            a_next = cell['a_next']
            a[:,:,t] = cell['a_next']
            y_pred[:,:,t] = cell['y_pred']
            hss.append(cell)

        return dict(a=a, y_pred=y_pred, hss=hss)

    def backward_pass(self, x_time_values, hss, a_init_d=None):
        _, input_size, no_of_time_steps = x_time_values.shape
        a_size = self.ya_weights.shape[1]

        x_time_values_d = np.zeros(x_time_values.shape)
        ax_weights_d_cum = np.zeros(self.ax_weights.shape)
        aa_weights_d_cum = np.zeros(self.aa_weights.shape)
        a_bias_d_cum = np.zeros(self.a_bias.shape)

        a_d = a_init_d if a_init_d is not None else np.zeros((a_size, input_size))
        a_prev_d = np.zeros((a_size, input_size))

        for t in reversed(range(no_of_time_steps)):
            cell_gradient = self.cell_backward(x_time_values[:,:,t], hss[t], a_prev_d + a_d[:,:, t])

            x_time_values_d[:,:,t] = cell_gradient['x_values_d']
            a_prev_d = cell_gradient['a_prev_d']

            ax_weights_d_cum += cell_gradient['ax_weights_d']
            aa_weights_d_cum += cell_gradient['aa_weights_d']
            a_bias_d_cum += cell_gradient['a_bias_d']

        return dict(x_values_d=x_time_values_d, 
                    a_prev_d=a_prev_d,
                    ax_weights_d=ax_weights_d_cum, 
                    aa_weights_d=aa_weights_d_cum, 
                    a_bias_d=a_bias_d_cum)


## 2b. Preview pass-through

In [46]:
np.random.seed(1)

no_of_time_steps = 5
x_train = np.random.randn(3, 10, no_of_time_steps)

unit = BasicUnit(x_size=x_train.shape[0], y_size=2, a_size=5)

print('=== Forward pass (peak @ [0][0]) ===')

fp_result = unit.forward_pass(x_train)

for t in range(x_train.shape[2]):
    print(f"""Value at time {t+1}:
    a(t-1) = {fp_result['hss'][t]['a_prev'][0][0]}, 
    a(t) = {fp_result['a'][:,:,t][0][0]}, 
    y_pred = {fp_result['y_pred'][:,:,t][0][0]}
""")
    
print(f"""Values for weights and biases:
    W_ax = {unit.ax_weights[0][0]},
    W_aa = {unit.aa_weights[0][0]},
    W_ya = {unit.ya_weights[0][0]},
    b_a = {unit.a_bias[0][0]},
    b_y = {unit.y_bias[0][0]}
""")

print('=== Backward pass (peak @ [0][0]) ===')

bp = unit.backward_pass(x_train, fp_result['hss'], a_init_d=np.random.randn(*fp_result['a'].shape))

for t in reversed(range(x_train.shape[2])):
    print(f"""Derivative at time {t+1}:
    x(t) = {bp['x_values_d'][:,:,t][0][0]}
""")

print(f"""Derivatives for weights and biases:
    W_ax = {bp['ax_weights_d'][0][0]},
    W_aa = {bp['aa_weights_d'][0][0]},
    b_a = {bp['a_bias_d'][0][0]}""")

=== Forward pass (peak @ [0][0]) ===
Value at time 1:
    a(t-1) = 0.0, 
    a(t) = -0.9704501385758645, 
    y_pred = 0.026321497913298138

Value at time 2:
    a(t-1) = -0.9704501385758645, 
    a(t) = 0.9999597278867156, 
    y_pred = 0.9882791423021918

Value at time 3:
    a(t-1) = 0.9999597278867156, 
    a(t) = 0.998689763217528, 
    y_pred = 0.8845671129063392

Value at time 4:
    a(t-1) = 0.998689763217528, 
    a(t) = 0.9998818752509105, 
    y_pred = 0.9865400974634416

Value at time 5:
    a(t-1) = 0.9998818752509105, 
    a(t) = 0.9185594012804403, 
    y_pred = 0.8291876750233819

Values for weights and biases:
    W_ax = -0.31011677351806,
    W_aa = -0.17470315974250095,
    W_ya = 1.1603385699937696,
    b_a = -0.40087819178892664,
    b_y = -1.7606885603987834

=== Backward pass (peak @ [0][0]) ===
Derivative at time 5:
    x(t) = -0.1528142638110492

Derivative at time 4:
    x(t) = -0.05833336493553153

Derivative at time 3:
    x(t) = 0.7024169887436428

Derivati

## 3a. Create the LSTM Unit

Here's the basic idea of the cell. For illustration sake, we assume our task as reading words in a piece of text.

- **Forget gate**: Keep track of grammatical structures (e.g. if subject is singular or plural). Gets rid of previously stored memory when subject has changed. Values: `0` (forget) - `1` (keep).

- **Update gate**: Find a way to update to reflect that new subject is plural. Values: `0` - `1`.

- **Output gate**: Decide which outputs to use.

<img src="https://datascience-enthusiast.com/figures/LSTM.png" width="800" height="auto" />
<img src="https://datascience-enthusiast.com/figures/LSTM_rnn.png" width="1000" height="auto" />

(Source: [Fisseha Berhane, n.d.](https://datascience-enthusiast.com/DL/Building_a_Recurrent_Neural_Network-Step_by_Step_v1.html))

In [61]:
def sigmoid(z):
    return 1 / (1 + np.exp(-z))


class LSTMUnit:
    
    def __init__(self, x_size=3, y_size=2, a_size=5):
        # Weight for forget gate
        self.f_weights = np.random.randn(a_size, a_size + x_size)
        # Weight for update gate
        self.i_weights = np.random.randn(a_size, a_size + x_size)
        # Weight for first "tanh"
        self.c_weights = np.random.randn(a_size, a_size + x_size)
        # Weight for output gate
        self.o_weights = np.random.randn(a_size, a_size + x_size)
        # Weight for relating the hidden-state to output
        self.y_weights = np.random.randn(y_size, a_size)
        # Bias for forget gate
        self.f_bias = np.random.randn(a_size, 1)
        # Bias for update gate
        self.i_bias = np.random.randn(a_size, 1)
        # Bias for first "tanh"
        self.c_bias = np.random.randn(a_size, 1)
        # Bias for output gate
        self.o_bias = np.random.randn(a_size, 1)
        # Bias for relating the hidden-state to output
        self.y_bias = np.random.randn(y_size, 1)

    def cell_forward(self, x_values, a_prev, c_prev):
        x_size, input_size = x_values.shape
        y_size, a_size = self.y_weights.shape
        
        # Concatenate a_prev with x_values
        concat = np.zeros((a_size + x_size, input_size))
        concat[:a_size,:] = a_prev
        concat[a_size:,:] = x_values
        
        forget_v = sigmoid(np.dot(self.f_weights, concat) + self.f_bias)
        update_v = sigmoid(np.dot(self.i_weights, concat) + self.i_bias)
        tanh_v = np.tanh(np.dot(self.c_weights, concat) + self.c_bias)
        output_v = sigmoid(np.dot(self.o_weights, concat) + self.o_bias)

        c_next = forget_v * c_prev + update_v * tanh_v
        a_next = output_v * np.tanh(c_next)

        y_pred = softmax(np.dot(self.y_weights, a_next) + self.y_bias)

        return dict(a_prev=a_prev, 
                    a_next=a_next, 
                    c_prev=c_prev, 
                    c_next=c_next,
                    forget=forget_v,
                    update=update_v,
                    tanh=tanh_v,
                    output=output_v,
                    y_pred=y_pred)

    def cell_backward(self, x_values, hs, a_next_d, c_next_d):
        a_size = hs['a_next'].shape[0]

        output_v_d = a_next_d * np.tanh(hs['c_next']) * \
                     hs['output'] * (1 - hs['output'])
        
        tanh_v_d = (c_next_d * hs['update'] + hs['output'] * (1 - np.square(np.tanh(hs['c_next']))) * \
                     hs['update'] * a_next_d) * \
                   (1 - np.square(hs['tanh']))
        
        update_v_d = (c_next_d * hs['tanh'] + hs['output'] * (1 - np.square(np.tanh(hs['c_next']))) * \
                       hs['tanh'] * a_next_d) * \
                     hs['update'] * (1 - hs['update'])
        
        forget_v_d = (c_next_d * hs['c_prev'] + hs['output'] * (1 - np.square(np.tanh(hs['c_next']))) * \
                       hs['c_prev'] * a_next_d) * \
                     hs['forget'] * (1 - hs['forget'])

        concat = np.concatenate((hs['a_prev'], x_values), axis=0)

        f_weights_d = np.dot(forget_v_d, concat.T)
        i_weights_d = np.dot(update_v_d, concat.T)
        c_weights_d = np.dot(tanh_v_d, concat.T)
        o_weights_d = np.dot(output_v_d, concat.T)

        f_bias_d = np.sum(forget_v_d, axis=1 ,keepdims=True)
        i_bias_d = np.sum(update_v_d, axis=1, keepdims=True)
        c_bias_d = np.sum(tanh_v_d, axis=1,  keepdims=True)
        o_bias_d = np.sum(output_v_d, axis=1, keepdims=True)

        a_prev_d = np.dot(self.f_weights[:, :a_size].T, forget_v_d) + \
                   np.dot(self.i_weights[:, :a_size].T, update_v_d) + \
                   np.dot(self.c_weights[:, :a_size].T, tanh_v_d) + \
                   np.dot(self.o_weights[:, :a_size].T, output_v_d)

        c_prev_d = c_next_d * hs['forget'] + hs['output'] * \
                   (1 - np.square(np.tanh(hs['c_next']))) * hs['forget'] * a_next_d

        x_values_d = np.dot(self.f_weights[:, a_size:].T, forget_v_d) + \
                     np.dot(self.i_weights[:, a_size:].T, update_v_d) + \
                     np.dot(self.c_weights[:, a_size:].T, tanh_v_d) + \
                     np.dot(self.o_weights[:, a_size:].T, output_v_d)

        return dict(output_d=output_v_d,
                    tanh_d=tanh_v_d,
                    update_d=update_v_d,
                    forget_d=forget_v_d,
                    f_weights_d=f_weights_d,
                    i_weights_d=i_weights_d,
                    c_weights_d=c_weights_d,
                    o_weights_d=o_weights_d,
                    f_bias_d=f_bias_d,
                    i_bias_d=i_bias_d,
                    c_bias_d=c_bias_d,
                    o_bias_d=o_bias_d,
                    a_prev_d=a_prev_d,
                    c_prev_d=c_prev_d,
                    x_values_d=x_values_d)

    def forward_pass(self, x_time_values, a_init=None):
        y_size, a_size = self.y_weights.shape
        _, input_size, no_of_time_steps = x_time_values.shape

        a = np.zeros((a_size, input_size, no_of_time_steps))
        c = a
        y_pred = np.zeros((y_size, input_size, no_of_time_steps))
        hss = []
    
        a_next = a_init if a_init is not None else np.zeros((a_size, input_size))
        c_next = np.zeros(a_next.shape)

        for t in range(no_of_time_steps):
            uc = self.cell_forward(x_time_values[:,:,t], a_next, c_next)
            a_next = uc['a_next']
            c_next = uc['c_next']
            a[:,:,t] = uc['a_next']
            c[:,:,t] = uc['c_next']
            y_pred[:,:,t] = uc['y_pred']
            hss.append(uc)

        return dict(a=a, c=c, y_pred=y_pred, hss=hss)

    def backward_pass(self, x_time_values, hss, a_init_d=None, c_init_d=None):
        _, input_size, no_of_time_steps = x_time_values.shape
        a_size = self.y_weights.shape[1]

        x_time_values_d = np.zeros(x_time_values.shape)

        f_weights_d_cum = np.zeros(self.f_weights.shape)
        i_weights_d_cum = np.zeros(self.i_weights.shape)
        c_weights_d_cum = np.zeros(self.c_weights.shape)
        o_weights_d_cum = np.zeros(self.o_weights.shape)

        f_bias_d_cum = np.zeros(self.f_bias.shape)
        i_bias_d_cum = np.zeros(self.i_bias.shape)
        c_bias_d_cum = np.zeros(self.c_bias.shape)
        o_bias_d_cum = np.zeros(self.o_bias.shape)

        a_d = a_init_d if a_init_d is not None else np.zeros((a_size, input_size))
        c_d = c_init_d if c_init_d is not None else np.zeros(a_d.shape)

        a_prev_d = np.zeros((a_size, input_size))
        c_prev_d = np.zeros(a_prev_d.shape)

        for t in reversed(range(no_of_time_steps)):
            cell_gradient = self.cell_backward(x_time_values[:,:,t], hss[t], a_prev_d + a_d[:,:,t], c_prev_d + c_d[:,:,t])
            
            x_time_values_d[:,:,t] = cell_gradient['x_values_d']
            f_weights_d_cum += cell_gradient['f_weights_d']
            i_weights_d_cum += cell_gradient['i_weights_d']
            c_weights_d_cum += cell_gradient['c_weights_d']
            o_weights_d_cum += cell_gradient['o_weights_d']
            
            
            f_bias_d_cum += cell_gradient['f_bias_d']
            i_bias_d_cum += cell_gradient['i_bias_d']
            c_bias_d_cum += cell_gradient['c_bias_d']
            o_bias_d_cum += cell_gradient['o_bias_d']

            a_prev_d = cell_gradient['a_prev_d']
            c_prev_d = cell_gradient['c_prev_d']
        
        return dict(f_weights_d=f_weights_d_cum,
                    i_weights_d=i_weights_d_cum,
                    c_weights_d=c_weights_d_cum,
                    o_weights_d=o_weights_d_cum,
                    f_bias_d=f_bias_d_cum,
                    i_bias_d=i_bias_d_cum,
                    c_bias_d=c_bias_d_cum,
                    o_bias_d=o_bias_d_cum,
                    a_prev_d=a_prev_d,
                    c_prev_d=c_prev_d,
                    x_values_d=x_time_values_d)


## 3b. Preview pass-through

In [62]:
np.random.seed(1)

no_of_time_steps = 5
x_train = np.random.randn(3, 10, no_of_time_steps)

unit = LSTMUnit(x_size=x_train.shape[0], y_size=2, a_size=5)

print('=== Forward pass (peak @ [0][0]) ===')

fp_result = unit.forward_pass(x_train)

for t in range(x_train.shape[2]):
    print(f"""Value at time {t+1}:
    a(t-1) = {fp_result['hss'][t]['a_prev'][0][0]}, 
    a(t) = {fp_result['a'][:,:,t][0][0]}, 
    c(t-1) = {fp_result['hss'][t]['c_prev'][0][0]}, 
    c(t) = {fp_result['c'][:,:,t][0][0]},
    y_pred = {fp_result['y_pred'][:,:,t][0][0]},

    forget gate={fp_result['hss'][t]['forget'][0][0]},
    update gate={fp_result['hss'][t]['update'][0][0]},
    tanh={fp_result['hss'][t]['tanh'][0][0]},
    output gate={fp_result['hss'][t]['output'][0][0]}
""")
    
print(f"""Values for weights and biases:
    W_f = {unit.f_weights[0][0]}
    W_o = {unit.o_weights[0][0]}
    W_i = {unit.i_weights[0][0]}
    W_c = {unit.c_weights[0][0]}
    W_y = {unit.y_weights[0][0]}
    b_f = {unit.f_bias[0][0]}
    b_o = {unit.o_bias[0][0]}
    b_i = {unit.i_bias[0][0]}
    b_c = {unit.c_bias[0][0]}
    b_y = {unit.y_bias[0][0]}
""")


print('=== Backward pass (peak @ [0][0]) ===')

bp = unit.backward_pass(x_train, fp_result['hss'], a_init_d=np.random.randn(*fp_result['a'].shape), 
                                                   c_init_d=np.random.randn(*fp_result['c'].shape))

for t in reversed(range(x_train.shape[2])):
    print(f"""Derivative at time {t+1}:
    x(t) = {bp['x_values_d'][:,:,t][0][0]}
""")

print(f"""Derivatives for weights and biases:
    W_f = {bp['f_weights_d'][0][0]},
    W_c = {bp['c_weights_d'][0][0]},
    W_i = {bp['i_weights_d'][0][0]},
    W_o = {bp['o_weights_d'][0][0]},
    b_f = {bp['f_bias_d'][0][0]},
    b_c = {bp['c_bias_d'][0][0]},
    b_i = {bp['o_bias_d'][0][0]},
    b_o = {bp['o_bias_d'][0][0]}""")

=== Forward pass (peak @ [0][0]) ===
Value at time 1:
    a(t-1) = 0.0, 
    a(t) = -0.14353153875994148, 
    c(t-1) = 0.0, 
    c(t) = -0.14353153875994148,
    y_pred = 0.17163846940788854,

    forget gate=0.42888833696800316,
    update gate=0.15518207490544242,
    tanh=-0.9249234413665368,
    output gate=0.04692662136616572

Value at time 2:
    a(t-1) = -0.006689575154730827, 
    a(t) = 0.354065631568152, 
    c(t-1) = -0.14353153875994148, 
    c(t) = 0.354065631568152,
    y_pred = 0.19492958598119411,

    forget gate=0.5484609360623877,
    update gate=0.8762617752533526,
    tanh=0.4939015781508593,
    output gate=0.7998679960399565

Value at time 3:
    a(t-1) = 0.27193609117543166, 
    a(t) = 1.0045654211001764, 
    c(t-1) = 0.354065631568152, 
    c(t) = 1.0045654211001764,
    y_pred = 0.21383870432464613,

    forget gate=0.5077195324369859,
    update gate=0.8434860404996112,
    tanh=0.9778459210775385,
    output gate=0.8975712697590178

Value at time 4:
    a