In [None]:
# This notebook demonstrates how to use the RKN to find the results of the Eusipco 2025 RKN publication.

In [None]:
from Algo.KalmanFilter import KalmanFilter
from Algo.RecursiveKalmanNet import GRUNetwork, RKN, Trainer
from Algo.DynamicalSystems import DynamicalSystems
from Algo.LossFunctions import GaussianLikelihoodLoss, MSELoss

from Tools.Plots import Plot_Std_RKN, Plot_Comparison_RKN, Plot_Loss
from Tools.GenerateMeasurementAndTarget import generate_measurements_and_target
from Tools.GenerateStdValues import generate_std_values

import torch
import os
import matplotlib.pyplot as plt
from adabelief_pytorch import AdaBelief

In [None]:
# Batch sizes and sequence length
N_E, N_CV, N_T = 1000, 100, 1000
len_sequence = 150

# Bimodal noise parameters
sigma_1 = 1.25
p = 0.6
r = 1
sigma_2 = ( (1/(1-p)) * (r-p*(sigma_1**2)) )**0.5
print("sigma_2 :", sigma_2)

In [None]:
train_std_values = generate_std_values(batch_size = N_E,
                                       len_sequence = len_sequence,
                                       obs_noise_distribution=['bimodal', [sigma_1, sigma_2, p]])

valid_std_values = generate_std_values(batch_size=N_CV,
                                       len_sequence=len_sequence,
                                       obs_noise_distribution=['bimodal', [sigma_1, sigma_2, p]])

test_std_values = generate_std_values(batch_size=N_T,
                                      len_sequence=len_sequence,
                                      obs_noise_distribution=['bimodal', [sigma_1, sigma_2, p]])

In [None]:
# Retrieve time-varying Kalman Filter parameters
dynamical_system_train = DynamicalSystems.constant_spd_linear_model(dt = 1,
                                                                    propag_std = 1e-2,
                                                                    obs_std_values = train_std_values)

dynamical_system_valid = DynamicalSystems.constant_spd_linear_model(dt = 1,
                                                                    propag_std = 1e-2,
                                                                    obs_std_values = valid_std_values)

dynamical_system_test = DynamicalSystems.constant_spd_linear_model(dt = 1,
                                                                   propag_std = 1e-2,
                                                                   obs_std_values = test_std_values)


In [None]:
# Initial conditions of the dynamical system
data_path = '.data/'
dataset_name = 'dataset_bimodal_observation_noise'

x0 = torch.tensor([[0, 1]], dtype=torch.float32).T
P0 = torch.tensor([[1, 0   ],
                   [0, 0.01]], dtype=torch.float32)

In [None]:
train_file = data_path + dataset_name + '/train.pt'
valid_file = data_path + dataset_name + '/valid.pt'
test_file = data_path + dataset_name + '/test.pt'

try:
    if os.path.exists(train_file) and os.path.exists(valid_file) and os.path.exists(test_file):
        print("Loading existing datasets...")
        train_measure, train_target, _, train_R_values = torch.load(train_file)
        valid_measure, valid_target, _, valid_R_values = torch.load(valid_file)
        test_measure, test_target, _, test_R_values = torch.load(test_file)
    else:
        print("Generating new datasets...")
        path_train = generate_measurements_and_target(N_E, len_sequence, dynamical_system_train, x0, P0, train_file)
        path_valid = generate_measurements_and_target(N_CV, len_sequence, dynamical_system_valid, x0, P0, valid_file)
        path_test = generate_measurements_and_target(N_T, len_sequence, dynamical_system_test, x0, P0, test_file)

        train_measure, train_target, _, train_R_values = torch.load(path_train)
        valid_measure, valid_target, _, valid_R_values = torch.load(path_valid)
        test_measure, test_target, _, test_R_values = torch.load(path_test)
        
except Exception as e:
    print("An error occurred:", e)

In [None]:
# Configurate the R matrix with the data loaded
dynamical_system_test = DynamicalSystems.constant_spd_linear_model(dt = 1,
                                                                   propag_std = 1e-2,
                                                                   obs_std_values = test_R_values[:, 0, 0, :]**0.5)

In [None]:
# oKF means Optimal Kalman Filter, which is the Kalman Filter with the exact R matrix.
# Thus, the Kalman Filter tuns in the optimal way, this is used as reference for the RKN.

oKF = KalmanFilter(dynamical_system_test, x0, P0)
oKF_x, oKF_gain, oKF_cov = oKF.process_batch(test_measure)

In [None]:
# Defining the soKF, sub-optimal Kalman Filter, using a R matrix that is constant over time with a mean value r (computed above).
# soKF is used as the reference that the RKN should outperform.

test_std_values_so = generate_std_values(batch_size=N_T,
                                         len_sequence=len_sequence,
                                         obs_noise_distribution=['normal', r])

dynamical_system_test_so = DynamicalSystems.constant_spd_linear_model(dt = 1,
                                                                      propag_std = 1e-2,
                                                                      obs_std_values = test_std_values_so)

soKF = KalmanFilter(dynamical_system_test_so, x0, P0)
soKF_x, soKF_gain, soKF_cov = soKF.process_batch(test_measure)

In [None]:
F, _, H, _ = dynamical_system_test

# Initialize the RKN, if the model_name specified exists in the .models/ file, it loads the model. If not, it initialies a new one with default parameters.
# Check the load_or_init_rnns method in the RecursiveKalmanNet.py file for more information.

model_name = 'RKN_bimodal'
seed = 61
torch.manual_seed(seed)
weight_factor = 0.1

# Initialize RKN
rkn = RKN([F, H],
          x0, P0,
          model_name=model_name,
          weight_factor=weight_factor)


In [None]:
# Initialize Trainer
learning_rate = 5e-4
use_cuda = torch.cuda.is_available()

# Initialize the Trainer 
rkn.model_name = 'RKN_bimodal'
trainer = Trainer(rkn, learning_rate, use_cuda, seed, training_loss=GaussianLikelihoodLoss())

# Use a slightly modified optimizer from Adam (default optimizer used in the Trainer), converge faster
weight_decay = 1e-4
trainer.optimizer = AdaBelief(trainer.model.parameters(), lr=trainer.learning_rate, weight_decay=weight_decay, weight_decouple=False, eps=1e-16, rectify=False, print_change_log=False)

# Additional parameters
n_epochs = 20
batch_size = 64

# Set a lr scheduler to deacrease the learning rate through the epochs
trainer.set_decreasing_learning_rate(decreasing_learning_rate=True, n_epochs=n_epochs, initial_lr=learning_rate, final_lr=5e-5, step_size=1)

# Train the model
trainer.train(train_measure, train_target, valid_measure, valid_target, n_epochs, batch_size)

In [None]:
RKN_x, RKN_gain, RKN_cov = rkn.process_batch(test_measure)

In [None]:
test_size = test_target.size()[0]
loss_fn = torch.nn.MSELoss(reduction='mean')

list_results = {
    'o-KF' : oKF_x[:,:,:],
    'so-KF' : soKF_x[:,:,:],
    'RKN' : RKN_x[:,:,:]
}

for key, predictions in list_results.items():
    print("MSE results in linear scale and dB scale for model ", key)
    MSE_test_linear_avg = torch.mean(torch.square(test_target[:,:,:] - predictions[:,:,:].detach()))
    MSE_test_dB_avg = 10 * torch.log10(MSE_test_linear_avg)

    print("MSE : ", MSE_test_dB_avg, " [dB]")
    print("MSE : ", MSE_test_linear_avg , " [linear scale] \n")


In [None]:
Plot_Loss(model_name, aspect_ratio=[5,4])

In [None]:
plot_path = '.results/plot_saves/'
plot_name = 'std_graph_bimodal'

dict = {'o-KF': [oKF_x, oKF_cov, 0],
        'so-KF': [soKF_x, soKF_cov, 1],
        'RKN': [RKN_x, RKN_cov, 2],}

Plot_Std_RKN(T=len_sequence,
                state=0,
                path=plot_path + plot_name, 
                test_target=test_target, 
                data_dict = dict,
                y_lim=None,
                marker_every=15,
                graph_size=1.5,
                time_lim=[0,150],
                x_label='Time (s)',
                y_label='Position Standard Deviation (m)',
                err_emp_std = 'Emp. std')

In [None]:
plot_name = 'gain_graph_bimodal'

idx = 5
dict = {'o-KF': oKF_gain[idx,:1,:,:],
        'so-KF': soKF_gain[idx,:1,:,:],
        'RKN': RKN_gain[idx,:1,:,:],
        }

Plot_Comparison_RKN(T=len_sequence,
                    path=plot_path + plot_name,
                    data_dict=dict,
                    y_lim = [[0, 0.7]],
                    show=True,
                    marker_every=10,
                    graph_size=1,
                    time_lim=[0,60],
                    x_label='Time (s)',
                    y_label='Position gain',
                    aspect_ratio=[6,4],
                    fontsize=12,
                    title='Temporal evolution of position estimated gain',
                    linewidth=1.5
                     )