# Kalman Filtering

В цьому завданні, пропонується імплементувати лінійний фільтр Калмана і застосувати його для поєднання показників сенсорів.

Використання підходів на базі Баєсівської фільтрації є стандартом в задачах такого типу. Наприклад:

- GNSS (GPS). Поєднання 5-12 сигналів супутників для уточнення позиції приймача. Див [laika](https://github.com/commaai/laika), а саме [ноутбук](https://github.com/commaai/laika/blob/master/examples/Compute_station_pos.ipynb)
- Поєднання сенсорів одометрії. Наприклад [GPS та IMU](https://www.youtube.com/watch?v=hN8dL55rP5I) для уточнення положення приймача та локалізації.
- Багато інших. Просто гугліть "Sensor fusion" :)



In [None]:
import numpy as np
from pathlib import Path
from matplotlib import pyplot as plt
randn = np.random.randn

In [None]:
t_max = 12
t_15Hz = np.linspace(start=0, stop=t_max, num=t_max * 15)
t_2Hz = np.linspace(start=0, stop=t_max, num=t_max * 2)

def f1(t):
    return np.sin(t) / 2 + t / 2 * np.cos(t)

def my_signal(t, noise_std=0.0):
    signal_vals_1 = f1(t)
    signal_vals_2 = f1(6.5)
    signal_vals =  signal_vals_1 * (t < 6.5) + (t >= 6.5) * signal_vals_2 
    return signal_vals + randn(*t.shape)*noise_std

In [None]:
f1(6.5)

# Сигнал
Спочатку розглянемо оригінальний сигнал.
$$ f(t) = \begin{cases} 
0.5 \sin(t) + 0.5 t \cos(t) & t < 6.5 \\
3.281... & t \ge 6.5
\end{cases}
$$
Та його зашумлені версії, які й спостерігають наші два сенсори
$$ \xi_1(t) = \mathcal{N} (f(t), \sigma_1) = f(t) + \mathcal{N} (0, 1) \sigma_1
$$

$$ \xi_2(t) = \mathcal{N} (f(t), \sigma_2) = f(t) + \mathcal{N} (0, 1) \sigma_2
$$

$$ \sigma_1 = 1.5, \sigma_2 = 0.1
$$

In [None]:
plt.figure(dpi=300)
plt.plot(t_15Hz, my_signal(t_15Hz), label="$f(t)$")
plt.plot(t_15Hz, my_signal(t_15Hz, noise_std=2.0), label="$\\xi_1(t)$")
plt.scatter(t_2Hz, my_signal(t_2Hz, noise_std=0.1), label="$\\xi_2(t)$")
plt.xlabel("time, $t$")
plt.legend();

Як бачите,

- перший сенсор $\xi_1(t)$ є доволі **шумним**, проте він видає свої показники з частотою **15Hz** (15 показань на секунду)
- другий сенсор $\xi_2(t)$ є майже **чистим**, проте він видає свої показники з частотою **всього 2Hz** (2 показання на секунду)

В ідеалі, ми б хотіли використати інформацію з обох сенсорів для відновлення сигналу. Але спочатку...

# Фільтр Калмана для зменшення шуму сигналу

Нагадаю, що рівняння системи, з якою працює лінійний КФ формулюється наступним чином:

$$ x_k = \textbf{F} x_{k-1} + \mathcal{N} (0, \textbf{Q}) $$
$$ z_k = \textbf{H} x_{k} + \mathcal{N} (0, \textbf{R}) $$

Звичайно, при наявності двох сенсорів, у нас буде два типи спостережень із різними дисперсіями. Але спочатку розглянемо лише один $ R = 1.5^2 $. Не будемо навіть розглядати динаміку. Скажемо, що $\textbf{F} = \textbf{I}$. Більше того, оскільки стан включає в себе лише один скаляр, то всі розмірності в рівняннях будуть одиничними:

$$ x_k = x_{k-1} + \mathcal{N} (0, Q) $$
$$ z_k = x_{k} + \mathcal{N} (0, 1.5^2) $$

In [None]:
# Початкова матриця коваріації визначається звичайним підбором. Припустимо такі значення
P = np.array([[1.0]])
# Для коваріації похибки моделі також визначаємо значення підбором.
Q = np.array([[1.0]])

H = np.array([[1.0]])
R1 = np.array([[1.5**2]])
R2 = np.array([[0.1**2]])  # буде потрібно далі

# Зверніть увагу, F - це функція від dt. Оскільки динаміка відсутня, ця функція - константа. 
# Цікаво спробувати додати динаміку і подивитись чи зміняться результати!
get_F = lambda dt: np.array([[1.0]])

In [None]:
# ---------------------------------------
# Виконайте вказівки в lib/my_kalman.py
# ---------------------------------------
from lib.my_kalman import update, predict

In [None]:
# Конструюємо сигнал на сенсорі (1).
z_15Hz = my_signal(t_15Hz, noise_std=1.5)
t = t_15Hz[0]
x = np.array([z_15Hz[0]])

# Використовуємо фільтр Калмана (predict та update).
filtered_signal = np.empty(t_15Hz.shape[0])
filtered_signal_sigma = np.empty(t_15Hz.shape[0])

filtered_signal[0] = x[0]
filtered_signal_sigma[0] = P[0][0]

for i in range(1, t_15Hz.shape[0]):
    dt = t_15Hz[i] - t
    t = t_15Hz[i]
    z = z_15Hz[i]
    # КФ
    x_next, P_next = predict(x, P, get_F(dt), Q=Q)
    x, P = update(x_next, P_next, z, R1, H)
    
    filtered_signal[i] = x[0]
    filtered_signal_sigma[i] = P[0][0]

In [None]:
plt.figure(dpi=300)
plt.title("Фільтрація першого сенсора")
plt.plot(t_15Hz, my_signal(t_15Hz), label="$f(t)$")
plt.plot(t_15Hz, z_15Hz, label="$\\xi_1(t)$")
plt.plot(t_15Hz, filtered_signal, label="після фільтрації")
plt.legend();

Але також було б цікаво подивитись на трохи іншу візуалізацію.

Нагадаю, що $\xi_1(t)$ має дисперсію і є імовірнісним сигналом. Вище, ми розглядали лише його реалізацію. То ж давайте порівняємо параметри випадкових процесів.

In [None]:
plt.figure(dpi=300)
plt.title("Фільтрація першого сенсора - процеси")
f_t = my_signal(t_15Hz)
sigma_t = 1.5
plt.plot(t_15Hz, f_t, label="$f(t)$")
plt.fill_between(t_15Hz, f_t - sigma_t, f_t + sigma_t, alpha=0.5)
plt.plot(t_15Hz, filtered_signal, label="після фільтрації")
plt.fill_between(t_15Hz, filtered_signal - filtered_signal_sigma, filtered_signal + filtered_signal_sigma, alpha=0.5)
# plt.fill_between(t_10Hz, f_t - filtered_signal_sigma, f_t + filtered_signal_sigma, alpha=0.5)
plt.legend();

## Візуалізація анімації процесу

In [None]:
from scipy.stats import norm 
x = np.linspace(start=-8, stop=8, num=100)
f_t = my_signal(t_15Hz)
sigma_t = 1.5

fig, ax = plt.subplots(dpi=200)
ax.set_xlim(-8, 8)
ax.set_ylim(0, 1.0)
filtered_signal_plot, = ax.plot([],[], label="KF - $\mathcal{N}(x_t, P_t)$")
xi1_plot, = ax.plot([],[], label="$\\xi_1$ - $\mathcal{N}(f(t), 1.5^2)$")
xi1_z_plot, = ax.plot([],[], label="$\\xi_1$ - спостереження")
ax.legend(loc="upper left")
ax.set_xlabel("значення сигналу")
ax.set_ylabel("'Імовірність'")

def animate(i):
    loc = filtered_signal[i]
    scale = filtered_signal_sigma[i]
    filtered_signal_plot.set_data(x, norm.pdf(x, loc=loc, scale=scale))
    xi1_plot.set_data(x, norm.pdf(x, loc=f_t[i], scale=sigma_t))
    xi1_z_plot.set_data([z_15Hz[i], z_15Hz[i]], [0.0, 1.0])

import matplotlib.animation
ani = matplotlib.animation.FuncAnimation(fig, animate, frames=len(t_15Hz))

from IPython.display import HTML
HTML(ani.to_jshtml())

# Але чим це відрізняється від інших методів згладжування?

Упевнений, що багато з вас знайомі із методами ковзних середніх та іншими методами згладжування. Може постати питання, чим вони відрізняються від КФ?

Окрім того, що згладжування це лише одна підзадача, яку вирішує КФ, існує глибокий зв'язок між методами відновлення стану (до яких відноситься КФ) і іншими методами часових рядів.



Якщо взяти Експоненційне ковзне середнє, то [зв'язок](https://stats.stackexchange.com/a/16474) полягає в тому що КФ реалізує достатньо схожу схему згладжування, лиш параметр змінюється відповідно до апріорних знань (R, Q, x0, P0, etc.) та даних часового ряду.

Ось приклад:

In [None]:
from lib.ewma import ewma

In [None]:
plt.figure(dpi=300)
plt.title("Фільтрація першого сенсора - KF vs EWMA")
ewma_signal = ewma_vectorized(z_15Hz, alpha=0.991)
plt.plot(t_15Hz, my_signal(t_15Hz), label="$f(t)$")
plt.plot(t_15Hz, ewma_signal, label="Експоненційне ковзне середнє")
plt.plot(t_15Hz, filtered_signal, label="filtered")
plt.legend();

# Два сенсори!

In [None]:
def merge(arr1, arr2):
    ''' Поєднуємо потоки спостережень. '''
    arr_merged = []
    from_merged = []
    idx_merged = []
    i1 = 0
    i2 = 0
    while i1 < len(arr1) or i2 < len(arr2):
        if i1 < len(arr1) and (i2 >= len(arr2) or arr1[i1] < arr2[i2]):
            arr_merged.append(arr1[i1])
            from_merged.append(0)
            idx_merged.append(i1)
            i1 += 1
        else:
            arr_merged.append(arr2[i2])
            from_merged.append(1)
            idx_merged.append(i2)
            i2 += 1
    return np.array(arr_merged), np.array(from_merged), np.array(idx_merged)

Все те саме, але у нас тепер два види спостережень із різними рівнями шуму спостережень (measurement noise, R).
Обʼєднуємо ці спостереження банально оновлюючи КФ обома спостереженнями, але із різними параметрами R.

In [None]:
# Конструюємо сигнал на сенсорі (1).
z1_15Hz = z_15Hz
z2_2Hz = my_signal(t_2Hz, noise_std=0.1)
t = t_15Hz[0]
x = np.array([z2_2Hz[0]])

t_merged, from_merged, idx_merged = merge(t_15Hz, t_2Hz)

# Використовуємо фільтр Калмана (predict та update).
fused_signal = np.empty(t_merged.shape[0])
fused_signal[0] = x[0]

for i in range(0, t_merged.shape[0]):
    dt = t_merged[i] - t
    t = t_merged[i]
    z = [z1_15Hz, z2_2Hz][from_merged[i]][idx_merged[i]]
    R = [R1, R2][from_merged[i]]

    # КФ
    x_next, P_next = predict(x, P, get_F(dt), Q=Q)
    x, P = update(x_next, P_next, z, R, H)
    
    fused_signal[i] = x[0]

In [None]:
plt.figure(dpi=300)
plt.title("Фільтрація двох сенсорів")
plt.plot(t_15Hz, my_signal(t_15Hz), label="$f(t)$")
plt.plot(t_15Hz, z1_15Hz, label="$\\xi_1(t)$")
plt.scatter(t_2Hz, z2_2Hz, label="$\\xi_2(t)$")
# plt.plot(t_15Hz, filtered_signal, label="після фільтрації")
plt.plot(t_merged, fused_signal, label="після фільтрації (2 сенсора)")
plt.legend();

In [None]:
plt.figure(dpi=300)
plt.title("Фільтрація двох сенсорів (порівняння)")
plt.plot(t_15Hz, my_signal(t_15Hz), label="$f(t)$")
plt.plot(t_15Hz, filtered_signal, label="після фільтрації")
plt.plot(t_merged, fused_signal, label="після фільтрації (2 сенсора)")
plt.legend();

Як бачите, об'єднана версія майже завжди лежить між першою версією та оригінальним сигналом, тобто є завжди більш точною.

Звичайно, коли маємо одновимірний сигнал, перевага не настільки величезна, плюс ми знаємо параметри шумів. В реальності обʼєднання сенсорів - трюк дуже важливий, який часто є необхідним для мінімально допустимої якості обробки сенсорів!