In [130]:
from glob import glob
import cv2, skimage, os
from skimage import io
import numpy as np
from scipy.spatial import distance
import scipy
import matplotlib.pyplot as plt
from tqdm import tqdm


class OdometryClass:
    def __init__(self, frame_path):
        self.frame_path = frame_path
        self.frames = sorted(glob(os.path.join(self.frame_path, 'images', '*')))
        with open(os.path.join(frame_path, 'calib.txt'), 'r') as f:
            lines = f.readlines()
            self.focal_length = float(lines[0].strip().split()[-1])
            lines[1] = lines[1].strip().split()
            self.pp = (float(lines[1][1]), float(lines[1][2]))
            
        with open(os.path.join(self.frame_path, 'gt_sequence.txt'), 'r') as f:
            self.pose = [line.strip().split() for line in f.readlines()]
        
        ###### Initialze #####
        self.img = {}
        initialize = np.array(self.pose[0]).reshape(3,4).astype('float64') 
        self.current_R = initialize[:3,:3]
        self.current_t = initialize[:,3].reshape(3,1)
        
    def imread(self, fname):
        """
        read image into np array from file
        """
        return cv2.imread(fname, 0)

    def imshow(self, img):
        """
        show image
        """
        skimage.io.imshow(img)

    def get_gt(self, frame_id):
        pose = self.pose[frame_id]
        x, y, z = float(pose[3]), float(pose[7]), float(pose[11])
        return np.array([[x], [y], [z]])

    def get_scale(self, frame_id):
        '''Provides scale estimation for mutliplying
        translation vectors
        
        Returns:
        float -- Scalar value allowing for scale estimation
        '''
        prev_coords = self.get_gt(frame_id - 1)
        curr_coords = self.get_gt(frame_id)
        return np.linalg.norm(curr_coords - prev_coords)

    def get_sift_data(self, img, num_matches):
        sift = cv2.SIFT_create(num_matches)
        kp, des = sift.detectAndCompute(img, None)
        kp = np.array([k.pt for k in kp])
        return kp, des
    
    def get_orb_data(self, img):
        orb = cv2.ORB_create()
        kp, des = orb.detectAndCompute(img, None)
        kp = np.array([k.pt for k in kp])
        return kp, des

    def plot_inlier_matches(self, ax, img1, img2, inliers):
        res = np.hstack([img1, img2])
        ax.set_aspect('equal')
        ax.imshow(res, cmap='gray')   
        ax.plot(inliers[:,2], inliers[:,3], '+r')
        ax.plot(inliers[:,0] + img1.shape[1], inliers[:,1], '+r')
        ax.plot([inliers[:,2], inliers[:,0] + img1.shape[1]],
                [inliers[:,3], inliers[:,1]], 'r', linewidth=0.4)
        ax.axis('off')
    def get_best_matches(self, img1, img2, num_matches):
        #kp1, des1 = self.get_sift_data(img1, num_matches)
        #kp2, des2 = self.get_sift_data(img2, num_matches)
        kp1, des1 = self.get_orb_data(img1)
        kp2, des2 = self.get_orb_data(img2)
        dist = scipy.spatial.distance.cdist(des1, des2, 'sqeuclidean')
        idx = np.argpartition(dist, num_matches, axis = None) #getting the first 100 lowest dist, flatten dist in the process
        kp1_pos = idx[:num_matches]//kp2.shape[0] #retrieving kp1 from the flatten list
        kp2_pos = idx[:num_matches]%kp2.shape[0] #retrieving kp2 from the flatten list
        return np.hstack([kp2[kp2_pos],kp1[kp1_pos]])
    
    def bf_matcher(self, img1, img2, num_matches):
        orb = cv2.ORB_create()
        kp1, des1 = orb.detectAndCompute(img1, None)
        kp2, des2 = orb.detectAndCompute(img2, None)
        bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck = True)
        matches = bf.match(des1,des2)
        matches = sorted(matches, key = lambda x:x.distance)
        matches = matches[:num_matches]
        list_kp1 = [kp1[mat.queryIdx].pt for mat in matches] 
        list_kp2 = [kp2[mat.trainIdx].pt for mat in matches]
        return np.hstack([list_kp1, list_kp2])
    
    def update(self, last_image, current_image, scale):
        self.img["current"] = current_image
        best_matches = self.get_best_matches(last_image, current_image, 2000)
        #best_matches = self.bf_matcher(last_image, current_image, 2000)
        q1 = best_matches[:, :2]
        q2 = best_matches[:,2:]
        E, mask = cv2.findEssentialMat(q1, q2,
                                      focal = self.focal_length, pp = self.pp,
                                      method = cv2.RANSAC, prob = 0.999, threshold = 1.0)
        _, R,t, mask = cv2.recoverPose(E, q1, q2,
                                      focal = self.focal_length, pp = self.pp)

        a = (scale * self.current_R.dot(t)).reshape(3,1)
        if (scale > 0.1):
            self.current_t += a
            self.current_R = R.dot(self.current_R)
        self.img["last"] = self.img["current"]
        return self.current_R, self.current_t
    
    def run(self):
        ############### LOAD DATA #######################
        num_images = len(self.pose)
        predictions = np.zeros((num_images, 3))
        spose = self.pose[0]
        predictions[0] = np.vstack([np.array([float(spose[3]), float(spose[7]), float(spose[11])])])
        for i in tqdm(range(num_images)):
            if i == 0:
                continue
            else:
                last_img = self.imread(self.frames[i-1])
                current_img = self.imread(self.frames[i])
                R,t = self.update(last_img, current_img, self.get_scale(i))
                pred = np.hstack([R,t]).reshape(-1,1)
                predictions[i] = np.hstack([pred[3], pred[7], pred[11]])
        np.save('predictions.npy', predictions)
        print("prediction", predictions)
        ############## ODOMETRY ########################
        """
        Uses the video frame to predict the path taken by the camera
        
        The reurned path should be a numpy array of shape nx3
        n is the number of frames in the video  
        """
        pred_coords = np.zeros((1,3))
        for pose in self.pose[:num_images]:
            pred_coords = np.vstack([pred_coords, np.array([float(pose[3]), float(pose[7]), float(pose[11])])])
        gt = pred_coords[1:, :]
        #print(gt)
        
        ############## EVALUATION #######################
        with open('video_train/gt_sequence.txt', 'r') as f:
            data = [line.strip().split() for line in f.readlines()]
        path = np.load('./predictions.npy')
        # Evaluation based on average mean squared error between the predicted
        # coordinates and the ground truth coordinates
        mse_scores = []
        for pred_coord, pose in zip(path, data):
            gt_coord = np.array([float(pose[3]), float(pose[7]), float(pose[11])])
            mse_scores.append(np.linalg.norm(pred_coord - gt_coord))

        # You can expect to get less than 40 MSE
        mse_scores = np.mean(mse_scores)
        print('avg_mse scores', mse_scores)
        #################################################
        return predictions


if __name__=="__main__":
    frame_path = 'video_train'
    odemotryc = OdometryClass(frame_path)
    path = odemotryc.run()
    print(path,path.shape)

100%|██████████| 801/801 [05:39<00:00,  2.36it/s]

prediction [[-5.55111500e-17 -3.33066900e-16  2.22044600e-16]
 [ 8.42128448e-02  3.28181658e-02  9.57672840e-01]
 [ 7.53596018e-02  1.40758245e-02  1.92077186e+00]
 ...
 [ 3.12281343e+02  2.51655885e+01  2.72333384e+02]
 [ 3.12281343e+02  2.51655885e+01  2.72333384e+02]
 [ 3.12281343e+02  2.51655885e+01  2.72333384e+02]]
avg_mse scores 73.1011901704272
[[-5.55111500e-17 -3.33066900e-16  2.22044600e-16]
 [ 8.42128448e-02  3.28181658e-02  9.57672840e-01]
 [ 7.53596018e-02  1.40758245e-02  1.92077186e+00]
 ...
 [ 3.12281343e+02  2.51655885e+01  2.72333384e+02]
 [ 3.12281343e+02  2.51655885e+01  2.72333384e+02]
 [ 3.12281343e+02  2.51655885e+01  2.72333384e+02]] (801, 3)



