In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal

In [2]:
class KalmanFilter(object):
    def __init__(self, A = None, C = None, Gamma = None, Sigma = None, P = None, u0 = None, V0 = None,x=None):

        if(A is None or C is None):
            raise ValueError("Set proper system dynamics.")

        self.x = x # T x N
        self.M = A.shape[0] # dimension of hidden states
        self.T = self.x.shape[0] # number of observations
        self.N = self.x.shape[1] # number of dimension of the observations

        self.A = A # A is the transition probability matrix, M x M 
        self.C = C # C is the emission probability matrix, N x M
        
        self.Gamma = np.eye(self.M) if Gamma is None else Gamma # Gamma is the covariance matrix of noise term added to the hidden state transition, M x M
        self.Sigma = np.eye(self.M) if Sigma is None else Sigma # Sigma is the covariance matrix of noise term added to the emission, N x N
        
        self.P = np.zeros((self.T, self.M, self.M))
        self.P[:,:,] = np.eye(self.M) if P is None else P # P is an intermediate variable during inference, N x M x M
        self.u = np.zeros((self.T, self.M)) # T x M x 1
        self.V = np.zeros((self.T, self.M, self.M)) # T x M x M
        self.K = np.zeros((self.T, self.M, self.N)) # T x M x N
        self.c = np.zeros((self.T)) # T x 1

        # for backward passing
        self.u_hat = np.zeros((self.T, self.M)) # T x M x 1
        self.V_hat = np.zeros((self.T, self.M, self.M)) # T x M x M
        self.J = np.zeros((self.T, self.M, self.M)) # T x M x M

        self.u0 = u0 # u0 is the initial estimate of the mean of z1, M x 1
        self.V0 = V0 # V0 is the initial estimate of the variance of z1, M x M
        
        S = np.dot(np.dot(self.C, self.V0), self.C.T) + self.Sigma
        J = np.dot(self.C, self.u0)
        I = np.eye(self.M)

        self.K[0] = np.dot(np.dot(self.V0, self.C.T), np.linalg.inv(S))
        self.u[0] = self.u0 + np.dot(self.K[0], self.x[0] - J)
        self.V[0] = np.dot((I - np.dot(self.K[0], self.C)), self.V0)

        self.c[0] = multivariate_normal.pdf(self.x[0], J, S)
    
    def forward(self,i):
        # during inference, u[n], V[n], c[n] are calculated
        
        self.P[i-1] = np.dot(np.dot(self.A, self.V[i-1]), self.A.T) + self.Gamma
        if i < self.T:
            I = np.eye(self.M)
            S = np.dot(np.dot(self.C, self.P[i-1]), self.C.T) + self.Sigma
            J = np.dot(np.dot(self.C, self.A), self.u[i-1])
            
            self.K[i] = np.dot(np.dot(self.P[i-1], self.C.T), np.linalg.inv(S))
            self.u[i] = np.dot(self.A, self.u[i-1]) + np.dot(self.K[i], self.x[i] - J)
            self.V[i] = np.dot((I - np.dot(self.K[i], self.C)), self.P[i-1])
            # print(f'The covariance matrix is: {S}')
            # print(f'The Sigma is {self.Sigma}, The Gamma is {self.Gamma}')
            # print(f'The A is {self.A}, The C is {self.C}, and P[{i-1}] is {self.P[i-1]}, K[{i}] is {self.K[i]}],V[{i-1}] is {self.V[i-1]}]')

            self.c[i] = multivariate_normal.pdf(self.x[i], J, S)

    def backward(self,i):
        self.J[i] = np.dot(np.dot(self.V[i], self.A.T), np.linalg.inv(self.P[i]))
        self.u_hat[i] = self.u[i] + np.dot(self.J[i], self.u_hat[i+1] - np.dot(self.A, self.u[-1]))
        self.V_hat[i] = self.V[i] + np.dot(np.dot(self.J[i], self.V_hat[i+1] - self.P[i]), self.J[i].T)
    def learning(self,M,N):
        self.u0 = self.u_hat[0]
        self.V0 = np.outer(self.u_hat[0], self.u_hat[0].T) - np.outer(self.u_hat[0], self.u_hat[0].T)

        # E[z[n]] : M x 1
        # E[z[n]z[n-1].T] : M x M
        # E[z[n]z[n].T] : M x M

        sub_1 = np.zeros((M,M))
        sub_2 = np.zeros((M,M))
        sub_3 = np.zeros((M,M))
        sub_4 = np.zeros((M,M))
        sub_5 = np.zeros((N,M))
        sub_6 = np.zeros((M,M))
        sub_7 = np.zeros((N,N))
        sub_8 = np.zeros((M,N))
        for i in range(1,self.T,1):
            sub_1 += np.dot(self.J[i-1], self.V_hat[i]) + np.outer(self.u_hat[i],self.u_hat[i-1].T) # z[n]z[n-1]
            sub_2 += self.V_hat[i-1] + np.outer(self.u_hat[i-1], self.u_hat[i-1].T) # z[n-1]z[n-1]
            sub_3 += self.V_hat[i] + np.outer(self.u_hat[i], self.u_hat[i].T) # z[n]z[n]
            sub_4 += np.dot(self.J[i-1], self.V_hat[i]) + np.outer(self.u_hat[i-1],self.u_hat[i].T) #z[n-1]z[n]
    
        for i in range(self.T):
            sub_5 += np.outer(self.x[i], self.u_hat[i].T) # x[n]*E[z[n]]
            sub_6 += self.V_hat[i] + np.outer(self.u_hat[i], self.u_hat[i].T) # z[n]z[n]
            sub_7 += np.outer(self.x[i], self.x[i].T) # x[n]x[n]
            sub_8 += np.outer(self.u_hat[i], self.x[i].T) #E[z[n]]*x[n]

        self.A = np.dot(sub_1, np.linalg.inv(sub_2))
        # self.Gamma = 1/(self.N-1) * (sub_3 - np.dot(self.A, sub_4) - np.dot(sub_1, self.A) + np.dot(np.dot(self.A, sub_2), self.A.T))
        self.Gamma = 1/(self.N-1) * (sub_3 - np.dot(self.A, sub_4) )
        self.C = np.dot(sub_5, np.linalg.inv(sub_6))
        # self.Sigma = 1/self.N * (sub_7 - np.dot(self.C, sub_8) - np.dot(sub_5, self.C) + np.dot(np.dot(self.C, sub_6), self.C))
        self.Sigma = 1/self.N * (sub_7 - np.dot(self.C, sub_8) )




In [3]:
def generate_examples(A, C, Gamma,Sigma,u0,V0,M,N,T):
 
    z = np.zeros((T,M))
    x = np.zeros((T,N))
    # z[0] = np.random.multivariate_normal(u0,V0)
    z[0] = np.array([23.0,24.0,25.0])
    x[0] = np.random.multivariate_normal(np.dot(C,z[0]),Sigma)
    for t in range(1,T,1):
        z[t] = np.random.multivariate_normal(np.dot(A,z[t-1]),Gamma)
        x[t] = np.random.multivariate_normal(np.dot(C,z[t]),Sigma)
    return z,x


In [None]:
def main():
	
	n_states = 3 # M
	n_obs = 2 # N
	n_time = 100 # T
	p_old = -10000
	tol = 0.0001
	max_iter = 100

	# z: T x M
	# x : T x N
	# A = np.array([[0.9, 0.1],[0.5,0.5]])
	# C = np.array([[1, 0],[0.2, 0.8]])
	# Gamma = np.array([[0.1, 0.1], [0.1, 0.1]])
	# Sigma = np.array([[0.5,0.5],[0.5,0.5]])

	A = np.array([[0.75, 0.433, -0.5],[-0.217, 0.875, 0.433],[0.625, -0.217, 0.75]])
	Gamma = np.array([[1.5, 0.1, 0.0], [0.1, 2.0, 0.3], [0.0, 0.3, 1.0]])
	C = np.array([[1.0,1.0,0.0],[0.0,1.0,1.0]])
	Sigma = np.array([[1.0,0.2], [0.2,2.0]])

	u0 = np.array([1,2])
	V0 = np.array([[0.1,0.3],[0.3,0.1]])

	A_init = np.array([[0.5, 0.5],[0.5,0.5]])
	C_init = np.array([[0.5, 0.5],[0.5, 0.5]])
	Gamma_init = np.array([[0.5, 0.9], [0.9, 4.5]])
	Sigma_init = np.array([[0.5, 0.9], [0.9, 2.5]])
	u0_init = np.array([1,2])
	V0_init = np.array([[0.2,0.5],[0.5,0.4]])

	A_init = np.array([[1.0, 0.5, 0.5],[0.5,1.0, 0.5],[0.5,0.5,1.0]])
	C_init = np.array([[1.0,1.0,1.0], [1.0, 1.0,1.0]])
	Gamma_init = np.array([[1.0, 0.5, 0.5], [0.5,1.0, 0.5],[0.5, 0.5, 1.0]])
	Sigma_init = np.array([[1.0,0.5], [0.5,1.0]])
	u0_init = np.array([10.0,10.0,10.0])
	V0_init = np.array([[1.0, 0.5, 0.5], [0.5,1.0, 0.5],[0.5, 0.5, 1.0]])



	z,x = generate_examples(A,C,Gamma,Sigma,u0,V0,n_states,n_obs,n_time)
	kf = KalmanFilter(A = A_init, C = C_init, Gamma = Gamma_init, Sigma = Sigma_init, u0=u0_init, V0=V0_init,x=x)
	
	for ite in range(max_iter):
		print(f'The current iteration is: {ite}')

		for t in range(1,kf.T+1,1):
			kf.forward(t)

		for t in range(kf.T-2,-1,-1):
			kf.backward(t)
		kf.learning(z.shape[1],x.shape[1])
		p = np.sum(np.log(kf.c))
		print(f'The likelihood is {p}')
		if p>p_old and p - p_old < tol:
			break
		p_old = p
	print(kf.A,kf.C,kf.Gamma,kf.Sigma)


if __name__ == '__main__':
    main()

The current iteration is: 0


ValueError: operands could not be broadcast together with shapes (3,3) (2,2) (3,3) 

In [None]:
def is_symmetric_positive_semidefinite(matrix):
    if not np.allclose(matrix, matrix.T):
        return False  # Not symmetric
    eigenvalues = np.linalg.eigvals(matrix)
    return np.all(eigenvalues >= 0)

B = np.array([[35.62538664, 35.93218881],
 [35.93218881, 36.40284164]])
print(is_symmetric_positive_semidefinite(B)) 