In [1]:
#!/usr/bin/env python
# coding: utf-8

import random
import numpy as np
import torch
from tqdm import tqdm
import matplotlib.pyplot as plt
import scipy

class Model(torch.nn.Module):
    '''
    Input (y, A, H, P)
    y, observation vector
    P, Covariance

    x(t+1) = A x(t) + ξ(t)
    y(t) = H x(t) + ω(t)
    x ∈ R^n, state
    y ∈ R^m, observation
    {ξ}_t∈Z, uncorrelated zero-mean process
    {ω}_t∈Z, measurement noise vectors
    E[ξ(t)ξ(t)^T] = Q
    E[ω(t)ω(t)^T] = R
    Q & R, unknown
    P, error covariance matrix
    ̂x(0) = m_0, P(t_0) = P_0

    Prediction problem
    ̂y_P(T) = H ̂x_P(T)
    ̂x(t+1) = A ̂x(t) + L(t) ( y(t) - H x̂(t) )

    Kalman gain
    L(t) := A P(t) H^T ( H P(t) H^T + R )^(-1)
    P(t) = E[(x(t) - ̂x(t))(x(t) - ̂x(t))^T]
    P(t+1) = ( A - L(t) H ) P(t) A^T + Q
    P converges when (A,H) is observable and the (A, Q^(1/2)) is controllable
    L_∞ = A P_∞ H^T ( H P_∞ H^T + R )^(-1)
    '''
    def __init__(self, A, H, gpu=True):
        super().__init__()
        if gpu:
            if torch.cuda.is_available():
                gpu = 'cuda'
            elif torch.backends.mps.is_available():
                gpu = 'mps'
            else:
                gpu = 'cpu'
        else:
            gpu = 'cpu'
        self.device = torch.device(gpu)
        print('Using device:', self.device)
        n = A.shape[0]
        m = H.shape[0]
        weights = torch.distributions.Uniform(0,0.5).sample((n,m))
        self.A = torch.Tensor(A).to(self.device)
        self.H = torch.Tensor(H).to(self.device)
        self.weights = torch.nn.Parameter(weights)

    def forward(self, y, N):
        A = self.A
        H = self.H
        L = self.weights
        y = torch.Tensor(y).to(self.device)
        n = A.shape[0]
        m = y.shape[0]
        xhat = torch.zeros((n,N+1), device=self.device)
        yhat = torch.zeros((m,N), device=self.device)
        for k in range(N):
            xhat[:,k+1] = A @ xhat[:,k] + L @ (y[:,k] - H @ xhat[:,k])
            yhat[:,k] = H @ xhat[:,k]
        return yhat

def sgd(model, y, N, steps=1000, lr=1e-3, opt_params='adam', gpu=True):
    '''
    y, observation vector
    N, time steps
    steps, optimising num of loops
    lr, learning rate
    '''
    #losses = []
    #gains = []
    losses = np.zeros(steps)
    gains = np.zeros((model.H.shape[1],model.H.shape[0],steps))

    if opt_params == 'adam':
        opt = torch.optim.Adam(model.parameters(), lr=lr)
    elif opt_params == 'sgd':
        opt = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    else:
        assert True, "Not supported optimizer"

    if gpu:
        if torch.cuda.is_available():
            gpu = 'cuda'
        elif torch.backends.mps.is_available():
            gpu = 'mps'
        else:
            gpu = 'cpu'
    else:
        gpu = 'cpu'
    device = torch.device(gpu)
    y = torch.Tensor(y).to(device)
    model = model.to(device)

    pbar = tqdm(total=steps, unit='epochs')
    for i in range(steps):
        preds = model(y,N)
        loss = torch.functional.F.mse_loss(preds, y)
        loss.backward()
        opt.step()
        opt.zero_grad()
        #losses.append(float(loss))
        #gains.append(model.weights.detach())
        losses[i] = loss
        gains[:,:,i] = model.weights.cpu().detach()
        pbar.update(1)
    pbar.close()
    preds = model(y,N).cpu().detach().numpy()
    #losses = np.array(losses)
    #gains = np.array(gains)
    return losses, preds, gains

def simple_mass_spring_damp(T, Q, R, P, Ts=0.1):
    '''
    m x''(t) + c x'(t) + k x(t) = 0
    m, mass
    c, damping coefficient
    k, spring constant
    F, force
    Ts, time step
    N, simulation length

    E[ξ(t)ξ(t)^T] = Q
    E[ω(t)ω(t)^T] = R

    x(t+1) = A x(t) + ξ(t)
    y(t) = H x(t) + ω(t)
    {ξ}_t∈Z, uncorrelated zero-mean process
    {ω}_t∈Z, measurement noise vectors
    x(t) ∈ R^n, state of the system
    y(t) ∈ R^m
    '''
    # (A,H) are known
    H = np.array([[1.0, 0.0]])
    C = 4
    K = 2
    M = 20
    F = 1
    N = int(T/Ts)
    A = np.array([[1, Ts], [Ts*(-K/M), 1+Ts*(-C/M)]])
    B = np.array([0, Ts*(1/M)])
    u = F

    n = A.shape[0]
    m = H.shape[0]
    x = np.zeros((n,N+1))
    y = np.zeros((m,N))
    m_0 = np.random.normal(0, P, size=n)
    #ξ = np.random.multivariate_normal(np.zeros(n), Q, N+1).T * np.sqrt(Ts)
    #ω = np.random.multivariate_normal(np.zeros(m), R, N).T * 1/np.sqrt(Ts)
    ξ = np.random.multivariate_normal(np.zeros(n), Q, N+1).T
    ω = np.random.multivariate_normal(np.zeros(m), R, N).T
    x[:,0] = m_0
    for k in range(N):
        x[:,k+1] = A @ x[:,k] + B * u + ξ[:,k]
        y[:,k] = H @ x[:,k] + ω[:,k]
    return x[:,:-1], y, A, H

In [None]:
n, m = 2, 1
Q = np.eye(n) * 0.1
R = np.eye(m) * 0.1
P_0 = 0.05
Ts = 0.1
T = 100
steps = 1000
N = int(T/Ts)

print('Horizon:', T)
x, y, A, H = simple_mass_spring_damp(T, Q, R, P_0)
t = np.arange(0, T, Ts)
model = Model(A, H, gpu=True)
losses, preds, weights = sgd(model, y, N, steps, lr=8e-3, gpu=True)
print("Kalman Gain L: \n", weights[:,:,-1])

# APA^T - APH^T (HPH^T + R)^(-1) HPA^T + Q - P = 0
P = scipy.linalg.solve_discrete_are(A.T,H.T,Q,R)
L = A @ P @ H.T @ np.linalg.inv(H @ P @ H.T + R)
print("P: \n", P)
print("L: \n", L)

x, y, A, H = simple_mass_spring_damp(T, Q, R, P_0)
preds = model(y,N).cpu().detach().numpy()

Horizon: 100
Using device: cuda


 20%|██        | 205/1000 [02:16<07:44,  1.71epochs/s]

In [None]:
for i in range(y.shape[0]):
    plt.figure()
    plt.plot(t, x[i])
    plt.plot(t, preds[i], color='r')
    plt.title('Simulation of Mass-Spring-Damper System')
    plt.xlabel('t [s]')
    plt.ylabel('x(t)')
    plt.grid()
    plt.legend(["x", "filtered x"])
    plt.show()

    plt.figure()
    plt.plot(t, y[i])
    plt.plot(t, preds[i], color='r')
    plt.title('Simulation of Mass-Spring-Damper System')
    plt.xlabel('t [s]')
    plt.ylabel('y(t)')
    plt.grid()
    plt.legend(["y", "filtered y"])
    plt.show()

    plt.figure()
    plt.plot(losses)
    plt.title('Loss Function')
    plt.xlabel('n [steps]')
    plt.grid()
    plt.show()

    print(weights)
    print(weights.shape)
    for j in range(n):
        plt.figure()
        plt.plot(weights[j][i])
        plt.axhline(L[j], linestyle='--')
        plt.title('Kalman Gain')
        plt.xlabel('n [steps]')
        plt.legend(["Gain", "Optimal Gain"])
        plt.grid()
        plt.show()

        plt.figure()
        plt.semilogy((L[j] - weights[j][i])**2)
        plt.title('MSE Kalman Gain')
        plt.xlabel('n [steps]')
        plt.grid()
        plt.show()