# Kalman Filter zur Zustandsschaetzung

_Philipp Rapp_

**Mechatronik 2022/2023**



In [None]:
# Pakete
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal

%matplotlib inline

## Erzeugung der Trajektorie des wahren Zustands

Da wir in der Simulation arbeiten, koennen wir die Trajektorie $x(t)$ des wahren (und eigentlich unbekannten) Zustands erzeugen.
Damit koennen wir die Funktion des Kalman Filters ueberpruefen.

Wir diskretisieren den Verlauf des Zustands mit der Schrittweite $\Delta T = 0.01$ (das entspricht 100 Hz) und speichern das Ergebnis in einem Dictionary.

Als Szenario nehmen wir an, dass das Flugzeug durch einen Punkt modelliert wird und eine Acht abfliegt.

Die (wahre) Position des Punktziels ist gegeben durch die Position $(p_x, p_y)$ mit
\begin{align}
    p_x &= \cos(\omega t) \\
    p_y &= \sin(2 \omega t)
\end{align}
mit $t \in [0,T]$.

Die Geschwindigkeit ergibt sich damit zu
\begin{align}
    v_x &= -\omega \sin(\omega t) \\
    v_y &= 2 \omega \cos(2 \omega t)
\end{align}

### Allgemeine Parameter

In [None]:
# Gesamtdauer
T = 10

true_state = {}
true_state['step_size'] = 0.01
# 0 bis 10 Sekunden
true_state['t'] = np.arange(0, 10.0, true_state['step_size'])

### Definition des Szenarios

Wir betrachten eine "Acht" aus der Vogelperspektive.

In [None]:
# Frequenz
omega = 2*np.pi/T

# Position
true_state['pos_x'] = np.cos(omega*true_state['t'])
true_state['pos_y'] = np.sin(2.0*omega*true_state['t'])
# Geschwindigkeit
true_state['vel_x'] = -omega*np.sin(omega*true_state['t'])
true_state['vel_y'] = 2*omega*np.cos(2*omega*true_state['t'])
true_state['x'] = np.column_stack((true_state['pos_x'], true_state['pos_y'], true_state['vel_x'], true_state['vel_y']))

In [None]:
plt.plot(true_state['pos_x'], true_state['pos_y'])
plt.grid(True)
plt.xlabel('pos_x')
plt.ylabel('pos_y')
plt.show()

## Hinzufuegen von Messrauschen

Als naechstes fuegen wir Messrauschen hinzu. Wir nehmen an, dass das Messrauschen mittelwertfrei, normalverteilt und weiss ist. Weisses Rauschen bedeutet, dass es zeitlich unkorreliert ist.



In [None]:
R = 0.02**2 * np.eye(2)
# R = 0.002**2 * np.eye(2)
print(f'Kovarianzmatrix R des Messrauschens:\n{R}')

measurement_noise = { 'mean': np.array([0.0, 0.0]), 'cov': R }
measurement_noise_distribution = multivariate_normal(mean=measurement_noise['mean'], cov=measurement_noise['cov'])

## Erzeugung des Messsignals

Das gemessene Signal $y(t)$ besteht aus der Position, die zusaetzlich durch Rauschen ueberlagert ist.

Wir nehmen die gleiche zeitliche Diskretisierung an wie fuer den wahren Zustand.

In [None]:
N = len(true_state['t'])

measurement_signal = {}
measurement_signal['t'] = true_state['t']
measurement_signal['y'] = true_state['x'][:,:2] + measurement_noise_distribution.rvs(N)

In [None]:
plt.plot(true_state['x'][:,0], true_state['x'][:,1], '--', color=[0.5,0.5,0.5], label='Wahrer Zustand (Position)')
plt.scatter(measurement_signal['y'][:,0], measurement_signal['y'][:,1], s=1.2, label='Messwerte')
plt.grid(True)
plt.legend()
plt.xlabel('pos_x')
plt.ylabel('pos_y')
plt.show()

## Auslegung des Kalman Filters

### Streckenmodell
Im Kontext der Zustandsschaetzung eines bewegten Ziels wird
das Streckenmodell oft auch _Bewegungsmodell_ genannt,
da es die Bewegung des Ziels modelliert.

Wir gehen von einem Punktziel aus und formulieren den Schwerpunktsatz
\begin{align}
    m \ddot{p}_x &= F_x \\
    m \ddot{p}_y &= F_y
\end{align}

Da wir die Kraefte nicht kennen, setzen wir sie zu Null.
Den Fehler, den wir dadurch erhalten, modellieren wir durch das sogenannte _Prozessrauschen_
(mit Kovarianzmatrix $Q$).
Dieses wird dazu genutzt, Unsicherheiten im Streckenmodell zu modellieren.

Unter Annahme einer konstanten Masse $m$ erhalten wir
\begin{align}
    \dot{v}_x &= 0 \\
    \dot{v}_y &= 0
\end{align}

Dieses Modell ist in der Literatur unter dem Namen _Constant Velocity_ Modell bekannt.

### Messmodell

Wir nehmen an dass wir die Position des Flugszeugs messen können (nicht aber seine Geschwindigkeit).

Auf die Messung wirkt weißes Gaußsches Rauschen mit Kovarianzmatrix $R$.

### Zustandsraumdarstellung
Wir koennen das Streckenmodell in Form der linearen ZRD darstellen.

Der Zustandsvektor ist
\begin{equation}
    x = ( p_x, p_y, v_x, v_y)
\end{equation}

Die ZRD ist damit
\begin{align}
    \dot{x} &= Ax \\
    y &= Cx
\end{align}
mit
\begin{equation}
    A = \left( \begin{array}{cccc} 0 & 0 & 1 & 0 \\
                                   0 & 0 & 0 & 1 \\
                                   0 & 0 & 0 & 0 \\
                                   0 & 0 & 0 & 0
               \end{array} \right)
\end{equation}
und
\begin{equation}
    C = \left( \begin{array}{cccc} 1 & 0 & 0 & 0 \\
                                   0 & 1 & 0 & 0
               \end{array} \right)
\end{equation}

### Zeitliche Diskretisierung


\begin{align}
    x_{k+1} &= F x_k \\
    y_k &= C x_k
\end{align}
mit
\begin{equation}
    F = E + \Delta t A
\end{equation}

In [None]:
kalman_filter = {}
kalman_filter['step_size'] = true_state['step_size']
kalman_filter['R'] = measurement_noise['cov']
# kalman_filter['Q'] = 0.05**2 * np.eye(4)
kalman_filter['Q'] = np.diag([0.005**2, 0.005**2, 0.1**2, 0.1**2])
print(kalman_filter['Q'])
kalman_filter['A'] = np.zeros((4,4),dtype=float)
kalman_filter['A'][0,2] = 1.0
kalman_filter['A'][1,3] = 1.0
kalman_filter['F'] = np.eye(4) + kalman_filter['step_size']*kalman_filter['A']
kalman_filter['C'] = np.array([[1.0,0,0,0], [0,1.0,0,0]])

## Definition des Kalman Filters

Das Kalman Filter kann als stochastischer Beobachter verstanden werden.

Im Gegensatz zum Luenberger Beobachter wird das Messrauschen R explizit berücksichtigt.
Darüber hinaus werden Unsicherheiten in der Modellierung der Strecke als Prozessrauschen dargestellt und ebenfalls berücksichtigt.

Der Zustand selbst wird ebenfalls als normalverteiltes Signal verstanden, wobei die (vom Luenberger Beobachter) bekannte Zustandsschätzung $\hat{x}(t)$ den Mittelwert darstellt, und die Kovarianzmatrix $P$ die Kovarianz des Schätzfehlers $x(t) - \hat{x}(t)$ darstellt. Damit stellt $P$ die Unsicherheit der Schätzung dar.

Das Kalman Filter arbeitet auf zeitdiskreten Signalen.

Die Implementierung wird in zwei Schritte unterteilt:
* Prädiktion vom Zeitpunkt $k$ zum nächsten Zeitpunkt $k+1$ mit Hilfe des Streckemodells
* Korrektur der prädizierten Zustandsschätzung mit Hilfe des Messmodells


### Prädiktion

\begin{align}
    \hat{x}_{k+1}^{-} &+ F \hat{x}_{k}^{+} + B u_k \\
    P_{k+1}^{-} &= F P_{k}^{+} F^T + Q
\end{align}


### Korrektur

\begin{align}
    \hat{x}_{k+1}^{+} &+ \hat{x}_{k+1}^{-} + K \cdot \left( y_{k+1} - \hat{y}_{k+1} \right) \\
    P_{k+1}^{+} &= \left( E - K C \right) P_{k+1}^{-}
\end{align}

mit
\begin{align}
    \hat{y}_{k+1} &= C \hat{x}_{k+1}^{-} \\
    S_{k+1} &= C P_{k+1}^{-} C^T + R \\
    K &= P_{k+1}^{-} C^T S_{k+1}^{-1}
\end{align}


In [None]:
# Praediktion
def prediction(x_dach, P, kf_params):
    F, Q = kf_params['F'], kf_params['Q']
    x_dach_priori = np.dot(F, x_dach)
    P_priori = np.linalg.multi_dot((F, P, F.T)) + kf_params['Q']
    return x_dach_priori, P_priori
    
    
# Korrektur
def correction(y, x_dach_priori, P_priori, kf_params):
    C, R = kf_params['C'], kf_params['R']
    y_expected = np.dot(C, x_dach_priori)
    innovation = y - y_expected
    
    S = np.linalg.multi_dot((C, P_priori, C.T)) + R
    S_inv = np.linalg.inv(S)
    
    # Berechnung der Kalman Verstaerkung
    K = np.linalg.multi_dot((P_priori, C.T, S_inv))
    
    x_dach_posteriori = x_dach_priori + np.dot(K, innovation)
    P_posteriori = np.dot(np.eye(4) - np.dot(K, C), P_priori)
    
    return x_dach_posteriori, P_posteriori

In [None]:
# Durchfuehrung der Zustandsschaetzung
estimated_state = {}
estimated_state['t'] = true_state['t']
estimated_state['x'] = np.zeros(true_state['x'].shape)

# Initialer Schaetzwert
x_dach = np.array([0.0, 0.0, 0.0, 0.0])
P = np.eye(4)

print(f'x_dach_0=\n{x_dach}')

In [None]:
for idx in range(N):
    # Messung abgreifen -- wir kommen nur an die Messung, nicht an den wahren Zustand.
    y = measurement_signal['y'][idx]
    x_dach_priori, P_priori = prediction(x_dach, P, kalman_filter)
    x_dach_posteriori, P_posteriori = correction(y, x_dach_priori, P_priori, kalman_filter)
    x_dach, P = x_dach_posteriori, P_posteriori
    estimated_state['x'][idx] = x_dach

In [None]:
# Ergebnisse anzeigen

plt.plot(estimated_state['x'][:,0], estimated_state['x'][:,1], '-', color=[0.5,0.5,0.5], label='Geschaetzter Zustand (Position)')
plt.scatter(measurement_signal['y'][:,0], measurement_signal['y'][:,1], s=1.2, label='Messwerte')
plt.grid(True)
plt.legend()
plt.show()

In [None]:
# Vergleich der Geschwindigkeiten

fig = plt.figure(figsize=(20,20))
ax = fig.add_subplot(2,1,1)
ax.plot(estimated_state['t'], estimated_state['x'][:,2], '-', color=[0.5,0.5,0.5], label='vx geschaetzt', linewidth=2)
ax.plot(true_state['t'], true_state['x'][:,2], '--', color=[0.0,0.5,1.0], label='vx wahr', linewidth=2)
ax.grid(True)
ax.legend()

ax = fig.add_subplot(2,1,2)
ax.plot(estimated_state['t'], estimated_state['x'][:,3], '-', color=[0.5,0.5,0.5], label='vy geschaetzt', linewidth=2)
ax.plot(true_state['t'], true_state['x'][:,3], '--', color=[0.0,0.5,1.0], label='vy wahr', linewidth=2)
ax.grid(True)
plt.legend()

plt.show()

In [None]:
# Vergleich der Positionen

fig = plt.figure(figsize=(20,20))
ax = fig.add_subplot(2,1,1)
ax.plot(estimated_state['t'], estimated_state['x'][:,0], '-', color=[0.5,0.5,0.5], label='px geschaetzt', linewidth=2)
ax.plot(true_state['t'], true_state['x'][:,0], '--', color=[0.0,0.5,1.0], label='px wahr', linewidth=2)
ax.grid(True)
ax.legend()

ax = fig.add_subplot(2,1,2)
ax.plot(estimated_state['t'], estimated_state['x'][:,1], '-', color=[0.5,0.5,0.5], label='py geschaetzt', linewidth=2)
ax.plot(true_state['t'], true_state['x'][:,1], '--', color=[0.0,0.5,1.0], label='py wahr', linewidth=2)
ax.grid(True)
plt.legend()

plt.show()

In [None]:
# Vergleich mit Geschwindigkeitsschaetzung durch Differenzenbildung.
# (Neue Position minus alte Position).

vx_via_diff = np.diff(measurement_signal['y'][:,0]) / true_state['step_size']
vx_via_diff = np.concatenate((np.zeros((1,)), vx_via_diff))

vy_via_diff = np.diff(measurement_signal['y'][:,1]) / true_state['step_size']
vy_via_diff = np.concatenate((np.zeros((1,)), vy_via_diff))

fig = plt.figure(figsize=(20,20))
ax = fig.add_subplot(2,1,1)
ax.plot(estimated_state['t'], estimated_state['x'][:,2], '-', color=[0.5,0.5,0.5], label='vx geschaetzt', linewidth=3)
ax.plot(true_state['t'], true_state['x'][:,2], '--', color=[0.0,0.5,1.0], label='vx wahr', linewidth=3)
ax.plot(estimated_state['t'], vx_via_diff, '-', color=[1.0,0.5,0.0], label='vx einfach', linewidth=1)
ax.grid(True)
ax.legend()

ax = fig.add_subplot(2,1,2)
ax.plot(estimated_state['t'], estimated_state['x'][:,3], '-', color=[0.5,0.5,0.5], label='vy geschaetzt', linewidth=2)
ax.plot(true_state['t'], true_state['x'][:,3], '--', color=[0.0,0.5,1.0], label='vy wahr', linewidth=2)
ax.plot(estimated_state['t'], vy_via_diff, '-', color=[1.0,0.5,0.0], label='vy einfach', linewidth=1)
ax.grid(True)
plt.legend()

plt.show()