[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Spinkk/TeachingTensorflow/blob/main/RNNs/RNNs%20as%20temporal%20weight%20sharing.ipynb)

# Basic RNNs as fully connected network architectures with temporally shared weights

In this notebook, I show you how a basic RNN that uses a simple recurrent cell (no LSTM or GRU) transforms input by applying the same matrix multiplications at each time step, using the latest output as one of the inputs. After having investigated the computations involved in recurrent neural networks, you will also see how you can write your own RNN wrapper and simple rnn cell layers in tensorflow, which you can use as an orientation for the homework.


Remember that tf.keras.layers.Dense layers perform a matrix multiplication on their input, so we use these layers to define our RNN cell.

A basic RNN takes its own current hidden state (which is a vector of dimension n_outputs for each example in the batch) and linearly transforms it with a weight matrix, creating a vector of n_outputs dimensions. In addition (quite literally) to that, the RNN cell takes the input at the current time-step t (which is the feature vector in the example for time-step t). Adding the results from the two matrix multiplications together with a bias (and applying a tanh activation to the sum), we obtain the hidden state of the RNN-cell that will then be used at the next time-step t+1, together with the input data feature vector at t+1.

More formally written, the computation of a simple RNN cell at one time-step can be described as follows:

### $$h^{<t+1>} = tanh ( W_{hh} h^{<t>} + W_{xh} x^{<t>} + b )$$ 

Where $x^{<t>}$ is the feature input at time-step t, and $W_{xh}$ is a matrix of shape x (the dimensionality of the input feature vector at a single time-step) by h (the dimensionality of the hidden state).


Below we implement this type of an RNN on some input (not inside a tf.keras.Model or tf.keras.Layer but as separate layers since we don't do any model fitting here). First we show how to use tensorflow/keras inbuilt RNN cell and RNN wrapper layers to achieve this. By the end of the notebook you'll know how to implement these layers yourself from scratch.

Note: You will see the context defined by "with tf.device('/device:cpu:0'):" quite a lot in this notebook. This is to force tensorflow to do computations on the device specified. Since we do not need a GPU for this notebook, it's better to not have tensorflow reserve the GPU memory.

In [1]:
import tensorflow as tf

In [14]:
with tf.device('/device:cpu:0'):
    
    # 1 batch with 6 time steps of 16 features. The time-dimension is the second after the batch dimension.
    input_shape = (2, 6, 16)

    # The length of the resulting vector (similar to the units argument in Dense layers)
    n_outputs = 3
    
    input_sequence = tf.random.uniform(shape = input_shape)
    
    simple_RNN_cell = tf.keras.layers.SimpleRNNCell(n_outputs)
    
    # return_sequences=False means we only output the final hidden_state.
    RNN = tf.keras.layers.RNN(simple_RNN_cell, return_sequences=False)
    
    output = RNN(input_sequence)
    
    print(f"The output of the RNN, which is the last hidden state is \n{output}\n\n")
    

The output of the RNN, which is the last hidden state is 
[[ 0.17481816  0.9551942   0.48502997]
 [ 0.3236171   0.6451717  -0.5921114 ]]




# tf.keras.layers.RNN and tf.keras.layers.SimpleRNNCell from scratch

Now we want to look at how we can implement what happens inside the two pre-defined layers that we have used above.

To do so, we create two dense layers carrying the matrices $W_{hh}$ and $W_{xh}$ that are involved in the simple RNN cell. For the bias $b$ we create a separate tf.variable. Notice that we disable the use of a bias in the dense layers and do not use an activation either, because we want a simple matrix multiplication.

With the weight matrices and the bias defined, we can start to iterate over the same input sequence data that we have used before. The initial hidden state of the RNN cell is set to be a vector of zeros before looping over the input sequence.

At each time-step we use the previous hidden-state of the cell and update it, following the equation shown in the beginning of the notebook. The output of this custom RNN is then just the final hidden-state vector.

Notice how we use the same two weight matrices and the same bias for different temporal parts of the input data. This is why RNNs can be thought of as another variant of weight sharing.

In [15]:
with tf.device('/device:cpu:0'):
    ### We've seen the output of the randomly initialized simple RNN.
    ### Now we want to show the computations that were involved by doing the same with dense layers 
    
    ### Remember that dense layers without activation are just matrix multiplication of a weight matrix with input
    
    # We create the Dense layers (that will store the matrices)
    dense_layer_hstate = tf.keras.layers.Dense(n_outputs, activation=None, use_bias=False)
    dense_layer_input = tf.keras.layers.Dense(n_outputs, activation=None, use_bias=False)
    bias = tf.Variable(tf.zeros(n_outputs, dtype=tf.float32), name="biases")
    
    # create the initial hidden state
    state = tf.zeros((1,n_outputs), tf.float32)
    
    # iterate over the time-steps
    for t in tf.range(input_shape[1]):
        # on this iteration we use the input at timestep t
        input_t =input_sequence[:,t,:]
        
        # we compute the sum of the input at t matrix multiplied, with the previous state matrix multiplied
        # and an additional bias added.
        x_sum = dense_layer_input(input_t) + dense_layer_hstate(state) + bias
        
        # finally we use tanh as an activation function to update the RNN cell state
        state = tf.nn.tanh(x_sum)

    print(f"The output of the custom RNN that we built with dense layers is: \n{state}\n\n")

The output of the custom RNN that we built with dense layers is: 
[[ 0.40653017  0.42645782  0.64013284]
 [ 0.3062448  -0.06140849  0.27174112]]




# Copying the weights of the pre-defined RNN to our custom RNN to verify the implementation

To make sure that we do the same computations as in the pre-defined tf.keras.layers.SimpleRNNCell and the wrapper layer (which implements the for loop) tf.keras.layers.RNN, we take the weights that we have used before and assign them to the two dense layers and the bias. We observe that we now get the exact same output - our implementation is correct.

This type of testing can be helpful to verify your custom implementations of the computations involved in the layers.

In [16]:
with tf.device('/device:cpu:0'):
    # Now we want to copy the weights of this RNN to a fully connected model to obtain the same output
    
    RNN_cell_input_weights = RNN.trainable_variables[0]

    RNN_cell_hstate_weights = RNN.trainable_variables[1]
    
    RNN_cell_biases = RNN.trainable_variables[2]
    
    dense_layer_hstate.weights[0].assign(tf.reshape(RNN_cell_hstate_weights, dense_layer_hstate.weights[0].shape))
    dense_layer_input.weights[0].assign(tf.reshape(RNN_cell_input_weights, dense_layer_input.weights[0].shape))
    bias.assign(RNN_cell_biases)
    
    # again we run our custom RNN, this time with the same weights as the tf.keras.layers.RNN version
    
    # same code as above
    state = tf.zeros((1,n_outputs), tf.float32)

    for t in tf.range(input_shape[1]):
        input_t =input_sequence[:,t,:]
        
        x_sum = dense_layer_input(input_t) + dense_layer_hstate(state) + bias

        state = tf.nn.tanh(x_sum)
    
    print(f"With the same weights as the pre-defined RNN, our custom RNN's output is\n {state}\n")
    
    print(f"The outputs of the pre-defined RNN and our custom RNN are the same: {tf.reduce_all(state==output)}")

With the same weights as the pre-defined RNN, our custom RNN's output is
 [[ 0.17481816  0.9551942   0.48502997]
 [ 0.3236171   0.6451717  -0.5921114 ]]

The outputs of the pre-defined RNN and our custom RNN are the same: True


# Return sequences of hidden-states with an RNN

Before we have set the return_sequences argument to False, and we ended up with a single vector as the output (the final hidden state) of the RNN. To obtain a sequence from the RNN, we want to return the hidden states from all the time-steps. This is useful for cases in which we want to translate from one sequence to another, like in language translation or audio-processing. To implement this with the pre-defined layers, we use the same simple rnn cell layer as before but re-instantiate the RNN wrapper layer with the new argument set.

In [18]:
with tf.device('/device:cpu:0'):
    # simple_RNN_cell and input_sequence remain unchanged
    
    # return_sequences=True means we output the hidden_states of all time-steps.
    RNN = tf.keras.layers.RNN(simple_RNN_cell, return_sequences=True)
    
    RNN_outputs = RNN(input_sequence)
    
    print(f"The output of the RNN, which is the last hidden state is \n {RNN_outputs}\n\n")

The output of the RNN, which is the last hidden state is 
 [[[-3.0370799e-01  8.2251513e-01  6.1285055e-01]
  [-5.5229354e-01  8.4337991e-01  4.5760280e-01]
  [-1.0805597e-01  4.6615121e-01  5.1362753e-02]
  [-1.6602981e-01 -3.8335644e-02  4.9897563e-02]
  [ 4.4487515e-01  3.4160706e-01  7.7810884e-01]
  [ 1.7481816e-01  9.5519418e-01  4.8502997e-01]]

 [[ 7.8896761e-02  4.1681889e-01  8.1771278e-01]
  [-3.1999335e-01  4.8503804e-01  2.1094157e-01]
  [ 8.3357078e-04  1.5827732e-01 -1.8002026e-01]
  [ 4.2266491e-01  3.5075590e-01  2.3640439e-01]
  [ 1.3910948e-01  5.6831509e-01  1.7772867e-01]
  [ 3.2361710e-01  6.4517170e-01 -5.9211141e-01]]]




# Return sequences from scratch

To implement this in a way that will allow you to also use this in tensorflow models that use the @tf.function decorator, we can't just create a list of the hidden states and append to it - tensorflow does not allow list appends in graph mode. Instead we need to make use of a tf.TensorArray object (this also allows for the case in which we do not know how many time-steps we will process and thus can't know beforehand how many hidden-states we want to output).

A tf.TensorArray object is not just a tensor and thus we need to call the .stack() method on it to obtain the hidden_states that it stores. Since the result has the batch-dimension and the time-dimension switched, we make use of a permuted transpose.

In [19]:
with tf.device('/device:cpu:0'):

    state = tf.zeros((input_sequence.shape[0],n_outputs), tf.float32)

    # initialize the hidden_states TensorArray that we want to output (shape is: batch, time-steps, h_dim)
    hidden_states = tf.TensorArray(dtype=tf.float32, size = input_sequence.shape[1])

    for t in tf.range(input_shape[1]):
        input_t =input_sequence[:,t,:]

        x_sum = dense_layer_input(input_t) + dense_layer_hstate(state) + bias

        state = tf.nn.tanh(x_sum)

        # write the states to the TensorArray
        hidden_states = hidden_states.write(t, state)

    # transpose the sequence of hidden_states from TensorArray accordingly (batch and time dimensions switched)
    custom_RNN_outputs = tf.transpose(hidden_states.stack(), [1,0,2])

    print(f"The output of the custom RNN that we built with dense layers is: \n{custom_RNN_outputs}\n\n")

The output of the custom RNN that we built with dense layers is: 
[[[-3.0370799e-01  8.2251513e-01  6.1285055e-01]
  [-5.5229354e-01  8.4337991e-01  4.5760280e-01]
  [-1.0805597e-01  4.6615121e-01  5.1362753e-02]
  [-1.6602981e-01 -3.8335644e-02  4.9897563e-02]
  [ 4.4487515e-01  3.4160706e-01  7.7810884e-01]
  [ 1.7481816e-01  9.5519418e-01  4.8502997e-01]]

 [[ 7.8896761e-02  4.1681889e-01  8.1771278e-01]
  [-3.1999335e-01  4.8503804e-01  2.1094157e-01]
  [ 8.3357078e-04  1.5827732e-01 -1.8002026e-01]
  [ 4.2266491e-01  3.5075590e-01  2.3640439e-01]
  [ 1.3910948e-01  5.6831509e-01  1.7772867e-01]
  [ 3.2361710e-01  6.4517170e-01 -5.9211141e-01]]]




Now let us verify that the @tf.function decorator works with this.

In [20]:
with tf.device('/device:cpu:0'):
    @tf.function
    def tf_func():

        state = tf.zeros((input_sequence.shape[0],n_outputs), tf.float32)

        # initialize the hidden_states TensorArray that we want to output (shape is: batch, time-steps, h_dim)
        hidden_states = tf.TensorArray(dtype=tf.float32, size=input_sequence.shape[1])

        for t in tf.range(input_shape[1]):
            input_t =input_sequence[:,t,:]

            x_sum = dense_layer_input(input_t) + dense_layer_hstate(state) + bias

            state = tf.nn.tanh(x_sum)

            # write the states to the TensorArray
            hidden_states = hidden_states.write(t, state)

        # transpose the sequence of hidden_states from TensorArray accordingly (batch and time dimensions switched)
        custom_RNN_outputs = tf.transpose(hidden_states.stack(), [1,0,2])
        
        return custom_RNN_outputs
    custom_RNN_outputs = tf_func()
    print(f"The output of the custom RNN that we built with dense layers is: \n{custom_RNN_outputs}\n\n")

The output of the custom RNN that we built with dense layers is: 
[[[-3.0370799e-01  8.2251513e-01  6.1285055e-01]
  [-5.5229354e-01  8.4337991e-01  4.5760280e-01]
  [-1.0805597e-01  4.6615121e-01  5.1362753e-02]
  [-1.6602981e-01 -3.8335644e-02  4.9897563e-02]
  [ 4.4487515e-01  3.4160706e-01  7.7810884e-01]
  [ 1.7481816e-01  9.5519418e-01  4.8502997e-01]]

 [[ 7.8896761e-02  4.1681889e-01  8.1771278e-01]
  [-3.1999335e-01  4.8503804e-01  2.1094157e-01]
  [ 8.3357078e-04  1.5827732e-01 -1.8002026e-01]
  [ 4.2266491e-01  3.5075590e-01  2.3640439e-01]
  [ 1.3910948e-01  5.6831509e-01  1.7772867e-01]
  [ 3.2361710e-01  6.4517170e-01 -5.9211141e-01]]]




Finally, we again verify that the outputs with return_sequences=True match between our custom implementation of the computations involved and the pre-defined RNN-wrapper and cell. Since we did not re-initialize the weights, the outputs should still be the same.

In [21]:
print(tf.reduce_all(custom_RNN_outputs == RNN_outputs).numpy())

True


# Implementing custom RNNs with subclassing

Let's start with writing the RNNWrapper function which loops over the observations at the different time-steps.

Like in tf.keras.layers.RNN, we implement a return_sequences argument that controls whether to return a sequence of hidden states or just the last hidden state.

(!) Important: This RNNWrapper is written only with the Simple RNN Cell in mind. It does not (yet - it is part of your task for the homework to adjust this) allow for LSTM Cells because here we only take care of the hidden_states but not of the cell state (since it does not exist for the basic rnn).

In [9]:
class RNNWrapper(tf.keras.layers.Layer):
    def __init__(self, RNN_Cell, return_sequences=False):
        super(RNNWrapper, self).__init__()
        
        self.return_sequences = return_sequences
        
        self.cell = RNN_Cell
    
    def call(self, data, training=False):
        
        length = data.shape[1]
        
        # initialize state of the simple rnn cell
        state = tf.zeros((data.shape[0], self.cell.units), tf.float32)
        
        # initialize array for hidden states (only relevant if self.return_sequences == True)
        hidden_states = tf.TensorArray(dtype=tf.float32, size=length)

        for t in tf.range(length):
            input_t = data[:,t,:]

            state = self.cell(input_t, state, training)

            if self.return_sequences:
                # write the states to the TensorArray
                hidden_states = hidden_states.write(t, state)
        
        if self.return_sequences:
            # transpose the sequence of hidden_states from TensorArray accordingly 
            #(batch and time dimensions are otherwise switched after .stack())
            outputs = tf.transpose(hidden_states.stack(), [1,0,2])
        
        else:
            # take the last hidden state of the simple rnn cell
            outputs = state
        
        return outputs

Next we implement the SimpleRNNCell which is compatible with our custom RNN wrapper layer, because it only has a single hidden state (no cell state like the LSTM).

Here we 

In [25]:
class CustomSimpleRNNCell(tf.keras.layers.Layer):
    def __init__(self, units, kernel_regularizer=None ):
        super(CustomSimpleRNNCell, self).__init__()
        
        self.units = units
        
        self.dense_hstate = tf.keras.layers.Dense(units, kernel_regularizer=kernel_regularizer, use_bias=False)
        
        self.dense_input = tf.keras.layers.Dense(units, kernel_regularizer=kernel_regularizer, use_bias=False)
        
        self.bias = tf.Variable(tf.zeros(units), name="RNN_Cell_biases")
        
        self.state_size = units
        
        
    def call(self, input_t, state,training=False):
        
        # we compute the sum of the input at t matrix multiplied and the previous state matrix multiplied
        # and an additional bias added.
        x_sum = self.dense_input(input_t) + self.dense_hstate(state) + self.bias
        
        # finally we use hyperbolic tangent as an activation function to update the RNN cell state
        state = tf.nn.tanh(x_sum)
        
        return state

Next, we define an RNN_model class, which uses the simple RNN to obtain a single feature vector from the time-series and makes a prediction based on this feature vector, using two fully connected layers.

In [26]:
class RNN_Model(tf.keras.Model):
    def __init__(self, units):
        super(RNN_Model, self).__init__()
        
        self.cell = CustomSimpleRNNCell(units)
        self.RNNWrapper = RNNWrapper(self.cell, return_sequences=False)
        
        self.dense = tf.keras.layers.Dense(128, activation="relu")
        
        self.out = tf.keras.layers.Dense(10, activation="softmax")
        
    def call(self, data, training=False):
        
        x = self.RNNWrapper(data, training)
        x = self.dense(x)
        x = self.out(x)
        
        return x

Lastly, we instantiate and use an RNN model on some random input as before.

In [27]:
with tf.device('/device:cpu:0'):
    
    input_sequence = tf.random.uniform(shape = (1,24,16), dtype=tf.float32)

    hidden_state_size = 256

    rnn = RNN_Model(hidden_state_size)

    prediction = rnn(input_sequence)

    print(f"The randomly initialized RNN model outputs the class probabilities \n{prediction}\n for the random input.")

The randomly initialized RNN model outputs the class probabilities 
[[0.06576791 0.07322028 0.07317584 0.05219256 0.15585336 0.10260176
  0.06084397 0.1635286  0.06335714 0.18945853]]
 for the random input.


In [28]:
rnn.summary()

Model: "rnn__model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 custom_simple_rnn_cell (Cus  multiple                 69888     
 tomSimpleRNNCell)                                               
                                                                 
 rnn_wrapper_1 (RNNWrapper)  multiple                  69888     
                                                                 
 dense_10 (Dense)            multiple                  32896     
                                                                 
 dense_11 (Dense)            multiple                  1290      
                                                                 
Total params: 104,074
Trainable params: 104,074
Non-trainable params: 0
_________________________________________________________________
