# Set up MoveIt for Planning Robot Poses

In [3]:
import time

# generic ros libraries
import rclpy
from rclpy.logging import get_logger

# moveit python library
from moveit.core.robot_state import RobotState
from moveit.planning import (
    MoveItPy,
    MultiPipelinePlanRequestParameters,
)

from ament_index_python.packages import get_package_share_directory
from moveit_configs_utils import MoveItConfigsBuilder

def plan_and_execute(
    robot,
    planning_component,
    logger,
    single_plan_parameters=None,
    multi_plan_parameters=None,
    sleep_time=0.0,
):
    """Helper function to plan and execute a motion."""
    # plan to goal
    logger.info("Planning trajectory")
    if multi_plan_parameters is not None:
        plan_result = planning_component.plan(
            multi_plan_parameters=multi_plan_parameters
        )
    elif single_plan_parameters is not None:
        plan_result = planning_component.plan(
            single_plan_parameters=single_plan_parameters
        )
    else:
        plan_result = planning_component.plan()

    # execute the plan
    if plan_result:
        logger.info("Executing plan")
        robot_trajectory = plan_result.trajectory
        robot.execute(robot_trajectory, controllers=[])
    else:
        logger.error("Planning failed")

    time.sleep(sleep_time)


# we need to specify our moveit_py config at the top of each notebook we use. 
# this is since we will start spinning a moveit_py node within this notebook.

moveit_config = (
        MoveItConfigsBuilder(robot_name="lite6", package_name="moveit_resources_lite6_moveit_config")
        .robot_description_semantic(file_path="config/lite6.srdf")
        .trajectory_execution(file_path="config/moveit_controllers.yaml")
        .robot_description(file_path=get_package_share_directory("moveit_resources_lite6_description")
                          + "/urdf/lite6.urdf")
        .moveit_cpp(
            file_path=get_package_share_directory("lite6_motion_planning_demos")
            + "/config/moveit_cpp.yaml"
        )
        .to_moveit_configs()
    ).to_dict()

In [4]:
rclpy.init()
logger = get_logger("moveit_py.pose_goal")
    
# instantiate MoveItPy instance and get planning component
lite6 = MoveItPy(node_name="moveit_py", config_dict=moveit_config)
lite6_arm = lite6.get_planning_component("lite6")
logger.info("MoveItPy instance created")

[INFO] [1699029217.740555583] [moveit_cpp_initializer]: Initialize rclcpp
[INFO] [1699029217.741181345] [moveit_cpp_initializer]: Initialize node parameters
[INFO] [1699029217.741198425] [moveit_cpp_initializer]: Initialize node and executor
[INFO] [1699029217.761422174] [moveit_cpp_initializer]: Spin separate thread
[INFO] [1699029217.766137128] [moveit_3546831652.rdf_loader]: Loaded robot model in 0.00211891 seconds
[INFO] [1699029217.766218999] [moveit_robot_model.robot_model]: Loading robot model 'UF_ROBOT'...
[INFO] [1699029217.766233239] [moveit_robot_model.robot_model]: No root/virtual joint specified in SRDF. Assuming fixed joint
[INFO] [1699029217.803400799] [moveit_kinematics_base.kinematics_base]: Joint weights for group 'lite6': 1 1 1 1 1 1
[INFO] [1699029218.197454652] [moveit_3546831652.planning_scene_monitor]: Publishing maintained planning scene on 'monitored_planning_scene'
[INFO] [1699029218.197607822] [moveit_3546831652.moveit_cpp]: Listening to '/joint_states' for j

True

# Calibration Node

In [12]:
from rclpy.node import Node
from sensor_msgs.msg import JointState
from sensor_msgs.msg import Image


class Calibration(Node):
    """An abstract base class for deploying learnt policies."""

    def __init__(self):
        """Initialise the policy."""
        super().__init__("calibration_node")
        self.logger = self.get_logger()

        # subscribe to joint state topic
        self.subscription = self.create_subscription(
            Image,
            '/zed2i/zed_node/rgb/image_rect_color',
            self.dummy_callback,
            10)
        
        # stores the most recent image observation
        self._last_image = None
        
    def dummy_callback(self, sensor_msg):
        self.image = sensor_msg

node = Calibration()

[WARN] [1699030664.120907505] [rcl.logging_rosout]: Publisher already registered for node name: 'calibration_node'. If this is due to multiple nodes with the same name then all logs for the logger named 'calibration_node' will go out over the existing publisher. As soon as any node with that name is destructed it will unregister the publisher, preventing any further logs for that name from being published on the rosout topic.


In [17]:
rclpy.spin_once(node)

In [19]:
from message_filters import Subscriber




# Collect Calibration Data

In [None]:
# camera image

import time

import numpy as np
import pyzed.sl as sl
import cv2
from cv2 import aruco

from scipy.spatial.transform import Rotation

from geometry_msgs.msg import PoseStamped


# setup camera
zed = sl.Camera()
init_params = sl.InitParameters()
zed.open(init_params)

def base2robot():
    lite6_arm.set_start_state_to_current_state()
    robot_state = lite6_arm.get_start_state()
    t_mat = robot_state.get_frame_transform("link6")
    return t_mat[:3, :-1], t_mat[:3, -1]

def eef_pose():
    lite6_arm.set_start_state_to_current_state()
    robot_state = lite6_arm.get_start_state()
    return robot_state.get_pose("link6")

def capture_image():
    if (zed.grab() == sl.ERROR_CODE.SUCCESS) :
        image = sl.Mat()
        zed.retrieve_image(image, sl.VIEW.LEFT) # Get the left image
        print("Image resolution: ", image.get_width(), " x ", image.get_height())
        return image
    else:
        return None


def plan_and_execute(
    robot,
    planning_component,
    single_plan_parameters=None,
    multi_plan_parameters=None,
    sleep_time=0.0,
):
    """Helper function to plan and execute a motion."""
    # plan to goal
    if multi_plan_parameters is not None:
        plan_result = planning_component.plan(
            multi_plan_parameters=multi_plan_parameters
        )
    elif single_plan_parameters is not None:
        plan_result = planning_component.plan(
            single_plan_parameters=single_plan_parameters
        )
    else:
        plan_result = planning_component.plan()

    # execute the plan
    if plan_result:
        robot_trajectory = plan_result.trajectory
        #input("Press Enter to execute trajectory")
        robot.execute(robot_trajectory, controllers=[])
            
    
def move_2_random_pose():
    # set plan start state to current state
    lite6_arm.set_start_state_to_current_state()
    
    # random pose within workspace
    pose_goal = PoseStamped()
    pose_goal.header.frame_id = "link_base"
    pose_goal.pose.position.x = np.random.uniform(0.3, 0.4)
    pose_goal.pose.position.y = np.random.uniform(-0.1, 0.1)
    pose_goal.pose.position.z = np.random.uniform(0.2, 0.4)
    

    # Create a rotation object from Euler angles specifying axes of rotation
    #rot_x = np.random.uniform()
    #rot_y = np.random.uniform()
    #rot_z = np.random.uniform()
    #rot = Rotation.from_euler('xyz', [, , ], degrees=True)
    # Convert to quaternions and print
    #rot_quat = rot.as_quat()

    pose_goal.pose.orientation.x = 0.55
    pose_goal.pose.orientation.y = 0.0
    pose_goal.pose.orientation.z =0.82
    pose_goal.pose.orientation.w =0.115
    
    
    # get current pose
    lite6_arm.set_goal_state(pose_stamped_msg=pose_goal, pose_link="link6")
    
    plan_and_execute(lite6, lite6_arm, sleep_time=3.0)
    
    
def data_capture():    
    # data structures to store samples
    images = []
    base2robot_transforms = [] #TODO: fix
    
    for _ in range(15):
        time.sleep(5.0)
        img = capture_image()
        base2robot_sample = base2robot()
        
        images.append(img)
        base2robot_transforms.append(base2robot_sample)
        
        move_2_random_pose()
        
    return images, base2robot_transforms
        
        
def detect_aruco(images):
    # set up parameters
    ARUCO_DICT = aruco.Dictionary_get(aruco.DICT_5X5_100)
    CHARUCO_BOARD = aruco.CharucoBoard_create(
        squaresX=14,
        squaresY=9,
        squareLength=0.02,
        markerLength=0.015,
        dictionary=ARUCO_DICT,
    )
    detector_params = cv2.aruco.DetectorParameters_create()
    detector_params.cornerRefinementMethod = cv2.aruco.CORNER_REFINE_SUBPIX
    
    for image in images:
        cv_image = cv2.cvtColor(image.numpy(), cv2.COLOR_BGRA2GRAY)
        img_size = cv_image.shape[:2]
        
        corners, ids, rejected = cv2.aruco.detectMarkers(cv_image, ARUCO_DICT, parameters=detector_params)

        corners, ids, _, _ = cv2.aruco.refineDetectedMarkers(
                    cv_image,
                    CHARUCO_BOARD,
                    corners,
                    ids,
                    rejected,
                    parameters=detector_params,
                    # TODO: add camera intrinsic parameters
            )

        num_corners_found, charuco_corners, charuco_ids = aruco.interpolateCornersCharuco(
            markerCorners=corners, markerIds=ids, image=cv_image, board=CHARUCO_BOARD, # TODO: add camera intrinsic parameters
        )

        readings = []
        readings.append((corners, charuco_corners, charuco_ids, img_size))
    
    return readings
        
def target2cam(readings):
    init_corners_all = []  # Corners discovered in all images processed
    init_ids_all = []  # Aruco ids corresponding to corners discovered
    fixed_image_size = readings[0][3]

    # Proccess Readings #
    init_successes = []
    for i in range(len(readings)):
        corners, charuco_corners, charuco_ids, img_size = readings[i]
        assert img_size == fixed_image_size
        init_corners_all.append(charuco_corners)
        init_ids_all.append(charuco_ids)
        init_successes.append(i)

    # First Pass: Find Outliers #
    threshold = self.num_img_threshold
    if len(init_successes) < threshold:
        return None
    # print('Not enough points round 1')
    # print('Num Points: ', len(init_successes))
    # return None

    calibration_error, cameraMatrix, distCoeffs, rvecs, tvecs, stdIntrinsics, stdExtrinsics, perViewErrors = (
        aruco.calibrateCameraCharucoExtended(
            charucoCorners=init_corners_all,
            charucoIds=init_ids_all,
            board=CHARUCO_BOARD,
            imageSize=fixed_image_size,
            flags=calib_flags,
            **self._intrinsics_dict[self._curr_cam_id],
        )
    )

    # Remove Outliers #
    threshold = self.num_img_threshold if train else 5
    final_corners_all = [
        init_corners_all[i] for i in range(len(perViewErrors)) if perViewErrors[i] <= self.inlier_error_threshold
    ]
    final_ids_all = [
        init_ids_all[i] for i in range(len(perViewErrors)) if perViewErrors[i] <= self.inlier_error_threshold
    ]
    final_successes = [
        init_successes[i] for i in range(len(perViewErrors)) if perViewErrors[i] <= self.inlier_error_threshold
    ]
    if len(final_successes) < threshold:
        return None
    # print('Not enough points round 2')
    # print('Num Points: ', len(final_successes))
    # print('Error Mean: ', perViewErrors.mean())
    # print('Error Std: ', perViewErrors.std())
    # return None

    # Second Pass: Calculate Finalized Extrinsics #
    calibration_error, cameraMatrix, distCoeffs, rvecs, tvecs = aruco.calibrateCameraCharuco(
        charucoCorners=final_corners_all,
        charucoIds=final_ids_all,
        board=CHARUCO_BOARD,
        imageSize=fixed_image_size,
        flags=calib_flags,
        **self._intrinsics_dict[self._curr_cam_id],
    )

    # Return Transformation #
    if calibration_error > self.reprojection_error_threshold:
        return None
    # print('Failed Calibration Threshold')
    # print('Calibration Error: ', calibration_error)
    # return None

    rmats = [R.from_rotvec(rvec.flatten()).as_matrix() for rvec in rvecs]
    tvecs = [tvec.flatten() for tvec in tvecs]

    return rmats, tvecs, final_successes
    
# from PIL import Image    
# data = image.numpy()

# # Convert the NumPy array to a Pillow image
# image = Image.fromarray(data)

# # Display the image
# image.show()

#move_random_pose()

images, base2robot_transforms = data_capture()


print(images)

# import numpy as np
# image = np.copy(cv_image)
# image = aruco.drawDetectedMarkers(image=image, corners=corners)
# cv2.imshow("test", image)
# cv2.waitKey(20)

In [None]:
readings = detect_aruco(images)

def target2cam(readings):
    # set up parameters
    ARUCO_DICT = aruco.Dictionary_get(aruco.DICT_5X5_100)
    CHARUCO_BOARD = aruco.CharucoBoard_create(
        squaresX=14,
        squaresY=9,
        squareLength=0.02,
        markerLength=0.015,
        dictionary=ARUCO_DICT,
    )
    detector_params = cv2.aruco.DetectorParameters_create()
    detector_params.cornerRefinementMethod = cv2.aruco.CORNER_REFINE_SUBPIX
    calib_flags = cv2.CALIB_USE_INTRINSIC_GUESS + cv2.CALIB_FIX_PRINCIPAL_POINT + cv2.CALIB_FIX_FOCAL_LENGTH

    
    init_corners_all = []  # Corners discovered in all images processed
    init_ids_all = []  # Aruco ids corresponding to corners discovered
    fixed_image_size = readings[0][3]

    # Proccess Readings #
    init_successes = []
    for i in range(len(readings)):
        corners, charuco_corners, charuco_ids, img_size = readings[i]
        assert img_size == fixed_image_size
        init_corners_all.append(charuco_corners)
        init_ids_all.append(charuco_ids)
        init_successes.append(i)

    # First Pass: Find Outliers #
    threshold = 1
    if len(init_successes) < threshold:
        return None
    # print('Not enough points round 1')
    # print('Num Points: ', len(init_successes))
    # return None

    calibration_error, cameraMatrix, distCoeffs, rvecs, tvecs, stdIntrinsics, stdExtrinsics, perViewErrors = (
        aruco.calibrateCameraCharucoExtended(
            charucoCorners=init_corners_all,
            charucoIds=init_ids_all,
            board=CHARUCO_BOARD,
            imageSize=fixed_image_size,
            flags=calib_flags,
        )
    )

    # Remove Outliers #
    threshold = self.num_img_threshold
    final_corners_all = [
        init_corners_all[i] for i in range(len(perViewErrors)) if perViewErrors[i] <= self.inlier_error_threshold
    ]
    final_ids_all = [
        init_ids_all[i] for i in range(len(perViewErrors)) if perViewErrors[i] <= self.inlier_error_threshold
    ]
    final_successes = [
        init_successes[i] for i in range(len(perViewErrors)) if perViewErrors[i] <= self.inlier_error_threshold
    ]
    if len(final_successes) < threshold:
        return None
    # print('Not enough points round 2')
    # print('Num Points: ', len(final_successes))
    # print('Error Mean: ', perViewErrors.mean())
    # print('Error Std: ', perViewErrors.std())
    # return None

    # Second Pass: Calculate Finalized Extrinsics #
    calibration_error, cameraMatrix, distCoeffs, rvecs, tvecs = aruco.calibrateCameraCharuco(
        charucoCorners=final_corners_all,
        charucoIds=final_ids_all,
        board=CHARUCO_BOARD,
        imageSize=fixed_image_size,
        flags=calib_flags,
    )

    # Return Transformation #
    if calibration_error > self.reprojection_error_threshold:
        return None
    # print('Failed Calibration Threshold')
    # print('Calibration Error: ', calibration_error)
    # return None

    rmats = [R.from_rotvec(rvec.flatten()).as_matrix() for rvec in rvecs]
    tvecs = [tvec.flatten() for tvec in tvecs]

    return rmats, tvecs, final_successes

target2cam(readings)

In [None]:
images[0].numpy().shape[:2]

In [None]:
readings[0][-1]

In [None]:
# base2robot



print(base2robot())

In [None]:
import cv2
from cv2 import aruco
aruco_dict = aruco.Dictionary_get(aruco.DICT_9X14_25)
# arucoParams = cv2.aruco.DetectorParameters_create()
# (corners, ids, rejected) = cv2.aruco.detectMarkers(image, arucoDict,
# 	parameters=arucoParams)

# Run Calibration Optmization

In [None]:
import cv2