In [None]:
import pandas as pd
import numpy as np 
import time 
import g2o 
import cv2
import os
import matplotlib.pyplot as plt

# data loader

In [None]:
folder_path='sample_data/Room_back_home-2024-12-29_19-50-26/'
folder_path_img = folder_path+'/Camera'

gyro = pd.read_csv(folder_path+'Gyroscope.csv')
gyro['time_diff_s'] = gyro['time'].diff().fillna(0) / 1e9
gyro['roll'] = (gyro['x']*gyro['time_diff_s']).cumsum()
gyro['pitch'] = (gyro['y']*gyro['time_diff_s']).cumsum()
gyro['yaw'] = (gyro['z']*gyro['time_diff_s']).cumsum()
gyro.drop(['x','y','z'],axis=1,inplace=True)

acc = pd.read_csv(folder_path+'/Accelerometer.csv')

motion = pd.merge(gyro, acc, on=['time','seconds_elapsed'],how='inner')
motion.rename(columns={'x': 'ax','y': 'ay','z': 'az'}, inplace=True)


image_filenames = sorted(os.listdir(folder_path_img))[1:-1]
image_timestamps = [float(filename.split('.')[0]) for filename in image_filenames] 

def synchronize_data(imu_data, image_timestamps):
    synchronized_data = []
    for img_time in image_timestamps:
        # Find closest IMU timestamp
        closest_imu_index = (np.abs(imu_data['time'] - img_time*1e6)).argmin()
        synchronized_data.append({
            'image_time': img_time,
            'imu_roll': imu_data.iloc[closest_imu_index]['roll'],
            'imu_pitch': imu_data.iloc[closest_imu_index]['pitch'],
            'imu_yaw': imu_data.iloc[closest_imu_index]['yaw'],
            'imu_ax': imu_data.iloc[closest_imu_index]['ax'],
            'imu_ay': imu_data.iloc[closest_imu_index]['ay'],
            'imu_az': imu_data.iloc[closest_imu_index]['az'],
            'image_path': os.path.join(folder_path_img, image_filenames[image_timestamps.index(img_time)])
        })
    return pd.DataFrame(synchronized_data)

synchronized_synchronized_df = synchronize_data(motion, image_timestamps)
synchronized_synchronized_df['time_diff_s'] = synchronized_synchronized_df['image_time'].diff().fillna(0) / 1e3

In [None]:
def load_image(path):
    return cv2.imread(path)

def extract(img, nPoints):
    """ 
    extract features of an image
    """
    orb = cv2.ORB_create()
 
    # Detection
    pts = cv2.goodFeaturesToTrack(np.mean(img, axis=-1).astype(np.uint8), nPoints, qualityLevel=0.01, minDistance=10)

 
    # Extract key points
    kps = [cv2.KeyPoint(f[0][0], f[0][1], 20) for f in pts]
    kps, des = orb.compute(img, kps)
 
    return np.array([(kp.pt[0], kp.pt[1]) for kp in kps]), des

def add_ones(x):
    """ 
    helper function to add ones to have right format of normalized coordinates (TODO: understand better)
    """
    # creates homogenious coordinates given the point x
    return np.concatenate([x, np.ones((x.shape[0], 1))], axis=1)

def normalize(Kinv, pts):
    """
    transform image coordinates to normalized coordinates
    """
    # The inverse camera intrinsic matrix 𝐾^(−1) transforms 2D homogeneous points 
    # from pixel coordinates to normalized image coordinates. 
    return np.dot(Kinv, add_ones(pts).T).T[:, 0:2]

def un_normalize(K, normalized_pts):
    """
    Transform normalized coordinates back to image coordinates
    """
    # Add homogeneous coordinate
    homogeneous_pts = add_ones(normalized_pts)
    
    # Apply camera intrinsic matrix K
    image_pts = np.dot(K, homogeneous_pts.T).T
    
    # Remove homogeneous coordinate and return 2D image coordinates
    return image_pts[:, :2]

class Frame(object):
    """ 
    Class that has all the frame data
    """
    def __init__(self, img, K, id, pointsPerImg):
        self.id = id   # unique identifier
        self.K = K     # Intrinsic camera matrix
        self.Kinv = np.linalg.inv(self.K)  # Inverse of the intrinsic camera matrix
 
        pts, self.des = extract(img, pointsPerImg)             # Extract feature points and descriptors from the image
        self.pts = normalize(self.Kinv, pts)     # Normalizes feature points to normalized coordinates

In [None]:
class UniquePointsFullListMatching(object):
    """ 
    this class tracks all unique points in entire frame, it consists of 3 index parallel lists
    descriptor: list of unique descriptors in entire sequence
    frameIds:   list of frame ids (list of lists), index parallel to descriptor (one descriptor multiple frame ids)
    normPoints: list of normalized points per frame (list of lists), also index parallel to descriptor (one descriptor, multiple frames with a point per frame)
    """
    def __init__(self):
        self.descriptor = [] # (descriptor, [frame_ids])
        self.frameIds   = []
        self.normPoints = [] #normalized point per frame id
        self.initial3dEstimates = []
        self.cameraPosition = [] # x,y,z
        self.cameraVelocity = []
        self.cameraAngle = []    # roll, pitch, yaw
        self.optimizer = g2o.SparseOptimizer()
        solver = g2o.BlockSolverSE3(g2o.LinearSolverCSparseSE3())
        solver = g2o.OptimizationAlgorithmLevenberg(solver)
        self.optimizer.set_algorithm(solver)
        self.pose_id = 0
        self.landmark_id = 0
        self.K = np.array([[730, 0, 620], [0, 730, 360], [0, 0, 1]])

    def processFrame(self, row):
        f = load_image(row.image_path)
        self.addImuData2D(row)

        if( not self.descriptor):
            for idx, des in enumerate(f.des):
                self.addNewPoint(des, f.id, f.pts[idx])
        else:
            self.calcAssociations(f)

    def addImuData2D(self, row):
        # assume data is 2d only for now (no motion in y)
        if not self.cameraPosition:
            vx = 0
            vy = 0
        else:
            idx = len(self.cameraPosition)-1
            vx,vy,vz = self.cameraVelocity[idx]
            x,y,z = self.cameraPosition[idx]

        dvx = row.ax * row.time_diff_s
        dvz = row.az * row.time_diff_s

        new_velocity = [vx + dvx, vy, vz + dvz]
        dx = new_velocity[0] * row.time_diff_s
        dz = new_velocity[2] * row.time_diff_s
        new_position = [x + dx, y, z + dz]

        self.cameraVelocity.append(new_velocity)
        self.cameraPosition.append(new_position)
        self.cameraAngle.append([0, row.pitch,0])

        self.add_pose_vertex(new_position)

        # calc relative pose
        if not self.cameraPosition:
            dpitch = row.pitch - self.cameraAngle[idx][0]

            # Create relative pose
            relative_pose = g2o.SE3Quat()
            relative_pose.set_translation(np.array([dx, 0, dz]))
            relative_pose.set_rotation(g2o.Quaternion(np.array([0, dpitch, 0])))
            information = np.identity(6)  # Example information matrix
            self.add_imu_edge(idx, idx + 1, relative_pose, information)

        
    def calcAssociations(self, f):
        # The code performs k-nearest neighbors matching on feature descriptors
        bf = cv2.BFMatcher(cv2.NORM_L2)
        matches = bf.knnMatch(np.array(self.descriptor), f.des, k=2)    

        matched_indices = set()
        for i, (m, n) in enumerate(matches):
            if m.distance < 0.5 * n.distance:
                p1 = self.normPoints[m.queryIdx][-1]
                p2 = f.pts[m.trainIdx]

                if np.linalg.norm((p1-p2)) < 0.1:
                    self.associate(m.queryIdx, f.id, p2)
                    matched_indices.add(m.trainIdx)

        # Process unmatched points
        for i in range(len(f.des)):
            if i not in matched_indices:
                # Add new point
                self.addNewPoint(f.des[i],f.id, f.pts[i]) 
        
    def triangulate(self, frameId1, frameId2, p1, p2):
        R1 = cv2.Rodrigues(np.array(self.cameraAngle[frameId1]))[0]
        t1 = np.array(self.cameraPosition[frameId1])
        R2 = cv2.Rodrigues(np.array(self.cameraAngle[frameId2]))[0]
        t2 = np.array(self.cameraPosition[frameId1])

        # Compute the essential matrix
        E = np.hstack((R2.dot(R1.T), t2[:, None] - R2.dot(R1.T).dot(t1)[:, None]))

        # Triangulate the 3D point
        return cv2.triangulatePoints(E, p1, p2)



    def associate(self, uniquePtIdx, frameId, newPoint):
        self.frameIds[uniquePtIdx].append(frameId)  # add new frameid to associated point
        self.normPoints[uniquePtIdx].append(newPoint)  # add new normalized point to associated point

    def addNewPoint(self, des, frameId, point):
        self.descriptor.append(des)
        self.frameIds.append([frameId])
        self.normPoints.append([point])

    def add_pose_vertex(self, pose):
        v_se3 = g2o.VertexSE3()
        v_se3.set_id(self.pose_id)
        v_se3.set_estimate(pose)
        self.optimizer.add_vertex(v_se3)
        self.pose_id += 1

    def add_landmark_vertex(self, point):
        v_point = g2o.VertexSBAPointXYZ()
        v_point.set_id(self.landmark_id)
        v_point.set_estimate(point)
        v_point.set_marginalized(True)
        self.optimizer.add_vertex(v_point)
        self.landmark_id += 1

    def add_imu_edge(self, pose_id1, pose_id2, relative_pose, information):
        edge = g2o.EdgeSE3()
        edge.set_vertex(0, self.optimizer.vertex(pose_id1))
        edge.set_vertex(1, self.optimizer.vertex(pose_id2))
        edge.set_measurement(relative_pose)
        edge.set_information(information)
        self.optimizer.add_edge(edge)

    def add_visual_edge(self, pose_id, landmark_id, imagePoint, information):
        """
        add an image point to the optimizer, pose_id is the camera pose, landmark_id is the 3D point estimate
        """
        edge = g2o.EdgeProjectXYZ2UV()
        edge.set_vertex(0, self.optimizer.vertex(landmark_id))
        edge.set_vertex(1, self.optimizer.vertex(pose_id))
        edge.set_measurement(imagePoint)
        edge.set_information(information)
        self.optimizer.add_edge(edge)

    def optimize(self, iterations=10):
        self.optimizer.initialize_optimization()
        self.optimizer.optimize(iterations)

In [None]:
slam = UniquePointsFullListMatching()


# Add landmarks using visual features
for feature in visual_features:
    point = triangulate_point(feature)
    slam.add_landmark_vertex(point)

# Add edges for IMU data
for i in range(len(imu_dataset) - 1):
    relative_pose = compute_relative_pose(imu_dataset[i], imu_dataset[i + 1])
    information = np.identity(6)  # Example information matrix
    slam.add_imu_edge(i, i + 1, relative_pose, information)

# Add edges for visual features
for imagePoint in visual_observations:
    pose_id = imagePoint.pose_id
    landmark_id = imagePoint.landmark_id
    measurement = imagePoint.measurement
    information = np.identity(2)  # Example information matrix
    slam.add_visual_edge(pose_id, landmark_id, measurement, information)

# Optimize the graph
slam.optimize()