In [1]:
import os, torch, time, shutil, json,glob,sys,copy, argparse
import numpy as np
from easydict import EasyDict as edict
from torch.utils.data import Dataset
from torch import optim, nn
import open3d as o3d
cwd = os.getcwd()
sys.path.append(cwd)
from datasets.indoor import IndoorDataset
from datasets.dataloader import get_dataloader
from models.architectures import KPFCNN
from lib.utils import load_obj, setup_seed,natural_key, load_config
from lib.benchmark_utils import ransac_pose_estimation, to_o3d_pcd, get_blue, get_yellow, to_tensor
from lib.trainer import Trainer
from lib.loss import MetricLoss
from numpy.linalg import inv
import shutil
setup_seed(0)
import matplotlib.pyplot as plt

plt.subplots_adjust(left=0,
                    bottom=0,
                    right=1.0,
                    top=1.0,
                    wspace=0.5,
                    hspace=0.2)

<Figure size 640x480 with 0 Axes>

# 1. PREDATOR Module 

In [2]:
class ThreeDMatchDemo(Dataset):
    """
    Load subsampled coordinates, relative rotation and translation
    Output(torch.Tensor):
        src_pcd:        [N,3]
        tgt_pcd:        [M,3]
        rot:            [3,3]
        trans:          [3,1]
    """
    def __init__(self,config, src, tgt):
        super(ThreeDMatchDemo,self).__init__()
        self.config = config
        self.src = src
        self.tgt = tgt

    def __len__(self):
        return 1

    def __getitem__(self,item): 
        # get pointcloud
        #print (type(torch.load(self.src_path)))
        #src_pcd = torch.load(self.src_path).astype(np.float32)
        #tgt_pcd = torch.load(self.tgt_path).astype(np.float32)   
        src_pcd = self.src.astype(np.float32)
        tgt_pcd = self.tgt.astype(np.float32)    
        
        
        #src_pcd = o3d.io.read_point_cloud(self.src_path)
        #tgt_pcd = o3d.io.read_point_cloud(self.tgt_path)
        #src_pcd = src_pcd.voxel_down_sample(0.025)
        #tgt_pcd = tgt_pcd.voxel_down_sample(0.025)
        #src_pcd = np.array(src_pcd.points).astype(np.float32)
        #tgt_pcd = np.array(tgt_pcd.points).astype(np.float32)


        src_feats=np.ones_like(src_pcd[:,:1]).astype(np.float32)
        tgt_feats=np.ones_like(tgt_pcd[:,:1]).astype(np.float32)

        # fake the ground truth information
        rot = np.eye(3).astype(np.float32)
        trans = np.ones((3,1)).astype(np.float32)
        correspondences = torch.ones(1,2).long()

        return src_pcd,tgt_pcd,src_feats,tgt_feats,rot,trans, correspondences, src_pcd, tgt_pcd, torch.ones(1)


def predict(config, demo_loader):
    config.model.eval()
    c_loader_iter = demo_loader.__iter__()
    with torch.no_grad():
        inputs = c_loader_iter.next()
        ##################################
        # load inputs to device.
        for k, v in inputs.items():  
            if type(v) == list:
                inputs[k] = [item.to(config.device) for item in v]
            else:
                inputs[k] = v.to(config.device)

        ###############################################
        # forward pass
        feats, scores_overlap, scores_saliency = config.model(inputs)  #[N1, C1], [N2, C2]
        pcd = inputs['points'][0]
        len_src = inputs['stack_lengths'][0][0]
        c_rot, c_trans = inputs['rot'], inputs['trans']
        correspondence = inputs['correspondences']
        
        src_pcd, tgt_pcd = pcd[:len_src], pcd[len_src:]
        src_raw = copy.deepcopy(src_pcd)
        tgt_raw = copy.deepcopy(tgt_pcd)
        src_feats, tgt_feats = feats[:len_src].detach().cpu(), feats[len_src:].detach().cpu()
        src_overlap, src_saliency = scores_overlap[:len_src].detach().cpu(), scores_saliency[:len_src].detach().cpu()
        tgt_overlap, tgt_saliency = scores_overlap[len_src:].detach().cpu(), scores_saliency[len_src:].detach().cpu()

        ########################################
        # do probabilistic sampling guided by the score
        src_scores = src_overlap * src_saliency
        tgt_scores = tgt_overlap * tgt_saliency

        if(src_pcd.size(0) > config.n_points):
            idx = np.arange(src_pcd.size(0))
            probs = (src_scores / src_scores.sum()).numpy().flatten()
            idx = np.random.choice(idx, size= config.n_points, replace=False, p=probs)
            src_pcd, src_feats = src_pcd[idx], src_feats[idx]
        if(tgt_pcd.size(0) > config.n_points):
            idx = np.arange(tgt_pcd.size(0))
            probs = (tgt_scores / tgt_scores.sum()).numpy().flatten()
            idx = np.random.choice(idx, size= config.n_points, replace=False, p=probs)
            tgt_pcd, tgt_feats = tgt_pcd[idx], tgt_feats[idx]

        ########################################
        # run ransac and draw registration
        tsfm = ransac_pose_estimation(src_pcd, tgt_pcd, src_feats, tgt_feats, mutual=False)
        return tsfm
        #draw_registration_result(src_raw, tgt_raw, src_overlap, tgt_overlap, src_saliency, tgt_saliency, tsfm)

In [3]:
# load configs
config = load_config(r'configs/test/indoor.yaml')
config = edict(config)
if config.gpu_mode:
    config.device = torch.device('cuda')
else:
    config.device = torch.device('cpu')

# model initialization
config.architecture = [
    'simple',
    'resnetb',
]
for i in range(config.num_layers-1):
    config.architecture.append('resnetb_strided')
    config.architecture.append('resnetb')
    config.architecture.append('resnetb')
for i in range(config.num_layers-2):
    config.architecture.append('nearest_upsample')
    config.architecture.append('unary')
config.architecture.append('nearest_upsample')
config.architecture.append('last_unary')
config.model = KPFCNN(config).to(config.device)

# create dataset and dataloader
info_train = load_obj(config.train_info)
train_set = IndoorDataset(info_train,config,data_augmentation=True)
_, neighborhood_limits = get_dataloader(dataset=train_set,
                                    batch_size=config.batch_size,
                                    shuffle=True,
                                    num_workers=config.num_workers,
                                    )
# load pretrained weights
assert config.pretrain != None
state = torch.load(config.pretrain)
config.model.load_state_dict(state['state_dict'])

def PredatorRegistration(src, tgt):
    #demo_set = ThreeDMatchDemo(config, config.src_pcd, config.tgt_pcd)
    start = time.time()
    demo_set = ThreeDMatchDemo(config, src, tgt)
    demo_loader, _ = get_dataloader(dataset=demo_set,
                                        batch_size=config.batch_size,
                                        shuffle=False,
                                        num_workers=3,
                                        neighborhood_limits=neighborhood_limits)

    # do pose estimation
    return predict(config, demo_loader), time.time() - start

def ICP_registration(src, tgt, threshold = 0.1, trans_init = np.identity(4)):
    if isinstance(src, np.ndarray):
        source = o3d.geometry.PointCloud()
        source.points = o3d.utility.Vector3dVector(src)
    if isinstance(tgt, np.ndarray):
        target = o3d.geometry.PointCloud()
        target.points = o3d.utility.Vector3dVector(tgt)
    
    start_time = time.time()
    
    reg_p2p = o3d.registration.registration_icp(
        source, target, threshold, trans_init,
        o3d.registration.TransformationEstimationPointToPoint(),
        o3d.registration.ICPConvergenceCriteria(max_iteration=1000))
    #print (reg_p2p)
    return reg_p2p.transformation, (time.time() - start_time)

len(dataset):3720
[0] Calib Neighbors 00000000: timings 0.20s
[1] Calib Neighbors 00000001: timings 0.40s
[2] Calib Neighbors 00000002: timings 0.57s
[3] Calib Neighbors 00000003: timings 0.77s
[4] Calib Neighbors 00000004: timings 0.97s




# 2. Functions

In [5]:
to_reg_m = np.array ([
    [5.0,0.0,0.0,0.0],
    [0.0,5.0,0.0,0.0],
    [0.0,0.0,5.0,0.0],
    [0.0,0.0,0.0,1.0]
])

def filter_point_cloud(points, voxel_size=0.015):
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    size_before = points.shape[0]
    
    #np.savetxt('temp1.txt',points)
    
    filter_pcd = pcd.voxel_down_sample(voxel_size)
#     filter_pcd = pcd.uniform_down_sample(every_k_points=3)
#     filter_pcd, ind = filter_pcd.remove_statistical_outlier(nb_neighbors=10, std_ratio=1.0)
    #filter_pcd, ind = filter_pcd.remove_radius_outlier(nb_points=16, radius=0.03)
    
    filtered_points = np.asarray(filter_pcd.points)
    #np.savetxt('temp2.txt',filtered_points)
    
    #after_size = filtered_points.shape[0]
    #print (f"{size_before}->{after_size} (-{(size_before-after_size) * 100/size_before:.4f}%)")
    return filtered_points

def transform_point_cloud(T, pc):
    # PC:N*3 T:4*4
    if pc.shape[1] == 3:
        new_pc = np.c_[ pc, np.ones(pc.shape[0]) ]
        transformed_pc = T.dot(new_pc.T)
    else:
        print ("TODO!")
    return transformed_pc[0:3,].T

def mpprint(tsfm):
    for a in tsfm:
        print (*a, end='\n')

def normalize(v):
    norm = np.linalg.norm(v)
    if norm == 0: 
       return v
    return v / norm

def get_Base_Cam_Calibration(tsfm, end2Raw = to_reg_m):
    new_cs = np.dot(inv(end2Raw) , inv(tsfm))
    #new_cs = inv(tsfm)

    # Transformation from Camera to Base
    # It should be applied to the @model data directly, and the result is shown with @scanned raw data
    #mpprint (new_cs)

    # same as the translation in @new_cs
    model_original_p = np.array([0,0,0,1]).reshape(4,1)
    #print (f'translate value is over here:\n', np.dot(new_cs, model_original_p))
    translate_T = np.dot(new_cs, model_original_p)
    
    R_Base_Cam = np.array([
        normalize(new_cs[0:3, 0:3][0]),
        normalize(new_cs[0:3, 0:3][1]),
        normalize(new_cs[0:3, 0:3][2])
    ])
    #print (R_Base_Cam,'\n', normalize((inv(new_cs[0:3, 0:3]).T)[0]))
    #print (R_Base_Cam, '\n', modelInScanedCS.as_matrix())
    R_Base_Cam_4by4 = np.identity(4)
    R_Base_Cam_4by4[0:3, 0:3] = R_Base_Cam

    Translation_Base_Cam = np.identity(4)
    Translation_Base_Cam[0:4,3:4] = translate_T
    #print (Translation_Base_Cam)

    # T_Base_Cam_Calibration = np.dot(R_Base_Cam_4by4,Translation_Base_Cam)  
    T_Base_Cam_Calibration = np.identity(4)
    T_Base_Cam_Calibration[0:4,3:4] = translate_T
    T_Base_Cam_Calibration[0:3, 0:3] = R_Base_Cam
    
    return T_Base_Cam_Calibration

# calibration_T_TCP_Base = calDHParameters)_
def get_Cam_TCP_Calibration(T_Base_Cam_Calibration, calibration_T_TCP_Base):
    T_Base_TCP_Calibration = inv(calibration_T_TCP_Base)
    T_Cam_Base_Calibration = inv(T_Base_Cam_Calibration)
    T_Cam_TCP_Calibration = np.dot(T_Base_TCP_Calibration, T_Cam_Base_Calibration)
    #print (T_Cam_TCP_Calibration, T_Cam_TCP_Calibration.shape)

    #Cam_TCP_Calibration_R = T_Cam_TCP_Calibration[0:3, 0:3]
    #print (1000*T_Cam_TCP_Calibration[0:3, 3:4].reshape(1,3), 
    #       list(map(np.rad2deg, RotationMatrixToRXYZ(Cam_TCP_Calibration_R))))
    return T_Cam_TCP_Calibration

def RotationMatrixToRXYZ(matrix):
    beta = np.arctan2(-matrix[2,0], np.sqrt(matrix[0,0]*matrix[0,0] + matrix[1,0]*matrix[1,0]))
    alpha = np.arctan2(matrix[1,0]/np.cos(beta), matrix[0,0]/np.cos(beta))
    r = np.arctan2(matrix[2,1]/np.cos(beta), matrix[2,2]/np.cos(beta))
    return r, beta, alpha

def reject_outliers(data, m = 1.5):
    d = np.abs(data - np.median(data))
    mdev = np.median(d)
    s = d/mdev if mdev else 0.
    return data[s<m]

In [6]:
ROBOT_TYPE = {
    'UR3e': False,
    'UR5e': True,
    'UR5': False,
}

if ROBOT_TYPE['UR3e']:
    tgt_data = np.loadtxt(r'./demo_test/UR3e_base_m_x5.txt').astype(np.float32)[:,0:3]
elif ROBOT_TYPE['UR5e']:
    tgt_data = np.loadtxt(r'./demo_test/UR5e_base_m_x5.txt').astype(np.float32)[:,0:3]
elif ROBOT_TYPE['UR5']:
    tgt_data = np.loadtxt(r'./demo_test/UR5_base_m_x5.txt').astype(np.float32)[:,0:3]
    
#tgt_data = transform_point_cloud(to_reg_m, tgt_data)
#np.savetxt('./assets/UR5e_base.txt',tgt_data,fmt='%1.6f')
tgt_data = filter_point_cloud(tgt_data, voxel_size=0.02)
print (tgt_data.shape)

# n = 1/0.9595599145956509 # 25.4.2023 ur5
n = 1/0.96630655934641 # 26.4.2023 ur5e
scale = np.array([
    [n,0,0,0 ],
    [0,n,0,0 ],
    [0,0,n,0 ],
    [0,0,0,1 ]
])

(15781, 3)


In [8]:
from tqdm.notebook import tqdm
for joints_num in tqdm(range(1, 6 +1)):
    #joints_num = 11
    #src_filename_list = glob.glob('../OpenPCDet/OpenPCDet2/OpenPCDetME/tools/repeatability_object_points2/*.txt')
    if ROBOT_TYPE['UR3e']:
        src_filename_list = glob.glob(r'./demo_test/ur3e_roi_data/test2.txt')
    elif ROBOT_TYPE['UR5e']:
        src_filename_list = glob.glob(f'./demo_test/ur5e_roi_data/joints_{joints_num}/roi*.txt')
    elif ROBOT_TYPE['UR5']:
        src_filename_list = glob.glob(f'./demo_test/ur5_roi_data/joints_{joints_num}/roi*.txt')
        #src_filename_list = glob.glob('./demo_test/makeCompletePoints/*.txt')

    src_filename_list.sort()
    print (f'[{joints_num}]src_filename_list length:{len(src_filename_list)}')
    print (src_filename_list[0], '\n', 'and more ...')
    
    tsfm_T_list, tsfm_R_list = [], []
    for index, src_file in tqdm(enumerate(src_filename_list), total=len(src_filename_list)):
        src_data = np.loadtxt(src_file).astype(np.float32)[:,0:3]
        src_data = transform_point_cloud(np.dot(to_reg_m, scale), src_data)
#         src_data = transform_point_cloud(to_reg_m, src_data)
        #src_data = transform_point_cloud(to_reg_m, src_data)
        
        points_filter = filter_point_cloud(src_data, voxel_size=0.02)
        src_data = points_filter

        #print (src_data.shape)

        #np.savetxt('temp1_src.txt', src_data)
        #np.savetxt('temp2_tgt.txt', tgt_data)

        tsfm, consuming_time = PredatorRegistration(src_data, tgt_data)
        #mpprint(tsfm)
        tsfm, consuming_time = ICP_registration(src_data, tgt_data, trans_init=tsfm)
        if ROBOT_TYPE['UR3e']:
            save_filename = f'./demo_test/ur3e_tsfm/joints_{joints_num}/tsfm_' + '{:06n}'.format(index) + '.txt'
        elif ROBOT_TYPE['UR5e']:
            save_filename = f'./demo_test/ur5e_tsfm/joints_{joints_num}/tsfm_' + '{:06n}'.format(index) + '.txt'
        elif ROBOT_TYPE['UR5']:
            save_filename = f'./demo_test/ur5_tsfm/joints_{joints_num}/tsfm_' + '{:06n}'.format(index) + '.txt'
        
#         if index == 2:
#             temp = transform_point_cloud(tsfm, src_data)
#             np.savetxt('temp.txt', temp)
#             adfasdf
#         mpprint(tsfm)
#         dfadfadf
        #tsfm_T = (tsfm[0:3, 3:4].reshape(1,3)*1000)[0].tolist()
        #print (list(map(np.rad2deg, RotationMatrixToRXYZ(Cam_TCP_Calibration_R))))
        #tsfm_R = list(map(np.rad2deg, RotationMatrixToRXYZ(tsfm[0:3, 0:3])))
        #tsfm_T_list.append([tsfm_T[0], tsfm_T[1], tsfm_T[2]])
        #tsfm_R_list.append([tsfm_R[0], tsfm_R[1], tsfm_R[2]])
        
        np.savetxt(save_filename,tsfm,fmt='%1.6f')
        #tsfm_list.append(tsfm)
        #break
        #if index == 3:
#         break
    
#     tsfm_T_list = np.asarray(tsfm_T_list)
#     tsfm_R_list = np.asarray(tsfm_R_list)
    
    #if joints_num == 2:
   #     break
#     break

HBox(children=(IntProgress(value=0, max=6), HTML(value='')))

[1]src_filename_list length:20
./demo_test/ur5e_roi_data/joints_1/roi_000000.txt 
 and more ...


HBox(children=(IntProgress(value=0, max=20), HTML(value='')))


[2]src_filename_list length:20
./demo_test/ur5e_roi_data/joints_2/roi_000000.txt 
 and more ...


HBox(children=(IntProgress(value=0, max=20), HTML(value='')))


[3]src_filename_list length:20
./demo_test/ur5e_roi_data/joints_3/roi_000000.txt 
 and more ...


HBox(children=(IntProgress(value=0, max=20), HTML(value='')))


[4]src_filename_list length:20
./demo_test/ur5e_roi_data/joints_4/roi_000000.txt 
 and more ...


HBox(children=(IntProgress(value=0, max=20), HTML(value='')))


[5]src_filename_list length:20
./demo_test/ur5e_roi_data/joints_5/roi_000000.txt 
 and more ...


HBox(children=(IntProgress(value=0, max=20), HTML(value='')))


[6]src_filename_list length:20
./demo_test/ur5e_roi_data/joints_6/roi_000000.txt 
 and more ...


HBox(children=(IntProgress(value=0, max=20), HTML(value='')))



