# Kalman Filtering VS Smoothing

In [55]:
"""
    File name: ssm_kalman.py
    Description: a re-implementation of the Kalman filter for http://www.gatsby.ucl.ac.uk/teaching/courses/ml1
    Author: Roman Pogodin / Maneesh Sahani (matlab version)
    Date created: October 2018
    Python version: 3.6
"""

import numpy as np

# function given on ssm_kalman.py
def run_ssm_kalman(X, y_init, Q_init, A, Q, C, R, mode='smooth'):
    """
    Calculates kalman-smoother estimates of SSM state posterior.
    :param X:       data, [d, t_max] numpy array
    :param y_init:  initial latent state, [k,] numpy array
    :param Q_init:  initial variance, [k, k] numpy array
    :param A:       latent dynamics matrix, [k, k] numpy array
    :param Q:       innovariations covariance matrix, [k, k] numpy array
    :param C:       output loading matrix, [d, k] numpy array
    :param R:       output noise matrix, [d, d] numpy array
    :param mode:    'forw' or 'filt' for forward filtering, 'smooth' for also backward filtering
    :return:
    y_hat:      posterior mean estimates, [k, t_max] numpy array
    V_hat:      posterior variances on y_t, [t_max, k, k] numpy array
    V_joint:    posterior covariances between y_{t+1}, y_t, [t_max, k, k] numpy array
    likelihood: conditional log-likelihoods log(p(x_t|x_{1:t-1})), [t_max,] numpy array
    """
    d, k = C.shape
    t_max = X.shape[1]

    # dimension checks
    assert np.all(X.shape == (d, t_max)), "Shape of X must be (%d, %d), %s provided" % (d, t_max, X.shape)
    assert np.all(y_init.shape == (k,)), "Shape of y_init must be (%d,), %s provided" % (k, y_init.shape)
    assert np.all(Q_init.shape == (k, k)), "Shape of Q_init must be (%d, %d), %s provided" % (k, k, Q_init.shape)
    assert np.all(A.shape == (k, k)), "Shape of A must be (%d, %d), %s provided" % (k, k, A.shape)
    assert np.all(Q.shape == (k, k)), "Shape of Q must be (%d, %d), %s provided" % (k, k, Q.shape)
    assert np.all(C.shape == (d, k)), "Shape of C must be (%d, %d), %s provided" % (d, k, C.shape)
    assert np.all(R.shape == (d, d)), "Shape of R must be (%d, %d), %s provided" % (d, k, R.shape)

    y_filt = np.zeros((k, t_max))  # filtering estimate: \hat(y)_t^t
    V_filt = np.zeros((t_max, k, k))  # filtering variance: \hat(V)_t^t
    y_hat = np.zeros((k, t_max))  # smoothing estimate: \hat(y)_t^T
    V_hat = np.zeros((t_max, k, k))  # smoothing variance: \hat(V)_t^T
    K = np.zeros((t_max, k, X.shape[0]))  # Kalman gain
    J = np.zeros((t_max, k, k))  # smoothing gain
    likelihood = np.zeros(t_max)  # conditional log-likelihood: p(x_t|x_{1:t-1})

    I_k = np.eye(k)

    # forward pass

    V_pred = Q_init
    y_pred = y_init

    for t in range(t_max):
        x_pred_err = X[:, t] - C.dot(y_pred)
        V_x_pred = C.dot(V_pred.dot(C.T)) + R
        V_x_pred_inv = np.linalg.inv(V_x_pred)
        likelihood[t] = -0.5 * (np.linalg.slogdet(2 * np.pi * (V_x_pred))[1] +
                                x_pred_err.T.dot(V_x_pred_inv).dot(x_pred_err))

        K[t] = V_pred.dot(C.T).dot(V_x_pred_inv)

        y_filt[:, t] = y_pred + K[t].dot(x_pred_err)
        V_filt[t] = V_pred - K[t].dot(C).dot(V_pred)

        # symmetrise the variance to avoid numerical drift
        V_filt[t] = (V_filt[t] + V_filt[t].T) / 2.0

        y_pred = A.dot(y_filt[:, t])
        V_pred = A.dot(V_filt[t]).dot(A.T) + Q

    # backward pass

    if mode == 'filt' or mode == 'forw':
        # skip if filtering/forward pass only
        y_hat = y_filt
        V_hat = V_filt
        V_joint = None
    else:
        V_joint = np.zeros_like(V_filt)
        y_hat[:, -1] = y_filt[:, -1]
        V_hat[-1] = V_filt[-1]

        for t in range(t_max - 2, -1, -1):
            J[t] = V_filt[t].dot(A.T).dot(np.linalg.inv(A.dot(V_filt[t]).dot(A.T) + Q))
            y_hat[:, t] = y_filt[:, t] + J[t].dot((y_hat[:, t + 1] - A.dot(y_filt[:, t])))
            V_hat[t] = V_filt[t] + J[t].dot(V_hat[t + 1] - A.dot(V_filt[t]).dot(A.T) - Q).dot(J[t].T)

        V_joint[-2] = (I_k - K[-1].dot(C)).dot(A).dot(V_filt[-2])

        for t in range(t_max - 3, -1, -1):
            V_joint[t] = V_filt[t + 1].dot(J[t].T) + J[t + 1].dot(V_joint[t + 1] - A.dot(V_filt[t + 1])).dot(J[t].T)

    return y_hat, V_hat, V_joint, likelihood


## Initialise parameters

In [56]:
data = np.loadtxt('/datasets/t1cw-data/ssm_spins.txt')  # data shape: (1000, 5)
data = data.T                    # Transpose to shape (5, 1000)
X = data

# Initial state mean and covariance
y_init = np.zeros(4)        # [0, 0, 0, 0]
Q_init = np.eye(4)          # 4x4 identity

# System matrices
# Latent dynamics (4x4)
A = 0.99 * np.array([
    [np.cos(2*np.pi/180), -np.sin(2*np.pi/180), 0, 0],
    [np.sin(2*np.pi/180),  np.cos(2*np.pi/180), 0, 0],
    [0, 0, np.cos(2*np.pi/90), -np.sin(2*np.pi/90)],
    [0, 0, np.sin(2*np.pi/90),  np.cos(2*np.pi/90)],
])

# Numerically,
# A = [[ 0.98939692 -0.03455050  0.          0.        ]
#      [ 0.03455050  0.98939692  0.          0.        ]
#      [ 0.          0.          0.98758841 -0.06905891]
#      [ 0.          0.          0.06905891  0.98758841]]

# Process noise covariance (4x4)
Q = np.eye(4) - np.dot(A, A.T)

# Numerically:
# Q = [[ 1.99000000e-02 -4.00127539e-19  0.00000000e+00  0.00000000e+00]
#      [-4.00127539e-19  1.99000000e-02  0.00000000e+00  0.00000000e+00]
#      [ 0.00000000e+00  0.00000000e+00  1.99000000e-02  1.68953551e-18]
#      [ 0.00000000e+00  0.00000000e+00  1.68953551e-18  1.99000000e-02]]

# Observation matrix (5x4)
C = np.array([
    [1, 0, 1, 0],
    [0, 1, 0, 1],
    [1, 0, 0, 1],
    [0, 0, 1, 1],
    [0.5, 0.5, 0.5, 0.5],
])

# Numerically:
# C = [[1. , 0. , 1. , 0. ],
#      [0. , 1. , 0. , 1. ],
#      [1. , 0. , 0. , 1. ],
#      [0. , 0. , 1. , 1. ],
#      [0.5, 0.5, 0.5, 0.5]]

# Observation noise covariance as identity
R = np.eye(5)

true_params = {}
true_params['A'] = A
true_params['Q'] = Q
true_params['C'] = C
true_params['R'] = R

## Log Determinants

In [57]:
# Log-determinant of covariance at each time step
def logdet(A):
    # Safely computes log(det(A)) via Cholesky
    return 2 * np.sum(np.log(np.diag(np.linalg.cholesky(A))))

# Smoothing (mode='smooth')
y, V, Vj, log = run_ssm_kalman(X, y_init, Q_init, A, Q, C, R, mode='smooth')

logdet_smooth = [logdet(V[t]) for t in range(V.shape[0])]

# Filtering (mode='filt')
Yfilt, Vfilt, _, Lfilt = run_ssm_kalman(X, y_init, Q_init, A, Q, C, R, mode='filt')

logdet_filt = [logdet(Vfilt[t]) for t in range(Vfilt.shape[0])]

## Plot results

In [58]:
import matplotlib.pyplot as plt

# ----- Smoothed data -----

plt.figure(figsize=(10,6))
for i in range(y.shape[0]): # number of latent dimensions (should be 4)
    plt.plot(y[i, :], label=f"Latent state y[{i}]")
plt.title("Smoothed Latent State Means")
plt.xlabel("Time")
plt.ylabel("Latent state")
plt.legend()
plt.show()

plt.figure(figsize=(10,6))
for i in range(V.shape[1]): # number of latent dimensions (should be 4)
    plt.plot(V[:, i, i], label=f"Variance y[{i}]")
plt.title("Smoothed Variance of Latent States (Diagonal of Posterior Variance)")
plt.xlabel("Time")
plt.ylabel("Variance")
plt.legend()
plt.show()

plt.plot(Vj[:, 0, 1], label="Cov(y[0], y[1]) (t, t+1)")
plt.title("Smoothed Lag-1 Cross-Covariances between y[0] and y[1]")
plt.xlabel("Time")
plt.ylabel("Covariance")
plt.legend()
plt.show()

plt.plot(log, label="log p(x_t | x_{1:t-1})")
plt.title("Smoothed Kalman Filter Log-Likelihood")
plt.xlabel("Time")
plt.ylabel("Log-Likelihood")
plt.legend()
plt.show()

plt.figure()
plt.plot(logdet_smooth)
plt.title("Log-Determinant (Smoothing Covariance)")
plt.xlabel("Time")
plt.ylabel("log(det(V_t))")
plt.show()

# ----- Filtered data -----

plt.figure(figsize=(10,6))
for i in range(Yfilt.shape[0]):
    plt.plot(Yfilt[i, :], label=f"y{str(i)}")
plt.title("Unsmoothed Latent States")
plt.xlabel("Time")
plt.ylabel("Latent State Value")
plt.legend()
plt.show()

plt.figure(figsize=(10,6))
for i in range(Vfilt.shape[1]):
    plt.plot(Vfilt[:, i, i], label=f"Variance y[{i}]")
plt.title("Unsmoothed Variance of Latent States (Diagonal of Posterior Variance)")
plt.xlabel("Time")
plt.ylabel("Variance")
plt.legend()
plt.show()

plt.plot(Lfilt, label="log p(x_t | x_{1:t-1})")
plt.title("Kalman Filter Log-Likelihood (Unsmoothed)")
plt.xlabel("Time")
plt.ylabel("Log-Likelihood")
plt.legend()
plt.show()

plt.figure()
plt.plot(logdet_filt)
plt.title("Log-Determinant (Filtering Covariance)")
plt.xlabel("Time")
plt.ylabel("log(det(Vfilt_t))")
plt.show()

# Learn parameters using EM

In [52]:
def regularize_cov(V_hat, eps=1e-6):
    """Ensure covariance matrix is positive definite with regularization."""
    # Add a small constant times identity to diagonal if necessary
    cov_reg = (V_hat + V_hat.T) / 2  # Force symmetry
    min_eig = np.min(np.linalg.eigvalsh(cov_reg))
    if min_eig < eps:
        cov_reg += np.eye(cov_reg.shape[0]) * (eps - min_eig)
    return cov_reg

def lgssm(X, y_init, Q_init, A, Q, C, R, n_iters=50, reg_eps=1e-6, verbose=True):
    T = X.shape[1]
    k = y_init.shape[0]
    d = X.shape[0]
    logs = []
    A_chg, C_chg, Q_chg, R_chg = [], [], [], []

    for em_step in range(n_iters):
        # === E-Step ===
        y, V, Vj, log= run_ssm_kalman(X, y_init, Q_init, A, Q, C, R, mode='smooth')

        # Store log-likelihood for plotting
        logs.append(np.sum(log))  # log is per iteration

        # === Sufficient Statistics ===
        # Observation stats
        Exy = np.zeros((d, k))
        Exx = np.zeros((d, d))
        Eyy = np.zeros((k, k))
        for t in range(T):
            Exy += np.outer(X[:, t], y[:, t])       # X_t y_t^T
            Exx += np.outer(X[:, t], X[:, t])           # X_t X_t^T
            Eyy += V[t] + np.outer(y[:, t], y[:, t])  # E[y_t y_t^T]

        # === M-Step: Update C and R ===
        C_new = np.dot(Exy, np.linalg.inv(Eyy))
        R_new = (Exx - np.dot(Exy, C_new.T)) / T
        R_new = regularize_cov(R_new, reg_eps)

        # Lag-1 sufficient statistics for A and Q
        Ey_lag = y[:, :-1]
        Ey_lead = y[:, 1:]
        V_lag = V[:-1]
        V_lead = V[1:]
        Vj_lead = Vj[1:]  # shape [T-1, k, k]

        Eyylead = np.zeros((k, k))
        Eyylag = np.zeros((k, k))
        Eyylag_self = np.zeros((k, k))
        for t in range(T-1):
            Eyylead += V_lead[t] + np.outer(Ey_lead[:, t], Ey_lead[:, t])
            Eyylag += Vj_lead[t] + np.outer(Ey_lead[:, t], Ey_lag[:, t])
            Eyylag_self += V_lag[t] + np.outer(Ey_lag[:, t], Ey_lag[:, t])

        # Update A
        A_new = np.dot(Eyylag, np.linalg.inv(Eyylag_self))
        # Update Q
        Q_new = (Eyylead - np.dot(Eyylag, A_new.T)) / (T-1)
        Q_new = regularize_cov(Q_new, reg_eps)

        # Track parameter changes
        A_chg.append(np.linalg.norm(A_new - A))
        C_chg.append(np.linalg.norm(C_new - C))
        Q_chg.append(np.linalg.norm(Q_new - Q))
        R_chg.append(np.linalg.norm(R_new - R))

        A, Q, C, R = A_new, Q_new, C_new, R_new

    return A, Q, C, R, logs, A_chg, Q_chg, C_chg, R_chg


## Training data likelihood: true parameters VS random initialisations

In [60]:
# Run 50 EM iterations starting from both the true/generating parameters and
# several random initialisations

n_runs = 11  # 1 generating param, 10 random
n_iters = 50
logtrace = []
labels = []
em_runs = []       # List of dicts for each run
param_sets = {}    # Dictionary of parameter tuples for likelihood/test eval

# Log-likeihood for true parameters
_, _, _, log_true = run_ssm_kalman(X, y_init, Q_init, true_params['A'], true_params['Q'], true_params['C'], true_params['R'], mode='filt')

# Add to param_sets
param_sets['true'] = (true_params['A'], true_params['Q'], true_params['C'], true_params['R'])


for i in range(n_runs):
    if i == 0:
        A0 = A.copy()
        Q0 = Q.copy()
        C0 = C.copy()
        R0 = R.copy()
        label = "EM_true"
    else:
        k = 4
        d = 5
        rng = np.random.default_rng(i)
        A0 = rng.standard_normal((k, k))
        Q0 = np.eye(k)
        C0 = rng.standard_normal((d, k))
        R0 = np.eye(d)
        label = f"EM_rand_{i}"

    A_final, Q_final, C_final, R_final, log, A_chg, Q_chg, C_chg, R_chg = lgssm(X, y_init, Q_init, A0, Q0, C0, R0, n_iters=n_iters, verbose=False)

    em_params = {
        'A': A_final,
        'Q': Q_final,
        'C': C_final,
        'R': R_final,
        'logs': log,
        'A_chg': A_chg,
        'Q_chg': Q_chg,
        'C_chg': C_chg,
        'R_chg': R_chg
    }
    em_runs.append(em_params)
    param_sets[label] = (A_final, Q_final, C_final, R_final)
    logtrace.append(log)
    labels.append(label)  # sum across iterations

# Show how the likelihood increases over the EM iterations (hand in a plot
plt.figure(figsize=(12,7))
for likes, label in zip(logtrace, labels):
    plt.plot(likes, label=label)
plt.xlabel("EM Iteration")
plt.ylabel("Log-Likelihood")
plt.title("Likelihood vs EM Iteration for Each Run")
plt.legend()
plt.show()

## SSID function

In [39]:
def ssid(X, k=4):
    # X: observed data, shape (d, T)
    # k: latent state dimension

    # Estimate C: least squares on principal components
    U, S, Vh = np.linalg.svd(X, full_matrices=False)
    C_SSID = U[:, :k]

    # Estimate latent Y by projecting onto C
    Y_est = np.dot(C_SSID.T, X)

    # Estimate A using linear regression between Y_t and Y_{t-1}
    Y_t = Y_est[:, 1:]
    Y_tm1 = Y_est[:, :-1]
    A_SSID = np.linalg.lstsq(Y_tm1.T, Y_t.T, rcond=None)[0].T

    # Estimate Q, R as sample variances
    Q_SSID = np.cov(Y_t - np.dot(A_SSID, Y_tm1))
    R_SSID = np.cov(X - np.dot(C_SSID, Y_est))

    return A_SSID, Q_SSID, C_SSID, R_SSID

A_SSID, Q_SSID, C_SSID, R_SSID = ssid(X, k=4)
param_sets["SSID"] = (A_SSID, Q_SSID, C_SSID, R_SSID)

# EM initialized at SSID solution
A_EM_SSID, Q_EM_SSID, C_EM_SSID, R_EM_SSID, log, *_ = lgssm(X, y_init, Q_init, A_SSID, Q_SSID, C_SSID, R_SSID, n_iters=n_iters, verbose=False)

# Add parameters to param_sets
param_sets["SSID+EM"] = (A_EM_SSID, Q_EM_SSID, C_EM_SSID, R_EM_SSID)


# Test data likelihood: true parameters VS random initialisations

In [61]:
# Evaluate the likelihood of the test data under all of
# parameters found above

# Load the test data
testdata = np.loadtxt('/datasets/t1cw-data/ssm_spins_test.txt').T  # shape: (5, N)
X = testdata

test_results = []
    # Test set likelihood
for label, (A, Q, C, R) in param_sets.items():
    # Run the filter (not smoother!), get conditional likelihoods
    _, _, _, log_test = run_ssm_kalman(X, y_init, Q_init, A, Q, C, R, mode='filt')
    test_results.append(np.sum(log_test))

## Test VS Training results

In [64]:
# Plot test vs training log-likelihoods

# Make sure true parameter log trace is a scalar
log_true = np.sum(log_true)

fig, axs = plt.subplots(1, 2, figsize=(16,7))  # 1 row, 2 columns

# Training data log-likelihood curves (EM trajectories)
for likes, label in zip(logtrace, labels):
    axs[0].plot(likes, label=label)
axs[0].axhline(log_true, color='black', linestyle='--', label='True')
axs[0].set_title('Training Data Log-Likelihood')
axs[0].set_xlabel('EM Iteration')
axs[0].set_ylabel('Log-Likelihood')
axs[0].legend()

# Test data log-likelihoods (final values only, or dots/lines as you prefer)
for i, label in enumerate(labels):
    # You might plot a flat line, or just the final values
    axs[1].scatter(i, test_results[i], color='green', s=60)
    axs[1].text(i+0.1, test_results[i], f'{label}\n{test_results[i]:.1f}', fontsize=9, color='green', va='bottom')
axs[1].set_title('Test Data Log-Likelihoods')
axs[1].set_xlabel('Run')
axs[1].set_ylabel('Log-Likelihood')
axs[1].set_xticks(range(len(labels)))
axs[1].set_xticklabels(labels, rotation=45)

plt.tight_layout()
plt.show()


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=cb182644-878e-48cb-992b-68a78a5afe3d' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>