# Main script for the Robust KalmanNet project

Import of important modules:

In [None]:
# Select the SS model you would like to evaluate with Robust KalmanNet:
# 0 - Synthetic NL model
# 1 - Discrete Time Lorentz Attractor

SELECTED_MODEL = 1

In [None]:
import torch
from torch.nn.functional import mse_loss
import Simulations.config as config
import matplotlib.pyplot as plt
import math
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import numpy as np
import pandas as pd

from datetime import datetime

assert SELECTED_MODEL <= 1, f"The selected model {SELECTED_MODEL} does not exist!"
if SELECTED_MODEL == 0:
    from Simulations.Synthetic_NL_model.parameters import Q_structure, R_structure,m,n,m1x_0,m2x_0, \
        f,h
elif SELECTED_MODEL == 1:
    from Simulations.Lorenz_Atractor.parameters import m1x_0, m2x_0, m, n,\
    f_gen, f, f_nobatch, h, h_nobatch, hRotate_nobatch, hRotate, H_Rotate, H_Rotate_inv, Q_structure, R_structure

from Simulations.Extended_sysmdl import SystemModel
from Simulations.utils import DataGen
from RobustKalmanPY.robust_kalman import RobustKalman

# KalmanNet
from Pipelines.Pipeline_EKF import Pipeline_EKF
from KNet.KalmanNet_nn import KalmanNetNN

# tqdm for jupyter notebook
from tqdm.notebook import tqdm

# Just so that some warnings are ignored
import warnings
warnings.filterwarnings('ignore')

## Settings

In [None]:
args = config.general_settings()

# Dataset parameters
# N is the number of sequences to be generated
args.N_E = 1 #length of training dataset
args.N_CV = 1 #length of validation dataset 
args.N_T = 1 #length of test dataset 
args.T = 100 #input sequence length
args.T_test = 100 #input test sequence length

# Training parameters
args.use_cuda = False # use GPU or not (True = use GPU)
args.n_steps =  200 #number of training steps, i.e. number of epochs (default: 1000)
args.n_batch = 30 #input batch size for training (default: 20)
args.lr = 1e-3 #learning rate (default: 1e-3)
args.wd = 1e-3 #weight decay (default: 1e-4)
device = torch.device('cpu')

path_results = 'KNet/'
if SELECTED_MODEL == 0:
    DatafolderName = 'Simulations/Synthetic_NL_model/data' + '/'
else:
    DatafolderName = 'Simulations/Lorenz_Atractor/data' + '/'

# Noise q and r
r2 = torch.tensor([0.1]) # [100, 10, 1, 0.1, 0.01]
vdB = -20 # ratio v=q2/r2
v = 10**(vdB/10) #vdb = 10*log(q2/r2)
q2 = torch.mul(v,r2) #see paper pag. 8 
print("1/r2 [dB]: ", 10 * torch.log10(1/r2[0]))
print("1/q2 [dB]: ", 10 * torch.log10(1/q2[0]))

Q = q2[0] * Q_structure  #defining the transition matrix noise
R = r2[0] * R_structure  #defining the output matrix noise

# Filenames to load the data
if SELECTED_MODEL == 0:
    traj_resultName = ['traj_synNL_T100.pt']
    dataFileName = ['data_synNL_T100.pt']
else:
    traj_resultName = ['traj_lorDT_rq1030_T100.pt']
    dataFileName = ['data_lor_v20_rq1030_T100.pt'] 

## Generation and loading of data

NOTE: For the Jacobian computation both analytical (computed with MATLAB symbolic toolbox) and numerical have been tested.

In [None]:
# Model Parameters
if SELECTED_MODEL == 0:
    # Synthetic NL model
    sys_model = SystemModel(f, Q, h, R, args.T, args.T_test, m, n)
    sys_model.InitSequence(m1x_0, m2x_0)# x0 and P0
    sys_model.alpha = 0.9
    sys_model.beta = 1.1
    sys_model.phi = 0.1*math.pi
    sys_model.delta = 0.01
    sys_model.a = 1
    sys_model.b = 1
    sys_model.c = 0
else:
    # Discrete Time Lorentz Attractor
    sys_model = SystemModel(f_nobatch, Q, h_nobatch, R, args.T, args.T_test, m, n)
    sys_model.InitSequence(m1x_0, m2x_0)

    # Model with partial info
    sys_model_partial = SystemModel(f_nobatch, Q, h_nobatch, R, args.T, args.T_test, m, n)
    sys_model_partial.InitSequence(m1x_0, m2x_0)

print("Start Data Gen")
DataGen(args, sys_model, DatafolderName + dataFileName[0])
print("Data Load")
print(dataFileName[0])
[train_input,train_target, _, _, _, _,_,_,_] =  torch.load(DatafolderName + dataFileName[0], map_location=device)
print("trainset input size:",train_input.size())
print("trainset target size:",train_target.size())

# Check of dimensions of inputs and outputs - Data Preprocessing
train_input = torch.squeeze(train_input) 
train_target = torch.squeeze(train_target)
print("trainset input size:",train_input.size())
print("trainset target size:",train_target.size())

Plot of the test data

In [None]:
target = torch.Tensor.numpy(torch.transpose(train_target,0, 1))
inputs = torch.Tensor.numpy(torch.transpose(train_input,0, 1))

if SELECTED_MODEL == 0:
    fig = make_subplots(rows=2, cols=1, subplot_titles=("X[n]","Y[n]"))
    
    fig.append_trace(go.Scatter(x = np.linspace(1, target.shape[0], target.shape[0]+1),
        y=target[:,0], name = 'Target X_1'
    ), row=1, col=1)
    fig.append_trace(go.Scatter(x = np.linspace(1, target.shape[0], target.shape[0]+1),
        y=target[:,1],name = 'Target X_2'
    ), row=1, col=1)
    
    fig.append_trace(go.Scatter(x = np.linspace(1, inputs.shape[0], inputs.shape[0]+1),
        y=inputs[:,0],name = 'Y_1'
    ), row=2, col=1)
    fig.append_trace(go.Scatter(x = np.linspace(1, inputs.shape[0], inputs.shape[0]+1),
        y=inputs[:,1],name = 'Y_2'
    ), row=2, col=1)
    
    fig.show()
else:
    fig = make_subplots(rows=3, cols=1, subplot_titles=("X[n]","Y[n]"))
    
    fig.append_trace(go.Scatter(x = np.linspace(1, target.shape[0], target.shape[0]+1),
        y=target[:,0], name = 'Target X_1'
    ), row=1, col=1)
    fig.append_trace(go.Scatter(x = np.linspace(1, target.shape[0], target.shape[0]+1),
        y=target[:,1],name = 'Target X_2'
    ), row=1, col=1)
    fig.append_trace(go.Scatter(x = np.linspace(1, target.shape[0], target.shape[0]+1),
        y=target[:,2],name = 'Target X_3'
    ), row=1, col=1)
    
    fig.append_trace(go.Scatter(x = np.linspace(1, inputs.shape[0], inputs.shape[0]+1),
        y=inputs[:,0],name = 'Y_1'
    ), row=2, col=1)
    fig.append_trace(go.Scatter(x = np.linspace(1, inputs.shape[0], inputs.shape[0]+1),
        y=inputs[:,1],name = 'Y_2'
    ), row=2, col=1)
    fig.append_trace(go.Scatter(x = np.linspace(1, inputs.shape[0], inputs.shape[0]+1),
        y=inputs[:,2],name = 'Y_2'
    ), row=2, col=1)

    fig.append_trace(go.Scatter(x = target[:, 0],
        y=target[:, 2],name = 'X-Z plot of Lorentz Attractor'
    ), row=3, col=1)
    
    fig.show()

Generate multiple test sequences

In [None]:
num_of_test_sets = 100
test_inputs = []
test_targets = []

print(rf"## GENERATING TEST SEQUENCE OF {num_of_test_sets} SEQUENCES ##")
for i in tqdm(range(num_of_test_sets)):
    
    DataGen(args, sys_model, DatafolderName + dataFileName[0])
    [_, _, _, _, test_input, test_target, _, _, _] = torch.load(DatafolderName + dataFileName[0], map_location=device)

    test_inputs.append(test_input)
    test_targets.append(test_target)

Create the dataframe for testing results

In [None]:
# Initialize dataframe for the test data
if SELECTED_MODEL == 0:
    # Synthetic NL Model
    test_data_synt = pd.DataFrame(columns=['MSE REKF','Computational Time REKF [s]','MSE Robust KNet','Computational Time Robust KNet [s]','MSE KalmanNet','Computational Time KalmanNet [s]'])
else:
    # Discrete Time Lorentz Attractor
    test_data_synt = pd.DataFrame(columns=['MSE REKF','Computational Time REKF [s]', 'MSE REKF-PARTIAL','Computational Time REKF-PARTIAL [s]','MSE Robust KNet','Computational Time Robust KNet [s]','MSE Robust KNet-PARTIAL','Computational Time Robust KNet-PARTIAL [s]','MSE KalmanNet','Computational Time KalmanNet [s]','MSE KalmanNet-PARTIAL','Computational Time KalmanNet-PARTIAL [s]'])


## Test of REKF

In [None]:
print(rf"EVALUATING MSE AND COMPUTATIONAL TIME ON THE TEST SEQUENCES")

sys_model.m1x_0 = torch.zeros(m, 1)

# Test on different noise matrices Q and R
sys_model.Q = Q
sys_model.R = R
#sys_model_partial.Q = Q
#sys_model_partial.R = R

# Constant tolerance value
c = 1e-3

i = 0
for test_input, test_target in tqdm(zip(test_inputs, test_targets), total=len(test_inputs)):
    test_input = torch.squeeze(test_input) 
    test_target = torch.squeeze(test_target)

    if SELECTED_MODEL == 0:
        REKF = RobustKalman(sys_model, test_input, c, True, False)
    
        [Xrekf,_,comp_time_rekf] = REKF.fnREKF()
    
        mse_rekf = mse_loss(Xrekf[:,:Xrekf.size()[1]-1], test_target)
    
        # Save MSE and Computational time values on dataframe
        test_data_synt.loc[i,'MSE REKF':'Computational Time REKF [s]'] = [mse_rekf.item(), comp_time_rekf]
        i+=1
    else:
        # Full information 
        REKF = RobustKalman(sys_model, test_input, c, False, False, sl_model = SELECTED_MODEL)
        # Partial information
        REKF_partial = RobustKalman(sys_model_partial, test_input, c, False, False, sl_model = SELECTED_MODEL)
        
        [Xrekf,_,comp_time_rekf] = REKF.fnREKF()
        [Xrekf_partual,_,comp_time_rekf_partial] = REKF_partial.fnREKF()
    
        mse_rekf = mse_loss(Xrekf[:,:Xrekf.size()[1]-1], test_target)
        mse_rekf_partial = mse_loss(Xrekf_partual[:,:Xrekf_partual.size()[1]-1], test_target)
    
        # Save MSE and Computational time values on dataframe
        test_data_synt.loc[i,'MSE REKF':'Computational Time REKF [s]'] = [mse_rekf.item(), comp_time_rekf]
        test_data_synt.loc[i,'MSE REKF-PARTIAL':'Computational Time REKF-PARTIAL [s]'] = [mse_rekf_partial.item(), comp_time_rekf_partial]
        i+=1
        
print("\n######   Test REKF   ######",f"\nAverage MSE: {test_data_synt.mean()['MSE REKF']:.5f}",f"\nAverage Computational Time [s]: {test_data_synt.mean()['Computational Time REKF [s]']:.5f}")

In [None]:
if SELECTED_MODEL == 0:
    
    REKFfig = make_subplots(rows=3, cols=1, subplot_titles=("X[n] - Only one sequence out of N","MSE[N]", "COMP_T[s][N]"))
    
    REKFfig.append_trace(go.Scatter(x = np.arange(len(test_data_synt)), y = torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 0], name = 'Target X_1'), row=1, col=1)
    REKFfig.append_trace(go.Scatter(x = np.arange(len(test_data_synt)), y = torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 1], name = 'target X_2'), row=1, col=1)
    REKFfig.append_trace(go.Scatter(x = np.arange(len(test_data_synt)), y = torch.squeeze(torch.transpose(Xrekf,0, 1)).numpy()[0:-1, 0], name = 'estimated X_1'), row=1, col=1)
    REKFfig.append_trace(go.Scatter(x = np.arange(len(test_data_synt)), y = torch.squeeze(torch.transpose(Xrekf,0, 1)).numpy()[0:-1, 1], name = 'estimated X_2'), row=1, col=1)
    
    REKFfig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=test_data_synt['MSE REKF'], name='MSE'), row=2, col=1)
    REKFfig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=np.linspace(test_data_synt.mean()[0], test_data_synt.mean()[0], len(test_data_synt)), name = 'MSE MEAN'), row=2, col=1)
    
    REKFfig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=test_data_synt['Computational Time REKF [s]'], name='MSE'), row=3, col=1)
    REKFfig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=np.linspace(test_data_synt.mean()[1], test_data_synt.mean()[1], len(test_data_synt)), name = 'MSE MEAN'), row=3, col=1)
    
    REKFfig.show()
    
else:
    
    REKFfig = make_subplots(rows=1, cols=1, subplot_titles=("X[n] - Only one sequence out of N","MSE[N]", "COMP_T[s][N]"))
    x_values = np.arange(len(test_data_synt))
    
    REKFfig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 0].shape[0], torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 0].shape[0]+1),
        y=torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 0], name = 'target X_1'
    ), row=1, col=1)
    REKFfig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 1].shape[0], torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 1].shape[0]+1),
        y=torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 1], name = 'target X_2'
    ), row=1, col=1)
    REKFfig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 2].shape[0], torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 2].shape[0]+1),
        y=torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 2], name = 'target X_3'
    ), row=1, col=1)
    REKFfig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(Xrekf,0, 1)).numpy()[0:-1, 0].shape[0], torch.squeeze(torch.transpose(Xrekf,0, 1)).numpy()[0:-1, 0].shape[0]+1),
        y=torch.squeeze(torch.transpose(Xrekf,0, 1)).numpy()[0:-1, 0], name = 'estimated X_1'
    ), row=1, col=1)
    REKFfig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(Xrekf,0, 1)).numpy()[0:-1, 1].shape[0], torch.squeeze(torch.transpose(Xrekf,0, 1)).numpy()[0:-1, 1].shape[0]+1),
        y=torch.squeeze(torch.transpose(Xrekf,0, 1)).numpy()[0:-1, 1], name = 'estimated X_2'
    ), row=1, col=1)
    REKFfig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(Xrekf,0, 1)).numpy()[0:-1, 2].shape[0], torch.squeeze(torch.transpose(Xrekf,0, 1)).numpy()[0:-1, 1].shape[0]+1),
        y=torch.squeeze(torch.transpose(Xrekf,0, 1)).numpy()[0:-1, 2], name = 'estimated X_3'
    ), row=1, col=1)
    
    """
    REKFfig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=test_data_synt['MSE REKF'], name='MSE'), row=2, col=1)
    REKFfig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=np.linspace(test_data_synt.mean()[0], test_data_synt.mean()[0], len(test_data_synt)), name = 'MSE MEAN'), row=2, col=1)

    REKFfig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=test_data_synt['Computational Time REKF [s]'], name='COMP-T'), row=3, col=1)
    REKFfig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=np.linspace(test_data_synt.mean()[1], test_data_synt.mean()[1], len(test_data_synt)), name = 'COMP-T MEAN'), row=3, col=1)
    """

    REKFfig.show()

    Lorentz = make_subplots(rows=1, cols=1, subplot_titles=("A sample plot of the test data", "A sample plot of the filtered data"))
    Lorentz.append_trace(go.Scatter(x = test_target[0,:], y= test_target[2, :],name="Target"), row=1, col=1)
    Lorentz.append_trace(go.Scatter(x = Xrekf[0,:], y= Xrekf[2, :], name="Estimated"), row=1, col=1)
    
    Lorentz.show()

## Robust-KalmanNet

Training phase

In [None]:
import torch.optim as optim
import torch.nn as nn
from tqdm.notebook import tqdm  # Use tqdm notebook version for Jupyter

sys_model.m1x_0 = torch.zeros(m, 1)

# Select the number of hidden-layers and neurons in the DNN
layers = [20, 20, 20, 20, 20]

# Initialize the noise covariance matrices Q and R for the filter
Q_2 = torch.eye(sys_model.m)
R_2 = torch.eye(sys_model.n)

# Create the Robust KalmanNet instance
RT_KalmanNet = RobustKalman(sys_model, train_input, c, False, True, input_feat_mode=3, hidden_layers = layers, sl_model = SELECTED_MODEL, set_noise_matrices = False, Q_mat = Q_2, R_mat = R_2)

# Hyperparameters
epochs = args.n_steps  # Number of epochs

train_inputs = []
train_targets = []
cv_inputs = []
cv_targets = []

# Generate training and cross-validation datasets
print(f"GENERATING TRAINING AND CROSS_VALIDATION DATA")
for epoch in tqdm(range(epochs)):
    
    DataGen(args, sys_model, DatafolderName + dataFileName[0])
    [train_input, train_target, cv_input, cv_target, _, _, _, _, _] = torch.load(DatafolderName + dataFileName[0], map_location=device)

    train_inputs.append(train_input)
    train_targets.append(train_target)
    cv_inputs.append(cv_input)
    cv_targets.append(cv_target)

In [None]:
# Set optimizer and loss function type
optimizer = optim.Adam(RT_KalmanNet.nn.parameters(), lr=args.lr, weight_decay=args.wd)
criterion = nn.MSELoss(reduction='mean')  # Minimizing square error w.r.t. state estimate

opt_MSE = float('inf')
opt_model_folder = "RobustKalmanPY/"

# Training procedure
for epoch in tqdm(range(epochs), desc="Training Progress", unit="epoch"):

    # Normalize data
    train_input = torch.squeeze(train_inputs[epoch])
    train_target = torch.squeeze(train_targets[epoch])

    # Zeroing gradients
    optimizer.zero_grad()

    # Set input
    RT_KalmanNet.y = train_input

    # Forward pass
    [Xrekf,_,_,_] = RT_KalmanNet.fnREKF(train=True)
    Xrekf = Xrekf[:,:-1]

    # Compute loss
    loss = criterion(Xrekf, train_target)

    # Backward pass
    with torch.autograd.set_detect_anomaly(True):
        loss.backward(retain_graph=True)

    # Optimization step
    optimizer.step()

    # Cross-Validation procedure
    cv_input = torch.squeeze(cv_inputs[epoch])
    cv_target = torch.squeeze(cv_targets[epoch])

    RT_KalmanNet.y = cv_input
    [Xrekf,_,_,_] = RT_KalmanNet.fnREKF()
    Xrekf = Xrekf[:,:-1]

    cv_loss = criterion(Xrekf, cv_target)

    if (cv_loss < opt_MSE):
        opt_MSE = cv_loss
        torch.save(RT_KalmanNet.nn, 'RobustKalmanPY/opt_RT_KNet.pt')

    # Update progress bar description with loss
    if (epoch + 1) % 1 == 0:
        tqdm.write(f"Epoch {epoch+1}/{epochs}, MSE Training: {loss.item():.4f}, MSE CV: {cv_loss.item():.4f}")

print("\nTraining finished")
print(f"Cross-Validation MSE Optimal Model: {opt_MSE.item():.4f}\n")

Test of Robust-KalmanNet - Computational time and RMSE

In [None]:
# Load the optimal model from cross validation procedure
RT_KalmanNet.nn = torch.load('RobustKalmanPY/opt_RT_KNet.pt', weights_only=False)

# Test over each time series
i = 0
print(rf"## EVALUATING MSE AND COMPUTATIONAL TIME ON THE TEST SEQUENCES ##")
for test_input, test_target in tqdm(zip(test_inputs, test_targets), total=len(test_inputs)):

    # Preprocess data - removing unecessary dimensions
    test_input = torch.squeeze(test_input)
    test_target = torch.squeeze(test_target)

    # Compute the RT-KalmaNet prediction
    RT_KalmanNet.y = test_input
    [Xrekf,c_t,_,comp_time_RT_KNet] = RT_KalmanNet.fnREKF()
    Xrekf = Xrekf[:,:-1].detach()

    test_loss = criterion(Xrekf[:,1:], test_target[:,1:])

    # Save MSE and Computational time values on dataframe
    test_data_synt.loc[i,'MSE Robust KNet':'Computational Time Robust KNet [s]'] = [test_loss.item(), comp_time_RT_KNet]
    i+=1

print("#####  Test RT-KalmanNet  #####",f"\nMSE: {test_data_synt.mean()['MSE Robust KNet']:.4f}",f"\nComputational Time: {test_data_synt.mean()['Computational Time Robust KNet [s]']:.4f}")

In [None]:
RobustKNetFig = make_subplots(rows=3, cols=1, subplot_titles=("X[n] - Only one sequence out of N","MSE[N]", "COMP_T[s][N]", "Tollerance C_t"))

if SELECTED_MODEL == 0:
    RobustKNetFig.append_trace(go.Scatter(x = np.arange(len(test_data_synt)), y = torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 0], name = 'target X_1'), row=1, col=1)
    RobustKNetFig.append_trace(go.Scatter(x = np.arange(len(test_data_synt)), y = torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 1], name = 'target X_2'), row=1, col=1)
    RobustKNetFig.append_trace(go.Scatter(x = np.arange(len(test_data_synt)), y = torch.squeeze(torch.transpose(Xrekf.detach(), 0, 1)).numpy()[0:-1, 0], name = 'estimated X_1'), row=1, col=1)
    RobustKNetFig.append_trace(go.Scatter(x = np.arange(len(test_data_synt)), y = torch.squeeze(torch.transpose(Xrekf.detach(), 0, 1)).numpy()[0:-1, 1], name = 'estimated X_2'), row=1, col=1)

    RobustKNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=test_data_synt['MSE Robust KNet'], name='MSE'), row=2, col=1)
    RobustKNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=np.linspace(test_data_synt.mean()[2], test_data_synt.mean()[2], len(test_data_synt)), name = 'MSE MEAN'), row=2, col=1)
    
    RobustKNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=test_data_synt['Computational Time Robust KNet [s]'], name='Comp. Time'), row=3, col=1)
    RobustKNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=np.linspace(test_data_synt.mean()[3], test_data_synt.mean()[3], len(test_data_synt)), name = 'Comp. Time MEAN'), row=3, col=1)
    
    RobustKNetFig.show()
else:
    RobustKNetFig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 0].shape[0], torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 0].shape[0]+1),
        y=torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 0], name = 'target X_1'
    ), row=1, col=1)
    RobustKNetFig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 1].shape[0], torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 1].shape[0]+1),
        y=torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 1], name = 'target X_2'
    ), row=1, col=1)
    RobustKNetFig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 1].shape[0], torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 1].shape[0]+1),
        y=torch.squeeze(torch.transpose(test_target,0, 1)).numpy()[0:-1, 2], name = 'target X_3'
    ), row=1, col=1)
    RobustKNetFig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(Xrekf.detach(), 0, 1)).numpy()[0:-1, 0].shape[0], torch.squeeze(torch.transpose(Xrekf.detach(), 0, 1)).numpy()[0:-1, 0].shape[0]+1),
        y=torch.squeeze(torch.transpose(Xrekf.detach(), 0, 1)).numpy()[0:-1, 0], name = 'estimated X_1'
    ), row=1, col=1)
    RobustKNetFig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(Xrekf.detach(), 0, 1)).numpy()[0:-1, 1].shape[0], torch.squeeze(torch.transpose(Xrekf.detach(), 0, 1)).numpy()[0:-1, 1].shape[0]+1),
        y=torch.squeeze(torch.transpose(Xrekf.detach(), 0, 1)).numpy()[0:-1, 1], name = 'estimated X_2'
    ), row=1, col=1)
    RobustKNetFig.append_trace(go.Scatter(x = np.linspace(1, torch.squeeze(torch.transpose(Xrekf.detach(), 0, 1)).numpy()[0:-1, 2].shape[0], torch.squeeze(torch.transpose(Xrekf.detach(), 0, 1)).numpy()[0:-1, 2].shape[0]+1),
        y=torch.squeeze(torch.transpose(Xrekf.detach(), 0, 1)).numpy()[0:-1, 2], name = 'estimated X_3'
    ), row=1, col=1)

    RobustKNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=test_data_synt['MSE Robust KNet'], name='MSE'), row=2, col=1)
    RobustKNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=np.linspace(test_data_synt.mean()[4], test_data_synt.mean()[4], len(test_data_synt)), name = 'MSE MEAN'), row=2, col=1)

    RobustKNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=test_data_synt['Computational Time Robust KNet [s]'], name='Comp. Time'), row=3, col=1)
    RobustKNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=np.linspace(test_data_synt.mean()[5], test_data_synt.mean()[5], len(test_data_synt)), name = 'Comp. Time MEAN'), row=3, col=1)
    
    RobustKNetFig.show()

    Lorentz = make_subplots(rows=1, cols=1, subplot_titles=("A sample plot of the test data", "A sample plot of the filtered data"))
    Lorentz.append_trace(go.Scatter(x = test_target[0,:], y= test_target[2, :]), row=1, col=1)
    Lorentz.append_trace(go.Scatter(x = Xrekf[0,:], y= Xrekf[2, :]), row=1, col=1)
    
    Lorentz.show()

In [None]:
# Plot tollerance c_t
c_tFig = go.Figure()
c_tFig.add_trace(go.Scatter(x=np.arange(len(c_t)), y=c_t, name="c_t"))
c_tFig.update_layout(
    title=dict(text='Tolerance c_t', font=dict(size=20), x=0.5, xanchor='center'),
    xaxis=dict(
        title='Time (s)',
        showgrid=True,       # Enable gridlines on x-axis
        gridcolor='lightgray',
        zeroline=False
    ),
    yaxis=dict(
        showgrid=True,       # Enable gridlines on y-axis
        gridcolor='lightgray',
        zeroline=False
    ),
    template='simple_white'
)

c_tFig.show()

## KalmanNet Test

In [None]:
args.N_E = 100 #length of training dataset
args.N_CV = 100 #length of validation dataset
args.N_T = 100 #length of test dataset

today = datetime.today()
now = datetime.now()
strToday = today.strftime("%m.%d.%y")
strNow = now.strftime("%H:%M:%S")
strTime = strToday + "_" + strNow

if SELECTED_MODEL == 0:
    DataGen(args, sys_model, DatafolderName + dataFileName[0])
    [train_input,train_target, cv_input, cv_target, test_input, test_target,_,_,_] =  torch.load(DatafolderName + dataFileName[0], map_location=device)

    ## Build Neural Network
    KalmanNet_model = KalmanNetNN()
    KalmanNet_model.NNBuild(sys_model, args)

    ## Train Neural Network
    print("#####   Training KalmanNet   #####\n")
    KalmanNet_Pipeline = Pipeline_EKF(strTime, "KNet", "KalmanNet")
    KalmanNet_Pipeline.setssModel(sys_model)
    KalmanNet_Pipeline.setModel(KalmanNet_model)
    print("Number of trainable parameters for KNet:",sum(p.numel() for p in KalmanNet_model.parameters() if p.requires_grad))
    KalmanNet_Pipeline.setTrainingParams(args)

    # Training process
    [MSE_cv_linear_epoch, MSE_cv_dB_epoch, MSE_train_linear_epoch, MSE_train_dB_epoch] = KalmanNet_Pipeline.NNTrain(sys_model, cv_input, cv_target, train_input, train_target, path_results)

    # Test evaluation
    for i in range(test_input.size(0)):
        [MSE_test_linear_arr, MSE_test_linear_avg, MSE_test_dB_avg, knet_out, comp_time_KNet] = KalmanNet_Pipeline.NNTest(sys_model, test_input[i].unsqueeze(0), test_target[i].unsqueeze(0), path_results)

        # Allocate the MSE and computational time on the dataframe
        test_data_synt.loc[i,'MSE KalmanNet':'Computational Time KalmanNet [s]'] = [MSE_test_linear_avg.item(), comp_time_KNet]

else:
    # Folder names and trajectories for Lorenz attractor
    DatafolderName = 'Simulations/Lorenz_Atractor/data' + '/'
    traj_resultName = ['traj_lorDT_rq1030_T100.pt']
    dataFileName = ['data_lor_v20_rq1030_T100.pt']

    # Generate and load data
    sys_model = SystemModel(f, Q, hRotate, R, args.T, args.T_test, m, n)
    sys_model.InitSequence(m1x_0, m2x_0)# x0 and P0

    DataGen(args, sys_model, DatafolderName + dataFileName[0])
    [train_input_long,train_target_long, cv_input, cv_target, test_input, test_target,_,_,_] =  torch.load(DatafolderName + dataFileName[0], map_location=device)
    train_target = train_target_long[:,:,0:args.T]
    train_input = train_input_long[:,:,0:args.T]

    ## Build Neural Network
    KNet_model = KalmanNetNN()
    KNet_model.NNBuild(sys_model, args)

    # Train Neural Network
    print("#####   Training KalmanNet   #####\n")
    KNet_Pipeline = Pipeline_EKF(strTime, "KNet", "KNet") #create the object and set some path to save data
    KNet_Pipeline.setssModel(sys_model)
    KNet_Pipeline.setModel(KNet_model)
    print("Number of trainable parameters for KNet:",sum(p.numel() for p in KNet_model.parameters() if p.requires_grad))
    KNet_Pipeline.setTrainingParams(args)

    # Training process
    [MSE_cv_linear_epoch, MSE_cv_dB_epoch, MSE_train_linear_epoch, MSE_train_dB_epoch] = KNet_Pipeline.NNTrain(sys_model, cv_input, cv_target, train_input, train_target, path_results)

    # Test evaluation
    for i in range(test_input.size(0)):
        [MSE_test_linear_arr, MSE_test_linear_avg, MSE_test_dB_avg,knet_out,comp_time_KNet] = KNet_Pipeline.NNTest(sys_model, test_input[i].unsqueeze(0), test_target[i].unsqueeze(0), path_results)

        # Allocate the MSE and computational time on the dataframe
        test_data_synt.loc[i,'MSE KalmanNet':'Computational Time KalmanNet [s]'] = [MSE_test_linear_avg.item(), comp_time_KNet]

print("\n#####  Test KalmanNet  #####", f"\nMSE: {test_data_synt.mean()['MSE KalmanNet']:.4f}",f"\nComputational Time: {test_data_synt.mean()['Computational Time KalmanNet [s]']:.4f}")

In [None]:
KNetFig = make_subplots(rows=3, cols=1, subplot_titles=("X[n] - Only one sequence out of N", "MSE[N]", "COMP_T[s][N]"))

KNetFig.append_trace(go.Scatter(x=np.arange(test_target.detach().size(2)),y=test_target.detach().numpy()[0,0,0:-1], name = 'Target X_1'), row=1, col=1)
KNetFig.append_trace(go.Scatter(x=np.arange(test_target.detach().size(2)),y=test_target.detach().numpy()[0,1,0:-1], name = 'Target X_2'), row=1, col=1)
if SELECTED_MODEL == 1:
    KNetFig.append_trace(go.Scatter(x=np.arange(knet_out.detach().size(2)),y=test_target.detach().numpy()[0,2,0:-1], name = 'Target X_3'), row=1, col=1)
KNetFig.append_trace(go.Scatter(x=np.arange(knet_out.detach().size(2)),y=knet_out.detach().numpy()[0,0,0:-1], name = 'Estimated X_1'), row=1, col=1)
KNetFig.append_trace(go.Scatter(x=np.arange(knet_out.detach().size(2)),y=knet_out.detach().numpy()[0,1,0:-1], name = 'Estimated X_2'), row=1, col=1)
if SELECTED_MODEL == 1:
    KNetFig.append_trace(go.Scatter(x=np.arange(knet_out.detach().size(2)),y=knet_out.detach().numpy()[0,2,0:-1], name = 'Estimated X_3'), row=1, col=1)

KNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=test_data_synt['MSE KalmanNet'], name='MSE'), row=2, col=1)
KNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=np.linspace(test_data_synt.mean()['MSE KalmanNet'], test_data_synt.mean()['MSE KalmanNet'], len(test_data_synt)), name = 'MSE MEAN'), row=2, col=1)

KNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=test_data_synt['Computational Time KalmanNet [s]'], name='Comp. Time'), row=3, col=1)
KNetFig.append_trace(go.Scatter(x=np.arange(len(test_data_synt)), y=np.linspace(test_data_synt.mean()['Computational Time KalmanNet [s]'], test_data_synt.mean()['Computational Time KalmanNet [s]'], len(test_data_synt)), name = 'Comp. Time MEAN'), row=3, col=1)

KNetFig.show()

Lorentz = make_subplots(rows=1, cols=1, subplot_titles=("A sample plot of the test data", "A sample plot of the filtered data"))
Lorentz.append_trace(go.Scatter(x = test_target.detach().numpy()[0,0,0:-1], y= test_target.detach().numpy()[0,2,0:-1]), row=1, col=1)
Lorentz.append_trace(go.Scatter(x = knet_out.detach().numpy()[0,0,0:-1], y= knet_out.detach().numpy()[0,2,0:-1]), row=1, col=1)

Lorentz.show()

### Statistics of MSE and Computational Time for the Synthetic Non-Linear Model

In [None]:
stats = pd.DataFrame({
    'MEAN': test_data_synt.mean(),
    'STD': test_data_synt.std()
})

print(stats.applymap(lambda x: f"{x:.6f}"))