## main

In [None]:
from collections import defaultdict,OrderedDict
def stitching_new(
                    #input 
                    source, 
                    target, 
                    labels= None,
                    mytitle = None,
    
                    #add_outlier_removal
                    add_outlier_removal = False,
    
                    #downsampling 
                    downsampling_voxel_size = 0.1,
                    limit_size_point_cloud = 30000,
                  
                    #registration
                    voxel_size = 0.2,
                    mmax_iteration = 10**7,
                    mmax_validation = 0.999,
                  
                    # quality control of the stitching
                    myoverlapping_factor = 0.5,
                    maxnumattempts = 10,
                  
                    # visualization and pringtng parameters
                    trans_init = None, # keep the possibility of starting directly the ICP
                    print_statements = False,
                    save_statements = True,
                    visualization_on = False,
                    final_vis_on = True,
                    params = myparams,  
                    configuration_file = myconfiguration_file
                 ):
    
    #labels
    if labels == None:
        labels = ["source","target"]
    # print title 
    if mytitle == None:
        mytitle = f"stitch_{labels[0]}_{labels[1]}"
    
    print (mytitle)
    
    ## INITIAL VISUALIZATION 
    # Use different colors on the two point clouds
    source.paint_uniform_color([1, 0.706, 0])     #source is yellow
    target.paint_uniform_color([0, 0.651, 0.929]) #target is blue
    # visualize
    if  visualization_on == True: 
        custom_draw_geometry(source+target,
                         mytitle = "initial_"+mytitle,
                         params = params,  
                         configuration_file = configuration_file, 
                         rotate = True)

        
    # count number of point in point clouds
    points_source,points_target = get_num_points([source,target],print_statement = False)
    
    
    #ADDITIONAL OUTLIER REMOVAL
    #Outlier removal for two point clouds separately
    if add_outlier_removal == True:
        if print_statements == True:
            print ("\noutlier removal")
        source, outlier_index_source = source.remove_radius_outlier(
                                                      nb_points=16,
                                                      radius=0.45)

        target, outlier_index_target = target.remove_radius_outlier(
                                                      nb_points=16,
                                                      radius=0.5)
    
    
    
    
    
    ## DOWNSAMPLING BIG CLOUDS
    # if pointclouds are bigger than limit_size_point_cloud then 
    if print_statements == True:
        print ("\nDOWNSAMPLING BIG CLOUDS")
    processed_list, downsampling = downsample_big_clouds(
                                                          [source,target],                     
                                                          labels= labels,
                                                          downsampling_voxel_size = downsampling_voxel_size,
                                                          limit_size_point_cloud = limit_size_point_cloud,
                                                          print_statements = print_statements,
                                                          visualization_on = visualization_on
                                                         )
    processed_source, processed_target = processed_list 
    
    # visualize if there has been any downsampling 
    if  visualization_on == True and downsampling == True: 
        custom_draw_geometry(processed_source+processed_target,
                             mytitle = "downsampled_"+mytitle,
                             params = params,  
                             configuration_file = configuration_file, 
                             rotate = True,
                             onewindow = True)
    


    # DATA PREPARATION
    if print_statements == True:
        print ("\nDATA PREPARATION")
    new_source, new_target, source_down, target_down, source_fpfh, target_fpfh = prepare_dataset_new(processed_source,
                                                                                                     processed_target,
                                                                                                     voxel_size, 
                                                                                                     mytitle =mytitle,
                                                                                                     print_statements = print_statements
                                                                                                      )

    ## QUALITY STATEMENT 
    # initialization 
    overlapping_points = np.zeros(1) 
    numattempts = 0
    
    # quality condition
    points_source,points_target = get_num_points([source,target],print_statement = False)
    valuetoreach = myoverlapping_factor*(points_target)
    if print_statements == True:
        print ("value to reach: ",valuetoreach)
        print (f"source has {points_source} points")
        print (f"target has {points_target} points")
    quality_needs_improvement = (len(np.asarray(overlapping_points)) < valuetoreach)
    
    #create dict of all results
    all_results = defaultdict (list)
    
    ## while the quality condition or the max number of attempts is not satisfied
    # either we get a stitch of the quality that we want in terms of overlapping points
    # either we try a number of times and take the best in terms of overlapping points
    while (numattempts <= maxnumattempts-1) and (quality_needs_improvement):
        numattempts += 1
        
        if print_statements == True:
            print ("-"*100)
            print (f"ATTEMPT {numattempts}")
        
        this_attempt_list_results = list()
    
        #GLOBAL REGISTRATION: execute ransac
        result_ransac = execute_global_registration(source_down, target_down,
                                                    source_fpfh, target_fpfh,
                                                    voxel_size= voxel_size,
                                                    max_iteration = mmax_iteration,
                                                    max_validation = mmax_validation,
                                                    print_statements = print_statements
                                                   )
        #print result
        if print_statements == True:
            print (result_ransac)
            print("Transformation is:")
            print(result_ransac.transformation)
        #visualize
        if visualization_on == True: 
            draw_registration_result(source, target, 
                                     result_ransac.transformation,
                                     title = "global registration-%s"%mytitle,
                                    )
        # append the just obtained one in this_attempt_list_results
        this_attempt_list_results.append(result_ransac)

                
    

        #LOCAL REGISTRATION: execute icp
        result_icp = refine_registration(source, target, 
                                         source_fpfh, target_fpfh,
                                         voxel_size= voxel_size,
                                         mytranformation =this_attempt_list_results[-1].transformation,
                                         print_statements = print_statements
                                        )
        
        points_source,points_target = get_num_points([source,target],print_statement = print_statements)
        overlapping_points = result_icp.correspondence_set
        quality_needs_improvement = (len(np.asarray(overlapping_points)) < valuetoreach)
        
        # print result
        if print_statements == True:
            print(result_icp)
            print("Transformation is:")
            print(result_icp.transformation)
            print (f"Quality of the stitch needs improvment: {quality_needs_improvement}")
            
       #append the just obtained one in this_attempt_list_results
        this_attempt_list_results.append(result_icp)

        
        #add the two new registration results to the dictionary of attempts
        all_results[numattempts] = this_attempt_list_results
            
    
    if quality_needs_improvement == False:
        if print_statements == True:
            print ("\nRESULT: Built stitch of expected quality")
        best_transformation = all_results[numattempts][1].transformation
        
        
    else: 
        ## sort results stored in the dictionary by the value of overlapping points        
        index_best_attempt,best_registration_icp,best_transformation,list_all_results_ordered= extract_best_result_and_transformation(all_results)
        
        
    if final_vis_on == True: 
        draw_registration_result(source, target, 
                                 best_transformation,
                                 title = "last fit registration-%s"%mytitle
                                )
    
    #build the final newpointcloud and eventually save it
    newpointcloud = save_registration_result(source, target, best_transformation, 
                                             mytitle, 
                                             save_result = save_statements,
                                             visualize_result = False)
        
    
    return new_source, new_target, newpointcloud ,all_results

## data preparation

In [None]:
def preprocess_point_cloud(pcd_down, voxel_size,pprint_statements = False):
    
    #pcd_down = pcd.voxel_down_sample(voxel_size)
    radius_normal = voxel_size * 2
    radius_feature = voxel_size * 5
    
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=230))
    
    pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=230))
    
    if pprint_statements== True: 
        print("\n Downsample with a voxel size %.3f." % voxel_size)
        print("Estimate normal with search radius %.3f." % radius_normal)
        print("Compute FPFH feature with search radius %.3f." % radius_feature)

    return pcd_down, pcd_fpfh

def prepare_dataset(source,target,voxel_size = None,mytitle = "", print_statements = False):
    """
    source and target are already downsampled 
    """

    source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
    target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)

    
    #outlier removal
    if print_statements== True:
        print ("removing outliers")
    processed_source, outlier_index = source.remove_radius_outlier(
                                              nb_points=25,
                                              radius=0.5)

    processed_target, outlier_index = target.remove_radius_outlier(
                                              nb_points=25,
                                              radius=0.5)

    return source, target, source_down, target_down, source_fpfh, target_fpfh

## Global and icp

In [None]:
def execute_global_registration(source_down, target_down, source_fpfh,
                                target_fpfh, voxel_size,
                                max_iteration,
                                max_validation,
                                print_statements = False,
                               ):
    distance_threshold = voxel_size *1.5
    
    if print_statements== True: 
        print("\nGLOBAL REGISTRATION: RANSAC registration on downsampled point clouds.")
        #print("   Since the downsampling voxel size is %.3f," % voxel_size)
        #print("   we use a liberal dista¢nce threshold %.3f." % distance_threshold)
    result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source_down, target_down, source_fpfh, target_fpfh, True,
        distance_threshold,
        o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
        3, [
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(
                0.99),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(
                distance_threshold)
        ], o3d.pipelines.registration.RANSACConvergenceCriteria(max_iteration,max_validation))
    

    return result


#icp
def refine_registration(source, target, 
                        source_fpfh, target_fpfh, 
                        voxel_size,
                        mytranformation = None,
                        print_statements = False
                       ):
    
    distance_threshold = voxel_size *2
    
    if print_statements== True: 
        print("\nPOINT-TO-PLANE ICP registration is applied on original point")
        print("distance threshold %.3f." % distance_threshold)
        
    #if type(mytranformation) != "numpy.ndarray":
        #mytranformation = np.identity(4)
        
    
    radius_normal = voxel_size * 5
    
    source.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=230))
    target.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=230))
    
    result = o3d.pipelines.registration.registration_icp(
        source, target, distance_threshold, mytranformation,
        o3d.pipelines.registration.TransformationEstimationPointToPlane())
    return result


## Helper functions

In [None]:
def get_num_points(list_pointclouds, print_statement = False):
    num_points = []
    n_pc = list(range(len(list_pointclouds)))
    for i in n_pc:
        points = len(np.asarray(list_pointclouds[i].points))
        num_points.append(points)
    #print("")
    if print_statement == True:
        #print("")
        print("number of points in clouds")
        print (*zip(n_pc,num_points), sep = "\n")
    return num_points

In [None]:
def save_registration_result(source, target, transformation,
                             
                             #visualization parameters
                             title = "", mytuples = None, 
                             params = None, #camera parameters,json file (P)
                             fov_step  = None, 
                             configuration_file = None, #object properties ,json file (O)
                             rotate = False,
                             
                             save_result = True,
                             visualize_result = False):
    
    dt_string = mytimestamp()
    filename = dt_string+'-stitch_'+title+'.pcd'
    
    # apply the chosen transformation to source and target
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.paint_uniform_color([1, 0.706, 0])
    target_temp.paint_uniform_color([0, 0.651, 0.929])
    source_temp.transform(transformation)
    
    # combine them and create the newpoint cloud
    newpointcloud = source_temp + target_temp
    #newpointcloud.paint_uniform_color([0,0.5,0.1])
    
    #save
    if save_result == True: 
        o3d.io.write_point_cloud(filename, newpointcloud)
    
    #visualize
    if visualize_result == True:
        
        custom_draw_geometry(newpointcloud,
                     mytitle = title,
                     params = params,  # parameter for camera point view, json file via pressing P
                     configuration_file = configuration_file, # configuration file for properties, json file via pressing o
                     rotate = rotate
                            )
        
        
        
    return newpointcloud

In [None]:
def extract_best_result_and_transformation(all_results):
    
    all_results_ordered= OrderedDict(sorted(all_results.items (),key=lambda t: len(np.asarray(t[1][1].correspondence_set))))
    # make it into a list
    list_all_results_ordered= list ((k,v) for k,v in all_results_ordered.items())
    # take the last one (with the highest correspondence set)
    best_result = list_all_results_ordered[-1]
    # get the registration out
    index_best_attempt = best_result[0]
    best_registration_icp = best_result[1][1]
    best_transformation = best_registration_icp.transformation 
    return index_best_attempt,best_registration_icp,best_transformation,list_all_results_ordered