In [3]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt

In [12]:
def load_data_camera(K_csv, C_csv):
    """Loads data from the specified path"""
    K = pd.read_csv(K_csv, header=None).to_numpy()
    # print(pd.read_csv(K_csv))
    C = pd.read_csv(C_csv, header=None).to_numpy()
    return K, C

def load_measurement(meas_csv):
    """Loads data from the specified path"""
    meas = pd.read_csv(meas_csv,header=None).to_numpy()
    return meas

def load_transformations(R_csv, T_csv):
    """Loads data from the specified path"""
    R = pd.read_csv(R_csv,header=None).to_numpy()
    T = pd.read_csv(T_csv,header=None).to_numpy()
    return R, T

In [109]:
# Camera 1
K1, C1 = load_data_camera('data_csv/Kf_1.csv', 'data_csv/C_1.csv')
z1 = load_measurement('data_csv/z_1.csv')

# Camera 2
K2, C2 = load_data_camera('data_csv/Kf_2.csv', 'data_csv/C_2.csv')
z2 = load_measurement('data_csv/z_2.csv')

# load the transformations
R, T = load_transformations('data_csv/R.csv', 'data_csv/T.csv')

In [110]:
def EKF(P, cov, H, Q, R, z, pred_z):
    pred_P = P.reshape(-1,1)
    pred_cov = cov + Q

    V = z.reshape(-1,1) - pred_z.reshape(-1,1)
    S =  H @ pred_cov @ H.T + R

    K = pred_cov @ H.T @ np.linalg.inv(S)
    # print(K.shape, V.shape)
    P = pred_P - K @ V
    cov = (np.eye(3) - K @ H) @ pred_cov

    return P, cov


# Sequential EKF
Sequentially predict and update on multiple measurements

In [117]:
P1 = np.array([1, 1, 1])
cov1 = np.eye(3) * 0.1

Q1 = np.eye(3) * 1000
Q2 = np.eye(3) * 0

R1 = np.eye(2) * 1000
R2 = np.eye(2) * 1000

for z_1, z_2 in zip(z1, z2):
    # print(P1.shape, cov1.shape)
    P1= P1.reshape(-1,)
    pred_z = (1/P1[2]) * K1@P1[:2].reshape(-1,1) + C1
    # measurement jacobian every time step
    H1 = K1 @ np.array([[1/P1[2], 0 , -P1[0]/(P1[2]**2)],[0, 1/P1[2], -P1[1]/(P1[2]**2)]])
    P1, cov1 = EKF(P1, cov1, H1, Q1, R1, z_1, pred_z)

    trans_P = (R.T@ P1 - R.T@T).reshape(-1,)
    # print(trans_P.shape)
    pred_z = (1/trans_P[2]) * K2 @ trans_P[:2].reshape(-1,1) + C2
    # measurement jacobian every time step
    # print(trans_P)

    
    H2 = K2 @ np.array([[1/trans_P[2], 0 , -trans_P[0]/(trans_P[2]**2)],[0, 1/trans_P[2], -trans_P[1]/(trans_P[2]**2)]]) @ R.T
    P1, cov1 = EKF(P1, cov1, H2, Q2, R2, z_2, pred_z)

print(P1, cov1)

[[ 6.16865431e+02]
 [ 1.55900564e+03]
 [-1.49481155e-02]] [[ 1.86521591e+03  4.71398352e+03  9.20001777e-03]
 [ 4.71398352e+03  1.19136467e+04  2.32517965e-02]
 [ 9.20017402e-03  2.32517733e-02 -2.12316884e-03]]


# Batch EKF
Stack multiple measurements to get better predictions

In [119]:
P = P1 #np.vstack((P1.reshape(3,1), P2.reshape(3,1)))
cov= cov1
H = np.vstack((H1, H2))
R_noise = np.eye(4) * 1000 # Block matrix : [[R1, 0], [0, R2]]
Q_noise = Q1 + Q2

for z_1, z_2 in zip(z1, z2):
    # print (P.shape, cov.shape, H.shape, R_noise.shape, Q_noise.shape)
    P = P.reshape(-1,)
    # prediction from the first camera
    pred_z1 = (1/P[2]) * K1@P[:2].reshape(-1,1) + C1
    # Its measurement jacobian
    H1 = K1 @ np.array([[1/P[2], 0 , -P[0]/(P[2]**2)],[0, 1/P[2], -P[1]/(P[2]**2)]])

    # prediction from the second camera
    # print(R.shape, T.shape, P.shape)
    trans_P = R.T@ P - R.T@T
    trans_P = trans_P.reshape(-1,)
    pred_z2 = (1/trans_P[2]) * K2 @ trans_P[:2].reshape(-1,1) + C2
    # Its measurement jacobian
    H2 = K2 @ np.array([[1/trans_P[2], 0 , -trans_P[0]/(trans_P[2]**2)],[0, 1/trans_P[2], -trans_P[1]/(trans_P[2]**2)]]) @ R.T

    H = np.vstack((H1, H2))
    pred_z = np.vstack((pred_z1, pred_z2))

    z = np.vstack((z_1.reshape(-1,1), z_2.reshape(-1,1)))
    P, cov = EKF(P, cov, H, Q_noise, R_noise, z, pred_z)

    

print(P)
print(cov)

[[1.73255760e+03]
 [2.52256323e+03]
 [1.76695336e-06]]
[[ 8.06482740e+03  1.24535181e+04 -5.81692363e-03]
 [ 1.24535181e+04  2.06416590e+04 -1.05828272e-04]
 [-1.01522083e-03 -2.08127039e-03 -6.73125223e+03]]
