In [9]:
from collections import defaultdict
import os
from pathlib import Path
import mujoco_py as mp
import time
import numpy as np
# from psutil import NIC_DUPLEX_UNKNOWN
from simple_pid import PID
from termcolor import colored
import ikpy
from pyquaternion import Quaternion
import matplotlib.pyplot as plt
import copy


  import distutils
  from distutils.sysconfig import customize_compiler


In [46]:
class MJ_Controller(object):
    '''
    class for control of an robotic arm in Mujoco
    '''
    def __init__(self,model=None,simulation=None,viewer=None):
        if model is None:
            self.model =mp.load_model_from_path("/home/jessie/internship/model/braccio_arm.xml")
        else:
            self.model=model
        self.sim=mp.MjSim(self.model) if simulation is None else simulation
        #self.viewer = mp.MjViewer(self.sim) if viewer is None else viewer
        self.create_lists()
        self.groups = defaultdict(list)
        self.groups['All'] = list(range(len(self.sim.data.ctrl)))
        self.create_group("Arm",list(range(5)))
        self.create_group("Gripper",list(range(2)))
        self.actuated_joint_ids = np.array(i[2] for i in self.actuators)
        self.reached_target = False
        self.current_output=np.zeros(len(self.sim.data.ctrl))
        #self.image_counter=0
        # what does this mean?
        #self.ee_chain=ikpy.chain.Chain.from_urdf_file("/home/jessie/internship/model/braccio_arm.xml")
        self.last_movement_steps=0

    def create_group(self,group_name,idx_list):
        '''
        allows the user to create custom objects for controlling groups of joints
        group_name: string defining the desired name of the group
        idx_list: list containing the IDs of the actuators that will belong to this group 
        '''

        try:
            assert len(idx_list)<=len(self.sim.data.ctrl),"too many joints specified"
            assert(
                group_name not in self.groups.keys()
            ),'A group with name {} already exists'.format(group_name)
            assert np.max(idx_list)<=len(self.sim.data.ctrl),"list contains invalid actuator ID(too high)"

        except Exception as e:
            print(e)
            print("could not create a new group")

    def show_model_info(self):
        '''
        display relevant model info for the user, namely bodies, joints, actuators,
        as well as their IDs and ranges
        '''
        print("\nNumber of bodies: {}".format(self.model.nbody))
        for i in range(self.model.nbody):
            print("Body ID:{}, Body Name:{}".format(i,self.model.body_id2name(i)))

        print("\nNumber of joints: {}".format(self.model.njnt))
        for i in range(self.model.njnt):
            print(
                "Joint ID: {}, Joint Name:{}, Limits:{}".format(i,self.model.joint_id2name(i),self.model.jnt_range[i])
            )

        print("\nNumber of Actuators: {}".format(len(self.sim.data.ctrl)))
        for i in range(len(self.sim.data.ctrl)):
            print("Actuator ID:{}, Actuator Name:{}, Controlled Joint:{}, Control Range:{}".format(
                i,
                self.model.actuator_id2name(i),
                self.actuators[i][3],
                self.model.actuator_ctrlrange[i]
                )
            ) 

        # what is this for?
        #print("\nJoints in kinematic chain:{}".format([i.name for i in self.ee_chain.links]))

        #print("\nPID Info:\n")
        # for i in range(len(self.actuators)):
        #     print(
        #         "{}: P: {}, I: {}, D: {}, setpoint: {}, sample_time: {}".format(
        #         self.actuators[i][3],
        #         self.actuators[i][4].tunings[0],
        #         self.actuators[i][4].tunings[1],
        #         self.actuators[i][4].tunings[2],
        #         self.actuators[i][4].setpoint,
        #         self.actuators[i][4].sample_time,
        #         )
        #     )
        
        # print("\n Camera info: \n")
        # for i in range(self.model.ncam):
        #     print(
        #         "Camera ID: {}, Camera Name: {}, Camera FOV (y, degrees): {}, Position: {}, Orientation: {}".format(
        #             i,
        #             self.model.camera_id2name(i),
        #             self.model.cam_fovy[i],
        #             self.model.cam_pos0[i],
        #             self.model.cam_mat0[i],
        #         )
        #     )
        

    def create_lists(self):
        """
        Creates some basic lists and fill them with initial values. This function is called in the class costructor.
        The following lists/dictionaries are created:

        - controller_list: Contains a controller for each of the actuated joints. This is done so that different gains may be
        specified for each controller.

        - current_joint_value_targets: Same as the current setpoints for all controllers, created for convenience.

        - current_output = A list containing the ouput values of all the controllers. This list is only initiated here, its
        values are overwritten at the first simulation step.

        - actuators: 2D list, each entry represents one actuator and contains:
            0 actuator ID
            1 actuator name
            2 joint ID of the joint controlled by this actuator
            3 joint name
            4 controller for controlling the actuator
        """
        self.controller_list =[]

        #values for training
        sample_time=0.0001
        p_scale=3
        i_scale=0.0
        i_gripper=0
        d_scale=0.1
        # to do : output_limits
        self.controller_list.append(
            PID(
                7*p_scale,
                0.0*i_scale,
                1.1*d_scale,
                setpoint=0,
                #output_limits=(-2,2),
                sample_time=sample_time,
            )
        ) # base_joint, Limits:[0.     3.1416]
        self.controller_list.append(
            PID(
                10 * p_scale,
                0.0 * i_scale,
                1.0 * d_scale,
                setpoint=1.57,
                #output_limits=(-2, 2),
                sample_time=sample_time,
            )
        )  # shoulder_joint, Limits:[0.2618 2.8798]
        self.controller_list.append(
            PID(
                5 * p_scale,
                0.0 * i_scale,
                0.5 * d_scale,
                setpoint=1.57,
                #output_limits=(-2, 2),
                sample_time=sample_time,
            )
        )  # elbow_joint, Limits:[0.     3.1416]
        self.controller_list.append(
            PID(
                7 * p_scale,
                0.0 * i_scale,
                0.1 * d_scale,
                setpoint=1.57,
                #output_limits=(-1, 1),
                sample_time=sample_time,
            )
        )  # wrist_pitch_joint, Limits:[0.     3.1416]
        self.controller_list.append(
            PID(
                5 * p_scale,
                0.0 * i_scale,
                0.1 * d_scale,
                setpoint=1.57,
                #output_limits=(-1, 1),
                sample_time=sample_time,
            )
        )  # wrist_roll_joint, Limits:[0.     3.1416]
        self.controller_list.append(
            PID(
                2.5 * p_scale,
                i_gripper,
                0.00 * d_scale,
                setpoint=0.0,
                #output_limits=(-1, 1),
                sample_time=sample_time,
            )
        )  # gripper_joint, Limits:[0.175  1.2741]
        self.controller_list.append(
            PID(
                2.5 * p_scale,
                i_gripper,
                0.00 * d_scale,
                setpoint=0.0,
                #output_limits=(-1, 1),
                sample_time=sample_time,
            )
        )  # sub_gripper_joint, Limits:[1.2741 2.3732]

        self.current_target_joint_values=[
            self.controller_list[i].setpoint for i in range(len(self.sim.data.ctrl))
        ]

        self.current_target_joint_values = np.array(self.current_target_joint_values)
        
        self.current_output = [controller(0) for controller in self.controller_list]
        self.actuators=[]
        for i in range(len(self.sim.data.ctrl)):
            item = [i,self.model.actuator_id2name(i)]
            item.append(self.model.actuator_trnid[i][0])
            item.append(self.model.joint_id2name(self.model.actuator_trnid[i][0]))
            item.append(self.controller_list[i])
            self.actuators.append(item)

    def actuate_joint_group(self,group,motor_values):
        try:
            assert group in self.groups.keys(), "No group with name {} exists!".format(group)
            assert len(motor_values) == len(
                self.groups[group]
            ), "Invalid number of actuator values!"
            for i, v in enumerate(self.groups[group]):
                self.sim.data.ctrl[v] = motor_values[i]

        except Exception as e:
            print(e)
            print("Could not actuate requested joint group.")

    def move_group_to_joint_target(
        self,
        group="All",
        target=None,
        tolerance=0.01,
        max_steps=10000,
        plot=False,
        render=True,
        quiet=False
    ):
        """
        Moves the specified joint group to a joint target.

        Args:
            group: String specifying the group to move.
            target: List of target joint values for the group.
            tolerance: Threshold within which the error of each joint must be before the method finishes.
            max_steps: maximum number of steps to actuate before breaking
            plot: If True, a .png image of the group joint trajectories will be saved to the local directory.
                  This can be used for PID tuning in case of overshoot etc. The name of the file will be "Joint_angles_" + a number.
            marker: If True, a colored visual marker will be added into the scene to visualize the current
                    cartesian target.
        """
        try:
            assert group in self.groups.keys(), "No group with name {} exists!".format(group)
            if target is not None:
                assert len(target) == len(
                    self.groups[group]
                ), "Mismatching target dimensions for group {}!".format(group)
            ids = self.groups[group]
            steps = 1
            result = ""
            if plot:
                self.plot_list = defaultdict(list)
            self.reached_target = False
            deltas = np.zeros(len(self.sim.data.ctrl))

            if target is not None:
                for i, v in enumerate(ids):
                    self.current_target_joint_values[v] = target[i]
                    # print('Target joint value: {}: {}'.format(v, self.current_target_joint_values[v]))

            for j in range(len(self.sim.data.ctrl)):
                # Update the setpoints of the relevant controllers for the group
                self.actuators[j][4].setpoint = self.current_target_joint_values[j]
                # print('Setpoint {}: {}'.format(j, self.actuators[j][4].setpoint))
            while not self.reached_target:
                current_joint_values = self.sim.data.qpos[self.actuated_joint_ids]

                # self.get_image_data(width=200, height=200, show=True)

                # We still want to actuate all motors towards their targets, otherwise the joints of non-controlled
                # groups will start to drift
                for j in range(len(self.sim.data.ctrl)):
                    self.current_output[j] = self.actuators[j][4](current_joint_values[j])
                    self.sim.data.ctrl[j] = self.current_output[j]
                for i in ids:
                    deltas[i] = abs(self.current_target_joint_values[i] - current_joint_values[i])

                if steps % 1000 == 0 and target is not None and not quiet:
                    print(
                        "Moving group {} to joint target! Max. delta: {}, Joint: {}".format(
                            group, max(deltas), self.actuators[np.argmax(deltas)][3]
                        )
                    )

                if plot and steps % 2 == 0:
                    self.fill_plot_list(group, steps)
                
                if max(deltas) < tolerance:
                    if target is not None and not quiet:
                        print(
                            colored(
                                "Joint values for group {} within requested tolerance! ({} steps)".format(
                                    group, steps
                                ),
                                color="green",
                                attrs=["bold"],
                            )
                        )
                    result = "success"
                    self.reached_target = True
                    # break

                if steps > max_steps:
                    if not quiet:
                        print(
                                colored(
                                "Max number of steps reached: {}".format(max_steps),
                                color="red",
                                attrs=["bold"],
                                )
                            )
                        print("Deltas: ", deltas)
                    result = "max. steps reached: {}".format(max_steps)
                    break

                self.sim.step()
                if render:
                    self.viewer.render()
                steps += 1

            self.last_movement_steps = steps

            if plot:
                self.create_joint_angle_plot(group=group, tolerance=tolerance)

            return result

        except Exception as e:
            print(e)
            print("Could not move to requested joint target.")

    
    def set_group_joint_target(self, group, target):

        idx = self.groups[group]
        try:
            assert len(target) == len(
                idx
            ), "Length of the target must match the number of actuated joints in the group."
            self.current_target_joint_values[idx] = target

        except Exception as e:
            print(e)
            print(f"Could not set new group joint target for group {group}")

    def open_gripper(self, half=False, **kwargs):
        """
        Opens the gripper while keeping the arm in a steady position.
        """
        return (
            self.move_group_to_joint_target(
                group="Gripper", target=[0.0,0.0], max_steps=1000, tolerance=0.05, **kwargs
            )
            if half
            else self.move_group_to_joint_target(
                group="Gripper", target=[0.4,0.4], max_steps=1000, tolerance=0.05, **kwargs
            )
        ) # we have two gripper joints what does half mean?

    def close_gripper(self, **kwargs):
        # def close_gripper(self, render=True, max_steps=1000, plot=False, quiet=True):
        """
        Closes the gripper while keeping the arm in a steady position.
        """
        return self.move_group_to_joint_target(
            group="Gripper", target=[-0.4, -0.4], tolerance=0.01, **kwargs
        ) # To do: change the target 

    def grasp(self, **kwargs):
        # def grasp(self, render=True, plot=False):
        """
        Attempts a grasp at the current location and prints some feedback on weather it was successful
        """

        result = self.close_gripper(max_steps=300, **kwargs)

        return result != "success"

    def move_ee(self, ee_position, **kwargs):
        """
        Moves the robot arm so that the gripper center ends up at the requested XYZ-position,
        with a vertical gripper position.

        Args:
            ee_position: List of XYZ-coordinates of the end-effector .
            plot: If True, a .png image of the arm joint trajectories will be saved to the local directory.
                  This can be used for PID tuning in case of overshoot etc. The name of the file will be "Joint_angles_" + a number.
        """
        joint_angles = self.ik(ee_position)
        if joint_angles is not None:
            result = self.move_group_to_joint_target(group="Arm", target=joint_angles, **kwargs)
            # result = self.move_group_to_joint_target(group='Arm', target=joint_angles, tolerance=0.05, plot=plot, max_steps=max_steps, quiet=quiet, render=render)
        else:
            result = "No valid joint angles received, could not move EE to position."
            self.last_movement_steps = 0
        return result

    def ik(self, ee_position):
        """
        Method for solving simple inverse kinematic problems.
        This was developed for top down graspig, therefore the solution will be one where the gripper is
        vertical. This might need adjustment for other gripper models.

        Args:
            ee_position: List of XYZ-coordinates of the end-effector .

        Returns:
            joint_angles: List of joint angles that will achieve the desired ee position.
        """

        try:
            assert (
                len(ee_position) == 3
            ), "Invalid EE target! Please specify XYZ-coordinates in a list of length 3."
            self.current_carthesian_target = ee_position.copy()
            # We want to be able to spedify the ee position in world coordinates, so subtract the position of the
            # base link. This is because the inverse kinematics solver chain starts at the base link.
            ee_position_base = (
                ee_position - self.sim.data.body_xpos[self.model.body_name2id("base_link")]
            )

            # By adding the appr. distance between ee_link and grasp center, we can now specify a world target position
            # for the grasp center instead of the ee_link
            gripper_center_position = ee_position_base + [0, -0.005, 0.16]
            # gripper_center_position = ee_position_base + [0, 0, 0.185]

            # initial_position=[0, *self.sim.data.qpos[self.actuated_joint_ids][self.groups['Arm']], 0]
            # joint_angles = self.ee_chain.inverse_kinematics(gripper_center_position, [0,0,-1], orientation_mode='X', initial_position=initial_position, regularization_parameter=0.05)
            joint_angles = self.ee_chain.inverse_kinematics(
                gripper_center_position, [0, 0, -1], orientation_mode="X"
            )

            prediction = (
                self.ee_chain.forward_kinematics(joint_angles)[:3, 3]
                + self.sim.data.body_xpos[self.model.body_name2id("base_link")]
                - [0, -0.005, 0.16]
            )
            diff = abs(prediction - ee_position)
            error = np.sqrt(diff.dot(diff))
            joint_angles = joint_angles[1:-2]
            if error <= 0.02:
                return joint_angles

            print("Failed to find IK solution.")
            return None
        except Exception as e:
            print(e)
            print("Could not find an inverse kinematics solution.")

    def ik_2(self, pose_target):
        """
        TODO: Implement orientation.
        """
        target_position = pose_target[:3]
        target_position -= self.sim.data.body_xpos[self.model.body_name2id("base_link")]
        orientation = Quaternion(pose_target[3:])
        target_orientation = orientation.rotation_matrix
        target_matrix = orientation.transformation_matrix
        target_matrix[0][-1] = target_position[0]
        target_matrix[1][-1] = target_position[1]
        target_matrix[2][-1] = target_position[2]
        print(target_matrix)
        self.current_carthesian_target = pose_target[:3]
        joint_angles = self.ee_chain.inverse_kinematics_frame(
            target_matrix, initial_position=initial_position, orientation_mode="all"
        )
        joint_angles = joint_angles[1:-1]
        current_finger_values = self.sim.data.qpos[self.actuated_joint_ids][6:]
        target = [*joint_angles, *current_finger_values]

    def stay(self, duration, render=True):
        """
        Holds the current position by actuating the joints towards their current target position.

        Args:
            duration: Time in ms to hold the position.
        """

        # print('Holding position!')
        starting_time = time.time()
        elapsed = 0
        while elapsed < duration:
            self.move_group_to_joint_target(
                max_steps=10, tolerance=0.0000001, plot=False, quiet=True, render=render
            )
            elapsed = (time.time() - starting_time) * 1000
        # print('Moving on...')


    def fill_plot_list(self, group, step):
        """
        Creates a two dimensional list of joint angles for plotting.

        Args:
            group: The group involved in the movement.
            step: The step of the trajectory the values correspond to.
        """

        for i in self.groups[group]:
            self.plot_list[self.actuators[i][3]].append(
                self.sim.data.qpos[self.actuated_joint_ids][i]
            )
        self.plot_list["Steps"].append(step)

    def create_joint_angle_plot(self, group, tolerance):
        """
        Saves the recorded joint values as a .png-file. The values for each joint of the group are
        put in a seperate subplot.

        Args:
            group: The group the stored values belong to.
            tolerance: The tolerance value that the joints were required to be in.
        """

        self.image_counter += 1
        keys = list(self.plot_list.keys())
        number_subplots = len(self.plot_list) - 1
        columns = 3
        rows = (number_subplots // columns) + (number_subplots % columns)

        position = range(1, number_subplots + 1)
        fig = plt.figure(1, figsize=(15, 10))
        plt.subplots_adjust(hspace=0.4, left=0.05, right=0.95, top=0.95, bottom=0.05)

        for i in range(number_subplots):
            axis = fig.add_subplot(rows, columns, position[i])
            axis.plot(self.plot_list["Steps"], self.plot_list[keys[i]])
            axis.set_title(keys[i])
            axis.set_xlabel(keys[-1])
            axis.set_ylabel("Joint angle [rad]")
            axis.xaxis.set_label_coords(0.05, -0.1)
            axis.yaxis.set_label_coords(1.05, 0.5)
            axis.axhline(
                self.current_target_joint_values[self.groups[group][i]], color="g", linestyle="--"
            )
            axis.axhline(
                self.current_target_joint_values[self.groups[group][i]] + tolerance,
                color="r",
                linestyle="--",
            )
            axis.axhline(
                self.current_target_joint_values[self.groups[group][i]] - tolerance,
                color="r",
                linestyle="--",
            )

        filename = "Joint_values_{}.png".format(self.image_counter)
        plt.savefig(filename)
        print(
            colored(
                "Saved trajectory to {}.".format(filename),
                color="yellow",
                on_color="on_grey",
                attrs=["bold"],
            )
        )
        plt.clf()

 





In [47]:
controller = MJ_Controller()
controller.show_model_info()


Number of bodies: 10
Body ID:0, Body Name:world
Body ID:1, Body Name:braccio_base_link
Body ID:2, Body Name:shoulder_link
Body ID:3, Body Name:elbow_link
Body ID:4, Body Name:wrist_pitch_link
Body ID:5, Body Name:wrist_roll_link
Body ID:6, Body Name:right_gripper_link
Body ID:7, Body Name:left_gripper_link
Body ID:8, Body Name:object
Body ID:9, Body Name:pick_box

Number of joints: 8
Joint ID: 0, Joint Name:base_joint, Limits:[0.     3.1416]
Joint ID: 1, Joint Name:shoulder_joint, Limits:[0.2618 2.8798]
Joint ID: 2, Joint Name:elbow_joint, Limits:[0.     3.1416]
Joint ID: 3, Joint Name:wrist_pitch_joint, Limits:[0.     3.1416]
Joint ID: 4, Joint Name:wrist_roll_joint, Limits:[0.     3.1416]
Joint ID: 5, Joint Name:gripper_joint, Limits:[0.175  1.2741]
Joint ID: 6, Joint Name:sub_gripper_joint, Limits:[1.2741 2.3732]
Joint ID: 7, Joint Name:free_joint_0, Limits:[0. 0.]

Number of Actuators: 7
Actuator ID:0, Actuator Name:base_joint_T, Controlled Joint:base_joint, Control Range:[0. 0.]


In [48]:
from ctypes import util
import os 
import time
import math
from gym.envs.mujoco.mujoco_env import MujocoEnv
from gym import utils, spaces
import traceback
#from decorators import *

class GraspEnv(MujocoEnv,utils.EzPickle):
    def __init__(self,
    file="/home/jessie/internship/model/braccio_arm.xml",
    show_obs=True,
    demo=False,
    render=False,
    isDiscrete=False):
        self.initialized = False
        self.isDiscrete = isDiscrete
        self.rotations = {0: 0, 1: 30, 2: 60, 3: 90, 4: -30, 5: -60}
        self.step_called = 0
        self.terminated = 0
        utils.EzPickle.__init__(self)
        MujocoEnv.__init__(self, file,1)
        if render:
            self.render()
        self.controller = MJ_Controller(self.model, self.sim, self.viewer)
        self.initialized = True
        self.grasp_counter = 0
        self.show_observations = show_obs
        self.demo_mode = demo
        self.TABLE_HEIGHT = 0.91
        self.render = render
        
    def __repr__(self):
        return f"GraspEnv(obs height={self.IMAGE_HEIGHT}, obs_width={self.IMAGE_WIDTH}, AS={self.action_space_type})"

    def step(self, action, record_grasps=False, action_info="no info"):
        """
        Lets the agent execute the action.

        Args:
            action: The action to be performed.

        Returns:
            observation: np-array containing the camera image data
            rewards: The reward obtained
            done: Flag indicating weather the episode has finished or not
            info: Extra info
        """

        done = False
        info = {}
        # Parent class will step once during init to set up the observation space, controller is not yet available at that time.
        # Therefore we simply return a dictionary of zeros of the appropriate size.
        if not self.initialized:
            self.current_observation = MJ_Controller().current_joint_values # TO DO
            reward = 0
        else:
            if self.step_called == 1:
                self.current_observation = self.get_observation(show=False)
            observationDim=len(self.getExtendedObservation())
            observation_high=np.array([largeValObservation]*observationDim)
            if (self.isDiscrete):
                self.action_sapce = spaces.Discrete(7)
            else:
                action_dim=3
                self.action_bound=1
                action_high=np.array([self.action_bound])*action_dim
                self.action_space = spaces.Box(-action_high,action_high)
            self.observation_space = spaces.Box(-observation_high,observation_high)

            grasped_something = self.move_and_grasp(
                    coordinates,
                    rotation,
                    render=self.render,
                    record_grasps=record_grasps,
                )

            reward = 1 if grasped_something else 0
            if grasped_something != "demo":
                print(
                        colored(
                            "Reward received during step: {}".format(reward),
                            color="yellow",
                            attrs=["bold"],
                        )
                    )

            self.current_observation = self.get_observation(show=self.show_observations)

        self.step_called += 1

        return self.current_observation, reward, done, info

    def set_action_space(self):
        if (self.isDiscrete):
            self.action_sapce = spaces.Discrete(7)
        else:
            action_dim=3
            self.action_bound=1
            action_high=np.array([self.action_bound])*action_dim
            self.action_space = spaces.Box(-action_high,action_high)

        return self.action_space

    def set_grasp_position(self, position):
        """
        Legacy method, not used in the current setup. May be used to directly set the joint values to a desired position.
        """

        joint_angles = self.controller.ik(position)
        qpos = self.data.qpos
        idx = self.controller.actuated_joint_ids[self.controller.groups["Arm"]]
        for i, index in enumerate(idx):
            qpos[index] = joint_angles[i]

        self.controller.set_group_joint_target(group="Arm", target=joint_angles)

        idx_2 = self.controller.actuated_joint_ids[self.controller.groups["Gripper"]]

        open_gripper_values = [0.2, 0.2, 0.0, -0.1]

        for i, index in enumerate(idx_2):
            qpos[index] = open_gripper_values[i]

        qvel = np.zeros(len(self.data.qvel))
        self.set_state(qpos, qvel)
        self.data.ctrl[:] = 0

    def rotate_wrist_3_joint_to_value(self, degrees):
        self.controller.current_target_joint_values[5] = math.radians(degrees)
        return self.controller.move_group_to_joint_target(
            tolerance=0.05, max_steps=500, render=self.render, quiet=True
        )

    def transform_height(self, height_action, depth_height):
        return np.round(
            self.TABLE_HEIGHT + height_action * (0.1) / self.action_space.nvec[1], decimals=3
        )
        # return np.round(max(self.TABLE_HEIGHT, self.TABLE_HEIGHT + height_action * (depth_height - self.TABLE_HEIGHT)/self.action_space.nvec[1]), decimals=3)

    def move_and_grasp(
        self, coordinates, rotation, render=False, record_grasps=False, markers=False, plot=False
    ):

        # Try to move directly above target
        coordinates_1 = copy.deepcopy(coordinates)
        coordinates_1[2] = 1.1
        result1 = self.controller.move_ee(
            coordinates_1,
            max_steps=1000,
            quiet=True,
            render=render,
            marker=markers,
            tolerance=0.05,
            plot=plot,
        )
        steps1 = self.controller.last_steps
        result_pre = "Failed"
        if result1 == "success":
            result_pre = "Above target"

        # If that's not possible, move to center as pre grasp position
        if result1[:2] == "No":
            result1 = self.controller.move_ee(
                [0.0, -0.6, 1.1],
                max_steps=1000,
                quiet=True,
                render=render,
                marker=markers,
                tolerance=0.05,
                plot=plot,
            )
            steps1 = self.controller.last_steps
            if result1 == "success":
                result_pre = "Center"

        # Check result1 for max steps reached => if so it got stuck in a bad position
        if result1[:3] == "max":
            result_rotate = "Skipped"
            steps_rotate = 0
            result2 = "Skipped"
            coordinates_2 = None
            steps2 = 0
            result_grasp = False

        else:
            # Rotate gripper according to second action dimension
            result_rotate = self.rotate_wrist_3_joint_to_value(self.rotations[rotation])
            steps_rotate = self.controller.last_steps

            self.controller.open_gripper(half=True, render=render, quiet=True, plot=plot)

            # Move to grasping height
            coordinates_2 = copy.deepcopy(coordinates)
            coordinates_2[2] = max(self.TABLE_HEIGHT, coordinates_2[2] - 0.01)
            result2 = self.controller.move_ee(
                coordinates_2,
                max_steps=300,
                quiet=True,
                render=render,
                marker=markers,
                tolerance=0.01,
                plot=plot,
            )
            steps2 = self.controller.last_steps

            # If we can't reach the desired grasping position, don't grasp
            if result2[:3] == "max":
                result2 = "Could not reach target location"
                result_grasp = False

            else:
                self.controller.stay(100, render=render)
                result_grasp = self.controller.grasp(
                    render=render, quiet=True, marker=markers, plot=plot
                )

        self.controller.actuators[0][4].Kp = 10.0

        # Move back above center of table
        result3 = self.controller.move_ee(
            [0.0, -0.6, 1.1],
            max_steps=1000,
            quiet=True,
            render=render,
            plot=plot,
            marker=markers,
            tolerance=0.05,
        )
        steps3 = self.controller.last_steps

        # Move to drop position
        result4 = self.controller.move_ee(
            [0.6, 0.0, 1.15],
            max_steps=1200,
            quiet=True,
            render=render,
            plot=plot,
            marker=markers,
            tolerance=0.01,
        )
        steps4 = self.controller.last_steps

        # self.controller.stay(500)

        result_final = "Skipped"

        if result_grasp:
            if not self.demo_mode:
                # Perform check if object is in gripper
                result_final = self.controller.close_gripper(
                    max_steps=1000, render=render, quiet=True, marker=markers, plot=plot
                )
            else:
                result_final = self.controller.close_gripper(
                    max_steps=100, render=render, quiet=True, marker=markers, plot=plot
                )

        final_str = "Nothing in the gripper"
        if result_final[:3] == "max":
            final_str = "Object in the gripper"

        grasped_something = result_final[:3] == "max" and result_grasp

        if grasped_something and record_grasps:
            capture_rgb, depth = self.controller.get_image_data(
                width=1000, height=1000, camera="side"
            )
            self.grasp_counter += 1
            img_name = "Grasp_{}.png".format(self.grasp_counter)
            cv.imwrite(img_name, cv.cvtColor(capture_rgb, cv.COLOR_BGR2RGB))

        # Open gripper again
        result_open = self.controller.open_gripper(render=render, quiet=True, plot=plot)
        steps_open = self.controller.last_steps

        if grasped_something:
            self.controller.stay(200, render=render)

        # Move back to zero rotation
        result_rotate_back = self.rotate_wrist_3_joint_to_value(0)

        self.controller.actuators[0][4].Kp = 20.0

        # if self.demo_mode:
        #     self.controller.stay(200, render=render)
        #     return 'demo'

        # else:
        print("Results: ")
        print("Move to pre grasp position: ".ljust(40, " "), result_pre, ",", steps1, "steps")
        print("Rotate gripper: ".ljust(40, " "), result_rotate, ",", steps_rotate, "steps")
        print(
            f"Move to grasping position (z={np.round(coordinates_2[2], decimals=4) if isinstance(coordinates_2, np.ndarray) else 0}):".ljust(
                40, " "
            ),
            result2,
            ",",
            steps2,
            "steps",
        )
        print("Grasped anything?: ".ljust(40, " "), result_grasp)
        print("Move to center: ".ljust(40, " "), result3, ",", steps3, "steps")
        print("Move to drop position: ".ljust(40, " "), result4, ",", steps4, "steps")
        print("Final finger check: ".ljust(40, " "), final_str)
        print("Open gripper: ".ljust(40, " "), result_open, ",", steps_open, "steps")

        if result1 == result2 == result3 == result4 == result_open == "success":
            print(colored("Executed all movements successfully.", color="green", attrs=["bold"]))
        else:
            print(
                colored(
                    "Could not execute all movements successfully.", color="red", attrs=["bold"]
                )
            )

        if grasped_something:
            print(colored("Successful grasp!", color="green", attrs=["bold"]))
            return True
        else:
            print(colored("Did not grasp anything.", color="red", attrs=["bold"]))
            return False

    def get_observation(self):
        """
        Uses the controllers get_image_data method to return an top-down image (as a np-array).

        """
        observation =[]
        current_joint_values = self.data.qpos[self.controller.actuated_joint_ids]
        #orientation = Quaternion(current_joint_values[3:])

        observation.extend(list(current_joint_values))
        #observation.extend(list(orientation))

        return observation

        
    def reset_model(self, show_obs=True):
        """
        Method to perform additional reset steps and return an observation.
        Gets called in the parent classes reset method.
        """

        qpos = self.data.qpos
        qvel = self.data.qvel

        qpos[self.controller.actuated_joint_ids] = [0, -1.57, 1.57, -1.57, -1.57, 0.0, 0.3]


        joint_name = f"free_joint_{i}"
        q_adr = self.model.get_joint_qpos_addr(joint_name)
        start, end = q_adr
        qpos[start] = np.random.uniform(low=-0.25, high=0.25)
        qpos[start + 1] = np.random.uniform(low=-0.77, high=-0.43)
        # qpos[start+2] = 1.0
        qpos[start + 2] = np.random.uniform(low=1.0, high=1.5)
        qpos[start + 3 : end] = Quaternion.random().unit.elements

       
        self.set_state(qpos, qvel)

        self.controller.set_group_joint_target(
            group="All", target=qpos[self.controller.actuated_joint_ids]
        )

        # Turn this on for training, so the objects drop down before the observation
        self.controller.stay(1000, render=self.render)
        if self.demo_mode:
            self.controller.stay(5000, render=self.render)
        # return an observation image
        return self.get_observation(show=self.show_observations)

    def close(self):
        MujocoEnv.close(self)

  

In [None]:
import gym

env = gym.make("gym_grasper:Grasper-v0", show_obs=False, render=True)

N_EPISODES = 100
N_STEPS = 100

env.print_info()

for episode in range(1, N_EPISODES + 1):
    obs = env.reset()
    for step in range(N_STEPS):
        print("#################################################################")
        print(
            colored("EPISODE {} STEP {}".format(episode, step + 1), color="white", attrs=["bold"])
        )
        print("#################################################################")
        action = env.action_space.sample()
        # action = [100,100] # multidiscrete
        # action = 20000 #discrete
        observation, reward, done, _ = env.step(action, record_grasps=True)
        # observation, reward, done, _ = env.step(action, record_grasps=True, render=True)

env.close()

print("Finished.")