In [43]:
import numpy as np
import scipy
from pr3_utils import *
from scipy.sparse import csr_matrix
from scipy.sparse import eye
from scipy.sparse.linalg import inv

# Load the measurements
dataset = '03'#10 or 03
filename = "../data/"+dataset+".npz"
ts,features,linear_velocity,angular_velocity,K,b,imu_T_cam = load_data(filename)

In [44]:
print("ts:", ts.shape)
print("features:", features.shape)
print("linear_velocity:", linear_velocity.shape)
print("angular_velocity:", angular_velocity.shape)
print("K:", K)
print("b:", b)
print("imu_T_cam:", imu_T_cam)


ts: (1, 1010)
features: (4, 5105, 1010)
linear_velocity: (3, 1010)
angular_velocity: (3, 1010)
K: [[552.554261   0.       682.049453]
 [  0.       552.554261 238.769549]
 [  0.         0.         1.      ]]
b: 0.6
imu_T_cam: [[ 0.03717833 -0.09861822  0.9944306   1.5752681 ]
 [ 0.99926755 -0.00535534 -0.03789026  0.00439141]
 [ 0.00906218  0.99511094  0.09834688 -0.65      ]
 [ 0.          0.          0.          1.        ]]


## Optional Downsampling of Data 

In [None]:
np.random.seed(0)
Num_landmarks = 1500 # recomend 3000 for dataset 10 and 1500 for dataset 3 
random_indices = np.random.choice(features.shape[1], size=Num_landmarks, replace=False)
# Downsample the array
features = features[:, random_indices, :]
print("features:", features.shape)

1. [15 pts] IMU localization via EKF prediction: Implement an EKF prediction step based on      
the SE(3) kinematics equations and the linear and angular velocity measurements from the IMU to         
estimate the pose Tt ∈SE(3) of the IMU over time t.     
        

In [None]:

zeta = np.zeros((6,linear_velocity.shape[1]))
zeta[3:6,:] = angular_velocity
zeta[0:3,:] = linear_velocity


In [None]:
# (a) IMU Localization via EKF Prediction 
# mu
T_list = [np.eye(4)]
for i in range(1,zeta.shape[1]):
    T = T_list[-1] @ axangle2pose(zeta[:,i]*(ts[:,i]-ts[:,i-1]))
    T_list.append(T)

T_list = np.array(T_list).transpose(1,2,0)

visualize_trajectory_2d(T_list)


In [None]:
# sigma 
W = np.diag([1e-1,1e-2,1e-2,1e-5,1e-5,1e-4])
sigma = [np.zeros((6,6))]
for i in range(1,zeta.shape[1]):
    s = scipy.linalg.expm(axangle2adtwist(-zeta[:,i])*(ts[:,i]-ts[:,i-1]))
    sigma.append(s @ sigma[-1] @ s.T + W)
sigma = np.array(sigma).transpose(1,2,0)

2. [15 pts] Landmark mapping via EKF update: Assume that the predicted IMU trajectory from part     
(a) is correct and focus on estimating the landmark positions m ∈R3×M of the landmarks observed in      
the images. There are many visual feature meaasurements in the dataset but you do not need to use all       
to obtain good results. You can think of ways to keep the computational complexity manageable. You      
should implement an EKF with the unknown landmark positions m ∈ R3×M as a state and perform     
EKF update steps using the visual observations zt to keep track of the mean and covariance of m.        
We are assuming that the landmarks m are static, and so it is not necessary to implement an EKF     
prediction step for them. Since the sensor does not move sufficiently along the z-axis, the estimation      
of the z coordinate of the landmarks will not be very good. This is expected and you should not worry       
about it. Focus on estimating the landmark x and y coordinates well     

### Create Ks

In [None]:
Ks = np.zeros((4,4))
Ks[:3,:3] = K
Ks[2,:3]= K[0,:3]
Ks[3,:3]= K[1,:3]
Ks[2,3] = -K[0,0]*b
print(Ks)

In [None]:
k_inv = np.linalg.inv(K)

def stereo_triangulation(k_inv, b, z):
    '''
    
    :param K_inv: (3,3) Inverse of camera matrix
    :param b: distance between the cameras
    :param z: (4,1) 2D homogenous image coordinates

    :return: (3,1) 3D homogenous point in the world frame
    
    '''
    assert z.shape == (4,1)
    z1 = np.zeros((3,1))
    z1[0:2,:] = z[:2].reshape(2,1)
    z1[2,0] = 1

    z2 = np.zeros((3,1))
    z2[0:2,:] = z[2:].reshape(2,1)
    z2[2,0] = 1
    if z1[0,0] < z2[0,0]:
        return np.zeros((3,1))
    
    z1 = k_inv@z1
    z2 = k_inv@z2
    p = np.array([b,0,0]).reshape(1,3)

    bo = z1 - z2
    m = (b**2/(p@bo))*z1

    if m[2,0]<0:
        return np.array([0,0,0]).reshape(3,1)
    
    return m

def camera_to_world(T, m):
    '''
    
    :param T: (4,4) Transformation matrix
    :param m: (3,1) 3D homogenous point in the camera frame

    :return: (3,1) 3D homogenous point in the world frame
    
    '''
    assert T.shape == (4,4)
    assert m.shape == (3,1)
    if np.linalg.norm(m) > 400:
        return np.array([0,0,0]).reshape(3,1)
    
    m1 = np.vstack((m,1))
    m2 = T@imu_T_cam@m1
    m3 = m2[:3]
    return m3

def new_xyz(xyz,T,delt,i):
    assert T.shape == (4,4)

    # Get columns where the second position is not [-1,-1,-1,-1]
    feat = features[:, :, i]
    filtered_columns = np.where(feat[2, :] != -1)[0]
    if len(filtered_columns) == 0:
        filtered_columns = np.where(feat[1, :] != -1)[0]
        if len(filtered_columns) == 0:
            filtered_columns = np.where(feat[0, :] != -1)[0]
        if len(filtered_columns) == 0:
            print("No features found",i)

    for j in filtered_columns:
        n = stereo_triangulation(k_inv, b, feat[:,j].reshape(4,1))
        if np.linalg.norm(xyz[3*j:3*j+3]) == 0:
            xyz[3*j:3*j+3] = camera_to_world(T, n.reshape(3,1))
        else:
            # average the two values 
            xyz[3*j:3*j+3] = (xyz[3*j:3*j+3] + camera_to_world(T, n.reshape(3,1)))/2
        
        if j not in delt[i]:
            delt[i].append(j)
            
    return xyz

def plot_estimates(pose,landmarks,old_pose=None):

    fig,ax = plt.subplots()
    n_pose = pose.shape[2]
    ax.plot(pose[0,3,:],pose[1,3,:],'r-',label='Estimated')
    ax.scatter(pose[0,3,0],pose[1,3,0],marker='s',label="start")
    ax.scatter(pose[0,3,-1],pose[1,3,-1],marker='o',label="end")
    if old_pose is not None:
        ax.plot(old_pose[0,3,:],old_pose[1,3,:],'g-',label='Original')
        ax.scatter(old_pose[0,3,0],old_pose[1,3,0],marker='s',label="start")
        ax.scatter(old_pose[0,3,-1],old_pose[1,3,-1],marker='o',label="end")
   
    a = landmarks.reshape(-1,3).T
    a = a[:,np.where(np.linalg.norm(a,axis=0) <4000)[0]]
    ax.scatter(a[0], a[1], s=1)

    plt.show()

    

# Landmark initialization 

In [None]:
xyz = np.zeros((3*features.shape[1],1))
delt = {}
for i in range(features.shape[2]):
    delt[i] = []

for i,T in enumerate(T_list.transpose(2,0,1)):
    xyz = new_xyz(xyz,T,delt,i)

plot_estimates(T_list,xyz)


In [None]:

Ks = csr_matrix(Ks)

def jac_pi(x):
    '''
    :param x: (4,1) vector
    '''
    x = x.reshape(4)
    jacobian = np.array([[1/x[2], 0,-x[0]/(x[2]**2), 0],
                            [0, 1/x[2],-x[1]/(x[2]**2), 0],
                                [0,0,0,0],
                                    [0, 0, -x[3]/(x[2]**2), 1/x[2]]])
    return csr_matrix(jacobian)

def pi(x):
    '''
    :param x: (4,1) vector
    '''
    return x/x[2]

def Map_Ht_plus(H,oTi,Tt,mu_t,t):
    '''
    :param oTi: (4,4) transformation matrix
    :param Tt: (4,4) Next transformation matrix
    :param mu_t: (3*M,1) landmark points in the world frame
    :param t: time index

    :return: (4*Nt,3*M) Ht matrix
    '''
    assert mu_t.shape[0] == 3*features.shape[1]
    assert Tt.shape == (4,4)
    assert oTi.shape == (4,4)

    T= csr_matrix(oTi@inversePose(Tt))
    
    mu_t = mu_t.reshape(-1,3)
    mu_t = mu_t[delt[t]].T

    mu = np.vstack((mu_t, np.ones((1,mu_t.shape[1]))))
    
    P = csr_matrix((3,4))
    P[:3,:3] = eye(3,format='csr')
   
    for j,i in enumerate(delt[t]):
        H[j*4:j*4+4,i*3:i*3+3] = Ks@jac_pi(T@(mu[:,j].reshape(4,1)))@T@P.T
        
    return H

def K_plus(nH,sigma_t):
    '''
    :param V: noise 
    :param nH: next_Ht(...)
    :param sigma_t: (3M,3M) covariance matrix

    :return: (3M,4Nt) Kalman gain
    '''

    if nH.shape[0] == 0:
        print('nH broken shape:',nH.shape)
        return None

    IV = 4*eye(nH.shape[0],format='csr')
    SH = sigma_t@nH.T
    K = SH @ inv(nH @ SH+ IV)

    return K

def ztilda(T,mu,t):
    '''
    :param T: (4,4) Next transformation matrix
    :param mu: (3,feature.shape[1]) landmark points in the world frame
    :param t: time index

    :return: (4*N,1) z_tilda
    '''

    assert T.shape == (4,4)
    assert mu.shape[0] == 3*features.shape[1]

    mu = mu.reshape(-1,3)
    mu = mu[delt[t]].T

    mu = np.vstack((mu, np.ones((1,mu.shape[1]))))
    z_tilda = np.zeros((4*mu.shape[1],1))

    for j in range(mu.shape[1]):
         z_tilda[j*4:j*4+4] = Ks@pi(T@mu[:,j].reshape(4,1))
        
    #print('z_tilda shape:',z_tilda.shape)
    return z_tilda

def Map_mu_plus(oTi,Tt,nK,z_t,mu_t,t):
    '''
    :param Tt: (4,4) Next transformation matrix
    :param oTi: (4,4) transformation matrix
    :param nK: next_K(...)
    :param z_t: (4,3000) 4D pixel coordinates
    :param mu_t: (3*feature.shape[1],1) estimated landmark points in the world frame
    :param t: time index

    :return: (3*M,1) next mu
    '''
    assert Tt.shape == (4,4)
    assert oTi.shape == (4,4)
    assert mu_t.shape[0] == 3*features.shape[1]
    
    if nK is None:
        return mu_t
    
    T= csr_matrix(oTi@inversePose(Tt))

    z_t = z_t.T
    z_t = z_t[delt[t]]
    z_t = z_t.reshape(z_t.shape[0]*z_t.shape[1],1)
    
    mu_t += (nK @ (z_t - ztilda(T,mu_t,t)))
    return mu_t

def sigma_plus(nxK,sigma_t,nxH):
    '''
    :param nxK: next_K(...)
    :param sigma_t: (3M,3M) covariance matrix
    :param nxH: next_Ht(...)

    :return: (3M,3M) next sigma
    '''
    if nxK is None:
        return sigma_t
    s = sigma_t
    nsig =s - nxK@nxH@s
    
    return nsig
  


# EKF Landmark update 

In [None]:
oTi = inversePose(imu_T_cam)

sigma_prior = 2*eye(3*features.shape[1],format='csr')
mu_t = xyz.copy()

for t in range(1,T_list.shape[2]):
    nz_t = features[:,:,t]
    nxT = T_list[:,:,t]
    H = csr_matrix((4*len(delt[t-1]),3*features.shape[1]))

    nxH = Map_Ht_plus(H,oTi,nxT,mu_t,t-1)

    nxK = K_plus(nxH,sigma_prior)

    mu_t = Map_mu_plus(oTi,nxT,nxK,nz_t,mu_t,t-1)

    sigma_prior = sigma_plus(nxK,sigma_prior,nxH)

    if t%100 == 0:
        print(t,len(delt[t]))

  
    
plot_estimates(T_list,mu_t)


3. [20 pts] Visual-inertial SLAM: Combine the IMU prediction step from part (a) with the landmark   
update step from part (b) and implement an update step for the IMU pose Tt ∈SE(3), based on the      
stereo-camera observation model, to obtain a complete visual-inertial SLAM algorithm.       


In [None]:
def hat(x):
    x = x.reshape(3)
    return np.array([[0,-x[2],x[1]],[x[2],0,-x[0]],[-x[1],x[0],0]])

def targetfunct(x):
    assert x.shape == (4,1) or x.shape == (4,)
    x = x.reshape(4)
    A = np.zeros((4,6))
    A[:3,:3] = np.eye(3)
    A[:3,3:] = -hat(x[:3])
    return csr_matrix(A)

def loc_Ht_plus(H,oTi,T_plus,mu_t,t):
    '''
    :param oTi: (4,4) inverse of optical_T_imu
    :param T_plus: (4,4) current pose matrix
    :param mu_t: (3*M,1) landmark points in the world frame
    :param t: time index

    return: (4Nt,6) Ht matrix
    '''
    assert oTi.shape == (4,4)
    assert T_plus.shape == (4,4)
    assert mu_t.shape[0] == 3*features.shape[1]

    mu_t = mu_t.reshape(-1,3)
    mu_t = mu_t[delt[t]].T
    mu = np.vstack((mu_t, np.ones((1,mu_t.shape[1]))))

    # precompute operations 
    inv_T_plus = inversePose(T_plus)
    T = oTi@inv_T_plus

    for j in range(len(delt[t])):
        H[j*4:j*4+4,-6:] = -Ks@jac_pi( T @ (mu[:,j].reshape(4,1)) ) @ oTi @ targetfunct(inv_T_plus @ mu[:,j].reshape(4,1))
                           
    return H

def mu_plus(oTi,T_plus,nK,z_t,mu_t,t):
    '''
    :param oTi: (4,4) inverse of optical_T_imu
    :param T_plus: (4,4) current pose matrix
    :param nK: next_K(...)
    
    '''
    assert oTi.shape == (4,4)
    assert T_plus.shape == (4,4)
    assert mu_t.shape[0] == 3*features.shape[1]
    

    if nK is None:
        return mu_t, T_plus

    z_t = z_t.T
    z_t = z_t[delt[t]]
    z_t = z_t.reshape(z_t.shape[0]*z_t.shape[1],1)

    oTw = csr_matrix(oTi@inversePose(T_plus))
    A = nK@(z_t - ztilda(oTw,mu_t,t))
    T = T_plus@axangle2pose((A[-6:,0].reshape(6))*(ts[:,t+1]-ts[:,t]))
    mu =mu_t+ A[:-6].reshape(-1,1)
    return mu , T


# EKF SLAM

In [None]:
oTi = csr_matrix(inversePose(imu_T_cam))

sigma_prior = 2*eye(3*features.shape[1]+6,format='csr') #3M+6
sigma_prior[-6:,-6:] = csr_matrix(np.diag([1e-4,1e-4,1e-4,1e-5,1e-5,1e-5]))
W = csr_matrix(np.diag([1e-4,1e-4,1e-4,1e-8,1e-8,1e-8]))
    
mu_t = np.zeros((3*features.shape[1],1)) # landmarks 
poses = [np.eye(4)]# Trajectories 


for t in range(1,T_list.shape[2]):
    mu_t = new_xyz(mu_t,poses[-1],delt,t)
    pos = poses[-1] @ axangle2pose(zeta[:,t]*(ts[:,t]-ts[:,t-1]))

    F = csr_matrix(scipy.linalg.expm(axangle2adtwist(-zeta[:,t])*(ts[:,t]-ts[:,t-1])))
    sigma_prior[:,-6:]=sigma_prior[:,-6:]@F.T
    sigma_prior[-6:,:]=F@sigma_prior[-6:,:]
    sigma_prior[-6:,-6:]+= W

    if len(delt[t-1]) == 0:
        poses.append(pos)
        continue

    H = csr_matrix((4*len(delt[t-1]),3*features.shape[1]+6))
    H = Map_Ht_plus(H,oTi,pos,mu_t,t-1) # updates (4Nt,:3M)
    H = loc_Ht_plus(H,oTi,pos,mu_t,t-1) # updates (4Nt,3M:)

    # Compute Kalman gain
    K = K_plus(H,sigma_prior) #shape (3M+6,4Nt)
    mu_t, temp_pose = mu_plus(oTi,pos,K,features[:,:,t],mu_t,t-1)

    # Update pose
    poses.append(temp_pose) # temp pose

    # Update sigma
    sigma_prior = sigma_plus(K,sigma_prior,H)
    if (t%100 == 0 and dataset=='03') or (t%300==0 and dataset=='10'):
        plot_estimates(np.array(poses).transpose(1,2,0),mu_t, T_list)
   
print('Complete Plot')
plot_estimates(np.array(poses).transpose(1,2,0),mu_t, T_list)
#900 at 648min

In [None]:
plot_estimates(np.array(poses).transpose(1,2,0),mu_t)