##### Author contributions
Please fill out for each of the following parts who contributed to what:
- Conceived ideas: 
- Performed math exercises: 
- Performed programming exercises:
- Contributed to the overall final assignment: 

# Chapter 7
## Recurrent neural networks


    Hand-in bug-free (try "Kernel" > "Restart & Run All") and including all (textual as well as figural) output via Brightspace before the deadline (see Brightspace).

Learning goals:
1. Get familiar with recurrent hidden units
1. Implement a simple RNN (Elman network) in PyTorch
1. Implement an LSTM-based neural network in PyTorch

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

### Exercise 1  (1 point)

Consider a recurrent neural network with one input unit $x$, one sigmoid recurrent hidden unit $h$, and one linear output unit $y$. The values of $x$ are given for 3 time points in `x_t`. As this is a very small RNN, $W^i$, $W^h$ and $W^o$ are given as the scalar values `w_i`, `w_h` and `w_o` respectively. The hidden unit has an added bias `h_bias`. The hidden unit state is initialized with `0.0`. The only 'value-manipulating' activation function in this network is the sigmoid activation $\sigma(\cdot)$ on the hidden unit. 

1. Write down the forward pass of this network for a specific time point $t$. 
1. What is the value of the hidden state $h$ after processing the last input `x_t[2]`? 
1. What is the output `y` of the network after processing the last input `x_t[2]`? 

$$
\large{
\begin{eqnarray*}
h_t &=& \sigma( W^i x_t + W^h h_{t-1}) + bias \\
y_t &=& W^o h(t) + bias\\
\end{eqnarray*}
}
$$

For 1.2 and 1.3, you can either compute the solution by hand (show clearly how you arrived there, 3 decimal points) or write code to find the answer. 

In [3]:
# inputs over times 0, 1, 2:
x_t = [9.0, 4.0, -2.0]

# weights and bias terms: 
w_i = 0.5
w_h = -1.0
w_o = -0.7
h_bias = -1.0
y_bias = 0.0

### Solution 1

In [4]:
import math
def sig(x):
    return 1/(1 + math.exp(-x))

In [5]:
h_0 = sig(w_i * x_t[0]) + h_bias
h_1 = sig((w_i * x_t[1]) + (w_h * h_0)) + h_bias
h_2 = sig((w_i * x_t[2]) + (w_h * h_1)) + h_bias

y_2 = w_o * h_2 + y_bias
print(h_2)
print(y_2)

-0.7072252802268861
0.4950576961588203


### Code introduction

We will apply two recurrent neural networks to learn a dynamic variant of the *adding problem*. First, run the next cell and inspect the output. 

There is a stream of inputs to the network, two at each time step. The first input unit will receive a series of decimal numbers in the interval $[-1,1]$. The second input unit will receive the numbers $0$, $-1$, or $1$. The target is the sum of the preceding two decimal numbers that came together with the number $1$ (called the marker, `x` in the generated output), and it should be produced whenever a marker has been seen. In the beginning until two of these markers have been seen, the output will stay 0. 


Below you will find two functions: 
1. `create_addition_data`: Generates sequential training data sets `X` and `T` for the dynamic *adding problem*, returns numpy array.
1. `MyDataset`: a custom PyTorch dataset that makes sure dimensions are as PyTorch likes them, and can return individual samples the way PyTorch wants them.

Note, the data are represented in a dictionary called `data`. To access the training, validation, and testing data, you can call `data["train"]`, `data["valid]`, and `data["test"]` respectively.

In [6]:
def create_addition_data(n_samples=3000):
    # This is a dynamic variant of the adding problem. 
    
    # Random numbers in [-1.0,1.0]): 
    X1 = np.random.uniform(low=-1.0, high=1.0, size=(n_samples,) )   
    
    # Random markers [-1.0, 0.0, 1.0] (1.0 marks the numbers that should be added):
    X2 = np.random.choice([-1.0, 0.0, 1.0], size=(n_samples,), p=[0.25, 0.25, 0.5])
    
    # Combine
    X = np.vstack((X1, X2)).T.astype("float32")

    # Create targets
    T = np.zeros((n_samples, 1)).astype("float32")

    # Get indices of 1.0
    markers = np.nonzero(X2 == 1.0)[0]
    
    # Generate data
    mem = X1[markers[0]]
    for mi, marker in enumerate(markers[1:]):
        T[marker] = mem + X1[marker]
        mem = X1[marker]
                
    return X, T

In [7]:
# Long as the markers x are sparse
X, T = create_addition_data(n_samples=100)
# Print some data
print("Data for the adding problem (x marks 1.0):")
for t in range(X.shape[0]):
    print("Time: {:03d} \t x: ({:+.3f} , {}) \t t: {:+.3f} ".format(
        t, X[t,0], 'x' if X[t,1] == 1.0 else ' ', T[t,0]))

Data for the adding problem (x marks 1.0):
Time: 000 	 x: (+0.908 ,  ) 	 t: +0.000 
Time: 001 	 x: (-0.753 , x) 	 t: +0.000 
Time: 002 	 x: (+0.973 ,  ) 	 t: +0.000 
Time: 003 	 x: (-0.811 , x) 	 t: -1.565 
Time: 004 	 x: (-0.863 ,  ) 	 t: +0.000 
Time: 005 	 x: (-0.261 , x) 	 t: -1.072 
Time: 006 	 x: (+0.571 , x) 	 t: +0.310 
Time: 007 	 x: (-0.708 , x) 	 t: -0.137 
Time: 008 	 x: (-0.696 ,  ) 	 t: +0.000 
Time: 009 	 x: (-0.628 ,  ) 	 t: +0.000 
Time: 010 	 x: (+0.694 ,  ) 	 t: +0.000 
Time: 011 	 x: (-0.117 ,  ) 	 t: +0.000 
Time: 012 	 x: (+0.028 ,  ) 	 t: +0.000 
Time: 013 	 x: (-0.125 , x) 	 t: -0.833 
Time: 014 	 x: (+0.107 , x) 	 t: -0.018 
Time: 015 	 x: (-0.777 ,  ) 	 t: +0.000 
Time: 016 	 x: (+0.985 , x) 	 t: +1.092 
Time: 017 	 x: (+0.812 ,  ) 	 t: +0.000 
Time: 018 	 x: (+0.878 , x) 	 t: +1.863 
Time: 019 	 x: (+0.421 , x) 	 t: +1.299 
Time: 020 	 x: (+0.777 , x) 	 t: +1.198 
Time: 021 	 x: (-0.841 , x) 	 t: -0.065 
Time: 022 	 x: (-0.694 , x) 	 t: -1.535 
Time: 023 	 x:

In [8]:
# Make PyTorch dataset
class MyDataset(torch.utils.data.Dataset):
    
    def __init__(self, X, T):
        self.X = torch.from_numpy(X).type(torch.FloatTensor) # [n_examples, n_samples, n_features]
        self.T = torch.from_numpy(T).type(torch.FloatTensor) # [n_examples, n_samples]
        
    def __getitem__(self, index):
        return self.X[index, :, :], self.T[index]
    
    def __len__(self):
        return self.X.size()[0]

In [9]:
n_examples = 9
n_samples = 3000
data = {}

# Define training data
X = np.zeros((n_examples, n_samples, 2))
T = np.zeros((n_examples, n_samples, 1))
for i_example in range(n_examples):
    X[i_example, :, :], T[i_example, :] = create_addition_data(n_samples)
data["train"] = torch.utils.data.DataLoader(MyDataset(X, T), batch_size=3)

# Define validation data
X = np.zeros((n_examples, n_samples, 2))
T = np.zeros((n_examples, n_samples, 1))
for i_example in range(n_examples):
    X[i_example, :, :], T[i_example, :] = create_addition_data(n_samples)
data["valid"] = torch.utils.data.DataLoader(MyDataset(X, T), batch_size=3)

# Define test data
X = np.zeros((n_examples, n_samples, 2))
T = np.zeros((n_examples, n_samples, 1))
for i_example in range(n_examples):
    X[i_example, :, :], T[i_example, :] = create_addition_data(n_samples)
data["test"] = torch.utils.data.DataLoader(MyDataset(X, T), batch_size=1)

### Exercise 2: training a network  (0.5 points)

We neede a function to train a `model`. This function `train_model(model, data, optimizer, criterion, n_epochs)` should do the following: 

1. Loop `n_epochs` times over the dataset, and loop over minibatches
1. Train the model on the training data and save the loss per epoch
1. Validate the model on the validation data and save the loss per epoch
1. The function should return the trained model and the losses

Note: this function is quite similar again as the function your wrote wor the MLP and CNN. The only difference is that we do not need to compute an accuracy, as we are performing regression here.

### Solution 2

In [10]:
for x,(i) in data["train"]:
    print(x[1][4])
    print(x[1][5])
    print(i[1][5])
    break
    #print(i)

tensor([-0.3464,  0.0000])
tensor([0.1376, 1.0000])
tensor([0.8733])


In [11]:
for x, i in data["test"]:
    print(x[0][249])
    print(x[0][250])
    print(x[0][251])
    print(i[0][251])
    break

tensor([0.6455, 1.0000])
tensor([0.2604, 0.0000])
tensor([0.9978, 1.0000])
tensor([1.6433])


In [12]:
def train_model(model,data,optimizer,criterion,n_epochs):
    train_loss = np.zeros(n_epochs)
    valid_loss = np.zeros(n_epochs)
    
    loss_train = []
    loss_valid = []
    for epoch in range(n_epochs):
        
        for index,(t,i) in enumerate(data["train"]):
            t = t.to(device)
            i = i.to(device)

            optimizer.zero_grad()
            output = model.forward(t)
            loss   = criterion(output,i)
            loss.backward()
            optimizer.step()
            
            loss_train.append(loss.item())
        
        train_loss[epoch] = np.mean(loss_train)
        with torch.no_grad():
            for t,i in data["valid"]:
                t = t.to(device)
                i = i.to(device)
                
                outputs = model(t)
                loss    = criterion(outputs,i)
                
                loss_valid.append(loss.item())
                
            valid_loss[epoch] = np.mean(loss_valid)
        if epoch % 10 == 0:
            print("epoch: [{},{}], train loss: {:.4f}, valid loss {:.4f}"
              .format(epoch+1,n_epochs,train_loss[epoch],valid_loss[epoch]))
    return model,train_loss,valid_loss

### Exercise 3: Testing a network  (1.5 points)

We neede a function to test a trained `model`. This function `test_model(model, data)` should do the following: 

1. Let `model` predict outputs on the testing data. For this, iterate through test data `data["test]` and pass each sample through `model`. 
1. Save the model output as well as the target output
1. The function should return the predicted and target outputs

### Solution 3

In [13]:
def test_model(model,data):
    with torch.no_grad():
        output = []
        target = []
        for (t,i) in data["test"]:
            t = t.to(device)
            i = i.to(device)
        
            predict = model(t)
            output.append(predict[0][:].detach())
            target.append(i[0][:])
    return output, target
        

### Exercise 4: Simple RNN  (3 points)

We first implement a simple recurrent architecture (a simple [Elman network](http://mnemstudio.org/neural-networks-elman.htm)). 

1. First implement the linear layers `l1` and `l2`. They should lead from `n_input` input units over `n_hidden` hidden units to `n_out` output units.
1. Add a recurrent linear weight layer `hr`. These are weights that self-connect to the hidden units. The input will be the values of the `n_hidden` hidden units, and they should project back to the `n_hidden` hidden units. 
1. A forward pass will update the hidden state with the inputs and the recurrent layer weights, and produce the output from the hidden unit. Specifically you should do the following: 
    2. If we are at the first time point, the hidden state should be set to the input passed through `l1` and `tanh` activations.
    2. If the hidden state has information from previous time points: a) Pass the input through `l1`. b) Pass the hidden state through the recurrent weight layer `hr`. c) The sum of a) and b) should be passed through the `tanh` activation. d) The result should be the new hidden state (used for the next time point).
    2. Finally pass the hidden state through layer `l2`. This produces the output `y` for that time point.
1. The forward pass will receive data `x` with shape [batch_size, time_points, features]. So within the forward pass, you will have to loop over time points, performing the steps as descibed above. The output of the forward pass is then output `y` with shape [batch_size, time_points, 1].

Note: this exercise could also be done with nn.RNN(). However, we want you to understand what a RNN is doing, so we want you to use nn.Linear instead.

### Solution 4

$h_t = \text{tanh}(W_{ih} x_t + b_{ih} + W_{hh} h_{(t-1)} + b_{hh})$

In [14]:
## Code here ##
class RecNN(nn.Module):
    def __init__(self, n_input,n_hidden,n_out):
        super(RecNN, self).__init__()
        self.l1 = nn.Linear(n_input,n_hidden)
        self.l2 = nn.Linear(n_hidden,n_out)
        self.hr = nn.Linear(n_hidden,n_hidden)
        self.hidd = self.hidden = torch.tensor([0. for i in range(n_hidden)])
        self.tanh = nn.Tanh()
        
        self.use_gpu= True
        
    def forward(self,x):
        self.hidden = self.tanh(self.l1(x))
        for i in range(x.size()[1]):
            out = self.l1(x)
            hs = self.hr(self.hidden)
            self.hidden = self.tanh(out + hs)
            y  = self.l2(hs)
        return y
        


### Exercise 5: Setup and run (1 point)

Try your simple `RNN` with the dynamic addition task. 

1. Define the model. `RNN` should have **2 hidden units**.
1. Define the loss as the Mean Squared Error loss, and use an Adam optimizer.
1. Train your model for several epochs on the data with `train_model`.
1. Plot the train and validation losses. 
1. Test the trained model with `test_model` 
1. Plot at least one target time series together with the predicted time series

Based on the losses and predictions, what would your conclusion be? Did the simple RNN learn the task? 

In [15]:
model = RecNN(2,2,1)
loss = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)

### Solution 5

In [None]:
model,train_loss,valid_loss = train_model(model,data,optimizer,loss,50)

epoch: [1,50], train loss: 0.3521, valid loss 0.3506


In [None]:
print(train_loss.shape)
print(valid_loss.shape)

In [None]:
epoch_plot = np.arange(50)

plt.plot(epoch_plot,train_loss,valid_loss)
plt.title("Train and Validation loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend(["Train loss","Validation loss"])


plt.show()

In [None]:
predicted, target = test_model(model,data)

In [None]:
for i in predicted:
    print(i.size())

In [None]:
plt.scatter(predicted[1],target[1],s=1)
plt.scatter(target[1],target[1],s=1)
plt.title("Results from the test")
plt.show()

In [None]:
import pandas as pd
plt.plot(target[0],color='orange')
plt.plot(predicted[0],color="blue")

plt.show()

In [None]:
plt.scatter(predicted[1],target[1])
plt.title("Results from the test")
plt.show()

### Exercise 6: LSTM RNN (2 points)

Long-Short Term Memory (LSTM) units have more [powerful functionality](http://colah.github.io/posts/2015-08-Understanding-LSTMs/), such as selective forgetting, and they are able to keep track of long-term dependencies. This might be useful for the adding task. 

Implement the `LSTM` model:

1. `lstm` should be an `LSTM` layer leading from the `n_input` inputs to the `n_hidden` hidden units.
1. `fc` should be a fully-connected (linear) layer leading from the hidden units (output of `lstm`) to the `n_out` output units. 
1. The network does not make use of further activation functions. 

### Solution 6

In [None]:
class LSTM(nn.Module):
    def __init__(n_input,n_hidden,n_out):
        super(LSTM,self).__init__()
        self.lstm = nn.LSTM(n_input,n_hidden)
        self.fc   = nn.Linear(n_hidden,n_out)
    
        self.use_gpu = True
        
    def forward(self,x):
        h0 = torch.zeros(1, x.shape[0], 2)
        c0 = torch.zeros(1, x.shape[2], 2)
        out= x.view(len(x[0]), x.shape[0], -1)
        
        
        

### Exercise 7: Setup and run (1 point)

Try your `LSTM` model with the dynamic addition task. 

1. Define the model. `LSTM` should have **2 hidden units**.
1. Define the loss as the Mean Squared Error loss, and use an Adam optimizer.
1. Train your model for several epochs on the data with `train_model`.
1. Plot the train and validation losses. 
1. Test the trained model with `test_model` 
1. Plot at least one target time series together with the predicted time series

Did the LSTM network capture the task better? Did any of the two capture the task perfectly? Or are the two networks on par? 

### Solution 7

In [None]:
## Code here ##

In [None]:
time = list(range(0, 3000))
plt.scatter(time, predicted[4],s=5)
plt.scatter(time, target[4],s=5)
plt.title("Target- and predicted time series")
plt.xlabel("Time")
plt.ylabel("")
plt.legend(['predicted', 'target'])
plt.show()

In [None]:
def test_model(model, data):
    output = []
    target = []
    for x,t in data["test"]:
        y = model(x)
        output.append(y)
        target.append(t)
    
    return output, target