### Train a 2-layer LSTM network to find patterns in time-series data.

In [None]:
%matplotlib inline


import numpy as np
from matplotlib import pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
# from torch.autograd import Variable

In [None]:
class Sequence(nn.Module):
    def __init__(self, hidden_size=50):
        super(Sequence, self).__init__()
        
        # Hidden layer.
        self.hidden_size = hidden_size + 1
        
        # Network architecture.
        self.lstm1 = nn.LSTMCell(input_size=1,
                                 hidden_size=self.hidden_size)
        self.lstm2 = nn.LSTMCell(input_size=self.hidden_size,
                                 hidden_size=self.hidden_size)
        self.linear = nn.Linear(in_features=self.hidden_size,
                                out_features=1)

    def forward(self, inputs:torch.tensor, future:int=0):
        input_size, n_inputs = inputs.size(0), inputs.size(1)
        
        # Hidden & cell state of 1st LSTM layer.
        h_t1 = torch.zeros(input_size, self.hidden_size)
        c_t1 = torch.zeros(input_size, self.hidden_size)

        # Hidden & cell state for 2nd LSTM layer.
        h_t2 = torch.zeros(input_size, self.hidden_size)
        c_t2 = torch.zeros(input_size, self.hidden_size)
        
        # Outputs
        outputs, output = [], 0

        # Every input entry is a new time step.
        for t, input_t in enumerate(inputs.chunk(n_inputs, dim=1)):
            # h_t1, c_t1 = self.lstm1(input=input_t, weight=(h_t1, c_t1))
            print(input_t.astype(torch.double))
            h_t1, c_t1 = self.lstm1(input_t, (h_t1, c_t1))
            h_t2, c_t2 = self.lstm2(h_t1, (h_t2, c_t2))
            output = self.linear(h_t2)
            outputs.append(output)
            
        # If we should predict the future.
        for t in range(future):
            h_t1, c_t1 = self.lstm1(output, (h_t1, c_t1))
            h_t2, c_t2 = self.lstm2(h_t1, (h_t2, c_t2))
            output = self.linear(h_t2)
            outputs.append(output)
        
        # Create stacked torch tensor from outputs.
        outputs = torch.stack(outputs, dim=1).squeeze(2)
        
        return outputs

In [None]:
np.random.seed(0)
torch.manual_seed(0)

# Load the dataset.
data_dir = '../datasets/time-series/sine-waves.pt'
data = torch.load(data_dir)

print('Data: {}'.format(data.shape))

In [None]:
# Testing samples & skip frequency.
n_test, skip = 3, 1

# Training set.
X_train = torch.from_numpy(data[n_test:, :-skip])
y_train = torch.from_numpy(data[n_test:, skip:])

# Testing set.
X_test = torch.from_numpy(data[:n_test, :-skip])
y_test = torch.from_numpy(data[:n_test, skip:])

In [None]:
# Data shapes.
print('X_train: {}\ty_test: {}'.format(X_train.size(), y_train.size()))
print('X_test: {}\ty_test: {}'.format(X_test.size(), y_test.size()))

In [None]:
# Hyperparameters.
hidden_size = 50
lr = 1e-1

# Build model.
model = Sequence(hidden_size=hidden_size)

In [None]:
print('Model structure:\n{}'.format(model))

In [None]:
# Loss function criterion.
criterion = nn.MSELoss()
# Optimizer: LBFGS, since we can load the whole data to train.
optimizer = optim.LBFGS(model.parameters(), lr=lr)

In [None]:
def visualize(inputs, target, t, future):
    plt.figure(figsize=(30, 10))
    
    
    def draw(y_t, **kwargs):
        size = inputs.size(0)
        
        plt.plot(np.arange(size), y_t[:size].numpy(), 
                 linestyle='solid', linewidth=2., **kwargs)
        # plt.plot(np.arange(size, size + size), y_t[size:].numpy(), 
        #          linestyle='dashed', linewidth=2., **kwargs)
    
    # Plot values.
    draw(target[0], color='r', label='1st index')
    draw(target[1], color='g', label='2nd index')
    draw(target[2], color='b', label='3rd index')

    plt.legend()
    plt.title('Predict future values for time sequences', fontsize=30)
    
    plt.xlabel('X-axis', fontsize=20)
    plt.ylabel('Y-axis', fontsize=20)
    
    plt.xticks(fontsize=20)
    plt.yticks(fontsize=20)
    
    plt.savefig('predict-{:04d}.pdf'.format(t))
    plt.close()

In [None]:
steps = 15

for t in range(steps):
    print('\nStep: {}'.format(t))
    
    def closure():
        # Clear optimizer gradient buffer.
        optimizer.zero_grad()
        
        # Make a prediction.
        pred = model(X_train)
        
        # Compute loss given prediction & grand truth.
        loss = criterion(y_pred, y_train)
        print('Loss: {:.3f}'.format(loss.item()))
        
        # Compute gradient for loss w.r.t. trainable variables.
        loss.backward()
        
        # Return the loss (for optimizer to minimize).
        return loss
    
    # Optimizer update step.
    optimizer.step(closure)
    
    # Predict: No need to compute/track gradients
    with torch.no_grad():
        future = 1000
        pred = model(X_test, future=future)
        loss = criterion(pred[:, :-future], y_test)
        print('Loss: {:.3f}'.format(loss.item()))
        # `.detach()` to stop tensor from tracking history.
        y = pred.detach().numpy()
    
    # Saves `.pdf` file on disk.
    visualize(inputs=X_test, target=y_test, t=t, future=future)