In [1]:
# 实现一个卡尔曼滤波器，设定如下：
# 1、运动是一维空间下的，因此表示状态的速度和位置都只有一个值
# 2、状态是二维的，即状态x = [速度，位置]
# 3、状态的不确定性矩阵P即为"协方差矩阵"（状态向量即为均值向量），是一个2✖2的matrix（这里设定为位置和速度互不相关）。
#    3-1、第一行代表位置的不确定性
#    3-2、第二行代表速度的不确定性

from math import *

In [13]:
class matrix:
    
    # implements basic operations of a matrix class
    
    def __init__(self, value):
        self.value = value
        self.dimx = len(value)
        self.dimy = len(value[0])
        if value == [[]]:
            self.dimx = 0
    
    def zero(self, dimx, dimy):
        # check if valid dimensions
        if dimx < 1 or dimy < 1:
            raise ValueError("Invalid size of matrix")
        else:
            self.dimx = dimx
            self.dimy = dimy
            self.value = [[0 for row in range(dimy)] for col in range(dimx)]
    
    def identity(self, dim):
        # check if valid dimension
        if dim < 1:
            raise ValueError("Invalid size of matrix")
        else:
            self.dimx = dim
            self.dimy = dim
            self.value = [[0 for row in range(dim)] for col in range(dim)]
            for i in range(dim):
                self.value[i][i] = 1
    
    def show(self):
        for i in range(self.dimx):
            print(self.value[i])
        print(' ')
    
    def __add__(self, other):
        # check if correct dimensions
        if self.dimx != other.dimx or self.dimy != other.dimy:
            raise ValueError("Matrices must be of equal dimensions to add")
        else:
            # add if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, self.dimy)
            for i in range(self.dimx):
                for j in range(self.dimy):
                    res.value[i][j] = self.value[i][j] + other.value[i][j]
            return res
    
    def __sub__(self, other):
        # check if correct dimensions
        if self.dimx != other.dimx or self.dimy != other.dimy:
            raise ValueError("Matrices must be of equal dimensions to subtract")
        else:
            # subtract if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, self.dimy)
            for i in range(self.dimx):
                for j in range(self.dimy):
                    res.value[i][j] = self.value[i][j] - other.value[i][j]
            return res
    
    def __mul__(self, other):
        # check if correct dimensions
        if self.dimy != other.dimx:
            raise ValueError("Matrices must be m*n and n*p to multiply")
        else:
            # multiply if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, other.dimy)
            for i in range(self.dimx):
                for j in range(other.dimy):
                    for k in range(self.dimy):
                        res.value[i][j] += self.value[i][k] * other.value[k][j]
            return res
    
    def transpose(self):
        # compute transpose
        res = matrix([[]])
        res.zero(self.dimy, self.dimx)
        for i in range(self.dimx):
            for j in range(self.dimy):
                res.value[j][i] = self.value[i][j]
        return res
    
    # Thanks to Ernesto P. Adorio for use of Cholesky and CholeskyInverse functions
    
    def Cholesky(self, ztol=1.0e-5):
        # Computes the upper triangular Cholesky factorization of
        # a positive definite matrix.
        res = matrix([[]])
        res.zero(self.dimx, self.dimx)
        
        for i in range(self.dimx):
            S = sum([(res.value[k][i])**2 for k in range(i)])
            d = self.value[i][i] - S
            if abs(d) < ztol:
                res.value[i][i] = 0.0
            else:
                if d < 0.0:
                    raise ValueError("Matrix not positive-definite")
                res.value[i][i] = sqrt(d)
            for j in range(i+1, self.dimx):
                S = sum([res.value[k][i] * res.value[k][j] for k in range(self.dimx)])
                if abs(S) < ztol:
                    S = 0.0
                try:
                   res.value[i][j] = (self.value[i][j] - S)/res.value[i][i]
                except:
                   raise ValueError("Zero diagonal")
        return res
    
    def CholeskyInverse(self):
        # Computes inverse of matrix given its Cholesky upper Triangular
        # decomposition of matrix.
        res = matrix([[]])
        res.zero(self.dimx, self.dimx)
        
        # Backward step for inverse.
        for j in reversed(range(self.dimx)):
            tjj = self.value[j][j]
            S = sum([self.value[j][k]*res.value[j][k] for k in range(j+1, self.dimx)])
            res.value[j][j] = 1.0/tjj**2 - S/tjj
            for i in reversed(range(j)):
                res.value[j][i] = res.value[i][j] = -sum([self.value[i][k]*res.value[k][j] for k in range(i+1, self.dimx)])/self.value[i][i]
        return res
    
    def inverse(self):
        aux = self.Cholesky()
        res = aux.CholeskyInverse()
        return res
    
    def __repr__(self):
        return repr(self.value)

In [20]:
# Test of using class matrix
my_mat = matrix([[1, 2]])
F = matrix([[3], [1]])
b = my_mat * F
print(type(b))
print(b)

print(F)
print(F.transpose())

<class '__main__.matrix'>
[[5]]
[[3], [1]]
[[3, 1]]


In [27]:
############################################
### Implement the filter function below!
############################################

def kalman_filter(x, P):
    for n in range(len(measurements)):
        
        # measurement update
        # 当前x是上一时刻t-1的predict的t时刻的状态，因此先利用观测值来更新它，同时也更新不确定性矩阵P
        measure = matrix([[measurements[n]]])
        y = measure - H * x                  # 计算t时刻测量值 和 t-1时刻预测的t时刻状态（经测量矩阵变换映射到观测值中）的差距
        s = H * P * H.transpose() + R
        K = P * H.transpose() * s.inverse()  # 计算卡尔曼增益
        x = x + K * y                        # update x
        P = (I - K * H) * P                  # update P

        # prediction
        # 利用测量值更新了当前t时刻的状态之后，再利用运动来预测下一时刻t+1的状态，同时预测下一时刻t+1的不确定性矩阵P
        x = F * x + u                        # update x
        P = F * P * F.transpose()            # update P
        
    return x,P

In [29]:
############################################
### use the code below to test your filter!
############################################

measurements = [1, 2, 3]

x = matrix([[0.], [0.]])               # initial state (location and velocity) 状态包括位置和速度
P = matrix([[1000., 0.], [0., 1000.]]) # initial uncertainty 不确定性矩阵是一个协方差矩阵，位置和速度的互不相关
u = matrix([[0.], [0.]])               # external motion 可忽略，因为外部因素造成的运动为0
F = matrix([[1., 1.], [0, 1.]])        # next state function 状态变换矩阵，F*x得到下一时刻的prediction预测状态.(位置=位置+速度，速度=速度) 
H = matrix([[1., 0.]])                 # measurement function 观测矩阵，只观测得到位置，H*x是将当前状态映射到观测值中
R = matrix([[1.]])                     # measurement uncertainty 测量不确定性矩阵，因为是一维的，所以是1*1
I = matrix([[1., 0.], [0., 1.]])       # identity matrix 单位矩阵

x, P = kalman_filter(x, P)
print(x)
print(P)
# output should be:
# x: [[3.9996664447958645], [0.9999998335552873]]
# P: [[2.3318904241194827, 0.9991676099921091], [0.9991676099921067, 0.49950058263974184]]

[[3.9996664447958645], [0.9999998335552873]]
[[2.3318904241194827, 0.9991676099921091], [0.9991676099921067, 0.49950058263974184]]
