In [1]:
from typing import Tuple

import numpy as np
from scipy.spatial.transform import Rotation as R
import pandas as pd

import mujoco
from mujoco import viewer

from dm_control import composer, mjcf
from dm_robotics.moma.models.arenas import empty
from dm_robotics.moma import robot

from rearrangement_benchmark.env_components.props import Rectangle

from ikpy.chain import Chain

from hydra import compose, initialize
from hydra.utils import instantiate


# load different robot configurations
initialize(version_base=None, config_path="./config", job_name="default_config")
POSITION_CONFIG = compose(config_name="controller_tuning", overrides=["robots=default"])
VELOCITY_CONFIG = compose(config_name="controller_tuning", overrides=["robots=velocity"])
MOTOR_CONFIG = compose(config_name="controller_tuning", overrides=["robots=motor"])
IKPY_URDF_PATH = "./models/arms/robot.urdf"

# For now assign default cfg
#cfg = POSITION_CONFIG
cfg = MOTOR_CONFIG
kinematic_chain = Chain.from_urdf_file(IKPY_URDF_PATH, base_elements=["panda_link0"]) 

# save default configs 
ready_config = np.array([0, -0.785, 0, -2.356, 0, 1.571, 0.785])
grasp_pose_config = np.array([-3.95380744e-04,  2.37985323e-01,  3.52180384e-04, -2.55912981e+00,
 -2.42755642e-04,  2.79711454e+00,  7.85573570e-01])



# Building Controllers on Torque Actuator

Example of running a controller that interacts with the motor actuator model.

## Implement Franka Gripper

In [None]:
from dm_robotics.moma.models.end_effectors.robot_hands import robot_hand

class PandaHand(robot_hand.RobotHand):
    

## Load Scene

In [2]:
def build_arena(name: str) -> composer.Arena:
    """Build a MuJoCo arena."""
    arena = empty.Arena(name=name)
    arena.mjcf_model.option.timestep = 0.01
    arena.mjcf_model.option.gravity = (0.0, 0.0, -9.8)
    arena.mjcf_model.option.timestep = 0.0001
    arena.mjcf_model.option.noslip_iterations = 3
    arena.mjcf_model.size.nconmax = 1000
    arena.mjcf_model.size.njmax = 2000
    arena.mjcf_model.visual.__getattr__("global").offheight = 640
    arena.mjcf_model.visual.__getattr__("global").offwidth = 640
    arena.mjcf_model.visual.map.znear = 0.0005
    return arena


def add_robot_and_gripper(arena: composer.Arena, arm, gripper) -> Tuple[composer.Entity, composer.Entity]:
    """Add a robot and gripper to the arena."""
    # attach the gripper to the robot
    robot.standard_compose(arm=arm, gripper=gripper)

    # define robot base site
    robot_base_site = arena.mjcf_model.worldbody.add(
        "site",
        name="robot_base",
        pos=(0.0, 0.0, 0.0),
    )

    # add the robot and gripper to the arena
    arena.attach(arm, robot_base_site)

    return arm, gripper

# build the base arena
arena = build_arena("base_scene")

# add robot arm and gripper to the arena
arm = instantiate(cfg.robots.arm)
gripper = instantiate(cfg.robots.gripper)
arm, gripper = add_robot_and_gripper(arena, arm, gripper)

# add a block 
rectangle = Rectangle(name="cube",
                      x_len=0.02,
                      y_len=0.02,
                      z_len=0.02,
                      rgba=(1.0, 0.0, 0.0, 1.0),
                      mass=0.1)
frame = arena.add_free_entity(rectangle)
rectangle.set_freejoint(frame.freejoint)


physics = mjcf.Physics.from_mjcf_model(arena.mjcf_model)

# set the default arm joint positions to ready
physics.data.qpos[:7] = ready_config
rectangle.set_pose(physics, position=np.array([0.45,0.0,0.02]), quaternion=np.array([0.0, 0.0, 0.0, 1.0]))


# add a cube to the scene


# launch passive viewer
passive_view = viewer.launch_passive(physics.model._model, physics.data._data)

#viewer.launch(physics.model._model, physics.data._data)

Adding actuator for joint: joint1
Adding actuator for joint: joint2
Adding actuator for joint: joint3
Adding actuator for joint: joint4
Adding actuator for joint: joint5
Adding actuator for joint: joint6
Adding actuator for joint: joint7
Adding sensor for joint: pos_joint1
Adding sensor for joint: pos_joint2
Adding sensor for joint: pos_joint3
Adding sensor for joint: pos_joint4
Adding sensor for joint: pos_joint5
Adding sensor for joint: pos_joint6
Adding sensor for joint: pos_joint7


## Test MuJoCo Menagerie Default Actuator

In [None]:
physics.reset()
physics.data.qpos[:7] = np.array([0, -0.785, 0, -2.356, 0, 1.571, 0.785])
rectangle.set_pose(physics, position=np.array([0.45,0.0,0.02]), quaternion=np.array([0.0, 0.0, 0.0, 1.0]))

# come up with targets
target_eef_pose = kinematic_chain.forward_kinematics(
                joints = np.array([0.0, -2.0, -0.785, 0.0, -2.356, 0.0, 1.571, 0.785, 0.0]),
                full_kinematics = False,
                )[:3, 3]
target_orientation = R.from_euler('xyz', [0, 180, 0], degrees=True).as_matrix()

# convert to joint target
joint_target = kinematic_chain.inverse_kinematics(
            target_position = target_eef_pose,
            target_orientation = target_orientation,
            orientation_mode = "all",
            initial_position = np.array([0.0, 0.0, -0.785, 0.0, -2.356, 0.0, 1.571, 0.785, 0.0]),
            )[1:-1]

duration = 5.0

joint_pos_data = pd.DataFrame(columns=["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"])
joint_vel_data = pd.DataFrame(columns=["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"])

while physics.data.time < duration:
    current_joint_position = physics.data.qpos[:7]
    current_joint_velocity = physics.data.qvel[:7]
    joint_pos_data = pd.concat([joint_pos_data, pd.DataFrame([current_joint_position], columns=joint_pos_data.columns)], ignore_index=True) 
    joint_vel_data = pd.concat([joint_vel_data, pd.DataFrame([current_joint_velocity], columns=joint_vel_data.columns)], ignore_index=True)

    # apply torque and update physics
    target = np.concatenate([joint_target, np.zeros(1)])
    physics.set_control(target)
    physics.step()
    passive_view.sync()


# plot control curves
# plot joint angles and velocities
import plotly.graph_objects as go
from plotly.subplots import make_subplots

fig_pos = make_subplots(rows=3, cols=3, vertical_spacing=0.02)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_1"], name="joint_1"), row=1, col=1)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_2"], name="joint_2"), row=1, col=2)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_3"], name="joint_3"), row=1, col=3)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_4"], name="joint_4"), row=2, col=1)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_5"], name="joint_5"), row=2, col=2)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_6"], name="joint_6"), row=2, col=3)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_7"], name="joint_7"), row=3, col=1)
fig_pos.update_layout(
        height=600, 
        width=800, 
        title_text="Joint Angles",
        yaxis_title="Joint Angle (rad)",
        )

# add constant reference line
fig_pos.add_hline(y=joint_target[0], line_dash="dash", row=1, col=1)
fig_pos.add_hline(y=joint_target[1], line_dash="dash", row=1, col=2)
fig_pos.add_hline(y=joint_target[2], line_dash="dash", row=1, col=3)
fig_pos.add_hline(y=joint_target[3], line_dash="dash", row=2, col=1)
fig_pos.add_hline(y=joint_target[4], line_dash="dash", row=2, col=2)
fig_pos.add_hline(y=joint_target[5], line_dash="dash", row=2, col=3)
fig_pos.add_hline(y=joint_target[6], line_dash="dash", row=3, col=1)


fig_vel = make_subplots(rows=3, cols=3, vertical_spacing=0.02)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_1"], name="joint_1"), row=1, col=1)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_2"], name="joint_2"), row=1, col=2)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_3"], name="joint_3"), row=1, col=3)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_4"], name="joint_4"), row=2, col=1)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_5"], name="joint_5"), row=2, col=2)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_6"], name="joint_6"), row=2, col=3)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_7"], name="joint_7"), row=3, col=1)
fig_vel.update_layout(
        height=600, 
        width=800, 
        title_text="Joint Velocities",
        yaxis_title="Joint Velocity (rad/s)",
        )

fig_pos.show()
fig_vel.show()

## Implement and Test Arm Controller

In [8]:
physics.reset()
physics.data.qpos[:7] = np.array([0, -0.785, 0, -2.356, 0, 1.571, 0.785])
rectangle.set_pose(physics, position=np.array([0.45,0.0,0.02]), quaternion=np.array([0.0, 0.0, 0.0, 1.0]))

# come up with targets
target_eef_pose = kinematic_chain.forward_kinematics(
                joints = np.array([0.0, -2.0, -0.785, 0.0, -2.356, 0.0, 1.571, 0.785, 0.0]),
                full_kinematics = False,
                )[:3, 3]
target_orientation = R.from_euler('xyz', [0, 180, 0], degrees=True).as_matrix()

# convert to joint target
joint_target = kinematic_chain.inverse_kinematics(
            target_position = target_eef_pose,
            target_orientation = target_orientation,
            orientation_mode = "all",
            initial_position = np.array([0.0, 0.0, -0.785, 0.0, -2.356, 0.0, 1.571, 0.785, 0.0]),
            )[1:-1]

# PD control with inverse dynamics
kp = 1000
kd = 900
duration = 5.0

joint_pos_data = pd.DataFrame(columns=["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"])
joint_vel_data = pd.DataFrame(columns=["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"])

while physics.data.time < duration:
    current_joint_position = physics.data.qpos[:7]
    current_joint_velocity = physics.data.qvel[:7]
    joint_pos_data = pd.concat([joint_pos_data, pd.DataFrame([current_joint_position], columns=joint_pos_data.columns)], ignore_index=True) 
    joint_vel_data = pd.concat([joint_vel_data, pd.DataFrame([current_joint_velocity], columns=joint_vel_data.columns)], ignore_index=True)
    
    # calculate acceleration
    target_acc = kp * (joint_target - current_joint_position) - kd * current_joint_velocity
    prev = current_joint_velocity.copy()

    # calculate torque
    physics.data.qacc[:7] = target_acc
    mujoco.mj_inverse(physics.model._model, physics.data._data)
    sol = physics.data.qfrc_inverse[:7].copy()
    physics.data.qacc[:7] = prev

    # apply torque and update physics
    sol = np.concatenate([sol, [255.0]])
    physics.set_control(sol)
    physics.step()
    passive_view.sync()


# plot control curves
# plot joint angles and velocities
import plotly.graph_objects as go
from plotly.subplots import make_subplots

fig_pos = make_subplots(rows=3, cols=3, vertical_spacing=0.02)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_1"], name="joint_1"), row=1, col=1)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_2"], name="joint_2"), row=1, col=2)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_3"], name="joint_3"), row=1, col=3)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_4"], name="joint_4"), row=2, col=1)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_5"], name="joint_5"), row=2, col=2)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_6"], name="joint_6"), row=2, col=3)
fig_pos.add_trace(go.Scatter(y=joint_pos_data["joint_7"], name="joint_7"), row=3, col=1)
fig_pos.update_layout(
        height=600, 
        width=800, 
        title_text="Joint Angles",
        yaxis_title="Joint Angle (rad)",
        )

# add constant reference line
fig_pos.add_hline(y=joint_target[0], line_dash="dash", row=1, col=1)
fig_pos.add_hline(y=joint_target[1], line_dash="dash", row=1, col=2)
fig_pos.add_hline(y=joint_target[2], line_dash="dash", row=1, col=3)
fig_pos.add_hline(y=joint_target[3], line_dash="dash", row=2, col=1)
fig_pos.add_hline(y=joint_target[4], line_dash="dash", row=2, col=2)
fig_pos.add_hline(y=joint_target[5], line_dash="dash", row=2, col=3)
fig_pos.add_hline(y=joint_target[6], line_dash="dash", row=3, col=1)


fig_vel = make_subplots(rows=3, cols=3, vertical_spacing=0.02)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_1"], name="joint_1"), row=1, col=1)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_2"], name="joint_2"), row=1, col=2)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_3"], name="joint_3"), row=1, col=3)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_4"], name="joint_4"), row=2, col=1)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_5"], name="joint_5"), row=2, col=2)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_6"], name="joint_6"), row=2, col=3)
fig_vel.add_trace(go.Scatter(y=joint_vel_data["joint_7"], name="joint_7"), row=3, col=1)
fig_vel.update_layout(
        height=600, 
        width=800, 
        title_text="Joint Velocities",
        yaxis_title="Joint Velocity (rad/s)",
        )

fig_pos.show()
fig_vel.show()

  joint_pos_data = pd.concat([joint_pos_data, pd.DataFrame([current_joint_position], columns=joint_pos_data.columns)], ignore_index=True)
  joint_vel_data = pd.concat([joint_vel_data, pd.DataFrame([current_joint_velocity], columns=joint_vel_data.columns)], ignore_index=True)


KeyboardInterrupt: 

Advice provided to switch to mujoco implemented PD controller using actuator models as this will be more accurate, attempt this implementation below.

## Test Grasping with Default Controller

In [4]:
physics.reset()
physics.data.qpos[:7] = grasp_pose_config
rectangle.set_pose(physics, position=np.array([0.45,0.0,0.02]), quaternion=np.array([0.0, 0.0, 0.0, 1.0]))

joint_target = grasp_pose_config

# PD control with inverse dynamics
kp = 20000
kd = 1000
duration = 5.0
grasp = False

while physics.data.time < duration:
    command = np.concatenate([joint_target, [255.0]])
    physics.set_control(command)
    physics.step()
    passive_view.sync()

KeyboardInterrupt: 

## Test Grasping with Custom Controller

In [17]:
physics.reset()
physics.data.qpos[:7] = grasp_pose_config
rectangle.set_pose(physics, position=np.array([0.45,0.0,0.02]), quaternion=np.array([0.0, 0.0, 0.0, 1.0]))

joint_target = grasp_pose_config

# PD control with inverse dynamics
kp = 1000
kd = 900
duration = 5.0
grasp = False

start_time = physics.data.time
while physics.data.time - start_time < duration:
    current_joint_position = physics.data.qpos[:7]
    current_joint_velocity = physics.data.qvel[:7]

    # calculate acceleration
    target_acc = kp * (joint_target - current_joint_position) + kd * (np.zeros(7) - current_joint_velocity)
    prev = current_joint_velocity.copy()

    # calculate torque
    physics.data.qacc[:7] = target_acc
    mujoco.mj_inverse(physics.model._model, physics.data._data)
    sol = physics.data.qfrc_inverse[:7].copy()
    physics.data.qacc[:7] = prev

    # apply torque and update physics
    sol = np.concatenate([sol, [255.0]])
    physics.set_control(sol)
    physics.step()
    passive_view.sync()

KeyboardInterrupt: 

# Automate Tuning of Actuator Params

## PD Controller

In [None]:
from cmaes import CMA
from dm_robotics.transformations import transformations as tr
from dm_robotics.transformations.transformations import mat_to_quat, quat_to_mat, quat_to_euler

def check_target_reached(target_pose, target_quat, joint_vals, position_threshold=0.01, orientation_threshold=0.5):
    # calculate end effector pose
    joint_vals = np.concatenate([np.zeros(1), joint_vals, np.zeros(1)]) # add zeros for dummy joints
    ee_pose = kinematic_chain.forward_kinematics(
        joints = joint_vals,
        full_kinematics = False,
        )
    ee_pos = ee_pose[:3, 3]
    ee_quat = mat_to_quat(ee_pose[:3, :3])

    # calculate distance to target
    linear_dist = np.linalg.norm(ee_pos - target_pose)
    angular_dist = tr.quat_dist(target_quat/np.linalg.norm(target_quat), ee_quat/np.linalg.norm(ee_quat))
    
    # check if target reached
    if (linear_dist > position_threshold) or (angular_dist > orientation_threshold):
        return False
    else:
        return True

def controller_cost(kp, kd):
    # reset simulator
    physics.reset()
    physics.data.qpos[:7] = ready_config
    rectangle.set_pose(physics, position=np.array([0.45,0.0,0.02]), quaternion=np.array([0.0, 0.0, 0.0, 1.0]))
    
    # come up with targets
    target_pose = kinematic_chain.forward_kinematics(
                    joints = np.array([0.0, -2.0, -0.785, 0.0, -2.356, 0.0, 1.571, 0.785, 0.0]),
                    full_kinematics = False,
                    )[:3, 3]
    target_orientation = R.from_euler('xyz', [0, 180, 0], degrees=True).as_matrix()
    target_quat = mat_to_quat(target_orientation)
    
    
    # convert to joint target
    joint_target = kinematic_chain.inverse_kinematics(
                target_position = target_pose,
                target_orientation = target_orientation,
                orientation_mode = "all",
                initial_position = np.array([0.0, 0.0, -0.785, 0.0, -2.356, 0.0, 1.571, 0.785, 0.0]),
                )[1:-1]
    
    max_duration = 3.0
    total_time = None
    joint_velocity_penalty = 0.0
    start_time = physics.data.time
    while physics.data.time - start_time < max_duration:
        current_joint_position = physics.data.qpos[:7]
        current_joint_velocity = physics.data.qvel[:7]

        # penalise joint velocity violation
        if np.any(current_joint_velocity > 2.5):
            joint_velocity_penalty += 10.0

        if check_target_reached(target_pose, target_quat, current_joint_position):
            total_time = physics.data.time - start_time
            break
        
        # calculate acceleration
        target_acc = kp * (joint_target - current_joint_position) + kd * (np.zeros(7) - current_joint_velocity)
        prev = current_joint_velocity.copy()
    
        # calculate torque
        physics.data.qacc[:7] = target_acc
        mujoco.mj_inverse(physics.model._model, physics.data._data)
        sol = physics.data.qfrc_inverse[:7].copy()
        physics.data.qacc[:7] = prev
    
        # apply torque and update physics
        sol = np.concatenate([sol, [255.0]])
        physics.set_control(sol)
        physics.step()

    # penalise not converging to solution
    if total_time is None:
        total_time = max_duration * 10
        

    return total_time + joint_velocity_penalty
    
    

optimizer = CMA(mean=np.ones(2)* 1000, sigma=350)

for generation in range(100):
    solutions = []
    for _ in range(optimizer.population_size):
        x = optimizer.ask()
        value = controller_cost(x[0], x[1])
        solutions.append((x, value))
        print(f"#{generation} {value} (x1={x[0]}, x2 = {x[1]})")
    optimizer.tell(solutions)

#0 30.0 (x1=1170.9191646625718, x2 = 1606.9121341358355)
#0 2.2749000000003763 (x1=1049.528712286365, x2 = 582.8469238047114)
#0 1.9051999999998066 (x1=1630.266873861925, x2 = 756.53983914521)
#0 2.9678000000018385 (x1=1031.9777376505908, x2 = 748.1734698946452)
#0 30.0 (x1=951.6940393430083, x2 = 1247.0856691319098)
#0 30.0 (x1=590.6249018994026, x2 = 1175.32409243337)
#1 30.0 (x1=1266.0217332426357, x2 = 1034.6869287566963)
#1 1.573799999999843 (x1=1574.6134235845473, x2 = 602.6648583064577)
#1 2.0774999999999597 (x1=1721.786971741451, x2 = 871.721435640786)
#1 3571.3073 (x1=2184.2622670439755, x2 = 691.6317656590475)
#1 11880.8422 (x1=1671.6155787794892, x2 = 337.89928742345717)
#1 15090.3993 (x1=2164.3859064183366, x2 = 195.16373088268938)
#2 2.2034000000002254 (x1=1782.608744433994, x2 = 957.4827839949135)
#2 30.0 (x1=1206.1546981794195, x2 = 969.3805622596468)
#2 1.5327999999998476 (x1=1369.0685982197404, x2 = 510.52781957488844)
#2 2.1015000000000104 (x1=1439.0413025471557, x2 =

## Position Actuator

## Velocity Actuator