In [1]:
import cv2
import numpy as np
import os
from scipy.optimize import least_squares
from tomlkit import boolean
from tqdm import tqdm
import matplotlib.pyplot as plt

In [None]:
class ImageProcessor():
    def __init__(self, data_folder:str, scaling_factor:float):
        # Load camera intrinsic parameters
        with open(data_folder + '\\calib.txt') as f:
            self.camera_matrix = np.array(list((map(lambda x:list(map(lambda x:float(x), x.strip().split(' '))), f.read().split('\n')))))
            self.image_paths = []
        
        # Load the images
        for img_file in sorted(os.listdir(data_folder)):
            if img_file.endswith('.jpg') or img_file.endswith('.png'):
                self.image_paths.append(os.path.join(data_folder, img_file))
        
        self.current_path = os.getcwd()
        self.scale_factor = scaling_factor
        self.apply_scaling()

    def apply_scaling(self) -> None:
        '''
        Apply scaling to the camera intrinsic parameters based on the scale factor
        '''
        self.camera_matrix[0, 0] /= self.scale_factor
        self.camera_matrix[1, 1] /= self.scale_factor
        self.camera_matrix[0, 2] /= self.scale_factor
        self.camera_matrix[1, 2] /= self.scale_factor
    
    def downscale_image(self, image):
        for _ in range(1, int(self.scale_factor / 2) + 1):
            image = cv2.pyrDown(image)
        return image


In [None]:
class StructureFromMotion():
    def __init__(self, data_folder:str, scaling_factor:float = 2.0) -> None:
        self.image_processor = ImageProcessor(data_folder, scaling_factor)

    def triangulate_points(self, points_2d_1, points_2d_2, proj_matrix_1, proj_matrix_2) -> tuple:
        point_cloud = cv2.triangulatePoints(points_2d_1, points_2d_2, proj_matrix_1.T, proj_matrix_2.T)
        return proj_matrix_1.T, proj_matrix_2.T, (point_cloud / point_cloud[3])

    def find_pose(self, object_points, image_points, intrinsic_matrix, dist_coeffs, rotation_vector, initial) -> tuple:
        if initial == 1:
            object_points = object_points[:, 0, :]
            image_points = image_points.T
            rotation_vector = rotation_vector.T

        _, rot_vector_est, tran_vector, inliers = cv2.solvePnPRansac(object_points, image_points, intrinsic_matrix, dist_coeffs, cv2.SOLVEPNP_ITERATIVE)
        rotation_matrix, _ = cv2.Rodrigues(rot_vector_est)

        if inliers is not None:
            image_points = image_points[inliers[:, 0]]
            object_points = object_points[inliers[:, 0]]
            rotation_vector = rotation_vector[inliers[:, 0]]

        return rotation_matrix, tran_vector, image_points, object_points, rotation_vector

    def compute_reprojection_error(self, object_points, image_points, transformation_matrix, intrinsic_matrix, homogenity) -> tuple:
        rotation_matrix = transformation_matrix[:3, :3]
        translation_vector = transformation_matrix[:3, 3]
        rotation_vector, _ = cv2.Rodrigues(rotation_matrix)

        if homogenity == 1:
            object_points = cv2.convertPointsFromHomogeneous(object_points.T)

        projected_image_points, _ = cv2.projectPoints(object_points, rotation_vector, translation_vector, intrinsic_matrix, None)
        projected_image_points = np.float32(projected_image_points[:, 0, :])
        total_error = cv2.norm(projected_image_points, np.float32(image_points.T) if homogenity == 1 else np.float32(image_points), cv2.NORM_L2)

        return total_error / len(projected_image_points), object_points

    def compute_optimal_reprojection_error(self, object_points) -> np.array:
        transformation_matrix = object_points[0:12].reshape((3, 4))
        intrinsic_matrix = object_points[12:21].reshape((3, 3))
        remaining_len = int(len(object_points[21:]) * 0.4)
        p = object_points[21:21 + remaining_len].reshape((2, int(remaining_len / 2))).T
        object_points = object_points[21 + remaining_len:].reshape((int(len(object_points[21 + remaining_len:]) / 3), 3))
        rotation_matrix = transformation_matrix[:3, :3]
        translation_vector = transformation_matrix[:3, 3]
        rotation_vector, _ = cv2.Rodrigues(rotation_matrix)
        projected_image_points, _ = cv2.projectPoints(object_points, rotation_vector, translation_vector, intrinsic_matrix, None)
        projected_image_points = projected_image_points[:, 0, :] 
        errors = [(p[idx] - projected_image_points[idx]) ** 2 for idx in range(len(p))]
        return np.array(errors).ravel() / len(p)





In [None]:
class BundleAdjustmentUtility():
    def bundle_adjust(self, object_points, optim_values, transformation_matrix_new, intrinsic_matrix, reprojection_error) -> tuple:
        opt_variables = np.hstack((transformation_matrix_new.ravel(), intrinsic_matrix.ravel()))
        opt_variables = np.hstack((opt_variables, optim_values.ravel()))
        opt_variables = np.hstack((opt_variables, object_points.ravel()))

        corrected_values = least_squares(self.compute_optimal_reprojection_error, opt_variables, gtol=reprojection_error).x
        intrinsic_matrix = corrected_values[12:21].reshape((3, 3))
        remaining_len = int(len(corrected_values[21:]) * 0.4)
        return (
            corrected_values[21 + remaining_len:].reshape((int(len(corrected_values[21 + remaining_len:]) / 3), 3)),
            corrected_values[21:21 + remaining_len].reshape((2, int(remaining_len / 2))).T,
            corrected_values[0:12].reshape((3, 4))
        )

    def export_to_ply(self, path, point_cloud, colors) -> None:
        out_points = point_cloud.reshape(-1, 3) * 200
        out_colors = colors.reshape(-1, 3)
        verts = np.hstack([out_points, out_colors])

        mean = np.mean(verts[:, :3], axis=0)
        scaled_verts = verts[:, :3] - mean
        dist = np.sqrt(scaled_verts[:, 0] ** 2 + scaled_verts[:, 1] ** 2 + scaled_verts[:, 2] ** 2)
        idx = np.where(dist < np.mean(dist) + 300)
        verts = verts[idx]
        
        ply_header = '''ply
            format ascii 1.0
            element vertex %(vert_num)d
            property float x
            property float y
            property float z
            property uchar blue
            property uchar green
            property uchar red
            end_header
            '''
        with open(path + '\\point_clouds\\' + self.image_processor.image_paths[0].split('\\')[-2] + '.ply', 'w') as f:
            f.write(ply_header % dict(vert_num=len(verts)))
            np.savetxt(f, verts, '%f %f %f %d %d %d')

    def find_common_points(self, points_1, points_2, points_3) -> tuple:
        common_points_1 = []
        common_points_2 = []

        for i in range(points_1.shape[0]):
            matching_indices = np.where(np.all(points_2 == points_1[i, :], axis=1))
            if matching_indices[0].size != 0:
                common_points_1.append(i)
                common_points_2.append(matching_indices[0][0])

        mask_array_1 = np.ma.array(points_2, mask=False)
        mask_array_1.mask[common_points_2] = True
        mask_array_1 = mask_array_1.compressed().reshape(-1, 2)

        mask_array_2 = np.ma.array(points_3, mask=False)
        mask_array_2.mask[common_points_2] = True
        mask_array_2 = mask_array_2.compressed().reshape(-1, 2)

        return np.array(common_points_1), np.array(common_points_2), mask_array_1, mask_array_2

In [None]:

class StructureFromMotionPipeline():
    def find_features(self, image_0, image_1) -> tuple:
        sift = cv2.xfeatures2d.SIFT_create()
        key_points_0, desc_0 = sift.detectAndCompute(cv2.cvtColor(image_0, cv2.COLOR_BGR2GRAY), None)
        key_points_1, desc_1 = sift.detectAndCompute(cv2.cvtColor(image_1, cv2.COLOR_BGR2GRAY), None)

        bf = cv2.BFMatcher()
        matches = bf.knnMatch(desc_0, desc_1, k=2)
        feature = []
        for m, n in matches:
            if m.distance < 0.70 * n.distance:
                feature.append(m)

        return np.float32([key_points_0[m.queryIdx].pt for m in feature]), np.float32([key_points_1[m.trainIdx].pt for m in feature])

    def process(self, enable_bundle_adjustment:boolean=False):
        cv2.namedWindow('image', cv2.WINDOW_NORMAL)
        pose_array = self.image_processor.camera_matrix.ravel()
        transform_matrix_0 = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0]])
        transform_matrix_1 = np.empty((3, 4))
    
        pose_0 = np.matmul(self.image_processor.camera_matrix, transform_matrix_0)
        pose_1 = np.empty((3, 4)) 
        total_points = np.zeros((1, 3))
        total_colors = np.zeros((1, 3))

        image_0 = self.image_processor.downscale_image(cv2.imread(self.image_processor.image_paths[0]))
        image_1 = self.image_processor.downscale_image(cv2.imread(self.image_processor.image_paths[1]))

        feature_0, feature_1 = self.find_features(image_0, image_1)

        essential_matrix, em_mask = cv2.findEssentialMat(feature_0, feature_1, self.image_processor.camera_matrix, method=cv2.RANSAC, prob=0.999, threshold=0.4, mask=None)
        feature_0 = feature_0[em_mask.ravel() == 1]
        feature_1 = feature_1[em_mask.ravel() == 1]

        _, rot_matrix, tran_matrix, em_mask = cv2.recoverPose(essential_matrix, feature_0, feature_1, self.image_processor.camera_matrix)
        feature_0 = feature_0[em_mask.ravel() > 0]
        feature_1 = feature_1[em_mask.ravel() > 0]
        transform_matrix_1[:3, :3] = np.matmul(rot_matrix, transform_matrix_0[:3, :3])
        transform_matrix_1[:3, 3] = transform_matrix_0[:3, 3] + np.matmul(transform_matrix_0[:3, :3], tran_matrix.ravel())

        pose_1 = np.matmul(self.image_processor.camera_matrix, transform_matrix_1)

        feature_0, feature_1, points_3d = self.structure_from_motion.triangulate_points(pose_0, pose_1, feature_0, feature_1)
        error, points_3d = self.structure_from_motion.compute_reprojection_error(points_3d, feature_1, transform_matrix_1, self.image_processor.camera_matrix, homogenity = 1)
        print("REPROJECTION ERROR: ", error)
        _, _, feature_1, points_3d, _ = self.structure_from_motion.find_pose(points_3d, feature_1, self.image_processor.camera_matrix, np.zeros((5, 1), dtype=np.float32), feature_0, initial=1)

        total_images = len(self.image_processor.image_paths) - 2 
        pose_array = np.hstack((np.hstack((pose_array, pose_0.ravel())), pose_1.ravel()))

        threshold = 0.5
        for i in tqdm(range(total_images)):
            image_2 = self.image_processor.downscale_image(cv2.imread(self.image_processor.image_paths[i + 2]))
            features_cur, features_2 = self.find_features(image_1, image_2)

            if i != 0:
                feature_0, feature_1, points_3d = self.structure_from_motion.triangulate_points(pose_0, pose_1, feature_0, feature_1)
                feature_1 = feature_1.T
                points_3d = cv2.convertPointsFromHomogeneous(points_3d.T)
                points_3d = points_3d[:, 0, :]
            
            cm_points_0, cm_points_1, cm_mask_0, cm_mask_1 = self.structure_from_motion.find_common_points(feature_1, features_cur, features_2)
            cm_points_2 = features_2[cm_points_1]
            cm_points_cur = features_cur[cm_points_1]

            rot_matrix, tran_matrix, cm_points_2, points_3d, cm_points_cur = self.structure_from_motion.find_pose(points_3d[cm_points_0], cm_points_2, self.image_processor.camera_matrix, np.zeros((5, 1), dtype=np.float32), cm_points_cur, initial = 0)
            transform_matrix_1 = np.hstack((rot_matrix, tran_matrix))
            pose_2 = np.matmul(self.image_processor.camera_matrix, transform_matrix_1)

            error, points_3d = self.structure_from_motion.compute_reprojection_error(points_3d, cm_points_2, transform_matrix_1, self.image_processor.camera_matrix, homogenity = 0)
        
            cm_mask_0, cm_mask_1, points_3d = self.structure_from_motion.triangulate_points(pose_1, pose_2, cm_mask_0, cm_mask_1)
            error, points_3d = self.structure_from_motion.compute_reprojection_error(points_3d, cm_mask_1, transform_matrix_1, self.image_processor.camera_matrix, homogenity = 1)
            print("Reprojection Error: ", error)
            pose_array = np.hstack((pose_array, pose_2.ravel()))

            if enable_bundle_adjustment:
                points_3d, cm_mask_1, transform_matrix_1 = self.structure_from_motion.bundle_adjustment(points_3d, cm_mask_1, transform_matrix_1, self.image_processor.camera_matrix, threshold)
                pose_2 = np.matmul(self.image_processor.camera_matrix, transform_matrix_1)
                error, points_3d = self.structure_from_motion.compute_reprojection_error(points_3d, cm_mask_1, transform_matrix_1, self.image_processor.camera_matrix, homogenity = 0)
                print("Bundle Adjusted error: ",error)
                total_points = np.vstack((total_points, points_3d))
                points_left = np.array(cm_mask_1, dtype=np.int32)
                color_vector = np.array([image_2[l[1], l[0]] for l in points_left])
                total_colors = np.vstack((total_colors, color_vector))
            else:
                total_points = np.vstack((total_points, points_3d[:, 0, :]))
                points_left = np.array(cm_mask_1, dtype=np.int32)
                color_vector = np.array([image_2[l[1], l[0]] for l in points_left.T])
                total_colors = np.vstack((total_colors, color_vector)) 

            transform_matrix_0 = np.copy(transform_matrix_1)
            pose_0 = np.copy(pose_1)
            plt.scatter(i, error)
            plt.pause(0.05)

            image_0 = np.copy(image_1)
            image_1 = np.copy(image_2)
            feature_0 = np.copy(features_cur)
            feature_1 = np.copy(features_2)
            pose_1 = np.copy(pose_2)
            cv2.imshow(self.image_processor.image_paths[0].split('\\')[-2], image_2)
            if cv2.waitKey(1) & 0xff == ord('q'):
                break

        cv2.destroyAllWindows()

        print("Saving .ply file")
        print(total_points.shape, total_colors.shape)
        self.structure_from_motion.export_to_ply(self.image_processor.path, total_points, total_colors)

In [None]:
if __name__ == '__main__':
    sfm_pipeline = StructureFromMotionPipeline()
    sfm_pipeline.process()
