In [1]:
import sys
import os
import time
import threading
import utilities
import numpy as np
np.set_printoptions(suppress=True)

import collections 
if sys.version_info.major == 3 and sys.version_info.minor >= 10:
    from collections.abc import MutableMapping
else:
    from collections import MutableMapping

from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient
from kortex_api.autogen.client_stubs.BaseCyclicClientRpc import BaseCyclicClient
from kortex_api.autogen.client_stubs.ControlConfigClientRpc import ControlConfigClient

from kortex_api.autogen.messages import Base_pb2, BaseCyclic_pb2, Common_pb2

# Maximum allowed waiting time during actions (in seconds)
TIMEOUT_DURATION = 20

In [3]:
class TCPArguments:
    def __init__(self):
        self.ip = "192.168.1.10"
        self.username = "admin"
        self.password = "admin"


def check_for_end_or_abort(e):
    """Return a closure checking for END or ABORT notifications

    Arguments:
    e -- event to signal when the action is completed
        (will be set when an END or ABORT occurs)
    """
    def check(notification, e = e):
        print("EVENT : " + \
              Base_pb2.ActionEvent.Name(notification.action_event))
        if notification.action_event == Base_pb2.ACTION_END \
        or notification.action_event == Base_pb2.ACTION_ABORT:
            e.set()
    return check


def move_to_home_position(base):
    TIMEOUT_DURATION = 10  # in seconds
    # Make sure the arm is in Single Level Servoing mode (high-level mode)
    base_servo_mode = Base_pb2.ServoingModeInformation()
    base_servo_mode.servoing_mode = Base_pb2.SINGLE_LEVEL_SERVOING
    base.SetServoingMode(base_servo_mode)
    
    # Move arm to ready position
    print("Moving the arm to home position")
    action_type = Base_pb2.RequestedActionType()
    action_type.action_type = Base_pb2.REACH_JOINT_ANGLES
    action_list = base.ReadAllActions(action_type)
    action_handle = None
    for action in action_list.action_list:
        if action.name == "Home":
            action_handle = action.handle

    if action_handle == None:
        print("Can't reach safe position. Exiting")
        return False

    e = threading.Event()
    notification_handle = base.OnNotificationActionTopic(
        check_for_end_or_abort(e),
        Base_pb2.NotificationOptions()
    )

    base.ExecuteActionFromReference(action_handle)
    finished = e.wait(TIMEOUT_DURATION)
    base.Unsubscribe(notification_handle)

    if finished:
        print("home position reached")
    else:
        print("Timeout on action notification wait")
    return finished


def move_tool_pose_relative(base, base_cyclic, pose_kinova):
    # move the end effector to a relative position of the current pose
    # pose_kinova is [x,y,z,theta_x,theta_y,theta_z] in meters and degrees
    print("Starting tool_pose relative movement ...")
    action = Base_pb2.Action()
    action.name = "Tool Pose Relative Movement"
    action.application_data = ""
    TIMEOUT_DURATION = 10  # in seconds

    feedback = base_cyclic.RefreshFeedback()

    cartesian_pose = action.reach_pose.target_pose
    cartesian_pose.x = feedback.base.tool_pose_x + pose_kinova[0]    # (meters)
    cartesian_pose.y = feedback.base.tool_pose_y + pose_kinova[1]    # (meters)
    cartesian_pose.z = feedback.base.tool_pose_z + pose_kinova[2]    # (meters)
    cartesian_pose.theta_x = feedback.base.tool_pose_theta_x + pose_kinova[3] # (degrees)
    cartesian_pose.theta_y = feedback.base.tool_pose_theta_y + pose_kinova[4] # (degrees)
    cartesian_pose.theta_z = feedback.base.tool_pose_theta_z + pose_kinova[5] # (degrees)

    e = threading.Event()
    notification_handle = base.OnNotificationActionTopic(
        check_for_end_or_abort(e),
        Base_pb2.NotificationOptions()
    )

    print("Executing action")
    base.ExecuteAction(action)

    print("Waiting for movement to finish ...")
    finished = e.wait(TIMEOUT_DURATION)
    base.Unsubscribe(notification_handle)

    if finished:
        print("tool_pose relative movement completed")
    else:
        print("Timeout on action notification wait")
    return finished


def move_end_effector(base, desired_pose):
    ### function for Kinova Gen3 7 dof on Kortex API
    ### Fundation for vel control code, can correct both pos and ang error at the same time
    ### Author: Chuizheng Kong
    ### Created on: 05/09/2024
    
    max_vel = 0.5-0.1  # m/s  0.5 is max
    #max_vel = 0.1
    max_w = 70.0  # ~ 50 deg/s
    kp_pos = 2.5
    kp_ang = 4.0

    dcc_range = max_vel / (kp_pos * 2)  # dcc_range should be smaller than max_vel/kp_pos
    ang_dcc_range = max_w / (kp_ang * 6)
    
    # Make sure the arm is in Single Level Servoing mode (high-level mode)
    base_servo_mode = Base_pb2.ServoingModeInformation()
    base_servo_mode.servoing_mode = Base_pb2.SINGLE_LEVEL_SERVOING
    base.SetServoingMode(base_servo_mode)

    # init_pose = base.GetMeasuredCartesianPose()
    # get the desired rotation matrix
    R_d = getRotMtx(desired_pose)
    
    command = Base_pb2.TwistCommand()
    # note that twist is naturally in tool frame, but this conversion made things a bit easy
    command.reference_frame = Base_pb2.CARTESIAN_REFERENCE_FRAME_BASE
    command.duration = 0

    twist = command.twist
    twist.linear_x = 0.0  # adjust linear velocity
    twist.linear_y = 0.0
    twist.linear_z = 0.0
    twist.angular_x = 0.0
    twist.angular_y = 0.0
    twist.angular_z = 0.0  # adjust angular velocity

    max_exe_time = 10  # sec
    eps_pos = 0.001  # convergence criterion
    eps_ang = 0.01
    t_start = time.time()
    while True:
        current_pose = base.GetMeasuredCartesianPose()
        R = getRotMtx(current_pose)
        
        # reducing the pos error (in global frame!!)
        pos_diff = np.array([desired_pose.x-current_pose.x,
                             desired_pose.y-current_pose.y,
                             desired_pose.z-current_pose.z])
        
        pos_diff_norm = np.linalg.norm(pos_diff)
        v_temp = max_vel * pos_diff/pos_diff_norm

        # reducing ang error using ER = RRd^T
        ER = R @ R_d.T
        # frobenius norm of matrix squre root of ER or eR2
        k,theta = R2rot(ER)
        k=np.array(k)
        eR2=-np.sin(theta/2)*k * 180 / np.pi  # not the best name but works fine
        
        eR2_norm = np.linalg.norm(eR2)+1e-5
        w_temp = max_w * eR2/eR2_norm

        # print(pos_diff_norm, eR2_norm)

        reached = pos_diff_norm < eps_pos and eR2_norm < eps_ang
        
        if reached or time.time()>t_start+max_exe_time:
            print("Robot Stop")
            if reached:
                print("Goal Pose reached")
            base.Stop()
            break
            
        # go in max vel when outside dcc_range
        if pos_diff_norm < dcc_range:
            v = kp_pos * pos_diff
        else:
            v = v_temp

        if eR2_norm < ang_dcc_range:
            kR = kp_ang*np.eye(3)
            w = kR @ eR2 
        else:
            w = w_temp
            
        twist.linear_x = v[0]
        twist.linear_y = v[1] 
        twist.linear_z = v[2]
        twist.angular_x = w[0]
        twist.angular_y = w[1]
        twist.angular_z = w[2]
        base.SendTwistCommand(command)
        #time.sleep(0.025)  # 40 Hz high-level control loop freq (ok might not need)
    return True

!!!!!!! ROBOT MOVING!!!!!!TO HOME!!!!!

In [4]:
class TCPArguments:
    def __init__(self):
        self.ip = "192.168.1.10"
        self.username = "admin"
        self.password = "admin"

args = TCPArguments()
# Create connection to the device and get the router
with utilities.DeviceConnection.createTcpConnection(args) as router:

    # Create required services
    base = BaseClient(router)
    base_cyclic = BaseCyclicClient(router)

    current_pose = base.GetMeasuredCartesianPose()
    current_angles = base.GetMeasuredJointAngles()
    print("End-effector poses (pos, ori)\n",current_pose)
    # print("Joint angles\n", current_angles)
    
    # Example core
    success = True

    success &= move_to_home_position(base)

    # print the joint angles and end effector poses
    #print(base.ComputeForwardKinematics(current_angles))

    feedback = base_cyclic.RefreshFeedback()
    cartesian_pose = current_pose
    cartesian_pose.x = feedback.base.tool_pose_x    # (meters)
    cartesian_pose.y = feedback.base.tool_pose_y    # (meters)
    cartesian_pose.z = feedback.base.tool_pose_z    # (meters)
    cartesian_pose.theta_x = feedback.base.tool_pose_theta_x  # (degrees)
    cartesian_pose.theta_y = feedback.base.tool_pose_theta_y  # (degrees)
    cartesian_pose.theta_z = feedback.base.tool_pose_theta_z  # (degrees)
    
    print("Cyclic Feedback Pose: \n", cartesian_pose)

    #max_vel = base.GetTwistHardLimitation().linear
    #print("Twist Limit:\n", max_vel)
    

Logging as admin on device 192.168.1.10
End-effector poses (pos, ori)
 x: 0.48667851090431213
y: -0.03494982421398163
z: 0.43705251812934875
theta_x: 89.80750274658203
theta_y: -1.181410789489746
theta_z: 89.72472381591797

Moving the arm to home position
EVENT : ACTION_START
EVENT : ACTION_END
home position reached
Cyclic Feedback Pose: 
 x: 0.4581160843372345
y: 0.0005676016444340348
z: 0.43717265129089355
theta_x: 89.79495239257812
theta_y: -1.1714142560958862
theta_z: 89.72274017333984



!!!!!!! ROBOT MOVING!!!!!!TO +Y!!!!!

In [14]:
with utilities.DeviceConnection.createTcpConnection(args) as router:

    # Create required services
    base = BaseClient(router)
    base_cyclic = BaseCyclicClient(router)
    
    pose = base.GetMeasuredCartesianPose()
    pose.y = pose.y - 0.3  # meters
    pose.theta_x = pose.theta_x - 30.0 # degrees
    
    print("target\n", pose)
    success = True
    success &= move_end_effector(base, pose)

    # print the joint angles and end effector poses
    current_pose = base.GetMeasuredCartesianPose()
    current_angles = base.GetMeasuredJointAngles()
    print("End-effector poses (pos, ori)\n",current_pose)

Logging as admin on device 192.168.1.10
target
 x: 0.45845118165016174
y: -0.2997697078215424
z: 0.43717965483665466
theta_x: 59.792396545410156
theta_y: -1.1725366115570068
theta_z: 89.724365234375

Robot Stop
Goal Pose reached
End-effector poses (pos, ori)
 x: 0.45846468210220337
y: -0.2996788024902344
z: 0.43722471594810486
theta_x: 59.78170394897461
theta_y: -1.1764131784439087
theta_z: 89.73845672607422



Move relatively

In [7]:
with utilities.DeviceConnection.createTcpConnection(args) as router:

    # Create required services
    base = BaseClient(router)
    base_cyclic = BaseCyclicClient(router)
    pose = np.array([-0.01, 0.1,0.01,0,0,0])
    move_tool_pose_relative(base, base_cyclic, pose)

Logging as admin on device 192.168.1.10
Starting tool_pose relative movement ...
Executing action
Waiting for movement to finish ...
EVENT : ACTION_START
EVENT : ACTION_END
tool_pose relative movement completed


In [5]:
tf

geometry_msgs.msg.TransformStamped(header=std_msgs.msg.Header(stamp=builtin_interfaces.msg.Time(sec=0, nanosec=0), frame_id='end_effector'), child_frame_id='endoscope', transform=geometry_msgs.msg.Transform(translation=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0), rotation=geometry_msgs.msg.Quaternion(x=0.0, y=0.0, z=0.0, w=1.0)))