http://nbviewer.jupyter.org/github/sharedmind/testy_01/blob/master/1_LKF.ipynb

In [6]:
from bokeh.plotting import figure, output_file, show
from bokeh.io import output_notebook
import numpy as np
from sklearn.metrics import mean_squared_error#metryki do porownan
output_notebook()

In [3]:
def kalmN(N, druk = False):
    
    """
    Predykcja
        X - przewidywana wartosc stanu
        P - kowariancja stanu
        A n x n - macierz przejscia stanu
        B n x 1 - macierz sterowania
    Korekcja
        H n x 1 - macierz wizazaca stany z wielkosciami mierzonymi
        R - macierz kowariancji szumu pomirowego
        Q - macierz kowariancji procesu
        Y - pomiar w kroku k
        V - measurement residual w kroku k
        S - kowariancja predykcji pomiaru w kroku k
        K - wzmocnienie
    
    
    """
    
    
    
    dt = 0.1#krok czasowy
    
    #model
    X = np.array([[0.0],[0.0],[0.1],[0.1]])    
    P = np.diag((0.1, 0.1, 0.1, 0.1))
    A = np.array([[1, 0 , dt, 0],\
                 [0, 1, 0, dt],\
                 [0, 0, 1, 0],\
                 [0, 0, 0, 1]])
    Q = np.eye((X.shape)[0])# X is Nx1
    B = np.eye((X.shape)[0])# X is Nx1
    U = np.zeros(((X.shape)[0],1))# U is Nx1
       
    H = np.array([[1, 0, 0, 0], [0, 1, 0, 0]])
    Y = np.array([[X[0,0] + np.abs(np.random.randn())],\
                  [X[1,0] + np.abs(np.random.randn())]])
    
    R = np.eye((Y.shape)[0])# X is Nx1   
    
    if druk == True:
        print("X", X.shape)
        print("P", P.shape)
        print("A", A.shape)
        print("Q", Q.shape)
        print("B", B.shape)
        print("U", U.shape)
        print("Y", Y.shape)
        print("H", H.shape)
        print("R", R.shape)    
    kf_out = []
    y_out = []
    for i in np.arange(0,N):
        #pomiar
        Y = np.array([[X[0,0] + np.abs(0.1*np.random.randn())], \
                  [X[1,0] + np.abs(0.1*np.random.randn())]])
    
        #predykcja
        X = np.dot(A,X) + np.dot(B,U)# X[k] = A*X[k-1] + B * u[k-1] 
        P = np.dot(A, np.dot(P,A.T)) + Q# P[k] = A*P[k-1]A' + Q
        #korekcja
        #HX = np.dot(H,X)
        V = Y - np.dot(H,X)# measurement residual w kroku k
        S = np.dot(H, np.dot(P,H.T)) + R# predykcja w kroku k kowariancji szumu pomiarowego
        K = np.dot(P, np.dot(H.T, np.linalg.inv(S)))
        X = X + np.dot(K, V)
        P = P - np.dot(K, np.dot(S, K.T))  
        kf_out.append(X[0][0])#wyjscie KF
        y_out.append(Y[0][0])#pomiar
    
    return kf_out, y_out

kf_out, y_out = kalmN(30)
t = np.linspace(0,10,30)
p1 = figure(plot_width = 650, plot_height=350)
p1.line(t,kf_out, legend="kf_out", color="red", line_width=2)
p1.line(t,y_out, legend="y_out", color="blue", line_width=2)
show(p1)

In [12]:
def kalm1d(z, N):   
    
    """
    Kalman 1D algorytm
    z - pomiar
    //Predykcja
    xhat_k = A * xhat_k_1;
    p_k    = A * p_k_1 * A' + Q;

    // Korekcja
    K_k = p_k  / (p_k  + R);
    xhat_k = xhat_k + K_k * (z_k - xhat_k);
    p_k = (1 - K_k) * p_k + Q;
    
    """
    A = 1 # mozna pominac
    Q = 1e-8 #szum procesowy
    R = 1e-5# szum pomiarowy

    xhat_k=np.zeros(N)      # a posteri estimate of x
    p_k=np.zeros(N)         # a posteri error estimate
    #xhat_k_1=np.zeros(N) # a priori estimate of x
    #p_k_1=np.zeros(N)    # a priori error estimate
    K_k=np.zeros(N)         # gain or blending factor
    
    
    # wartosci poczatkowe
    xhat_k[0] = 452150.0
    p_k[0] = 1.0
    for k in range(1,N):
        # predykcja
        xhat_k[k] = xhat_k[k-1]
        p_k[k] = p_k[k-1]+Q
    
        # korekcja
        K_k[k] = p_k[k]/( p_k[k]+R )
        xhat_k[k] = xhat_k[k]+K_k[k]*(z[k]-xhat_k[k])
        p_k[k] = (1-K_k[k])*p_k[k]
    mse = mean_squared_error(z[15:], xhat_k[15:])   
    return mse, xhat_k


def avg_smooth(y, box_pts):
    box = np.ones(box_pts)/box_pts
    y_smooth = np.convolve(y, box, mode='same')
    return y_smooth


def czytaj_dane(N):
    y = []
    x = np.linspace(1,N,N)
    f =  open('raw_adc.txt')
    y = np.zeros(N)
    ind = 0
    for i in x:
        read_data = f.readline()    
        if not read_data:
            break
        y[ind] = int(read_data)
        ind = ind +1
    f.close
    return (x,y)

N = 1500
x_in, y_in = czytaj_dane(N)
mse, y_out_kal = kalm1d(y_in, N)#kalman
avg_win = 31 #okno usredniania
y_out_avg = avg_smooth(y_in, avg_win)#usrednianie   

p1 = figure(plot_width = 650, plot_height=350)
p1.line(x_in, y_in, legend="noised raw", color="red", line_width=2)
p1.line(x_in, y_out_kal, legend="Kalman", color="blue", line_width=2)
p1.line(x_in[avg_win:-avg_win], y_out_avg[avg_win:-avg_win], legend="AVG", color="green", line_width=2)
show(p1)
print("Kalman std = ", np.std(y_out_kal))
print("AVG std = ", np.std(y_out_avg))


Kalman std =  38.4083080077
AVG std =  18604.7857411
