In [None]:
import numpy as np

import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torch.nn as nn
import torch.optim as optim

In this demo, we will show how vanilla Recurrent Neural Networks (RNNs) can be used to solve time-series extrapolation problems -- That is, problems that require you to predict the "next value" given a series of values.

# Generate Data

Here, we will try to use a RNN to predict a sine wave. We create a dataset consisting of a random length of a sine wave with Gaussian noise.

In [None]:
#Standard Deviation of noise we will add to our generated curves
SIGMA = 0.02
#Number of curves we will generate
N = 20

#dataset will hold values of all generated curves
dataset = []

for _ in range(N):
    #Domain of our curve will be start to end
    #Note start is also randomized now
    start = np.random.uniform(0, 10)
    end = np.random.uniform(40, 50)
    #We will generate a point for all x values divisible by 0.1
    t = np.arange(start, end, 0.1)
    #Generate values as sin(t) + noise
    x = np.sin(t) + np.random.normal(loc=0.0, scale=SIGMA, size=t.shape)
    #Add generated curve to dataset
    dataset.append(x)

#Plot example curve
plt.figure(figsize=(16, 9))
plt.scatter(np.arange(dataset[0].shape[0]), dataset[0], s=20)
plt.show()

In [None]:
#1/C_factor of the data will be used by context LSTM.  With C_factor = 2 the data
#is split half and half.  Further comments will just use 2 instead of C_factor.
C_factor = 2
#Each 'data point' in our dataset will be ([y_0,y_1,...,y_(n-1)],[y_n/2,y_(n/2+1),...,y_n])
#Our model will predict future values so the output will be of shifted time index.
dataset = [
    (torch.from_numpy(input_seq[:-1]).double(), \
     torch.from_numpy(input_seq[int((np.size(input_seq)-1)/C_factor)+1:]).double()) \
        for input_seq in dataset
]

# Model Definition

In [None]:
#Definition of our Model
#With an LSTM we will use the first half of the input data to determine
#the state of the curve we are working with.
#The second half of the data will be used by a RNN to make predictions 
#considering both the data and the state computed by the LSTM.
#Motivation: LSTM computes phase, RNN showed earlier that given phase
#information it can appropriately model a sine wave.
class LSTM_RNN(nn.Module):
    #Initialize LSTM_RNN module
    def __init__(self, hidden_size):
        
        super().__init__()
        
        #Hidden size for LSTM and RNN determined by argument
        (self.lstm_hidden_size, self.rnn_hidden_size) = hidden_size
        
        #Initial state for LSTM
        self.init_hidden = torch.nn.Parameter(torch.zeros(1,1,self.lstm_hidden_size))
        self.init_cell = torch.nn.Parameter(torch.zeros(1,1,self.lstm_hidden_size))
        
        #Initialize LSTM
        self.lstm = nn.LSTM(input_size=1, hidden_size=self.lstm_hidden_size)
        
        #Transformation matrix to be used to transform final LSTM cell state to
        #initial RNN hidden state
        self.state_trans = torch.nn.Parameter(torch.randn(self.lstm_hidden_size,self.rnn_hidden_size))
        
        #Initialize RNN
        self.rnn = nn.RNN(input_size=1,hidden_size=self.rnn_hidden_size)
        
        #linear mapping used to map outputs of RNN to predictions
        self.linear = nn.Linear(in_features=self.rnn_hidden_size, out_features=1)
        
    #Forward pass through model.  Returns predictions for second half of data and final hidden state.
    def forward(self, inputs, teacher_force_prob=0.5):
        #Number of points in passed in input
        seq_len = inputs.shape[0]
        #List to contain outputs of the model
        outputs = []
        #Length of input to be used as context, (1/2 of the data)
        c_length = int(seq_len/C_factor)
        context = inputs.narrow(0,0,c_length)
        #Final cell state is used by passing the context of the input through LSTM
        _, (_, final_cell) = self.lstm(context.reshape(c_length, 1, 1), (self.init_hidden, self.init_cell))
        #Initial hidden state of RNN computed by transforming final cell state of LSTM
        h = torch.matmul(final_cell, self.state_trans)
        #For every data point in second half of input
        for i, inp in enumerate(inputs.narrow(0,c_length, seq_len - c_length)):
            #If it's the first point or with probability teacher_force_prob
            #we use the actual previous value given from the data
            #Otherwise we use what the model outputted last as input
            if i == 0 or np.random.uniform() < teacher_force_prob:
                inp = inp
            else:
                inp = output
            #We reshape input as the RNN module expects (num_time_steps,batch_size,input_size)
            #dimensionality of it's input.  We are manually going through one time step at a time,
            #learning from one example at a time (batch size = 1), and our input size is 1.
            #RNN module outputs final output and hidden state.  Since we are just working with one
            #time step this is simply the next output and next hidden state
            output, h = self.rnn(inp.reshape(1, 1, 1), h)
            #Output will be of size (time_steps=1,batch_size=1,output_size=1) so we squeeze to get
            #a single value.  Then we apply our linear transformation
            output = self.linear(torch.squeeze(output))
            #Append output to our list of outputs
            outputs.append(output)
        #Save final hidden state to return
        final_h = h
        
        outputs = torch.squeeze(torch.cat(outputs))
        
        return outputs, final_h
    
    #Given a set of inputs runs through model collecting final hidden state
    #Using hidden state we extrapolate exclusively feeding the output of the 
    #model back in to create a self sustaining model of the future
    def extrapolate(self, inputs, num_futures):
        #We don't need to keep track of gradients here (not used for learning)
        with torch.no_grad():
            #Get final hidden state of model after passing through given inputs
            #As well as outputs over the course of the forward pass.
            outputs, final_h = self.forward(inputs)
            
            #Initialize lists to contain future extrapolated points
            future_points = []
            #Initialize hidden state as final hidden state found earlier
            h = final_h
            #Last predicted point initialized to last point predicted by model
            pred_pt = outputs[-1]
            #For every point you want to extrapolate
            for _ in range(num_futures):
                #Output and next hidden state gotten through forward pass
                pred, h = self.rnn(pred_pt.reshape(1, 1, 1), h)
                #Apply linear transformation on output to get predicted point
                pred_pt = self.linear(pred)
                #Append predicted point to list of extrapolated points
                future_points.append(pred_pt.item())
            #Return an ndarray containing extrapolated points
            return np.array(future_points)
                


# Training

In [None]:
#Number of Epochs
EPOCHS = 20
#Learning Rate
LR = 0.04
#Momentum Constant
BETA = 0.2
#Dimensionality of RNN Hidden State
HIDDEN_SIZE = (16, 64)

#Initialize network
net = LSTM_RNN(HIDDEN_SIZE).double()

#We will use Mean Squared Loss
criterion = nn.MSELoss()
#Optimize using Stochastic Gradient Descent
optimizer = optim.SGD(net.parameters(), lr=LR, momentum=BETA)

for i in range(EPOCHS):
    #Value to display reset each epoch
    total_loss = 0
    #Online learning - Update weights after each example
    for inputs, target in dataset:
        #We are only interested in the predictions made by the model
        #So only save the outputs given the inputs
        output, _ = net(inputs)
        #'Squeeze' the outputs to reduce to a single dimensional list of predictions
        output = output.reshape(-1)
        #Loss is the mean squared error with the expected targets
        loss = criterion(output, target)
        
        #Zero the gradients before computing new gradients
        net.zero_grad()
        #Compute gradients in a backward pass
        loss.backward()
        #Update the parameters to minimize loss
        optimizer.step()
        
        #Increment the total loss for this Epoch
        total_loss += loss
    #Divide total loss by the length of the dataset to get 'average' total loss  
    total_loss /= len(dataset)
    #Display average loss for Epoch
    print(f"Epoch {i+1}: {total_loss.item()}")
    
    

# Extrapolation Demo

In [None]:
#Number of points we want to extrapolate
num_future = 200

#We will use the first example function in our dataset.  Note the second 0 is to retrieve
#the points [y_0,y_1,...,y_(n-1)]] as we saved earlier. (dataset[i][1]) would get
#[y_1,y_2,...,y_n])
ground = dataset[np.random.randint(0,N)][0]
#Number of points we are given
num_ground = ground.shape[0]
#Our extrapolated points
prediction = net.extrapolate(ground, num_future)

#Plot the extrapolated points along with the given input.
plt.figure(figsize=(16, 9))
plt.scatter(np.arange(0, num_ground), ground, s=20)
plt.scatter(np.arange(num_ground, num_ground+num_future), prediction, s=10)
plt.show()