<a href="https://colab.research.google.com/github/neuralsrg/SequenceModels/blob/main/manual_RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import numpy as np

def softmax(x):
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=0)


def sigmoid(x):
    return 1 / (1 + np.exp(-x))


def initialize_adam(parameters) :
    """
    Initializes v and s as two python dictionaries with:
                - keys: "dW1", "db1", ..., "dWL", "dbL" 
                - values: numpy arrays of zeros of the same shape as the corresponding gradients/parameters.
    
    Arguments:
    parameters -- python dictionary containing your parameters.
                    parameters["W" + str(l)] = Wl
                    parameters["b" + str(l)] = bl
    
    Returns: 
    v -- python dictionary that will contain the exponentially weighted average of the gradient.
                    v["dW" + str(l)] = ...
                    v["db" + str(l)] = ...
    s -- python dictionary that will contain the exponentially weighted average of the squared gradient.
                    s["dW" + str(l)] = ...
                    s["db" + str(l)] = ...
    """
    
    L = len(parameters) // 2 # number of layers in the neural networks
    v = {}
    s = {}
    
    # Initialize v, s. Input: "parameters". Outputs: "v, s".
    for l in range(L):
    ### START CODE HERE ### (approx. 4 lines)
        v["dW" + str(l+1)] = np.zeros(parameters["W" + str(l+1)].shape)
        v["db" + str(l+1)] = np.zeros(parameters["b" + str(l+1)].shape)
        s["dW" + str(l+1)] = np.zeros(parameters["W" + str(l+1)].shape)
        s["db" + str(l+1)] = np.zeros(parameters["b" + str(l+1)].shape)
    ### END CODE HERE ###
    
    return v, s


def update_parameters_with_adam(parameters, grads, v, s, t, learning_rate = 0.01,
                                beta1 = 0.9, beta2 = 0.999,  epsilon = 1e-8):
    """
    Update parameters using Adam
    
    Arguments:
    parameters -- python dictionary containing your parameters:
                    parameters['W' + str(l)] = Wl
                    parameters['b' + str(l)] = bl
    grads -- python dictionary containing your gradients for each parameters:
                    grads['dW' + str(l)] = dWl
                    grads['db' + str(l)] = dbl
    v -- Adam variable, moving average of the first gradient, python dictionary
    s -- Adam variable, moving average of the squared gradient, python dictionary
    learning_rate -- the learning rate, scalar.
    beta1 -- Exponential decay hyperparameter for the first moment estimates 
    beta2 -- Exponential decay hyperparameter for the second moment estimates 
    epsilon -- hyperparameter preventing division by zero in Adam updates
    Returns:
    parameters -- python dictionary containing your updated parameters 
    v -- Adam variable, moving average of the first gradient, python dictionary
    s -- Adam variable, moving average of the squared gradient, python dictionary
    """
    
    L = len(parameters) // 2                 # number of layers in the neural networks
    v_corrected = {}                         # Initializing first moment estimate, python dictionary
    s_corrected = {}                         # Initializing second moment estimate, python dictionary
    
    # Perform Adam update on all parameters
    for l in range(L):
        # Moving average of the gradients. Inputs: "v, grads, beta1". Output: "v".
        ### START CODE HERE ### (approx. 2 lines)
        v["dW" + str(l+1)] = beta1 * v["dW" + str(l+1)] + (1 - beta1) * grads["dW" + str(l+1)] 
        v["db" + str(l+1)] = beta1 * v["db" + str(l+1)] + (1 - beta1) * grads["db" + str(l+1)] 
        ### END CODE HERE ###

        # Compute bias-corrected first moment estimate. Inputs: "v, beta1, t". Output: "v_corrected".
        ### START CODE HERE ### (approx. 2 lines)
        v_corrected["dW" + str(l+1)] = v["dW" + str(l+1)] / (1 - beta1**t)
        v_corrected["db" + str(l+1)] = v["db" + str(l+1)] / (1 - beta1**t)
        ### END CODE HERE ###

        # Moving average of the squared gradients. Inputs: "s, grads, beta2". Output: "s".
        ### START CODE HERE ### (approx. 2 lines)
        s["dW" + str(l+1)] = beta2 * s["dW" + str(l+1)] + (1 - beta2) * (grads["dW" + str(l+1)] ** 2)
        s["db" + str(l+1)] = beta2 * s["db" + str(l+1)] + (1 - beta2) * (grads["db" + str(l+1)] ** 2)
        ### END CODE HERE ###

        # Compute bias-corrected second raw moment estimate. Inputs: "s, beta2, t". Output: "s_corrected".
        ### START CODE HERE ### (approx. 2 lines)
        s_corrected["dW" + str(l+1)] = s["dW" + str(l+1)] / (1 - beta2 ** t)
        s_corrected["db" + str(l+1)] = s["db" + str(l+1)] / (1 - beta2 ** t)
        ### END CODE HERE ###

        # Update parameters. Inputs: "parameters, learning_rate, v_corrected, s_corrected, epsilon". Output: "parameters".
        ### START CODE HERE ### (approx. 2 lines)
        parameters["W" + str(l+1)] = parameters["W" + str(l+1)] - learning_rate * v_corrected["dW" + str(l+1)] / np.sqrt(s_corrected["dW" + str(l+1)] + epsilon)
        parameters["b" + str(l+1)] = parameters["b" + str(l+1)] - learning_rate * v_corrected["db" + str(l+1)] / np.sqrt(s_corrected["db" + str(l+1)] + epsilon)
        ### END CODE HERE ###

    return parameters, v, s

In [4]:
import numpy as np

def rnn_cell_forward_tests(target):
    # Only bias in expression
    a_prev_tmp = np.zeros((5, 10))
    xt_tmp = np.zeros((3, 10))
    parameters_tmp = {}
    parameters_tmp['Waa'] = np.random.randn(5, 5)
    parameters_tmp['Wax'] = np.random.randn(5, 3)
    parameters_tmp['Wya'] = np.random.randn(2, 5)
    parameters_tmp['ba'] = np.random.randn(5, 1)
    parameters_tmp['by'] = np.random.randn(2, 1)
    parameters_tmp['Wya'] = np.zeros((2, 5))

    a_next_tmp, yt_pred_tmp, cache_tmp = target(xt_tmp, a_prev_tmp, parameters_tmp)
    
    assert a_next_tmp.shape == (5, 10), f"Wrong shape for a_next. Expected (5, 10) != {a_next_tmp.shape}"
    assert yt_pred_tmp.shape == (2, 10), f"Wrong shape for yt_pred. Expected (2, 10) != {yt_pred_tmp.shape}"
    assert cache_tmp[0].shape == (5, 10), "Wrong shape in cache->a_next"
    assert cache_tmp[1].shape == (5, 10), "Wrong shape in cache->a_prev"
    assert cache_tmp[2].shape == (3, 10), "Wrong shape in cache->x_t"
    assert len(cache_tmp[3].keys()) == 5, "Wrong number of parameters in cache. Expected 5"
    
    assert np.allclose(np.tanh(parameters_tmp['ba']), a_next_tmp), "Problem 1 in a_next expression. Related to ba?"
    assert np.allclose(softmax(parameters_tmp['by']), yt_pred_tmp), "Problem 1 in yt_pred expression. Related to by?"

    # Only xt in expression
    a_prev_tmp = np.zeros((5,10))
    xt_tmp = np.random.randn(3,10)
    parameters_tmp['Wax'] = np.random.randn(5,3)
    parameters_tmp['ba'] = np.zeros((5,1))
    parameters_tmp['by'] = np.zeros((2,1))

    a_next_tmp, yt_pred_tmp, cache_tmp = target(xt_tmp, a_prev_tmp, parameters_tmp)

    assert np.allclose(np.tanh(np.dot(parameters_tmp['Wax'], xt_tmp)), a_next_tmp), "Problem 2 in a_next expression. Related to xt?"
    assert np.allclose(softmax(np.dot(parameters_tmp['Wya'], a_next_tmp)), yt_pred_tmp), "Problem 2 in yt_pred expression. Related to a_next?"

    # Only a_prev in expression
    a_prev_tmp = np.random.randn(5,10)
    xt_tmp = np.zeros((3,10))
    parameters_tmp['Waa'] = np.random.randn(5,5)
    parameters_tmp['ba'] = np.zeros((5,1))
    parameters_tmp['by'] = np.zeros((2,1))

    a_next_tmp, yt_pred_tmp, cache_tmp = target(xt_tmp, a_prev_tmp, parameters_tmp)

    assert np.allclose(np.tanh(np.dot(parameters_tmp['Waa'], a_prev_tmp)), a_next_tmp), "Problem 3 in a_next expression. Related to a_prev?"
    assert np.allclose(softmax(np.dot(parameters_tmp['Wya'], a_next_tmp)), yt_pred_tmp), "Problem 3 in yt_pred expression. Related to a_next?"

    print("\033[92mAll tests passed")
    

def rnn_forward_test(target):
    np.random.seed(17)
    T_x = 13
    m = 8
    n_x = 4
    n_a = 7
    n_y = 3
    x_tmp = np.random.randn(n_x, m, T_x)
    a0_tmp = np.random.randn(n_a, m)
    parameters_tmp = {}
    parameters_tmp['Waa'] = np.random.randn(n_a, n_a)
    parameters_tmp['Wax'] = np.random.randn(n_a, n_x)
    parameters_tmp['Wya'] = np.random.randn(n_y, n_a)
    parameters_tmp['ba'] = np.random.randn(n_a, 1)
    parameters_tmp['by'] = np.random.randn(n_y, 1)

    a, y_pred, caches = target(x_tmp, a0_tmp, parameters_tmp)
    
    assert a.shape == (n_a, m, T_x), f"Wrong shape for a. Expected: ({n_a, m, T_x}) != {a.shape}"
    assert y_pred.shape == (n_y, m, T_x), f"Wrong shape for y_pred. Expected: ({n_y, m, T_x}) != {y_pred.shape}"
    assert len(caches[0]) == T_x, f"len(cache) must be T_x = {T_x}"
    
    assert np.allclose(a[5, 2, 2:6], [0.99999291, 0.99332189, 0.9921928, 0.99503445]), "Wrong values for a"
    assert np.allclose(y_pred[2, 1, 1: 5], [0.19428, 0.14292, 0.24993, 0.00119], atol=1e-4), "Wrong values for y_pred"
    assert np.allclose(caches[1], x_tmp), f"Fail check: cache[1] != x_tmp"

    
    print("\033[92mAll tests passed")
    
def lstm_cell_forward_test(target):
    np.random.seed(212)
    m = 8
    n_x = 4
    n_a = 7
    n_y = 3
    x = np.random.randn(n_x, m)
    a0 = np.random.randn(n_a, m)
    c0 = np.random.randn(n_a, m)
    params = {}
    params['Wf'] = np.random.randn(n_a, n_a + n_x)
    params['bf'] = np.random.randn(n_a, 1)
    params['Wi'] = np.random.randn(n_a, n_a + n_x)
    params['bi'] = np.random.randn(n_a, 1)
    params['Wo'] = np.random.randn(n_a, n_a + n_x)
    params['bo'] = np.random.randn(n_a, 1)
    params['Wc'] = np.random.randn(n_a, n_a + n_x)
    params['bc'] = np.random.randn(n_a, 1)
    params['Wy'] = np.random.randn(n_y, n_a)
    params['by'] = np.random.randn(n_y, 1)
    a_next, c_next, y_pred, cache = target(x, a0, c0, params)
    
    assert len(cache) == 10, "Don't change the cache"
    
    assert cache[4].shape == (n_a, m), f"Wrong shape for cache[4](ft). {cache[4].shape} != {(n_a, m)}"
    assert cache[5].shape == (n_a, m), f"Wrong shape for cache[5](it). {cache[5].shape} != {(n_a, m)}"
    assert cache[6].shape == (n_a, m), f"Wrong shape for cache[6](cct). {cache[6].shape} != {(n_a, m)}"
    assert cache[1].shape == (n_a, m), f"Wrong shape for cache[1](c_next). {cache[1].shape} != {(n_a, m)}"
    assert cache[7].shape == (n_a, m), f"Wrong shape for cache[7](ot). {cache[7].shape} != {(n_a, m)}"
    assert cache[0].shape == (n_a, m), f"Wrong shape for cache[0](a_next). {cache[0].shape} != {(n_a, m)}"
    assert cache[8].shape == (n_x, m), f"Wrong shape for cache[8](xt). {cache[8].shape} != {(n_x, m)}"
    assert cache[2].shape == (n_a, m), f"Wrong shape for cache[2](a_prev). {cache[2].shape} != {(n_a, m)}"
    assert cache[3].shape == (n_a, m), f"Wrong shape for cache[3](c_prev). {cache[3].shape} != {(n_a, m)}"
    
    assert a_next.shape == (n_a, m), f"Wrong shape for a_next. {a_next.shape} != {(n_a, m)}"
    assert c_next.shape == (n_a, m), f"Wrong shape for c_next. {c_next.shape} != {(n_a, m)}"
    assert y_pred.shape == (n_y, m), f"Wrong shape for y_pred. {y_pred.shape} != {(n_y, m)}"

    
    assert np.allclose(cache[4][0, 0:2], [0.32969833, 0.0574555]), "wrong values for ft"
    assert np.allclose(cache[5][0, 0:2], [0.0036446, 0.9806943]), "wrong values for it"
    assert np.allclose(cache[6][0, 0:2], [0.99903873, 0.57509956]), "wrong values for cct"
    assert np.allclose(cache[1][0, 0:2], [0.1352798,  0.39884899]), "wrong values for c_next"
    assert np.allclose(cache[7][0, 0:2], [0.7477249,  0.71588751]), "wrong values for ot"
    assert np.allclose(cache[0][0, 0:2], [0.10053951, 0.27129536]), "wrong values for a_next"
    
    assert np.allclose(y_pred[1], [0.417098, 0.449528, 0.223159, 0.278376,
                                   0.68453,  0.419221, 0.564025, 0.538475]), "Wrong values for y_pred"
    
    print("\033[92mAll tests passed")
    
def lstm_forward_test(target):
    np.random.seed(45)
    n_x = 4
    m = 13
    T_x = 16
    n_a = 3
    n_y = 2
    x_tmp = np.random.randn(n_x, m, T_x)
    a0_tmp = np.random.randn(n_a, m)
    parameters_tmp = {}
    parameters_tmp['Wf'] = np.random.randn(n_a, n_a + n_x)
    parameters_tmp['bf'] = np.random.randn(n_a, 1)
    parameters_tmp['Wi'] = np.random.randn(n_a, n_a + n_x)
    parameters_tmp['bi']= np.random.randn(n_a, 1)
    parameters_tmp['Wo'] = np.random.randn(n_a, n_a + n_x)
    parameters_tmp['bo'] = np.random.randn(n_a, 1)
    parameters_tmp['Wc'] = np.random.randn(n_a, n_a + n_x)
    parameters_tmp['bc'] = np.random.randn(n_a, 1)
    parameters_tmp['Wy'] = np.random.randn(n_y, n_a)
    parameters_tmp['by'] = np.random.randn(n_y, 1)

    a, y, c, caches = target(x_tmp, a0_tmp, parameters_tmp)
    
    assert a.shape == (n_a, m, T_x), f"Wrong shape for a. {a.shape} != {(n_a, m, T_x)}"
    assert c.shape == (n_a, m, T_x), f"Wrong shape for c. {c.shape} != {(n_a, m, T_x)}"
    assert y.shape == (n_y, m, T_x), f"Wrong shape for y. {y.shape} != {(n_y, m, T_x)}"
    assert len(caches[0]) == T_x, f"Wrong shape for caches. {len(caches[0])} != {T_x} "
    assert len(caches[0][0]) == 10, f"length of caches[0][0] must be 10."
    
    assert np.allclose(a[2, 1, 4:6], [-0.01606022,  0.0243569]), "Wrong values for a"
    assert np.allclose(c[2, 1, 4:6], [-0.02753855,  0.05668358]), "Wrong values for c"
    assert np.allclose(y[1, 1, 4:6], [0.70444592 ,0.70648935]), "Wrong values for y"
    
    print("\033[92mAll tests passed")

# Manual implementation of RNN in numpy

In [7]:
import numpy as np 
import scipy

## RNN

### RNN unit
![image](https://i.imgur.com/7PAU2pP.png)

In [39]:
def rnn_unit(x, a_prev, parameters):
  '''
  Implements signle RNN unit forward pass

  Args:
  x -- input vector
  a_prev -- activation from previous RNN unit
  parameters -- python dictionary with keys 'Waa', 'Wax', 'ba', 'Wya', 'by'

  Returns:
  y_pred -- softmax activation outputs
  a -- current RNN unit hidden state
  cache -- tuple containing (a, a_prev, x, parameters)
  '''
  Waa, Wax, ba, Wya, by = parameters['Waa'], parameters['Wax'], parameters['ba'], \
                          parameters['Wya'], parameters['by']

  a = Waa @ a_prev + Wax @ x + ba
  a = np.tanh(a)

  y_pred = scipy.special.softmax(Wya @ a + by, axis=0)

  cache = (a, a_prev, x, parameters)

  return y_pred, a, cache

### RNN forward pass

In [35]:
def rnn(x, a_init, parameters):
  '''
  Implements RNN forward pass

  Args:
  x -- tensor of shape (n_x, m, T_x), where n_x - num of features, m - batch size,
      T_x - num of time steps
  a_init -- initial rnn hidden state
  parameters -- python dictionary with keys 'Waa', 'Wax', 'ba', 'Wya', 'by'

  Returns:
  Y_pred -- tensor of shape (n_y, m, T_x)
  A -- tensor of hidden states of shape (n_a, m, T_x)
  caches -- (list_of_caches, x)
  '''
  n_x, m, T_x = x.shape
  a = a_init
  n_y, n_a = parameters['Wya'].shape

  Y_pred = np.empty((n_y, m, 0))
  A = np.empty((n_a, m, 0))
  caches = []

  for time_step in range(T_x):
    y_pred, a, cache = rnn_unit(x[:, :, time_step], a, parameters)

    Y_pred = np.append(Y_pred, y_pred[..., np.newaxis], axis=-1)
    A = np.append(A, a[..., np.newaxis], axis=-1)
    caches.append(cache)

  return Y_pred, A, (caches, x)

### RNN backpropagation

![image](https://i.imgur.com/x2jxcSq.png)

$$
\begin{align}
\displaystyle a^{\langle t \rangle} &= \tanh(W_{ax} x^{\langle t \rangle} + W_{aa} a^{\langle t-1 \rangle} + b_{a})\tag{-} \\[8pt]
\displaystyle \frac{\partial \tanh(x)} {\partial x} &= 1 - \tanh^2(x) \tag{-} \\[8pt]
\displaystyle {dtanh} &= da_{next} * ( 1 - \tanh^2(W_{ax}x^{\langle t \rangle}+W_{aa} a^{\langle t-1 \rangle} + b_{a})) \tag{0} \\[8pt]
\displaystyle  {dW_{ax}} &= dtanh \cdot x^{\langle t \rangle T}\tag{1} \\[8pt]
\displaystyle dW_{aa} &= dtanh \cdot a^{\langle t-1 \rangle T}\tag{2} \\[8pt]
\displaystyle db_a& = \sum_{batch}dtanh\tag{3} \\[8pt]
\displaystyle dx^{\langle t \rangle} &= { W_{ax}}^T \cdot dtanh\tag{4} \\[8pt]
\displaystyle da_{prev} &= { W_{aa}}^T \cdot dtanh\tag{5}
\end{align}
$$

In [59]:
def rnn_unit_backward(da, cache):
  '''
  Implements the backward pass for a single rnn unit based on it's cache

  Args:
  da -- dJ/da (da_next from the picture above)
  cache -- cache from rnn forward pass

  Returns:
  grads -- python dictionary with keys: 'dba', 'dWax', 'dx' (for deep RNNs),
      'dWaa', 'dWa_prev', 'da_prev'
  '''
  a, a_prev, x, parameters = cache
  Waa, Wax, ba, Wya, by = parameters['Waa'], parameters['Wax'], parameters['ba'], \
                          parameters['Wya'], parameters['by']

  grads = {}

  dz = da * (1 - np.square(a))

  grads['dba'] = np.sum(dz, axis=-1, keepdims=True)
  grads['dWaa'] = dz @ a_prev.T # it is basically a sum over mini-batch sample gradients
  grads['da_prev'] = Waa.T @ dz # shape (n_a, m)

  grads['dWax'] = dz @ x.T
  grads['dx'] = Wax.T @ dz

  return grads

In [66]:
def rnn_backward(da, caches):
  '''
  Implements the backward pass for the entire RNN

  Args:
  da -- dJ/da of shape (n_a, m, T_x) computed elsewhere
  caches -- output from rnn()

  Returns:
  grads -- python dictionary with keys 
    'dx' : shape(n_x, m, T_x)
    'da_init' : shape(n_a, m)
    'dWax' : shape(n_a, n_x)
    'dWaa' : shape(n_a, n_a)
    'dba' : shape(n_a, 1)
  '''
  caches, x = caches
  a, a_init, x, parameters = caches[0]

  n_a, m, T_x = da.shape
  n_x, _ = x.shape

  dx = np.empty((n_x, m, 0))
  dWax = np.zeros((n_a, n_x))
  dWaa = np.zeros((n_a, n_a))
  dba = np.zeros((n_a, 1))

  # to save the last value
  da_prev = np.zeros((n_a, m))

  for time_step in range(T_x - 1, -1, -1):
    grads = rnn_unit_backward(da[:, :, time_step] + da_prev, caches[time_step])
    dba_t, dWaa_t, da_prev, dWax_t, dx_t = grads['dba'], grads['dWaa'], \
        grads['da_prev'], grads['dWax'], grads['dx']
    
    dx = np.append(dx_t[..., np.newaxis], dx, axis=-1)
    dWax += dWax_t
    dWaa += dWaa_t
    dba += dba_t
  
  grads = {"dx": dx, "da0": da_prev, "dWax": dWax, "dWaa": dWaa, "dba": dba}

  return grads