<a href="https://colab.research.google.com/github/shengpu-tang/CS334-demo/blob/main/RNN_Sine_Wave.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sample Code: Train an RNN to predict the sine wave

Example of sequence prediction using Recurrent Neural Net with LSTM cell

Reference: https://github.com/pytorch/examples/tree/master/time_sequence_prediction

In [None]:
%config InlineBackend.figure_format = 'svg'

In [None]:
# GPU support
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('using device:', device)

In [None]:
import numpy as np
import os
import scipy
import time

import torch
import torch.nn as nn
import torch.optim as optim

from matplotlib import pyplot as plt

In [None]:
# Generate synthetic data using sine wave
import numpy as np
np.random.seed(2)
T_ = 20
L = 1000
N = 100
t = np.empty((N, L), 'int64')
t[:] = np.arange(L) + np.random.randint(-4*T_, 4*T_, N).reshape(N, 1)
x = np.sin(t / 1.0 / T_).reshape(N, L, 1)

In [None]:
# Show two examples of our data
fig = plt.figure(figsize=(10,5))
plt.plot(x[0], 'b', label='$\\vec{x}^{(1)}$')
plt.plot(x[3], 'r', label='$\\vec{x}^{(4)}$')
plt.title('Training examples')
plt.xlabel('t')
plt.ylabel('$x_t$')
plt.legend()
plt.show()

In [None]:
# Note the dimension of x
print(x.shape)
N, L, d = x.shape

In [None]:
import torch
import torch.nn as nn

class RNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.lstm1 = nn.LSTMCell(1, 51)
        self.lstm2 = nn.LSTMCell(51, 51)
        self.linear = nn.Linear(51, 1)

    def forward(self, x, future=0):
        N, T, d = x.shape

        outputs = []
        h1_t, c1_t, h2_t, c2_t = self.init_hidden(x)
        for t in range(T):
            h1_t, c1_t = self.lstm1(x[:,t,:], (h1_t, c1_t))
            h2_t, c2_t = self.lstm2(h1_t, (h2_t, c2_t))
            output = self.linear(h2_t)
            outputs += [output]

        # if we should predict the future
        for t in range(future):
            h1_t, c1_t = self.lstm1(output, (h1_t, c1_t))
            h2_t, c2_t = self.lstm2(h1_t, (h2_t, c2_t))
            output = self.linear(h2_t)
            outputs += [output]

        outputs = torch.stack(outputs, 1)
        return outputs

    def init_hidden(self, x):
        input_size = x.size(0)
        h1_t = torch.zeros(input_size, 51).float().to(x.device)
        c1_t = torch.zeros(input_size, 51).float().to(x.device)
        h2_t = torch.zeros(input_size, 51).float().to(x.device)
        c2_t = torch.zeros(input_size, 51).float().to(x.device)
        return h1_t, c1_t, h2_t, c2_t

In [None]:
# set random seed to 0
np.random.seed(0)
torch.manual_seed(0)

# load data and make training set
train_x = torch.from_numpy(x[3:, :-1]).float().to(device)
train_y = torch.from_numpy(x[3:, 1:]).float().to(device)
test_x  = torch.from_numpy(x[:3, :-1]).float().to(device)
test_y  = torch.from_numpy(x[:3, 1:]).float().to(device)

# build the model
net = RNN().to(device)
criterion = nn.MSELoss()

# use LBFGS as optimizer since we can load the whole data to train
optimizer = optim.LBFGS(net.parameters(), lr=0.8)

# begin to train
for i in range(10):
    print('STEP: ', i)
    def closure():
        optimizer.zero_grad()
        out = net(train_x)
        loss = criterion(out, train_y)
        print('loss:', loss.detach().cpu().item())
        loss.backward()
        return loss
    optimizer.step(closure)

    # begin to predict, no need to track gradient here
    with torch.no_grad():
        future = 1000
        pred = net(test_x, future=future)
        loss = criterion(pred[:, :-future], test_y)
        print('test loss:', loss.item())
        y = pred.detach().cpu().numpy()

    # draw the result
    plt.figure(figsize=(10,5))
    plt.title('Predict future values for time sequences\n(Dashlines are predicted values)')
    plt.xlabel('x')
    plt.ylabel('y')
    plt.xticks()
    plt.yticks()
    def draw(yi, color):
        plt.plot(np.arange(train_x.size(1)), yi[:train_x.size(1)], color, linewidth = 2.0)
        plt.plot(np.arange(train_x.size(1), train_x.size(1) + future), yi[train_x.size(1):], color + ':', linewidth = 2.0)
    draw(y[0], 'r')
    draw(y[1], 'g')
    draw(y[2], 'b')
    plt.show()