In [1]:
#Math stuff
import numpy as np
import math

#3D Object stuff
import pyvista as pv

#Mujoco stuff
#Environment stuff
import gym
import mujoco_py

In [2]:
def check_in_target_sphere(sphere_center_point, EE_cordinates, sphere_radius = 0.05):
    EE_cordinate_mesh = pv.PolyData(EE_cordinates)

    # Generate a sphere around the center point
    sphere = pv.Sphere(radius=sphere_radius)
    sphere = sphere.translate(sphere_center_point)

    intersection_set = EE_cordinate_mesh.select_enclosed_points(sphere)


    # Print the result of the intersection test
    if intersection_set['SelectedPoints'].sum() >= 1:
        print("intersected")
        return 1
    else:
        return 0

In [3]:
class AgentUtils:
    def __init__(self, simulation_model, threeD_Object, initial_action, target_coordinate, 
                 target_radius = 0.08):
        self.sim = simulation_model
        self.last_action = initial_action

        self.threeD_object = threeD_Object

        self.TARGET_COORDINATE = target_coordinate
        self.target_radius = target_radius


    def EE_to_target_distance_euclidean(self):
        x_t,y_t,z_t = self.sim.data.get_body_xpos("EE").copy()
        x,y,z = self.TARGET_COORDINATE 

        distance = np.sqrt((x-x_t)**2 + (y-y_t)**2 + (z-z_t)**2)
        return distance

    def EE_to_target_distance_absolute(self):
        x_t,y_t,z_t = self.sim.data.get_body_xpos("EE").copy()
        x,y,z = self.TARGET_COORDINATE 

        distance = np.abs(x-x_t) + np.abs(y-y_t) + np.abs(z-z_t)
        return distance

    def get_reward(self):
        distance = -self.EE_to_target_distance_euclidean()
        return distance
    
    def get_reward_exp_reward(self, EE_pos, exponent_steepness=1):
        distance = self.arm_robot_reward_function(EE_pos=EE_pos)
        return (np.exp(distance) * exponent_steepness)
    

    def huber_loss_reward(self):
        if (self.EE_to_target_distance_euclidean() < 0.1):
            return 1/2 * self.get_reward()
        else:
            return 0.1*(-self.EE_to_target_distance_absolute() - 1/2*0.1)

    def check_EE_is_in_the_target_area(self):
        EE_coordinates = self.sim.data.get_body_xpos("EE").copy()
        #EE_coordinate_mesh = pv.PolyData(EE_coordinates)

        # Generate a sphere around the center point
        sphere = pv.Sphere(radius=self.target_radius, center=self.TARGET_COORDINATE)
        index = sphere.find_closest_point(EE_coordinates)

        point_coordinates = sphere.points[index]

        #compute the distance
        distance = np.sqrt(np.sum((EE_coordinates - point_coordinates)**2))
        if distance < 0.03:
            print("intersected")
            return 1
        else:
            return 0
        #intersection_set = EE_coordinate_mesh.select_enclosed_points(sphere)

        # Print the result of the intersection test
        # if intersection_set['SelectedPoints'].sum() >= 1:
        #     print("intersected")
        #     return 1
        # else:
        #     return 0
    
    #This modifies the angles to be circular
    def get_joint_angles(self, action):
        

        old_joint_angles = self.last_action

        sum_angles = old_joint_angles + action.copy()
    
        # Wrap the result within the range of -2π to 2π
        sum_angles_wrapped = sum_angles % (2 * math.pi)
        
        # Adjust the result to be within the range of -π to π
        new_joint_angles = []
        for angle in sum_angles_wrapped:
            if angle > math.pi:
                angle -= 2 * math.pi
            elif angle < -math.pi:
                angle += 2 * math.pi
            new_joint_angles.append(angle)
        
        self.last_action = new_joint_angles
        
        return new_joint_angles

    def buttom_platform_touched(self):
        links_pos = self.get_non_stationary_links_and_EE_coordinates()
        
        crashed_arr = [1 if link_pos[2] <= 0.01 else 0 for link_pos in links_pos]
        if(np.sum(crashed_arr) > 0):
            print("Crashed Platform")
            return 1
        else:
            return 0

        
    def get_non_stationary_links_and_EE_coordinates(self):
        
        link_indicies = [i for i in range(9,self.sim.model.nbody)] #From 9 to end is Link2 to EE
        links_position = [self.sim.data.xipos[index] for index in link_indicies]  

        return np.array(links_position)

    def link_distances_in_next_state(self, next_state_actions_batch):
        
        #current_state_distance = np.array([])
        next_state_distance = np.array([])

        for action in next_state_actions_batch:

            self.sim.data.qpos[:] = action

            try:
                self.sim.step()
            except mujoco_py.MujocoException as e:
                if "Got MuJoCo Warning: Nan, Inf or huge value in QACC" in str(e):
                    print("Simulation is unstable during training. Taking corrective action...")
                    # Do something to correct the instability, e.g. reset the simulation or adjust the control parameters
                    continue
                else:
                    # Handle other MujocoExceptions as needed
                    print(f"Unexpected MujocoException: {e}")


            next_state_closest_distances_to_obstacle = self.threeD_object.compute_distance_to_object(
                        self.threeD_object.link_closest_point_to_object() )
            
            #current_state_distance = np.append(current_state_distance,state_distance)
            next_state_distance = np.append(next_state_distance, next_state_closest_distances_to_obstacle)
            
        return next_state_distance

   
    def get_objective_target_coordinate(self):
        return self.TARGET_COORDINATE

In [1]:
class ThreeD_Obstacle():
    def __init__(self, simulation, sphere_or_ellipsoid, radius_or_axis, 
                 my_position, safety_l1, safety_l2, target_pos,
                  translation_rate = [0,0,0]):
        #simulation stuff
        self.sim = simulation
       
        #Target stuff
        self.target_pos = target_pos
        
        #object stuff
        #Given as radius or half a width
        self.threeD_obstable = None
        self.l1_layer_mesh = None
        self.l2_layer_mesh = None
        self.my_center_pos = my_position #aka center
        self.initial_center_pos = my_position
        self.translation_rate = translation_rate
        self.translate_forward = True
        self.object_hit_count = 0 ##To keep track how many times the object was hit by the robot


        if(sphere_or_ellipsoid == "Sphere" or sphere_or_ellipsoid == "sphere" ):
            self.threeD_obstable = pv.Sphere(radius=radius_or_axis, center=self.my_center_pos)
            #self.threeD_obstable = self.threeD_obstable.translate(self.my_center_pos)

            self.l1_layer_mesh = pv.Sphere(radius=safety_l1, center= self.my_center_pos)
            self.l2_layer_mesh = pv.Sphere(radius=safety_l2, center= self.my_center_pos)
            # self.l1_layer_mesh = self.l1_layer_mesh.translate(self.my_center_pos)
            # self.l2_layer_mesh = self.l2_layer_mesh.translate(self.my_center_pos)
        else:
            self.threeD_obstable = pv.ParametricEllipsoid(radius_or_axis[0],
                                                          radius_or_axis[1],radius_or_axis[2])
            self.threeD_obstable = self.threeD_obstable.translate(self.my_center_pos)

            self.l1_layer_mesh = pv.ParametricEllipsoid(safety_l1[0],
                                                          safety_l1[1],safety_l1[2])
            self.l2_layer_mesh = pv.ParametricEllipsoid(safety_l2[0],
                                                          safety_l2[1],safety_l2[2])

            self.l1_layer_mesh = self.l1_layer.translate(self.my_center_pos)
            self.l1_layer_mesh = self.l2_layer.translate(self.my_center_pos)

  
        if not isinstance(self.my_center_pos , np.ndarray):
            self.my_center_pos  = np.array(self.my_center_pos )
        #Ensuring that the translation rates per x,y,z coordinates array is a numpy.array
        if not isinstance(translation_rate, np.ndarray):
            self.translation_rate = np.array(self.translation_rate)

    def get_center_pos(self):
        return self.my_center_pos

    def get_non_stationary_links_and_EE_coordinates(self):
        
        link_indicies = [i for i in range(9,self.sim.model.nbody)] #From 9 to end is Link2 to EE
        links_position = [self.sim.data.xipos[index] for index in link_indicies]  

        return np.array(links_position)
    

    #this function will serve for the object, for the inner layer and outer layer checks
    def intruded(self, l1_12_object):

        link_and_EE_coordinates = self.get_non_stationary_links_and_EE_coordinates()
        meshes = [pv.PolyData(pos) for pos in link_and_EE_coordinates]

        intersection_list = [mesh.select_enclosed_points(l1_12_object)['SelectedPoints'] for mesh in meshes]
        return np.concatenate(intersection_list)
    
    def object_intruded(self):
        intersection_list = self.intruded(self.threeD_obstable)
        return intersection_list

    def l1_intruded(self):
        intersection_list = self.intruded(self.l1_layer_mesh)
        return intersection_list
    
    def l2_intruded(self):
        intersection_list = self.intruded(self.l2_layer_mesh)
        return intersection_list
    
    def find_closes_point(self, l1_12_object):
        link_and_EE_coordinates = self.get_non_stationary_links_and_EE_coordinates()
        # print(link_and_EE_coordinates)
        # print(self.threeD_obstable.find_closest_point(link_and_EE_coordinates[0]))
        closest_points = [l1_12_object.find_closest_point(link_and_EE_pos) for link_and_EE_pos in link_and_EE_coordinates]
        return np.array(closest_points)

    def link_closest_point_to_object(self):
        closest_points = self.find_closes_point(self.threeD_obstable)
        closest_points_pos = self.threeD_obstable.points[closest_points]
        
        return closest_points_pos

    def link_closest_point_to_l1(self):
        closest_points = self.find_closes_point(self.l1_layer_mesh)
        closest_points_pos = self.l1_layer_mesh.points[closest_points]
        
        return closest_points_pos
    
    def link_closest_point_to_l2(self):
        closest_points = self.find_closes_point(self.l2_layer_mesh)
        closest_points_pos = self.l2_layer_mesh.points[closest_points]
        return closest_points_pos
    
    def compute_distance_to(self, closest_positions_on_l1_l2_object):
        link_and_EE_coordinates = self.get_non_stationary_links_and_EE_coordinates()
        
        distances = [np.sqrt(np.sum((link_and_EE_pos - closest_pos)**2)) 
                     for link_and_EE_pos, closest_pos in zip(link_and_EE_coordinates, closest_positions_on_l1_l2_object)]
        


        return np.array(distances)  

    def compute_distance_to_object(self, closest_positions_on_l1_l2_object):
        return self.compute_distance_to(closest_positions_on_l1_l2_object)
    def compute_distance_to_l1(self, closest_positions_on_l1_l2_object):
        distances = self.compute_distance_to(closest_positions_on_l1_l2_object)

        link_and_EE_coordinates = self.get_non_stationary_links_and_EE_coordinates()
        meshes = [pv.PolyData(pos) for pos in link_and_EE_coordinates]
        intersection_list = [mesh.select_enclosed_points(self.l1_layer_mesh)['SelectedPoints'][0] for mesh in meshes]

        distances = [0 if inter == 1 else dist for dist, inter in zip(distances, intersection_list)]
        return distances
    
    def compute_distance_to_l2(self, closest_positions_on_l1_l2_object):
        return self.compute_distance_to(closest_positions_on_l1_l2_object)
    
    def compute_distance_to_l1_when_in_l2_by_closest_points(self, closest_positions_on_l1_l2_object):
        closest_pos_to_l1 = self.link_closest_point_to_l1()
        closest_pos_to_l2 = self.link_closest_point_to_l2()
        distances_to_l1 = self.compute_distance_to_l1(closest_positions_on_l1_l2_object)

        distances_norm = [ np.sqrt(np.sum((dl1-dl2)**2)) for dl1, dl2 in zip(closest_pos_to_l1, closest_pos_to_l2)]
        distances_to_l1 = distances_to_l1/distances_norm
        return distances_to_l1

    def compute_link_distance_to_object_when_in_l1_by_closest_points_for_cost(self):
        l1_intruded_check_array = self.l1_intruded()
        object_intruded = self.object_intruded()

        if(any(object_intruded)):
            return 1.0
        elif(any(l1_intruded_check_array)):

            # closest_pos_to_ob = self.link_closest_point_to_object()
            # closest_pos_to_l1 = self.link_closest_point_to_l1()
            
            # distances_to_ob = self.compute_distance_to_object(closest_pos_to_ob)

            # distances_norm = [ np.sqrt(np.sum((dob-dl1)**2)) for dob, dl1 in zip(closest_pos_to_ob, closest_pos_to_l1)]
            # distances_to_ob = distances_to_ob/distances_norm
            #distances_to_ob = (1 - np.min(distances_to_ob)) * 100 + 10
            return 1.0
        else:
            return 0

    def compute_link_distance_to_object_when_in_l1_by_closest_points_for_repulsion(self):
        l1_intruded_check_array = self.l1_intruded()
        object_intruded = self.object_intruded()

        if(any(object_intruded)):
            return 0 + 1e-4
        elif(any(l1_intruded_check_array)):

            closest_pos_to_ob = self.link_closest_point_to_object()
            closest_pos_to_l1 = self.link_closest_point_to_l1()
            
            distances_to_ob = self.compute_distance_to_object(closest_pos_to_ob)

            distances_norm = [ np.sqrt(np.sum((dob-dl1)**2)) for dob, dl1 in zip(closest_pos_to_ob, closest_pos_to_l1)]
            distances_to_ob = distances_to_ob/distances_norm
            return np.min(distances_to_ob)
        else:
            return 1

    ########### Intrusions by target
    def intruded_by_target(self, l1_12_object):
       
        target_mesh = pv.PolyData(self.target_pos)
        intersection = target_mesh.select_enclosed_points(l1_12_object)['SelectedPoints']
        return np.sum(intersection)

    def goal_intruded_object(self):
        return self.intruded_by_target(self.threeD_obstable)

    def goal_intruded_l1(self):
        return self.intruded_by_target(self.l1_layer_mesh)   
    
    def find_closet_point_to_goal(self, l1_12_object):
        closest_point = l1_12_object.find_closest_point(self.target_pos)
        closest_coordinate = l1_12_object.points[closest_point]
        return np.array(closest_coordinate)
    
    def compute_object_distance_to_goal_norm(self):
        if(self.goal_intruded_object()):
            return 0
        elif(self.goal_intruded_l1()):
            coordinate_to_object = self.find_closet_point_to_goal(self.threeD_obstable)
            coordinate_to_l1 = self.find_closet_point_to_goal(self.l1_layer_mesh)

            distance_goal_to_object = np.sqrt(np.sum((self.target_pos - coordinate_to_object)**2))
            norm = np.sqrt(np.sum((coordinate_to_object - coordinate_to_l1)**2))
            return distance_goal_to_object/norm
        else:
            return 1.0


    def obstacle_proximity_to_target_penalty(self):
        object_intruded = self.goal_intruded_object()
        l1_intruded = self.goal_intruded_l1()

        if(object_intruded == 1):
            return 0.0 + 1e-4
        elif(l1_intruded):
            return self.compute_object_distance_to_goal_norm()
        else:
            return 1


    def safety_penalty_by_object_proximity_to_goal(self):
        l1_intruded_check_array = self.l1_intruded()
        object_intruded_check_array = self.object_intruded()

        if(any(object_intruded_check_array) or any(l1_intruded_check_array)):
            #closest_pos_to_ob = self.link_closest_point_to_object()
            penalty = self.obstacle_proximity_to_target_penalty()
            
            return penalty
        else:
            return 1.0


    def linearly_translate_object_back_and_forward(self):
        #Translation does not go overly bellow 0 in the z-axis
        if self.translate_forward and (self.my_center_pos[0] >= 1.6 or self.my_center_pos[1] >= 1.6 or self.my_center_pos[2] >= 1.6):
            self.translate_forward = False
            self.translation_rate = -self.translation_rate
        elif not self.translate_forward and (self.my_center_pos[0] <= -1.6 or self.my_center_pos[1] <= -1.6 or self.my_center_pos[2] < 0.0):
            self.translate_forward = True
            self.translation_rate = -self.translation_rate

        self.threeD_obstable = self.threeD_obstable.translate(self.translation_rate)
        self.l1_layer_mesh = self.l1_layer_mesh.translate(self.translation_rate)
        self.l2_layer_mesh = self.l2_layer_mesh.translate(self.translation_rate)
        
        self.my_center_pos = self.threeD_obstable.center
    
    def pseudo_random_linear_translation(self):
        #Translation does not go overly bellow 0 in the z-axis

        x = np.random.randint([3],[10])[0]/1000
        
        if self.translate_forward:
            self.translation_rate[0] = x
        else:
            self.translation_rate[0] = -x
            
        if self.translate_forward and (self.my_center_pos[0] >= 1.6 or self.my_center_pos[1] >= 1.6 or self.my_center_pos[2] >= 1.6):
            self.translate_forward = False
            self.translation_rate = -self.translation_rate
        elif not self.translate_forward and (self.my_center_pos[0] <= -1.6 or self.my_center_pos[1] <= -1.6 or self.my_center_pos[2] < 0.0):
            self.translate_forward = True
            self.translation_rate = -self.translation_rate


        
        self.threeD_obstable = self.threeD_obstable.translate(self.translation_rate)
        self.l1_layer_mesh = self.l1_layer_mesh.translate(self.translation_rate)
        self.l2_layer_mesh = self.l2_layer_mesh.translate(self.translation_rate)
        
        self.my_center_pos = self.threeD_obstable.center

    def reset(self):
        self.translate_forward = True
        self.my_center_pos = self.initial_center_pos


    



In [27]:
class Features:
    def __init__(self, simulation_model, threeD_obstacle, agent_util):
        self.sim = simulation_model
        self.link_set = ['base_link','link1','link2','link3','link4','link5','link6','EE']

        self.obstacle = threeD_obstacle

        self.agent_util = agent_util

    def get_EE_coordinates(self):
        EE_position = self.sim.data.get_body_xpos("EE").copy()
        return EE_position
    
    def get_non_stationary_links_and_EE_coordinates(self):
        
        link_indicies = [i for i in range(9,self.sim.model.nbody)] #From 9 to end is Link2 to EE
        links_position = [self.sim.data.xipos[index] for index in link_indicies]  

        return np.array(links_position)
      
    def robot_links_distance_to_obstacle_for_feature(self):  
        closest_links_pos_to_object =  self.obstacle.link_closest_point_to_object()
        links_and_EE_distances_to_obj = self.obstacle.compute_distance_to_object(
            closest_links_pos_to_object)
        
        return links_and_EE_distances_to_obj
    
    def robot_links_intruded_l1_mesh_for_feature(self):  
        closest_links_pos_to_l1 =  self.obstacle.link_closest_point_to_l1()
        links_and_EE_distances_to_l1 = self.obstacle.compute_distance_to_l1(
           closest_links_pos_to_l1)
        
        #intrusion = self.obstacle.l1_intruded()
        
        return links_and_EE_distances_to_l1
    
    def get_obstacle_coordinate_for_feature(self):
        return self.obstacle.get_center_pos()

    #robot_link_set = ['base_link','link1','link2','link3','link4','link5','link6']
    def compute_robot_center_of_mass_for_feature(self):
        links_index = [self.sim.model.body_name2id(link) for link in self.link_set]
        mass_of_links = self.sim.model.body_mass[links_index]

        total_mass = mass_of_links.sum()

        x_mul_mass = (self.sim.data.xipos[links_index][:,0] * mass_of_links).sum()
        y_mul_mass = (self.sim.data.xipos[links_index][:,1] * mass_of_links).sum()
        z_mul_mass = (self.sim.data.xipos[links_index][:,2] * mass_of_links).sum()

        center_of_mass = np.array([x_mul_mass,y_mul_mass,z_mul_mass])/total_mass
        return center_of_mass

    def get_joints_angular_velocities_for_feature(self):
        return self.sim.data.qvel
    
    def get_joints_sine_and_cosine_angle_for_feature(self):
        sine_angles = np.sin(self.sim.data.qpos) #cosine and sin from each joints angle in radians
        cosine_angles = np.cos(self.sim.data.qpos)

        return sine_angles, cosine_angles
    
    def get_joints_randian_angle_for_feature(self):
        return self.sim.data.qpos.copy()
    
    def get_all_features(self):
        #sine_angles, cosine_angles = self.get_joints_sine_and_cosine_angle_for_feature()
        # print("angular vel", self.get_joints_angular_velocities_for_feature())
        # print()
        # print("distances to obstacle", self.robot_links_distance_to_obstacle_for_feature())
        # print()
        # print("EE coordinates", self.get_EE_coordinates())
        # print()
        # print("Joint in radians", self.get_joints_randian_angle_for_feature())
        
        all_features = np.concatenate((#self.get_EE_coordinates(),
                                       self.compute_robot_center_of_mass_for_feature().copy(), 
                                       self.get_joints_angular_velocities_for_feature().copy(),
                                       #sine_angles,
                                       #cosine_angles,
                                       self.get_joints_randian_angle_for_feature().copy(),
                                       self.get_obstacle_coordinate_for_feature().copy(),
                                       self.robot_links_distance_to_obstacle_for_feature().copy(),
                                       self.get_non_stationary_links_and_EE_coordinates().copy().flatten(),
                                       self.robot_links_intruded_l1_mesh_for_feature().copy(),
                                       ))
        return all_features
