In [2]:
import numpy as np

Vanilla neural network does not handle sequential data. In light of this, we have RNN, which is a special type of neural network that is good in modelling sequential data. Examples of sequential data include text, audio, time series, etc. 

Below picture offers a great visualization of the mechanism of RNN under the hood. 

<img src="https://camo.githubusercontent.com/9ecb85b81652e0442a635261463ea3ddae39512bf3d928b0a1f5b8df86ee4ca0/68747470733a2f2f6769746875622e636f6d2f446565704c6561726e696e674454552f30323435362d646565702d6c6561726e696e672d776974682d5079546f7263682f626c6f622f6d61737465722f7374617469635f66696c65732f726e6e2d756e666f6c642e706e673f7261773d31" width="800">


RNN is effectively a loop, where RNN unit processes one unit of the input sequence at a time and then passes the result to the next one. As results are being propagated down the loop, latter RNN units will have memory about the previous units of the sequence. 

In [56]:
# there are different strategies to parameter initialization which would speed up the training, e.g. Xavier, however in this example we only use the good O' random initialization method
def init_params(hidden_size, vocab_size):
    U = np.random.randn(hidden_size, vocab_size) * 0.1 # weights matrix applied on input
    V = np.random.randn(hidden_size, hidden_size) * 0.1
    W = np.random.randn(vocab_size, hidden_size) * 0.1
    bh = np.zeros((hidden_size, 1)) # hidden state bias
    by = np.zeros((vocab_size, 1)) # output bias
    return U, V, W, bh, by


In [1]:
# activation functions
def sigmoid(x, derivative=False): # squish the value between [0,1]
    if not derivative:
        return 1/(1+np.exp(-x))
    else:
        return sigmoid(x)*(1-sigmoid(x))
    
    
def tanh(x, derivative=False): # squish the value between [-1,1]
    if not derivative:
        return (np.exp(x)-np.exp(-x))/(np.exp(x)+np.exp(-x))
    else:
        return 1 - tanh(x)**2
    
    
def softmax(x): # convert numbers into probabilities
    return np.exp(x) / np.sum(np.exp(x))

        
    

In [37]:
# forward propagation
# the input data, ie. the sequential data, is in the form of an array of vectors
# each vector is passed to the RNN one by one
# in each loop, there are two outputs, the output & the hidden state

def forward_propagation(x, params, hidden_size):
    U, V, W, bh, by = params
    outputs, hidden_states = [], []
    h_prev = np.zeros_like((hidden_size,1))
    for i in x:
        # update current hidden state with new inputs
        h_curr = tanh(np.dot(U, i) + np.dot(V, h_prev) + bh)

        # make prediction based on the current hidden states, which combines current input and previous hidden states
        y_hat = softmax(np.dot(W, h_curr) + by)

        # save the results
        hidden_states.append(h_curr)
        outputs.append(y_hat)

        # update h_prev for next iteration
        h_prev = h_curr

    return outputs, hidden_states


def cross_entropy_loss(y_hat, y):
    epsilon = 1e-12 # avoid log(0)
    return -np.mean(np.log(y_hat) + epsilon * y)


def backward_propagation(inputs, targets, outputs, hidden_states, params, learning_rate=0.01):
    # unpack parameters
    U, V, W, bh, by = params

    # initialize gradients as zeros
    dU, dV, dW, dbh, dby = np.zeros_like(U), np.zeros_like(
        V), np.zeros_like(W), np.zeros_like(bh), np.zeros_like(by)

    # initialize loss - the total loss will be the sum of loss of each timestep of the input sequence
    loss = 0
    
    # initialize hidden state derivatives
    # need to keep track of hidden state derivatives of each timestep since the dh of a given timestep is the sum of current dh & the dh of the next timestep
    dh_next = np.zeros_like(hidden_states[0])
    
    # compute the gradients from back to start
    for t in reversed(range(len(outputs))):
        # t -> time-step
        
        # compute the loss and add to total loss
        loss += cross_entropy_loss(y_hat=outputs[t], y=targets[t])  
        
        # derivative of loss wrt y_hat
        dy = outputs[t].copy()
        # derivation proof: https://cs231n.github.io/neural-networks-case-study/#grad
        # dy[np.argmax(targets[t])] -= 1 -- from Andrej Karpathy's example (https://gist.github.com/karpathy/d4dee566867f8291f086)
        dy = dy - targets[t] # tho less efficient, this line has the same output as the above, but I think this is more intuitive and readable
        # dy shape -> (vocab_size, 1)
        
        # derivative of loss wrt W
        # dW shape -> (vocab_size, hidden_size) -> (vocab_size, 1) * (hidden_size,1).T 
        dW = np.dot(dy, hidden_states[t].T)
        dby += dy
        
        # derivative of loss wrt hidden state
        # dh shape -> (hidden_size, 1)
        dh = np.dot(W.T, dy) + dh_next
        
        # derivative of h_raw wrt U -> dh * dh/d_tanh
        # h_raw -> raw hidden state vector before passing to the tanh activation function
        # dh shape -> (hidden_size, 1)
        # d_tanh shape -> (hidden_size, 1)
        # element-wise multiplication instead of np.dot to maintain the shape of (hidden_size, 1)
        dh_raw = dh * tanh(hidden_states[t], derivative=True)
        
        # derivative of loss wrt U -> dh_raw * dh_raw/dU
        # input shape -> (vocab_size, 1)
        # dU = dh_raw * input.T -> (hidden_size ,1) * (vocab_size, 1).T -> (hidden_size, vocab_size)
        dU += np.dot(dh_raw, inputs[t].T)
        
        # derivative of loss wrt V -> dh * dh_raw * dh_raw/dU
        # dV = dh_raw * hidden_state.T -> (hidden_size ,1) * (hidden_size, 1).T -> (hidden_size, hidden_size)
        dV += np.dot(dh_raw, hidden_states[t].T)
        
        # derivative of loss wrt dh_next
        dh_next = np.dot(V.T, dh_raw)
        
        # derivative of loss wrt hb
        dhb += dh_raw
        
    # put gradients into a tuple
    gradients = (dU, dV, dW, dbh, dby)
    
    # update parameters
    updated_params = update_params(gradients, params, learning_rate)
    
    # return gradients and loss (for logging)
    return updated_params, loss


# function to update parameters according to the gradients
def update_params(gradients, params, learning_rate):
    for grad, p in zip(gradients, params):
        p -= grad * learning_rate
    return params


In [None]:
import tqdm # to print progress bar

def train(x, y, epochs, hidden_size, vocab_size):
    # init random params
    params = init_params(hidden_size, vocab_size)
    
    # save loss for plotting
    loss_history = []
    
    for e in tqdm(range(epochs)):
        # forward propagation
        outputs, hidden_states = forward_propagation(x, params, hidden_size)
        
        # back propagation and update parameters
        params, loss = backward_propagation(x, outputs, hidden_states, params, y, learning_rate=0.01)
        
        # save loss
        loss_history.append(loss)
        
        # print training results
        if e % 50 ==0:
            print(f"Epoch {e}/{epochs} loss: {loss}")
            
    # return trained parameters for using the model        
    return params
        

In [53]:
def predict

array([[2, 4, 6],
       [4, 6, 8]])