In [1]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import math
import pickle
class Kalman_filter(object):
    def __init__(self,
                 true=np.zeros(0),
                 order=4,
                 time=1,
                ):
        self.true = true
        self.time = time
        self.ax = 0
        self.ay = 0
        self.stateMatrix = np.zeros((order,1))
        self.identityMatrix = np.eye(order)

        self.varianceMatrix = self.P(order)
        self.measurementCovMatrix = self.R(order)
        self.processCovMatrix = self.Q(order)
        self.transitionMatrix = self.F(order, self.time)
        self.controlMatrix = self.B(order, self.time)
        self.controlVector = self.U(self.ax, self.ay)
        self.measurementMatrix = self.H(order)
        self.error_measure = []
        self.error_kalman = []
        
    def F(self, order, time):
        if order == 2:
            return np.array([[1, time],
                             [0, 1]])
        elif order == 4:
            return np.array([[1, 0, time, 0],
                             [0, 1, 0, time],
                             [0, 0, 1, 0],
                             [0, 0, 0, 1],])
    def B(self, order, time):
        t_ = (time**2)/2
        if order == 2:
            return np.array([[t_],
                             [time]])
        elif order == 4:
            return np.array([[t_, 0],
                             [0, t_],
                             [time, 0],
                             [0, time]])
    def U(self, ax, ay):
        return np.array([[ax],
                         [ay]])
    
    def P(self, order):
        return np.eye(order)
    
    def R(self, order):
        if order > 2:
            R = np.eye(int(order/2))
        else:
            R = t
        return R if self.time == 0 else R * self.time 

    def Q(self, order):
        Q = np.eye(order)
        return Q if self.time == 0 else Q * self.time 
    
    def T(self, array):
        return array.transpose()
    
    def H(self, order):
        if order == 2:
            return np.eye(2)
        else:
            return np.array([[1, 0, 0, 0],
                             [0, 1, 0, 0]])
    
    def dot(self, p, *args):
        for arg in args:
            p = np.dot(p, arg)
        return p
    
    def inverse(self, array):
        return np.linalg.inv(array)
    
    def predict(self, order):
        self.stateMatrix = self.dot(self.transitionMatrix, self.stateMatrix)
        if order == 4:
            self.stateMatrix += self.dot(self.controlMatrix, self.controlVector)
        
        self.varianceMatrix = self.dot(self.transitionMatrix, self.varianceMatrix, self.T(self.transitionMatrix))
        self.varianceMatrix += self.processCovMatrix
        
        print(self.stateMatrix, order)
        return self.stateMatrix, self.varianceMatrix
        
    def update(self, true_data, measure_data, error=""):
        x_measure = measure_data[0]
        y_measure = measure_data[1]
        
        x_true = true_data[0]
        y_true = true_data[1]
        
        T = self.inverse(self.dot(self.measurementMatrix, 
                                  self.varianceMatrix,
                                  self.T(self.measurementMatrix)
                                 ) + self.measurementCovMatrix
                        )
        P = self.dot(self.varianceMatrix, 
                     self.T(self.measurementMatrix)
                    ) 
        kalmanGain = self.dot(P, T)
        S = np.array([[x_measure],
                      [y_measure]]) - self.dot(self.measurementMatrix,
                                           self.stateMatrix
                                          )
        self.stateMatrix += self.dot(kalmanGain, S)
        
        self.varianceMatrix = self.dot((
            self.identityMatrix - self.dot(
                kalmanGain, 
                self.measurementMatrix
            )
        ),
            self.varianceMatrix
        )
        
        x_kalman = self.stateMatrix[0,0]
        y_kalman = self.stateMatrix[1,0]
        
        if error == "mse" or error == "mean_square_error":
            measure_error = (x_true - x_measure)**2 + (y_true - y_measure)**2
            kalman_error = (x_true - x_kalman)**2 + (y_true - y_kalman)**2
            self.error_measure.append(measure_error)
            self.error_kalman.append(kalman_error)
        
        return self.stateMatrix, self.varianceMatrix
    

In [1]:
true = pickle.load(open("data1/true1.pickle","rb"))
tracka = pickle.load(open("data1/position1.pickle","rb"))
trackb = pickle.load(open("data1/velocity1.pickle","rb"))
trackc = pickle.load(open("data1/acceleration1.pickle","rb"))

measurement = [tracka, trackb]#, trackc]
orders = [4, 4] # position, velocity, acceleration
time = [0.01, 0.01]

j = 1
for i in range(len(measurement)):
    Z = measurement[i]
    o = orders[i]
    t = time[i]
    kf = Kalman_filter(true=true, 
                       order=o,
                       time=t
                      )
    kf.__init__()
    x_kalmanPath, y_kalmanPath = [], []
    x_variance, y_variance = [], []
    for i in range(len(true)):
        true_data = true[i]
        measure_data = Z[i]
        
        PredictStateMartix, PredictVarianceMatrix = kf.predict(o)
        UpdateStateMartix, UpdateVarianceMatrix = kf.update(
            true_data, 
            measure_data,
            error="mean_square_error"
        )
        
        x_kalmanPath.append(UpdateStateMartix[0,0])
        y_kalmanPath.append(UpdateStateMartix[1,0])
        
        x_variance.append(UpdateVarianceMatrix[0,0])
        y_variance.append(UpdateVarianceMatrix[1,1])
        
    plt.subplot(3,3,j)
    plt.plot(true[:,0],true[:,1], c='b')
    plt.plot(Z[:,0],Z[:,1],c='orange')
    plt.plot(x_kalmanPath, x_kalmanPath, c='g')
    
    plt.subplot(3,3,3+j)
    plt.plot(range(len(kf.error_measure)), kf.error_measure, c='orange')
    plt.plot(range(len(kf.error_kalman)), kf.error_kalman, c='g')

    plt.subplot(3,3,6+j)
    plt.plot(range(len(x_variance)), x_variance, c='orange')
    plt.plot(range(len(y_variance)), y_variance, c='g')
    j += 1
        
plt.show()

In [2]:
for i in range(2):
    kf = Kalman_filter()
    kf.__init__()
    print(kf.stateMatrix)
    kf.stateMatrix = 3
    print(kf.stateMatrix)
    

In [154]:
def F(t):
    return np.array([[1, t],
                     [0, 1]])

def B(t):
    return np.array([[(t**2)/2],
                     [    t   ]])

def U(a):
    return a

def inverse(array):
    return np.linalg.inv(array)

def dot(p, *args):
    for arg in args:
        p = np.dot(p, arg)
    return p

def initCovarianceMatrix(array):
    std_x = array[:, 0].std()
    std_y = array[:, 1].std()
    var_x = std_x**2
    var_y = std_y**2
    covar = std_x * std_y
    return np.array([[var_x, covar],
                     [covar, var_y]])

def initStateMatrix(x, y):
    return np.array([[x],
                     [y],
                     [np.sqrt(x**2 + y**2)/0.01]])

def collectionXYdata(array, x_data=[], y_data=[]):
    h, w = array.shape
    if (h * w) == 4:
        x_data.append(array[0,0])
        y_data.append(array[1,1])
    else:
        x_data.append(array[0,0])
        y_data.append(array[1,0])
    return x_data, y_data

In [105]:
# with open('c_acceleration/tracks3.dat', 'rb') as a:
#     for i in range(6000):
#         print(str(a.readline())[2:-1]
#               .replace("\\n", ",")
#               .replace("  ", ",")
#               .replace("[,","[")
#               .replace(",,",",")
#               .replace("[ ","[")
#               .replace(" -",",-")
#         )

In [106]:
# import pickle

# pickle_out = open("track3.pickle","wb")
# pickle.dump(x, pickle_out)
# pickle_out.close()

# pickle_out = open("track1.pickle","wb")
# pickle.dump(y, pickle_out)
# pickle_out.close()