This notebook aims to test different registration methods using the Ground Truth points that were manually detected on EM and LM images.

#### Loading the Ground truth files in XML

In [None]:
import os
import numpy as np
import bigfish.stack as stack
import bigfish.plot as plot
from utils import xml_to_dataframe, dataframe_to_xml


Load the locations for fiducial particles in EM and LM. The locations were manually detected using ICY software and were saved in XML format. These point locations are considered to be the ground truth. For further processing we need to load the XML format and convert it into Pandas dataframe.

In [None]:
target_xml = 'Ground_truth_fiducials_EM.xml'
#target_xml = 'Ground_truth_fiducials_EM_only_clusters.xml'
target_df = xml_to_dataframe(target_xml)
print(target_df)

source_xml = 'Ground_truth_fiducials_LM.xml'

#source_xml = 'Ground_truth_fiducials_LM_only_clusters.xml'
source_df_small = xml_to_dataframe(source_xml)   # small means it is smaller resolution than target, we need to scale it up
print(source_df_small)

EM and LM images have most probably different sizes. LM image is usually much smaller. The points that were detected in LM image coordinate system needs to be rescaled to EM image coordinate system, otherwise it might be very hard with the registration.

In [None]:
def find_the_scale(EM_shape, LM_shape):
    scale_x = EM_shape[0]/LM_shape[0]
    scale_y = EM_shape[1]/LM_shape[1]
    return scale_x, scale_y

In [None]:
# load the EM and LM images

input_path = "D:/DATA/AI4Life_Pr26/20240805_Trial_data_fiducial_particles/240723_JB294_CLEM-AI4life_sample1/pos1"

EM_image_path = os.path.join(input_path, "240726_JB295_HEK293_CLEM_LAMP1-488_Particles-555_grid4_pos1_bin4_EM.tif")
LM_image_path  = os.path.join(input_path, "240726_JB295_HEK293_CLEM_LAMP1-488_Particles-555_grid4_pos1_LM.tif")

EMimage = stack.read_image(EM_image_path)
LMimage_small = stack.read_image(LM_image_path)


# Find what is the scaling rate between the 2 images
scale_y, scale_x = find_the_scale(EMimage.shape, LMimage_small.shape)

print("Scale x: ",scale_x)
print("Scale y: ",scale_y)


# Resize the LM point positions
source_df = xml_to_dataframe(source_xml)
source_df['pos_x'] = source_df_small['pos_x']*scale_x
source_df['pos_y'] = source_df_small['pos_y']*scale_y


# Resize the LM image to fit the position of the resized points
LMimage = stack.resize_image(LMimage_small, EMimage.shape, method='bilinear')
#EMimage_small = stack.resize_image(EMimage, LMimage_small.shape, method='bilinear')

# save the resized EM image and LM image
#output_path = "E:/DATA/AI4Life_Pr26/20240805_Trial_data_fiducial_particles/240723_JB294_CLEM-AI4life_sample1/pos1"
#output_path = os.path.join(output_path, "240726_JB295_HEK293_CLEM_LAMP1-488_Particles-555_grid4_pos1_bin4_EM_resized.tif")
#stack.save_image(EMimage_small, output_path)
#output_path = os.path.join(output_path, "240726_JB295_HEK293_CLEM_LAMP1-488_Particles-555_grid4_pos1_LM_resized.tif")
#stack.save_image(LMimage, output_path)

#scale_x = 22.966165413533833
#scale_y = 24.873456790123456
print(LMimage.shape)

Converting the Panda dataframe into numpy array (2D points (X,Y) and 3D points (X,Y,Z)). The 3D points are needed if we plan to convert them to point clouds later

In [None]:
# Converting dataframe into numpy array of coordinate pairs (X,Y) 
target = target_df[['pos_x', 'pos_y']].to_numpy()
source = source_df[['pos_x', 'pos_y']].to_numpy()
source_small = source_df_small[['pos_x', 'pos_y']].to_numpy()

# Adding the Z-dimension to the 2D points
target3D = np.hstack((target, np.zeros((len(target), 1))))
source3D = np.hstack((source, np.zeros((len(source), 1))))
source3D_small = np.hstack((source_small, np.zeros((len(source_small), 1))))

Plotting the point coordinates on top of the image to double check the correct placement

In [None]:
#plot.plot_detection(LMimage_small[:,:,1], source_small, contrast=True)
plot.plot_detection(LMimage[:,:,1], (source[:, [1, 0]]), shape="circle", radius = 3*scale_y, color = "red", linewidth = 1, fill=False, contrast=True) 
plot.plot_detection(EMimage, (target[:, [1, 0]]), radius = 3*scale_y, contrast=False)

Converting points in numpy array (X,Y,Z) into points clouds and saving them as .ply files

In [None]:
# I want to write target to a .txt file called "target_all_2D.txt"

#np.savetxt("target_all_2D.txt", target, fmt='%f', delimiter=' ')
#np.savetxt("source_all_2D.txt", source, fmt='%f', delimiter=' ')

#np.savetxt("target_2D.txt", target, fmt='%f', delimiter=' ')
#np.savetxt("source_2D.txt", source, fmt='%f', delimiter=' ')

In [None]:
import open3d as o3d

# Convert NumPy array to Open3D PointCloud
target_pcd = o3d.geometry.PointCloud()
target_pcd.points = o3d.utility.Vector3dVector(target3D)
source_pcd = o3d.geometry.PointCloud()
source_pcd.points = o3d.utility.Vector3dVector(source3D)

# Visualize the point cloud
#o3d.visualization.draw_geometries([target_pcd])
#o3d.visualization.draw_geometries([source_pcd])

# Assuming 'point_cloud' is your Open3D PointCloud object
#o3d.io.write_point_cloud("target_all.ply", target_pcd)
#o3d.io.write_point_cloud("source_all.ply", source_pcd)

## Testing different registration methods provided by Probreg package

Define functions for visualisation of the results and plotting transformations

In [None]:
import pandas as pd
import numpy as np

# Visualize results
def visualize_result_nparray(source, target, result, title):
    source_cloud = o3d.geometry.PointCloud()
    source_cloud.points = o3d.utility.Vector3dVector(source)
    source_cloud.paint_uniform_color([1, 0, 0])  # Red

    target_cloud = o3d.geometry.PointCloud()
    target_cloud.points = o3d.utility.Vector3dVector(target)
    target_cloud.paint_uniform_color([0, 1, 0])  # Green

    result_cloud = o3d.geometry.PointCloud()
    result_cloud.points = o3d.utility.Vector3dVector(result)
    result_cloud.paint_uniform_color([0, 0, 1])  # Blue

    o3d.visualization.draw_geometries([source_cloud, target_cloud, result_cloud], window_name=title)

def visualize_result_pcd(source, target, result, title):
    source.paint_uniform_color([1, 0, 0])  # Red
    target.paint_uniform_color([0, 1, 0])  # Green
    result.paint_uniform_color([0, 0, 1])  # Blue
    o3d.visualization.draw_geometries([source, target, result], window_name=title)

# Print transformations
def print_transformations(transformation_paramters, title):
    print(title)
    print(transformation_paramters.rot)  # Rotation matrix
    print(transformation_paramters.t)    # Translation vector
    print(transformation_paramters.scale)  # Scale factor

    # Evaluate alignment using chamfer distance
def chamfer_distance(A, B, title):
    distances = np.min(np.sum((A[:, np.newaxis, :] - B[np.newaxis, :, :]) ** 2, axis=2), axis=1)
    print(f"Chamfer distance = {np.mean(distances)} ({title}) ")

    return np.mean(distances)

def convert_to_pcd(nparray):
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(nparray)
    return pcd

def save_correspondences_in2df(source, target, correspondences):
    # create empty dataframe
    source_df = pd.DataFrame()
    target_df = pd.DataFrame()

    # add columns to the dataframe ['id', 'name', 'source_pos_x', 'source_pos_y', target_pos_x, target_pos_y] and the length of the dataframe is the number of correspondences
    source_df['id'] = range(1, len(correspondences) + 1)
    target_df['id'] = range(1, len(correspondences) + 1)
    source_df['name'] = "'Point2D'"
    target_df['name'] = "'Point2D'"
    source_df['pos_x'] = source[correspondences[:, 0], 0]
    source_df['pos_y'] = source[correspondences[:, 0], 1]
    target_df['pos_x'] = target[correspondences[:, 1], 0]
    target_df['pos_y'] = target[correspondences[:, 1], 1]
    
    return source_df, target_df


def save_correspondences_in1df(source, target, correspondences):
    # create empty dataframe
    df = pd.DataFrame()

    # add columns to the dataframe ['id', 'name', 'source_pos_x', 'source_pos_y', target_pos_x, target_pos_y] and the length of the dataframe is the number of correspondences
    df['id'] = range(1, len(correspondences) + 1)
    df['name'] = "'Point2D'"
    df['source_pos_x'] = source[correspondences[:, 0], 0]
    df['target_pos_x'] = target[correspondences[:, 1], 0]
    df['source_pos_y'] = source[correspondences[:, 0], 1]
    df['target_pos_y'] = target[correspondences[:, 1], 1]
    df['source_ind'] = correspondences[:, 0]
    df['target_ind'] = correspondences[:, 1]
    df['distance'] = np.sqrt(np.sum((source[correspondences[:, 0]] - target[correspondences[:, 1]]) ** 2, axis=1))

    
    return df

def clean_correspondences(correspondences):
    # Get unique target indices and their first occurrences
    unique_target_indices, unique_indices = np.unique(correspondences[:, 1], return_index=True)

    # Use the unique_indices to select the rows from the original array
    cleaned_correspondences = correspondences[unique_indices]

    return cleaned_correspondences

Loading source and target point clouds

In [None]:
import open3d as o3d

# Load the point clouds and set the scale
source = o3d.io.read_point_cloud("source.ply") # ('source.ply') ('source_all.ply')
target = o3d.io.read_point_cloud("target.ply") # ('target.ply') ('target_all.ply')
scale = 1000

# Convert to numpy arrays and subscale the points
source_points = np.asarray(source.points)/scale         # Subscale the points, so the physical distance between them is not too large
target_points = np.asarray(target.points)/scale          # Subscale the points, so the physical distance between them is not too large


### Test different registration methods:

In [None]:
import numpy as np
from probreg import callbacks
from probreg import cpd, gmmtree, filterreg, bcpd, l2dist_regs


# 1. Coherent Point Drift (CPD) - rigid - WORKS
tf_param_rigid, _, _ = cpd.registration_cpd(source_points, target_points, tf_type_name='rigid', maxiter=1000, tol=1e-5)
result_rigid = tf_param_rigid.transform(source_points)
visualize_result_nparray(source_points, target_points, result_rigid, "Rigid CPD")


# 2. Coherent Point Drift (CPD) - nonrigid - WORKS
tf_param_nonrigid, _, _ = cpd.registration_cpd(source_points, target_points, tf_type_name='nonrigid', maxiter=1000, tol=1e-5)
result_nonrigid = tf_param_nonrigid.transform(source_points)
visualize_result_nparray(source_points, target_points, result_nonrigid, "Non-rigid CPD")
chamfer_distance(target_points, result_nonrigid, "Chamfer distance 1st - Nonrigid CPD")


# 3. Coherent Point Drift (CPD) - affine - ERROR -  LinAlgError: Singular matrix
#tf_param_affine, _, _ = cpd.registration_cpd(source_points, target_points, tf_type_name='affine', maxiter=1000, tol=1e-5)
#result_affine = tf_param_affine.transform(source_points)
#visualize_result_nparray(source_points, target_points, result_affine, "Affine CPD")


# 4. FilterReg - WORKS
tf_param_filterreg, _, _ = filterreg.registration_filterreg(source_points, target_points, objective_type="pt2pt")
result_filterreg = tf_param_filterreg.transform(source_points)
visualize_result_nparray(source_points, target_points, result_filterreg, "FilterReg")


# 5. GMMReg (Gaussian Mixture Model Registration) ERROR - TypeError: cannot unpack non-iterable RigidTransformation object
#tf_param_gmmreg, _ = l2dist_regs.registration_gmmreg(source_points, target_points)
#result_gmmreg = tf_param_gmmreg.transform(source_points)
#visualize_result_nparray(source_points, target_points, result_gmmreg, "GMMReg")


# 6. SVR (Support Vector Registration) ERROR - InvalidParameterError: The 'gamma' parameter of OneClassSVM must be a str among {'scale', 'auto'} or a float in the range [0.0, inf). Got np.float64(inf) instead.
#tf_param_svr = l2dist_regs.registration_svr(source_points, target_points)
#result_svr = tf_param_svr.transform(source_points)
#visualize_result_nparray(source_points, target_points, result_svr, "SVR")


# 7. Bayesian Coherent Point Drift (BCPD) (experimental) - Error: cannot unpack non-iterable CombinedTransformation object
#tf_param_bcpd, _ = bcpd.registration_bcpd(source_points, target_points)
#result_bcpd = tf_param_bcpd.transform(source_points)
#visualize_result_nparray(source_points, target_points, result_bcpd, "BCPD")


# 8. GMMTree (Hierarchical Stochastic Model) - WORKS, but NO TRANSFORMATION
#tf_param_gmmtree, _ = gmmtree.registration_gmmtree(source_points, target_points)
#result_gmmtree = tf_param_gmmtree.transform(source_points)
#visualize_result_nparray(source_points, target_points, result_gmmtree, "GMMTree")


Print transformations:

In [None]:
# 1
print_transformations(tf_param_rigid, "Rigid CPD Transformation:")

# 2
print("Non-rigid CPD Transformation:")
print(tf_param_nonrigid.g)  # Displacement field
print(tf_param_nonrigid.w)  # Weight matrix

# 3
#print("Affine CPD Transformation:")
#print(tf_param_affine.b)  # Affine matrix
#print(tf_param_affine.t)  # Translation vector

#4
print_transformations(tf_param_filterreg, "FilterReg Transformation:")

#5-8
#print_transformations(tf_param_gmmreg, "GMMReg Transformation:")
#print_transformations(tf_param_svr, ""SVR Transformation:"")
#print_transformations(result_bcpd, "BCPD Transformation:")
#print_transformations(tf_param_gmmtree, "GMMTree Transformation:")

### Test multilevel registration methods - first rigid and then non-rigid registration

In [None]:
import numpy as np
import open3d as o3d
from probreg import cpd, gmmtree, filterreg, bcpd, l2dist_regs

# Load the point clouds and set the scale
source = o3d.io.read_point_cloud("source.ply") # ('source.ply') ('source_all.ply')
target = o3d.io.read_point_cloud("target.ply") # ('target.ply') ('target_all.ply')
source_all = o3d.io.read_point_cloud("source_all.ply") # ('source.ply') ('source_all.ply')
target_all = o3d.io.read_point_cloud("target_all.ply") # ('target.ply') ('target_all.ply')
scale = 1000

# Convert to numpy arrays and subscale the points
source_points = np.asarray(source.points)/scale         # Subscale the points, so the physical distance between them is not too large
target_points = np.asarray(target.points)/scale          # Subscale the points, so the physical distance between them is not too large
source_points_all = np.asarray(source_all.points)/scale         # Subscale the points, so the physical distance between them is not too large
target_points_all = np.asarray(target_all.points)/scale          # Subscale the points, so the physical distance between them is not too large



# 1st - registration by Coherent Point Drift (CPD) - rigid ---------------------------------------------
tf_param_rigid, _, _ = cpd.registration_cpd(source_points, target_points, tf_type_name='rigid', maxiter=1000, tol=1e-5)
source_points_res2 = tf_param_rigid.transform(source_points)
source_points_all_res2 = tf_param_rigid.transform(source_points_all)

visualize_result_nparray(source_points, target_points, source_points_res2, "Rigid CPD")
print_transformations(tf_param_rigid, "Rigid CPD Transformation:")
chamfer_distance(target_points, source_points_res2, "Chamfer distance 1st - Rigid CPD")
chamfer_distance(target_points_all, source_points_all_res2, " all Chamfer distance 2nd - Nonrigid CPD")


# 2nd - registration by Coherent Point Drift (CPD) - nonrigid ---------------------------------------------
tf_param_nonrigid, _, _ = cpd.registration_cpd(source_points_res2, target_points, tf_type_name='nonrigid', maxiter=1000, tol=1e-5)
source_points_res3 = tf_param_nonrigid.transform(source_points_res2)

tf_param_nonrigid_all, _, _ = cpd.registration_cpd(source_points_all_res2, target_points_all, tf_type_name='nonrigid', maxiter=1000, tol=1e-5)
source_points_all_res3 = tf_param_nonrigid_all.transform(source_points_all_res2)

visualize_result_nparray(source_points_res2, target_points, source_points_res3, "Nonrigid CPD")

print("Non-rigid CPD Transformation:")
print(tf_param_nonrigid.g)  # Displacement field
print(tf_param_nonrigid.w)  # Weight matrix

chamfer_distance(target_points, source_points_res3, "Chamfer distance 2nd - Nonrigid CPD")
chamfer_distance(target_points_all, source_points_all_res3, "all Chamfer distance 2nd - Nonrigid CPD")



# 3rd - finding the correspondence between the points ---------------------------------------------
threshold = 0.02
trans_init = np.asarray([[ 1, 0, 0, 0],  # In trying to have identity matrix as initial transformation so I can instead of source point use result of non-rigid reg points and to the ICP on those
                         [ 0, 1, 0, 0],
                         [ 0, 0, 1, 0],
                         [ 0, 0, 0, 1]])

sour = convert_to_pcd(source_points_res3)
targ = convert_to_pcd(target_points)
sour_all = convert_to_pcd(source_points_all_res3)
targ_all = convert_to_pcd(target_points_all)

evaluation = o3d.pipelines.registration.evaluate_registration(sour, targ, threshold, trans_init)
evaluation_all = o3d.pipelines.registration.evaluate_registration(sour_all, targ_all, threshold, trans_init)
print("Evaluation: ", evaluation)
print("Correspondence set: ")
print(np.asarray(evaluation.correspondence_set))

print("Apply point-to-point ICP")
reg_p2p = o3d.pipelines.registration.registration_icp(
    sour, targ, threshold, trans_init,
    o3d.pipelines.registration.TransformationEstimationPointToPoint())
print(reg_p2p)
print("Transformation is:")
print(reg_p2p.transformation)

#evaluation = o3d.pipelines.registration.evaluate_registration(sour, targ, threshold, reg_p2p.transformation)
#evaluation_all = o3d.pipelines.registration.evaluate_registration(sour_all, targ_all, threshold, reg_p2p.transformation)
#print("Evaluation: ", evaluation)
print("Correspondence set: ")
print(np.asarray(evaluation_all.correspondence_set))


correspondences = clean_correspondences(np.asarray(evaluation.correspondence_set))
correspondences_all = clean_correspondences(np.asarray(evaluation_all.correspondence_set))
print("Correspondences: ", correspondences_all)

dff2s,dff2t = save_correspondences_in2df(source_points, target_points, correspondences)

# This correspondence seems to be working in a way that corresponding point may repeat in the list. 
# same point can be corresponding to multiple points - fixed by clean_correspondences

#orig_source_df = save_correspondences_in1df(np.asarray(orig_source.points), np.asarray(orig_target.points), correspondences)
orig_source_df, orig_target_df = save_correspondences_in2df((source_points)/[scale_y,scale_x,1]*scale, 
                                                            target_points*scale, correspondences)

orig_source_all_df, orig_target_all_df = save_correspondences_in2df((source_points_all)/[scale_y,scale_x,1]*scale, 
                                                            target_points_all*scale, correspondences_all)

df = save_correspondences_in1df(source_points_all, target_points_all, correspondences_all)

In [None]:
# measure the euclidean distance between the points in the orig_source_all_df and orig_target_all_df dataframe

def euclidean_distance(df1, df2):
    dist = np.sqrt((df1['pos_x'] - df2['pos_x'])**2 + (df1['pos_y'] - df2['pos_y'])**2)
    return dist

def euclidean_distance(df):
    dist = np.sqrt((df['source_pos_x'] - df['target_pos_x'])**2 + (df['source_pos_y'] - df['target_pos_y'])**2)
    return dist




Finding correspondences using Point2Point algorithm - This one can be applied to full set of points

Create panda dataframe with the corresponding points having the same "id":

In [None]:
dataframe_to_xml(orig_source_df, 'original_source_points_0502.xml')
dataframe_to_xml(orig_target_df, 'original_target_points_0502.xml')
dataframe_to_xml(orig_source_all_df, 'original_source_points_all_0502.xml')
dataframe_to_xml(orig_target_all_df, 'original_target_points_all_0502.xml')




### To warp an image according to a displacement field calculated from point clouds.
Expand the displacement field to cover the convex hull and then the entire image.

To start understanding the displacement field I produce the field myself from vector between the corresponding pairs and use it.

In [None]:
import numpy as np
import flow_vis
import matplotlib.pyplot as plt
from scipy.interpolate import griddata


def expand_displacement_field(points, displacements, image_shape):
    # Create a grid covering the entire image
    y, x = np.mgrid[0:image_shape[0], 0:image_shape[1]]

    # Interpolate
    expanded_field = griddata(points, displacements, (y, x), method='linear', fill_value=0)
 
 #   # Interpolate x and y displacements separately
 #   dx = griddata(points, displacements[:, 0] * weights, (y, x), method='linear', fill_value=0)
 #   dy = griddata(points, displacements[:, 1] * weights, (y, x), method='linear', fill_value=0)
 #   
 #   # Stack the two displacement fields
 #   expanded_field = np.stack([dx, dy], axis=-1)
    return expanded_field

def calculate_displacement_vectors(sourse_database, target_database):
    displacements_x = np.asarray(target_database['pos_x']) - np.asarray(sourse_database['pos_x']*scale_x)
    displacements_y = np.asarray(target_database['pos_y']) - np.asarray(sourse_database['pos_y']*scale_y)
    displacements = np.column_stack((displacements_x, displacements_y))
    return displacements

def visualize_extended_field(expanded_field, point_cloud):
    flow_color = flow_vis.flow_to_color(expanded_field, convert_to_bgr=False)
    plt.imshow(flow_color)
    plt.scatter(point_cloud[:, 1], point_cloud[:, 0], c='r', s=5)
    plt.show()


sourse_database = orig_source_df #orig_source_all_df
target_database = orig_target_df #orig_target_all_df
points = np.column_stack((sourse_database['pos_x']*scale_x, sourse_database['pos_y']*scale_y))
displacements = calculate_displacement_vectors(sourse_database, target_database)

expanded_field = expand_displacement_field(points, displacements, EMimage.shape)

visualize_extended_field(expanded_field, points)

In [None]:
from scipy.interpolate import interpn

def warp_image(image, displacement_field):
    height, width = image.shape[:2]
    y_coords, x_coords = np.meshgrid(np.arange(height), np.arange(width), indexing='ij')
    
    src_coords = np.stack([
        y_coords + displacement_field[:,:,1], 
        x_coords + displacement_field[:,:,0]
    ], axis=-1)
    
    warped_image = interpn(
        (np.arange(height), np.arange(width)),
        image,
        src_coords,
        method='linear',
        bounds_error=False,
        fill_value=0
    )
    
    return warped_image.astype(image.dtype)

warped_image = warp_image(LMimage, expanded_field)
plt.imshow(warped_image)
plt.show()

In [None]:
pointsT = np.column_stack((target_database['pos_x'], target_database['pos_y']))
plot.plot_detection(warped_image[:,:,1], pointsT, shape="circle", radius = 3*scale_y, color = "red", linewidth = 1, fill=False, contrast=True) 


In [None]:
warped_image[:,:,0].shape

In [None]:
height, width = EMimage.shape[:2]
y_coords, x_coords = np.meshgrid(np.arange(height), np.arange(width), indexing='ij')
src_coords = np.stack([
    y_coords - expanded_field[:,:,1],
    x_coords - expanded_field[:,:,0]
    ], axis=-1)
#print(src_coords)

src_coords[8000:8010,3000]

In [None]:
# This code is not necessary
# trying to use ICP as correspondence estimation, but as an input I need a pre-registered point clouds and I use rigid transformation matrix and not non-rigid one
# https://www.open3d.org/docs/latest/tutorial/pipelines/icp_registration.html
import copy

def draw_registration_result(source, target, transformation):
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.paint_uniform_color([1, 0.706, 0])
    target_temp.paint_uniform_color([0, 0.651, 0.929])
    source_temp.transform(transformation)
    o3d.visualization.draw_geometries([source_temp, target_temp])
    
#source = o3d.io.read_point_cloud("source.ply")
#target = o3d.io.read_point_cloud("target.ply")
#source_points, target_points, result_rigid,
target = o3d.geometry.PointCloud()
target.points = o3d.utility.Vector3dVector(target_points)
source = o3d.geometry.PointCloud()
source.points = o3d.utility.Vector3dVector(source_points_res3 )#(result_nonrigid)

threshold = 0.02

trans_init = np.asarray([[ 1, 0, 0, 0],  # In trying to have identity matrix as initial transformation so I can instead of source point use result of non-rigid reg points and to the ICP on those
                         [ 0, 1, 0, 0],
                         [ 0, 0, 1, 0],
                         [ 0, 0, 0, 1]])


draw_registration_result(source, target, trans_init)
print("Initial alignment")
evaluation = o3d.pipelines.registration.evaluate_registration(
    source, target, threshold, trans_init)

print("Evaluation: ", evaluation)
print(evaluation.fitness)
print(evaluation.inlier_rmse)
print(evaluation.correspondence_set)
print(np.asarray(evaluation.correspondence_set))

print(source.points[np.asarray(evaluation.correspondence_set[0][0])])
print(target.points[np.asarray(evaluation.correspondence_set[0][1])])

print("Apply point-to-point ICP")
reg_p2p = o3d.pipelines.registration.registration_icp(
    source, target, threshold, trans_init,
    o3d.pipelines.registration.TransformationEstimationPointToPoint())
print(reg_p2p)
print("Transformation is:")
print(reg_p2p.transformation)
draw_registration_result(source, target, reg_p2p.transformation)

In [None]:


# Load point cloud data
orig_source = o3d.io.read_point_cloud("source.ply") #("source_all.ply")
orig_target = o3d.io.read_point_cloud("target.ply") #("target_all.ply")
orig_source_all = o3d.io.read_point_cloud("source_all.ply")
orig_target_all = o3d.io.read_point_cloud("target_all.ply")

correspondences = clean_correspondences(np.asarray(evaluation.correspondence_set))
correspondences_all = clean_correspondences(np.asarray(evaluation_all.correspondence_set))

# Upscale back the coordinates
source_points = np.asarray(source.points)*scale
target_points = np.asarray(target.points)*scale

result_df = save_correspondences_in1df(source_points, target_points, correspondences)
#result_source_df,result_target_df = save_correspondences_in2df(source_points, target_points, correspondences)
print(result_df)
#print (result_target_df)

result_all_df = save_correspondences_in1df(source_points_all, target_points_all, correspondences_all)
##############################


#orig_source_df = save_correspondences_in1df(np.asarray(orig_source.points), np.asarray(orig_target.points), correspondences)
orig_source_df, orig_target_df = save_correspondences_in2df(np.asarray(orig_source.points)/[scale_y,scale_x,1], 
                                                            np.asarray(orig_target.points), correspondences)

orig_source_all_df, orig_target_all_df = save_correspondences_in2df(np.asarray(orig_source_all.points)/[scale_y,scale_x,1], 
                                                            np.asarray(orig_target_all.points), correspondences_all)

print(orig_source_df)
print(orig_target_df)
print(orig_source_all_df)
print(orig_target_all_df)

In [None]:
# warp the source image called 'LMimage' to the target image using the estimated transformation
matrix = np.array([[1, -0.5, 100], [0.1, 0.9, 50], [0.0015, 0.0015, 1]])
tform = transform.ProjectiveTransform(matrix=matrix)
tf_img = transform.warp(img, tform.inverse)
fig, ax = plt.subplots()
ax.imshow(tf_img)
ax.set_title('Projective transformation')

plt.show()


In [None]:
# after registration of source to target using cpd algorithm I want to detect the corresponding points in the source and target point clouds
# and then calculate the distance between the corresponding points in the source and target point clouds.

# I want to write the corresponding points in the source and target point clouds to a .txt file called "corresponding_points.txt"
# and the distances between the corresponding points in the source and target point clouds to a .txt file called "distances.txt"

# Convert Open3D point clouds to NumPy arrays
source_points = np.asarray(source.points)
target_points = np.asarray(target.points)
result_points = np.asarray(result.points)

# Find the corresponding points in the source and target point clouds
correspondence = cpd.registration_cpd.find_correspondence(source_points, target_points, tf_param.rot, tf_param.t)

# Write the corresponding points to a .txt file
np.savetxt("corresponding_points.txt", correspondence, fmt='%f', delimiter=' ')

# Calculate the distances between the corresponding points in the source and target point clouds
distances = np.linalg.norm(source_points[correspondence[:, 0]] - target_points[correspondence[:, 1]], axis=1)

# Write the distances to a .txt file
np.savetxt("distances.txt", distances, fmt='%f', delimiter=' ')

# Print the average distance
print("Average distance:", np.mean(distances))

# I want to write the transformation matrix to a .txt file called "transformation_matrix.txt"
# and the transformed source point cloud to a .ply file called "result.ply"

# Write the transformation matrix to a .txt file
np.savetxt("transformation_matrix.txt", tf_param.rot, fmt='%f', delimiter=' ')


In [None]:
# after registration of source to target using cpd algorithm I want to detect the points in the source that are not in the target
# I want to write the coordinates of these points to a .txt file called "source_not_in_target.txt"

# Convert Open3D PointCloud to NumPy array
source_points = np.asarray(source.points)
target_points = np.asarray(target.points)
result_points = np.asarray(result.points)

# Find the points in the source that are not in the target
source_not_in_target = np.setdiff1d(source_points, target_points, assume_unique=False)
print(source_not_in_target)

# Write the coordinates of these points to a .txt file
np.savetxt("source_not_in_target.txt", source_not_in_target, fmt='%f', delimiter=' ')


In [None]:
import numpy as np
import transforms3d as t3d
from probreg import filterreg
from probreg import callbacks
import utils


cbs = [callbacks.Open3dVisualizerCallback(source, target)]
tf_param, _, _ = filterreg.registration_filterreg(source, target,
                                                  objective_type='pt2pt',
                                                  sigma2=None,
                                                  update_sigma2=True,
                                                  callbacks=cbs)

result = copy.deepcopy(source)
result.points = tf_param.transform(result.points)

print("result: ", np.rad2deg(t3d.euler.mat2euler(tf_param.rot)),
      tf_param.scale, tf_param.t)
print("result: ", tf_param.rot,
      tf_param.scale, tf_param.t)
# draw result
source.paint_uniform_color([1, 0, 0])
target.paint_uniform_color([0, 1, 0])
result.paint_uniform_color([0, 0, 1])
o3.visualization.draw_geometries([source, target, result])

TODO:

- I was checking different registration methods
Need to do: clean detection of fiducial particles in EM to get only the large clusters 3+ particles
- take only the strongest points from LM image. 
- alternatively we can subsample the points first and use only a subset - hard subsample the same way both EM and LM detections
- method for detecting registration pair and assign the same ID number to the both particles


In [None]:
from probreg import cpd
from probreg import callbacks
import matplotlib.pyplot as plt
import utils

def prepare_source_and_target_nonrigid_2d(source_filename,
                                          target_filename):
    source = np.loadtxt(source_filename)
    target = np.loadtxt(target_filename)
    return source, target

print(source)
print(target)

source, target = prepare_source_and_target_nonrigid_2d('source_2D.txt',
                                                             'target_2D.txt')
cbs = [callbacks.Plot2DCallback(source, target)]
tf_param, _, _ = cpd.registration_cpd(source, target, 'affine',
                                      callbacks=cbs)
plt.show()

In [None]:
from probreg import cpd
from probreg import callbacks
import matplotlib.pyplot as plt
import utils

source, target = prepare_source_and_target_nonrigid_2d('source_2D.txt',
                                                             'target_2D.txt')
cbs = [callbacks.Plot2DCallback(source, target)]
tf_param, _, _ = cpd.registration_cpd(source, target, 'nonrigid',
                                      callbacks=cbs)
plt.show()