# Assignment 1: Numpy RNN
Implement a RNN and run BPTT

In [1]:
from typing import Dict, Tuple
import numpy as np

In [2]:
class RNN(object):
    """Numpy implementation of sequence-to-one recurrent neural network for regression tasks."""
    
    def __init__(self, input_size: int, hidden_size: int, output_size: int):
        """Initialization 

        Parameters
        ----------
        input_size : int
            Number of input features per time step
        hidden_size : int
            Number of hidden units in the RNN
        output_size : int
            Number of output units.
        """
        super(RNN, self).__init__()
        self.input_size = input_size   # D in literature
        self.hidden_size = hidden_size # I in literature
        self.output_size = output_size # K in literature

        # create and initialize weights of the network
        # as 90% of the usages in the scriptum are W.T, R.T, V.T
        init = lambda shape: np.random.uniform(-0.2, 0.2, shape)
        self.W = init((hidden_size, input_size)) # I X D
        self.R = init((hidden_size, hidden_size)) # I x I
        self.bs = np.zeros((hidden_size))
        self.V = init((output_size, hidden_size)) # K x I
        self.by = np.zeros((output_size))

        # place holder to store intermediates for backprop
        self.a = None
        self.y_hat = None
        self.grads = None
        self.x = None

        
    def forward(self, x: np.ndarray) -> np.ndarray:
        """Forward pass through the RNN.

        Parameters
        ----------
        x : np.ndarray
            Input sequence(s) of shape [sequence length, number of features]

        Returns
        -------
        NumPy array containing the network prediction for the input sample.
        """
        self.x = x
        # as we have no activation function (f(t) is linear)
        # a(t) = f(s(t)) = s(t) = W^T . x(t) + R^T . a(t-1) + bs
        # = tanh( W^T . x(t) + R^T . a(t-1) + bs )
        self.a = np.zeros((self.input_size, self.hidden_size)) # to make accessing t = -1 possible
        for t in range(len(x)):
            self.a[t] = np.tanh(self.W @ x[t] + self.R @ self.a[t-1] + self.bs)
        self.y_hat = self.V @ self.a[t] + self.by
        return self.y_hat  # sequence-to-1 model, so we only return the last 

    
    def forward_fast(self, x: np.ndarray) -> np.ndarray:
        """ optimized method without saving to self.a """
        a = np.tanh(self.W @ x[0] + self.bs)
        for t in range(1, len(x)):
            a = np.tanh(self.W @ x[t] + self.R @ a + self.bs)
        return self.V @ a + self.by

    
    def backward(self, d_loss: np.ndarray) -> Dict:
        """Calculate the backward pass through the RNN.
        
        Parameters
        ----------
        d_loss : np.ndarray
            The gradient of the loss w.r.t the network output in the shape [output_size,]

        Returns
        -------
        Dictionary containing the gradients for each network weight as key-value pair.
        """
        # create view, so that we don't have to reshape every time we call it
        a = self.a.reshape(self.a.shape[0], 1, self.a.shape[1])
        x = self.x.reshape(self.x.shape[0], 1, self.x.shape[1])
        
        # needs to be calculated only once
        d_V = d_loss @ a[-1]
        d_by = d_loss
        
        # init with 0 and sum it up
        d_W = np.zeros_like(self.W)
        d_R = np.zeros_like(self.R)
        d_bs = np.zeros_like(self.bs)
        
        # instead of using * diag, we use elementwise multiplication
        delta = d_loss.T @ self.V * (1 - a[-1] ** 2)
        
        for t in reversed(range(self.input_size)):
            d_bs += delta.reshape(self.bs.shape)
            d_W += delta.T @ x[t]
            if t > 0:
                d_R += delta.T @ a[t-1]
                # a[t] = tanh(..) -> derivation = 1-tanh² -> reuse already calculated tanh
                # calculate delta for the next step at t-1
                delta = delta @ self.R * (1 - a[t-1] ** 2)
        
        self.grads = {'W': d_W, 'R': d_R, 'V': d_V, 'bs': d_bs, 'by': d_by}
        return self.grads
    
        
    def update(self, lr: float):
        # update weights, aggregation is already done in backward
        w = self.get_weights()
        for name in w.keys():
            w[name] -= lr * self.grads[name]
        
        # reset internal class attributes
        self.grads = {}
        self.y_hat, self.a = None, None
        
    def get_weights(self) -> Dict:
        return {'W': self.W, 'R': self.R, 'V': self.V, 'bs': self.bs, 'by': self.by}
    
    def set_weights(self, weights: Dict):
        if not all(name in weights.keys() for name in ['W', 'R', 'V']):
            raise ValueError("Missing one of 'W', 'R', 'V' keys in the weight dictionary")
        for name, w in weights.items():
            self.__dir__["name"] = w

<h2 style="color:rgb(0,120,170)">Numerical gradient check</h2>

To validate your implementation, especially the backward pass, use the two-sided gradient approximation given by the equation below.

In [3]:
def get_numerical_gradient(model: RNN, x: np.ndarray, eps: float=1e-7) -> Dict:
    """Implementation of the two-sided numerical gradient approximation
    
    Parameters
    ----------
    model : RNN
        The RNN model object
    x : np.ndarray
        Input sequence(s) of shape [sequence length, number of features]
    eps : float
        The epsilon used for numerical gradient approximation
        
    Returns
    -------
    A dictionary containing the numerical gradients for each weight of the RNN. Make sure
    to name the dictionary keys like the names of the RNN gradients dictionary (e.g. 
    'd_R' for the weight 'R')
    """
    g = {}
    # iterate all weight-matrices w and all positions i, and calculate the num. grad.
    for name, w in model.get_weights().items():
        # initialize weight gradients with zero
        wg = np.zeros_like(w)
        # this makes a backup copy of original weights
        for i, orig in np.ndenumerate(w): # can be 1d or 2d
            # caculate for +eps
            w[i] += eps
            plus = model.forward_fast(x)

            # calculate for -eps
            w[i] = orig - eps
            minus = model.forward_fast(x)

            w[i] = orig # reset
            # set weight gradient for this weight and this index
            wg[i] = np.sum(plus - minus) / (2*eps)
        # add calculated weights into return-weights
        g[name] = wg
    return g


def get_analytical_gradient(model: RNN, x: np.ndarray) -> Dict:
    """Helper function to get the analytical gradient.
        
    Parameters
    ----------
    model : RNN
        The RNN model object
    x : np.ndarray
        Input sequence(s) of shape [sequence length, number of features]
        
    Returns
    -------
    A dictionary containing the analytical gradients for each weight of the RNN.
    """
    loss = model.forward(x)
    return model.backward(np.ones((model.output_size, 1)))

            
def gradient_check(model: RNN, x: np.ndarray, treshold: float = 1e-7):
    """Perform gradient checking.
    
    You don't have to do anything in this function.
    
    Parameters
    ----------
    model : RNN
        The RNN model object
    x : np.ndarray
        Input sequence(s) of shape [sequence length, number of features]
    eps : float
        The epsilon used for numerical gradient approximation    
    """
    numerical_grads = get_numerical_gradient(model, x)
    analytical_grads = get_analytical_gradient(model, x)
    
    for key, num_grad in numerical_grads.items():
        difference = np.linalg.norm(num_grad - analytical_grads[key])
        # assert num_grad.shape == analytical_grads[key].shape
        
        if difference < treshold:
            print(f"Gradient check for {key} passed (difference {difference:.3e})")
        else:
            print(f"Gradient check for {key} failed (difference {difference:.3e})")

<h2 style="color:rgb(0,120,170)">Compare the time for gradient computation</h2>
Finally, use the code below to investigate the benefit of being able to calculate the exact analytical gradient.

In [4]:
print("Gradient check with a single output neuron:")
model = RNN(input_size=5, hidden_size=10, output_size=1)
x = np.random.rand(5, 5)
gradient_check(model, x)

print("\nGradient check with multiple output neurons:")
model = RNN(input_size=5, hidden_size=10, output_size=5)
x = np.random.rand(5, 5)
gradient_check(model, x)

Gradient check with a single output neuron:
Gradient check for W passed (difference 4.371e-10)
Gradient check for R passed (difference 4.748e-10)
Gradient check for V passed (difference 2.930e-11)
Gradient check for bs passed (difference 1.721e-10)
Gradient check for by passed (difference 5.939e-12)

Gradient check with multiple output neurons:
Gradient check for W passed (difference 5.764e-10)
Gradient check for R passed (difference 7.208e-10)
Gradient check for V passed (difference 1.329e-10)
Gradient check for bs passed (difference 2.681e-10)
Gradient check for by passed (difference 1.123e-10)


In [5]:
analytical_time = %timeit -o get_analytical_gradient(model, x)

126 µs ± 3.91 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [6]:
numerical_time = %timeit -o get_numerical_gradient(model, x)

12.1 ms ± 328 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [7]:
if analytical_time.average < numerical_time.average:
    fraction = numerical_time.average / analytical_time.average
    print(f"The analytical gradient computation was {fraction:.0f} times faster")
else:
    fraction = analytical_time.average / numerical_time.average
    print(f"The numerical gradient computation was {fraction:.0f} times faster")

The analytical gradient computation was 96 times faster
