# Generating data using Kalman filter and predicting with an LSTM.

A Kalman filter is a model for which the following holds: \
$ 2X_{k+1} = AX_k + ϵ_{1,k+1} $ \
Where: 
- $ X_k $ is the unobserved state vector
- $ Y_k $ is the observation
- A is the state dynamic model
- B is the observation model. 
- The variable $ ϵ_1 $ is called the process noise and $ ϵ_2 $ is called observations noise, where $ ∀ \epsilon_{1,i} ∼ N(0, σ_{12}) $ and $ ϵ_{2,i} ∼ N(0, σ_{22}) $.


In [1]:
import numpy as np
import pandas as pd
np.random.seed(123)

A = 0.1*np.array([[np.sqrt(99), -1], [1, np.sqrt(99)]])
B = 0.5*np.array([[np.sqrt(2), -2], [np.sqrt(2), np.sqrt(2)]])

sigma1 = .01
sigma2 = .2

X_0 = np.array([[1,0]]).T
Y_0 = np.dot(B, X_0) + np.random.normal(loc=0, scale=sigma2, size=2).reshape(2,1)

In [2]:
process_length = 2000

X = [X_0]
Y = [Y_0]

for _ in range(process_length):
    X_k = X[-1]
    X_k_next = np.dot(A, X_k) + np.random.normal(loc=0, scale=sigma1, size=2).reshape(2,1)
    Y_k_next = np.dot(B, X_k_next) + np.random.normal(loc=0, scale=sigma2, size=2).reshape(2,1)
    X.append(X_k_next)    
    Y.append(Y_k_next)

def sliding_windows(data, seq_length):
    x = []
    y = []

    for i in range(len(data)-seq_length-1):
        _x = data[i:(i+seq_length)]
        _y = data[i+seq_length]
        x.append(_x)
        y.append(_y)

    return np.array(x),np.array(y)

In [3]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.utils.data as data_utils
torch.manual_seed(123)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyper-parameters
sequence_length = 4
input_size = 2
hidden_size = 4
num_layers = 1
num_classes = 2
batch_size = 1
num_epochs = 15
learning_rate = 0.001

Y_seq, Y_target = sliding_windows(Y, sequence_length)
Y_seq = Variable(torch.Tensor(Y_seq.reshape(-1, sequence_length, 1, 2)))
Y_target = Variable(torch.Tensor(Y_target.reshape(-1, 1, 2)))
train_samples = 1000 - sequence_length

train_tensor = data_utils.TensorDataset(Y_seq[:train_samples], Y_target[:train_samples]) 
train_loader = data_utils.DataLoader(dataset = train_tensor, batch_size = batch_size)

test_tensor = data_utils.TensorDataset(Y_seq[(train_samples+1):], Y_target[(train_samples+1):]) 
test_loader = data_utils.DataLoader(dataset = test_tensor, batch_size = batch_size)

# Recurrent neural network (many-to-one)
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.last_hidden = None
    
    def forward(self, x):
        # Set initial hidden and cell states 
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) 
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        # Forward propagate LSTM
        out, hidden = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)
        self.last_hidden = hidden
        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
                
        return out

In [4]:
def fit_lstm(model, train_loader, test_loader):
    # Loss and optimizer
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    total_step = len(train_loader)
    best_model = {
        'epoch': -1,
        'model_state_dict': None,
        'accuracy': 0,
        'loss': 1,
    }
    # Train the model
    for epoch in range(num_epochs):
        for i, (Y_seq_i, Y_target_i) in enumerate(train_loader):
            model.train()

            Y_seq_i = Y_seq_i.reshape(-1, sequence_length, input_size).to(device)
            Y_target_i = Y_target_i.reshape(-1, num_classes).to(device)
            # Forward pass
            outputs = model(Y_seq_i)
            loss = criterion(outputs, Y_target_i)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()
        test_criterion = torch.nn.MSELoss()
        test_loss = 0
        for i, (Y_seq_i, Y_target_i) in enumerate(test_loader):
            Y_seq_i = Y_seq_i.reshape(-1, sequence_length, input_size).to(device)
            Y_target_i = Y_target_i.reshape(-1, num_classes).to(device)
            test_outputs = model(Y_seq_i)
            test_loss = test_criterion(test_outputs, Y_target_i)

        test_accuracy = 1 - test_loss.item()
        if best_model['accuracy'] < test_accuracy:
            best_model = {
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'accuracy': test_accuracy,
                'loss': loss.item(),
            }
        print("Epoch: %d, loss: %1.5f, test accuracy: %1.5f" % (epoch, loss.item(), test_accuracy))
    print("="*20)
    print(f"Best model using the validation set:\n"
          f" epoch:         [{best_model['epoch']}]\n"
          f" loss:          [{best_model['loss']}]\n"
          f" test accuracy: [{best_model['accuracy']}]\n")
    print("="*20)

    model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)
    model.load_state_dict(best_model['model_state_dict'])
    
    return model

In [5]:
model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)
model = fit_lstm(model, train_loader, test_loader)

Epoch: 0, loss: 0.08312, test accuracy: 0.87508
Epoch: 1, loss: 0.01671, test accuracy: 0.92062
Epoch: 2, loss: 0.01890, test accuracy: 0.92466
Epoch: 3, loss: 0.02164, test accuracy: 0.92762
Epoch: 4, loss: 0.02334, test accuracy: 0.92968
Epoch: 5, loss: 0.02432, test accuracy: 0.93104
Epoch: 6, loss: 0.02489, test accuracy: 0.93196
Epoch: 7, loss: 0.02524, test accuracy: 0.93261
Epoch: 8, loss: 0.02545, test accuracy: 0.93309
Epoch: 9, loss: 0.02558, test accuracy: 0.93347
Epoch: 10, loss: 0.02567, test accuracy: 0.93378
Epoch: 11, loss: 0.02574, test accuracy: 0.93403
Epoch: 12, loss: 0.02578, test accuracy: 0.93424
Epoch: 13, loss: 0.02581, test accuracy: 0.93442
Epoch: 14, loss: 0.02584, test accuracy: 0.93458
Best model using the validation set:
 epoch:         [14]
 loss:          [0.02583935856819153]
 test accuracy: [0.9345771595835686]



## Predict the next observation (Y_1001)

In [6]:
model.eval()
Y_seq_1001, Y_target_1001 = iter(test_loader).next()

Y_seq_1001 = Y_seq_1001.reshape(-1, sequence_length, input_size).to(device)
Y_target_1001 = Y_target_1001.reshape(-1, num_classes).to(device)
    
Y1001_hat = model(Y_seq_1001)

In [7]:
print(f"predicted Y_1001: \n\n{Y1001_hat.data.numpy().reshape(2,1)}\n"
      f"actual Y_1001: \n\n{Y_target_1001.data.numpy().reshape(2,1)}")

predicted Y_1001: 

[[1.0176398 ]
 [0.46983743]]
actual Y_1001: 

[[1.0863123 ]
 [0.37219563]]


## Infer the value of the hidden X_1001
- Fitting another lstm for X|Y
- Training using 900 X,Y pairs
- Testing using next 100 pairs

In [8]:
Y_seq, _ = sliding_windows(Y, sequence_length)
_ , X_target = sliding_windows(X, sequence_length)

Y_seq = Variable(torch.Tensor(Y_seq.reshape(-1, sequence_length, 1, 2)))
X_target = Variable(torch.Tensor(X_target.reshape(-1, 1, 2)))
train_samples = 900 - sequence_length

train_tensor = data_utils.TensorDataset(Y_seq[:train_samples], X_target[:train_samples]) 
train_loader = data_utils.DataLoader(dataset = train_tensor, batch_size = batch_size)

test_tensor = data_utils.TensorDataset(Y_seq[(train_samples+1):1000], X_target[(train_samples+1):1000]) 
test_loader = data_utils.DataLoader(dataset = test_tensor, batch_size = batch_size)

In [9]:
model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)
model = fit_lstm(model, train_loader, test_loader)

Epoch: 0, loss: 0.06843, test accuracy: 0.96136
Epoch: 1, loss: 0.03089, test accuracy: 0.99167
Epoch: 2, loss: 0.02117, test accuracy: 0.99235
Epoch: 3, loss: 0.01710, test accuracy: 0.99399
Epoch: 4, loss: 0.01475, test accuracy: 0.99551
Epoch: 5, loss: 0.01327, test accuracy: 0.99652
Epoch: 6, loss: 0.01227, test accuracy: 0.99711
Epoch: 7, loss: 0.01158, test accuracy: 0.99746
Epoch: 8, loss: 0.01108, test accuracy: 0.99768
Epoch: 9, loss: 0.01072, test accuracy: 0.99784
Epoch: 10, loss: 0.01046, test accuracy: 0.99797
Epoch: 11, loss: 0.01027, test accuracy: 0.99808
Epoch: 12, loss: 0.01015, test accuracy: 0.99819
Epoch: 13, loss: 0.01006, test accuracy: 0.99829
Epoch: 14, loss: 0.01001, test accuracy: 0.99840
Best model using the validation set:
 epoch:         [14]
 loss:          [0.010008081793785095]
 test accuracy: [0.9983952012844384]



In [10]:
model.eval()

Y_seq_1001 = Y_seq[1001].reshape(-1, sequence_length, input_size).to(device)
X_target_1001 = X_target[1001].reshape(-1, num_classes).to(device)
    
X1001_hat = model(Y_seq_1001)

In [11]:
print(f"predicted X_1001: \n\n{X1001_hat.data.numpy().reshape(2,1)}\n"
      f"actual X_1001: \n\n{X_target_1001.data.numpy().reshape(2,1)}")

predicted X_1001: 

[[ 1.0967779 ]
 [-0.04586291]]
actual X_1001: 

[[ 1.1499354 ]
 [-0.07246078]]
