In [None]:
#success rate exp
#generate trajectory without using gazebo

import sys
import os

from util.pc_data_utils import CameraInfo
import numpy as np
from scipy.spatial.transform import Rotation as R

from PIL import Image
import open3d as o3d
import os
import copy
_PROXY_VARS = [
    "http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY",
    "ALL_PROXY", "all_proxy", "no_proxy", "NO_PROXY"
]

for v in _PROXY_VARS:
    os.environ.pop(v, None)     


In [None]:
#load the pc data

down_sample_size = 0.05
from util.pc_data_utils import vis_points,merge_clouds,create_point_cloud_from_depth_image,down_sample,mask_to_pc,to_o3d_pcd

from util.pc_data_utils import remove_ground

from util.pc_seg import objects_seg
import torch

vis_flag = False
#notebook
current_path = os.getcwd()
intri_path = os.path.join(current_path,'../param/intrinsics.npy')
intrinsic = np.load(intri_path)
print(intrinsic)
factor_depth =  1000
camera = CameraInfo(1280.0, 720.0, intrinsic[0][0], intrinsic[1][1], intrinsic[0][2], intrinsic[1][2], factor_depth)

#load the img
color_img_list = []
depth_img_list = []
K_list = []
camera_pose_list = []

img_num = 5
data_dir = os.path.join(current_path,'../example/table_chair/')
for i in range(1,img_num+1):
    color = np.array(Image.open(os.path.join(data_dir, f'robot1_rgb{i}.png'))) 
    depth = np.array(Image.open(os.path.join(data_dir, f'robot1_depth{i}.png')))
    K_inv = np.load(os.path.join(data_dir, f'robot1_K_inv{i}.npy'))
    color_img_list.append(color)
    depth_img_list.append(depth)
    K_list.append(np.linalg.inv(K_inv))
    camera_pose_list.append(K_inv)


In [None]:
#segmentation for the pc
obj_name_list = ['chair']
obj_img_dict = {}
obj_img_dict[obj_name_list[0]] = [0]

now_seg = objects_seg(color_img_list,depth_img_list,camera,obj_name_list)

#get the mask of the object
now_seg.obj_img_dict = obj_img_dict

#get the scene point clouds
scene_pc = []
scene_color = []
for i in range(0,img_num):
    now_pc = create_point_cloud_from_depth_image(depth_img_list[i], camera,organized=False)
    scene_color.append(color_img_list[i].reshape(-1,3))
    scene_pc.append(now_pc)
#merge pc
scene_pc = merge_clouds(scene_pc, K_list)
scene_color = np.concatenate(scene_color, axis=0)


#deal with the obj pointclouds
obj_pc_dict = {}
pc_dict = {}

for target_obj in obj_name_list:
    clouds, masks = now_seg.get_obj_cloud(target_obj)

    #merge the clouds
    new_clouds = []
    obj_color = np.array([]).reshape(-1,3)
    for index, (cloud, mask) in enumerate(zip(clouds, masks)):
        img_index = now_seg.obj_img_dict[target_obj][index]
        pc,color = mask_to_pc(mask, cloud, color_img_list[img_index])
        new_clouds.append(pc)
        

        obj_color = np.concatenate((obj_color, color), axis=0)
        #save pc
        np.save(os.path.join(data_dir, f'{target_obj}_pc_{img_index}.npy'), pc)
        np.save(os.path.join(data_dir, f'{target_obj}_color_{img_index}.npy'), color)
    
    pc_dict[target_obj] = []

    obj_K_list = [K_list[i] for i in now_seg.obj_img_dict[target_obj]]

    #create_world_pc
    for now_cloud, now_K in zip(new_clouds, obj_K_list):
        k_inv = np.linalg.inv(now_K)
        now_cloud =k_inv[0:3,0:3] @ now_cloud.T 
        now_cloud = now_cloud.T + k_inv[0:3,3]
        pc_dict[target_obj].append(now_cloud)

    obj_pc = merge_clouds(new_clouds, obj_K_list)
    #remove ground
    scene_pc, scene_color, obj_pc, obj_color = remove_ground(scene_pc, scene_color, obj_pc, obj_color)
    #down sample
    obj_pc, obj_color = down_sample(obj_pc, obj_color, down_sample_size)
    
    #obj_pcd
    obj_pcd = to_o3d_pcd(obj_pc, obj_color)
    obj_pc_dict[target_obj] = obj_pcd
    
#down sample
scene_pc, scene_color = down_sample(scene_pc, scene_color, down_sample_size)
scene_pcd = to_o3d_pcd(scene_pc, scene_color)

In [None]:
from util.pc_data_utils import show_pc
o3d.visualization.draw_geometries([obj_pc_dict["chair"]])

In [None]:
#down sample point
from util.pc_data_utils import remove_obj_from_scene,write_pc

for now_obj in obj_name_list:
    obj_pcd = obj_pc_dict[now_obj]
    scene_pcd = remove_obj_from_scene(scene_pcd, obj_pcd)


#now you get two pointclouds, scene_pc and obj_pc, scene_color and obj_color
#save the pointclouds

save_path = os.path.join(data_dir, 'scene.ply')
print(save_path)
o3d.io.write_point_cloud(save_path, scene_pcd)

for now_obj in obj_name_list:
    save_path = os.path.join(data_dir, f'{now_obj}.ply')
    o3d.io.write_point_cloud(save_path, obj_pc_dict[now_obj])
del now_seg
torch.cuda.empty_cache()


In [None]:
#decide robot number
from util.prompt_interface import prompt_interface
import json

# obj_list = ["chair"]
now_obj_name = "chair" #object need to manipulate
max_robot_num = 2

now_instruction = "Move the chair near the table"
api_type = "qwen"
#qwen-vl-plus,qwen-vl-max-2025-04-08
vlm_model = "qwen-vl-max-2025-04-08" #we provide an example using QWEN to achieve this task, because you can try QWEN for free
api_file_path = os.path.join(current_path, f'../prompts/api_{api_type}.json')
with open(api_file_path, "r") as f:
    config = json.load(f)

api_key = config["api_key"]
api_base = config["api_base"]

prompt_inter = prompt_interface(api_key,api_base)

prompt_path = os.path.join(current_path, '../prompts/choose_robot_num_prompts.txt')
with open(prompt_path, "r") as f:
    prompt = f.read()

applied_robot_num = prompt_inter.choose_robot_num(color_img_list[obj_img_dict[now_obj_name][0]], prompt, now_instruction, max_robot_num=max_robot_num, model=vlm_model)

robot_list = [f"robot{i+1}" for i in range(applied_robot_num)]

if applied_robot_num != 2:
    print("The output of VLM is not 2, it will lead to some error in the following process.")
    print("Your can directly change applied_robot_num to 2, and generate robot_list again.")
    sys.exit(1)

In [None]:
#If failed, uncomment and run this cell
applied_robot_num = 2

robot_list = [f"robot{i+1}" for i in range(applied_robot_num)]

In [None]:
from util.draw_axis import draw_axis_on_img

img_with_axis = draw_axis_on_img(copy.deepcopy(color_img_list[obj_img_dict[now_obj_name][0]]), K_list[obj_img_dict[now_obj_name][0]], intrinsic)

prompt_path = os.path.join(current_path, '../prompts/generate_constraint_prompts.txt')
with open(prompt_path, "r") as f:
    prompt = f.read()

response, obtained_constraint = prompt_inter.obtain_constraint(img_with_axis, prompt, now_instruction, max_robot_num=max_robot_num, model=vlm_model)

if "keep_z_axis" in obtained_constraint:
    keep_z_axis_flag = True
else:
    keep_z_axis_flag = False
print(keep_z_axis_flag)

In [None]:

keep_z_axis_flag = True

In [None]:
from util.funcs import process_pcd
import numpy as np

#motion planning

obj_center = np.mean(obj_pc, axis=0)


obj_pose = np.eye(4)
scene,obj_list,T_world_2_obj_list = process_pcd(scene_pcd,[obj_pc_dict[now_obj_name]],[obj_pose])

now_obj_pcd = obj_list[0]
T_world_2_obj = T_world_2_obj_list[0]

now_obj_points = np.array(now_obj_pcd.points)

In [None]:
from motion_planning.motion_planning import motion_planner_agv,xyzrpy_to_T
from grasping.create_object_grasp_pose import large_obj_grasping



#SETP 1: generate a feasible pose for the grasp pose
print("grasp pose generation")

this_obj_img_list = [color_img_list[index] for index in obj_img_dict[now_obj_name]]
this_obj_pc_list = pc_dict[now_obj_name]
this_obj_camera_pose_list = [camera_pose_list[index] for index in obj_img_dict[now_obj_name]]
gripper_model_path = os.path.join(current_path, 'grasping')

large_obj_GP_generator = large_obj_grasping(this_obj_img_list,this_obj_pc_list,this_obj_camera_pose_list,
                                            n_robot=applied_robot_num,VLM_model=vlm_model,gripper_CAD_path=gripper_model_path)


img_index = 0 #max value len(this_obj_pc_list)


In [None]:
from util.funcs import generate_ground_mesh
ground_z = np.min(scene_pc[:,2])
collision_points_obj = large_obj_GP_generator.merged_pc
ground_mesh = generate_ground_mesh(collision_points_obj, ground_z)

collision_points = np.vstack((collision_points_obj, ground_mesh))
print(collision_points_obj.shape,ground_mesh.shape,collision_points.shape)

In [None]:
grasp_poses,feasible_flag = large_obj_GP_generator.genera_feasible_grasp_poses(img_index,collision_points=collision_points,vis_grasp_flag=False)

In [None]:
print("choose grasp poses")
feasible_grasp_poses = [grasp_poses[i] for i in range(len(grasp_poses)) if feasible_flag[i]]
choose_index_list = large_obj_GP_generator.choose_grasp_pose(img_index,feasible_grasp_poses,now_instruction) #using VLM to choose
print(choose_index_list)

In [None]:
choose_grasp_poses = [feasible_grasp_poses[i] for i in choose_index_list]
grasp_pose = large_obj_GP_generator.collaborative_ik_solver(choose_grasp_poses,collision_points=collision_points,max_potential_q=40)

grasp_pose = np.array(grasp_pose)

In [None]:
#STEP 2: motion planning from robot current pose to grasp pose
agv_pose_list = np.array([[ 1.00407003, -2.00377211,  0.00944374,  0,  0 ,0,   0,   0, 0 ], [ 0.00714296 , 1.00136292 ,-0.0084044 ,  0,  0, 0, 0 ,0, 0]])
planner = motion_planner_agv(agv_pose_list)
planner.assign_scene_obj(scene,now_obj_pcd)
#update the agv pose
for i in range(len(agv_pose_list)):
    planner.agv_list[i].move_agv(agv_pose_list[i])
# planner.dis_tolerance = 0.01
success, robot_pose1 = planner.motion_planning_RRT(grasp_pose,T_world_2_obj,total_points_num=500) #motion planning will influence the robot_pose

test_T = xyzrpy_to_T([T_world_2_obj[0,3],T_world_2_obj[1,3],T_world_2_obj[2,3],0,0,0])
planner.add_grasp(test_T) 


In [None]:
if  success:
    print(f"success in grasping process with {len(robot_pose1)} steps")
    obj_pose_list = [T_world_2_obj for i in range(len(robot_pose1))]
    planner.animation(robot_pose1,dt=0.005,scene_pcd=scene,object_list=[now_obj_pcd],obj_pose_list=obj_pose_list, stop_time=1)
else:
    print("failed in grasping process")
    planner.animation(robot_pose1,scene_pcd=scene,object_list=[now_obj_pcd], stop_time=5)

In [None]:
#STEP 3: planning for the object

start = np.array([T_world_2_obj[0,3],T_world_2_obj[1,3],T_world_2_obj[2,3],0,0,0]) #x,y,z,r,p,y
goal = np.array([T_world_2_obj[0,3]+1,T_world_2_obj[1,3]-3,T_world_2_obj[2,3],0,0,-1.57]) #move to origin

if keep_z_axis_flag:
    simplifed_start = np.array([start[0], start[1], start[2], start[5]]) #keep z axis
    simplifed_goal = np.array([goal[0], goal[1], goal[2], goal[5]])
    success, path = planner.obj_planning_keepz(simplifed_start,simplifed_goal,n_point=500) #we only consider the situation of keep z axis, which is most common in the real world
    new_path = []
    for now_point in path:
        new_point = np.array([now_point[0], now_point[1], now_point[2], start[3], start[4], now_point[3]])
        new_path.append(new_point)
    path = new_path
else:
    success, path = planner.obj_planning(start,goal,n_point=500)
# path = [goal]
obj_path_T = [xyzrpy_to_T(p,xyz_first=True) for p in path] #in world frame
if success:
    print(f"find object path with {len(path)} points")

#STEP 4: motion planning for each robot
#set the robots' pose to grasp pose
for i in range(len(robot_list)):
    now_grasp_pose = grasp_pose[i]
    planner.agv_list[i].FK(now_grasp_pose)
success, robot_pose2 = planner.motion_planning(obj_path_T)


In [None]:
if  success:
    print("success in planning for each robot")
    planner.animation(robot_pose2,dt = 0.005,scene_pcd=scene,object_list=[now_obj_pcd],obj_pose_list=obj_path_T,stop_time=2)
else:
    print("------------failed in planning for each robot------------")
    planner.animation(robot_pose2,scene_pcd=scene,object_list=[now_obj_pcd],obj_pose_list=obj_path_T,stop_time=2)