In [None]:
import numpy as np
import xarray as xr
from kalman_reconstruction import pipeline
from kalman_reconstruction import example_models
from kalman_reconstruction.custom_plot import (
    plot_state_with_probability,
    set_custom_rcParams,
)
import matplotlib.pyplot as plt
from kalman_reconstruction.kalman import (
    Kalman_filter,
    Kalman_smoother,
)

set_custom_rcParams()

In [None]:
from importlib import reload

reload(pipeline)

<module 'kalman_reconstruction.pipeline' from 'C:\\Users\\Niebaum\\Documents\\Repositories\\kalman-reconstruction-partially-observered-systems\\kalman_reconstruction\\pipeline.py'>

Now there can be easily multiple random variables add and the Kalman_SEM performs on all of them

In [None]:
data = example_models.Lorenz_63_xarray(dt=0.01, time_length=5, time_steps=None)
seed = 345831200837
variance = 5
rng1 = np.random.default_rng(seed=seed)
rng2 = np.random.default_rng(seed=seed + 1)
rng3 = np.random.default_rng(seed=seed + 2)
rng4 = np.random.default_rng(seed=seed + 3)
pipeline.add_random_variable(
    data, var_name="z1", random_generator=rng1, variance=variance
)
pipeline.add_random_variable(
    data, var_name="z2", random_generator=rng2, variance=variance
)
pipeline.add_random_variable(
    data, var_name="z3", random_generator=rng3, variance=variance
)
pipeline.add_random_variable(
    data, var_name="z4", random_generator=rng4, variance=variance
)

In [None]:
test = pipeline.xarray_Kalman_SEM(
    ds=data,
    observation_variables=["x2", "x3"],
    random_variables=["z1", "z2"],
    nb_iter_SEM=10,
    variance_obs_comp=0.0001,
)

fig, ax = plt.subplots(1, 1)
for var in test.state_name:
    plot_state_with_probability(
        ax=ax,
        x_value=test.time,
        state=test.states.sel(state_name=var),
        prob=test.covariance.sel(state_name=var, state_name_copy=var),
        line_kwargs={"label": var.values},
    )

ax.legend()
ax.set_xlim((0, 2))
ax.set_xlabel("time")
ax.set_ylabel("Values")
ax.set_title("Using multiple random latent variables");

100%|██████████| 10/10 [00:03<00:00,  2.76it/s]


## Test on Filter and Smoother results

There seems to be a sever issue with the filter and smoother having bad results at the start of the first point index

In [None]:
test = pipeline.xarray_Kalman_SEM(
    ds=data,
    observation_variables=["x2", "x3"],
    random_variables=["z1", "z2"],
    nb_iter_SEM=10,
    variance_obs_comp=0.0001,
)

fig, ax = plt.subplots(1, 1)
for var in test.state_name:
    plot_state_with_probability(
        ax=ax,
        x_value=test.time,
        state=test.states.sel(state_name=var),
        prob=test.covariance.sel(state_name=var, state_name_copy=var),
        line_kwargs={"label": var.values},
    )

ax.legend()
ax.set_xlim((0, 2))
ax.set_xlabel("time")
ax.set_ylabel("Values")
ax.set_title("Using multiple random latent variables");

100%|██████████| 10/10 [00:03<00:00,  2.65it/s]


In [None]:
idx = 100

H, R = pipeline._input_matrices_H_R_from_n_p(n=4, p=2)

test_filter = pipeline.xarray_Kalman_filter(
    ds=data,
    state_variables=["x2", "x3", "z1", "z2"],
    observation_variables=["x2", "x3"],
    initial_covariance_matrix=test["covariance"].isel(time=idx).values,
    M=test.M.values,
    Q=test.Q.values,
    H=H,
    R=R,
    dim="time",
    estimation_idx=idx,
)

test_smoother = pipeline.xarray_Kalman_smoother(
    ds=data,
    state_variables=["x2", "x3", "z1", "z2"],
    observation_variables=["x2", "x3"],
    initial_covariance_matrix=test["covariance"].isel(time=idx).values,
    M=test.M.values,
    Q=test.Q.values,
    H=H,
    R=R,
    dim="time",
    estimation_idx=idx,
)

### Filter assertion

In [None]:
x_f, P_f, x_a, P_a, loglik, K_a = Kalman_filter(
    y=np.array([data.x2.values, data.x3.values]).T,
    x0=np.array(
        [
            data.x2.isel(time=idx).values,
            data.x3.isel(time=idx).values,
            data.z1.isel(time=idx).values,
            data.z2.isel(time=idx).values,
        ]
    ).T,
    P0=test["covariance"].isel(time=idx).values,
    M=test.M.values,
    Q=test.Q.values,
    H=H,
    R=R,
);

In [None]:
# Check that the result is similar to the real kalman filter:
try:
    np.testing.assert_array_almost_equal(test_filter.state_forecast, x_f, decimal=6)
except Exception as E:
    print("\n===========\nState Forecast")
    print(E)
    print(np.abs(test_filter.state_forecast - x_f) < 10 ** (-6))
    plt.pcolormesh(
        (np.abs(test_filter.state_forecast - x_f)).isel(time=slice(0, 10)),
        cmap="Reds",
        vmin=0,
        vmax=10 ** (-6),
    )
try:
    np.testing.assert_array_almost_equal(
        test_filter.covariance_forecast, P_f, decimal=6
    )
except Exception as E:
    print("\n===========\nState Covariance")
    print(E)
    print(np.abs(test_filter.covariance_forecast - P_f) < 10 ** (-6))

np.testing.assert_array_almost_equal(test_filter.state_assimilation, x_a, decimal=6)
np.testing.assert_array_almost_equal(
    test_filter.covariance_assimilation, P_a, decimal=6
)
np.testing.assert_array_almost_equal(test_filter.log_likelihod, loglik, decimal=6)

### Smoother assertion

In [None]:
x_f, P_f, x_a, P_a, x_s, P_s, loglik, P_s_lag = Kalman_smoother(
    y=np.array([data.x2.values, data.x3.values]).T,
    x0=np.array(
        [
            data.x2.isel(time=idx).values,
            data.x3.isel(time=idx).values,
            data.z1.isel(time=idx).values,
            data.z2.isel(time=idx).values,
        ]
    ).T,
    P0=test["covariance"].isel(time=idx).values,
    M=test.M.values,
    Q=test.Q.values,
    H=H,
    R=R,
);

In [None]:
# Check that the result is similar to the real kalman filter:
try:
    np.testing.assert_array_almost_equal(test_filter.state_forecast, x_f, decimal=6)
except Exception as E:
    print("\n===========\nState Forecast")
    print(E)
    print(np.abs(test_filter.state_forecast - x_f) < 10 ** (-6))
try:
    np.testing.assert_array_almost_equal(
        test_filter.covariance_forecast, P_f, decimal=6
    )
except Exception as E:
    print("\n===========\nState Covariance")
    print(E)
    print(np.abs(test_filter.covariance_forecast - P_f) < 10 ** (-6))

np.testing.assert_array_almost_equal(test_smoother.state_smooth, x_s, decimal=6)
np.testing.assert_array_almost_equal(test_smoother.covariance_smooth, P_s, decimal=6)

np.testing.assert_array_almost_equal(test_smoother.state_assimilation, x_a, decimal=6)
np.testing.assert_array_almost_equal(
    test_smoother.covariance_assimilation, P_a, decimal=6
)
np.testing.assert_array_almost_equal(test_smoother.log_likelihod, loglik, decimal=6)