## Robot Pick-and-Place Environment

1. **`PickAndPlaceEnv` Class:** This is the main class that defines the robot pick and place environment.

2. **Action and Observation Spaces:** Defines the structure of actions and observations for the environment.

3. **PyBullet Initialization:** Sets up the PyBullet physics simulation environment.

4. **Load Environment Components:** Loads the necessary objects (plane, robot, and cube) into the simulation.

5. **Robot Configuration:** Specifies the robot's end-effector and gripper joint indices.

6. **Reset Method:** Resets the environment to its initial state.

7. **Observation Method:** Collects and returns the current state of the environment.

8. **Step Method:** Executes an action in the environment and returns the new state, reward, and other information.

9. **Reward Computation:** Calculates the reward based on the robot's performance.

10. **Rendering Method:** Configures the PyBullet visualizer when GUI mode is enabled.

11. **Cleanup Method:** Disconnects from the PyBullet server when the environment is closed.

12. **Environment Checker:** Uses the Stable Baselines 3 library to verify that the environment is correctly implemented.

In [1]:
!pip install pybullet --quiet

In [2]:
!pip install stable-baselines3[extra] --quiet

In [14]:
!pip install gymnasium --quiet

  and should_run_async(code)


In [15]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pybullet as p
import pybullet_data
import time
from stable_baselines3.common.env_checker import check_env

from google.colab import drive
drive.mount('/content/drive')

  and should_run_async(code)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [16]:
import os

def check_file_exists(file_path):
    if os.path.exists(file_path):
        print(f"File exists: {file_path}")
    else:
        print(f"File does not exist: {file_path}")

# List of URDF files to check
urdf_files = [
    "/content/drive/My Drive/MSCS/AdvML/urdf_files/plane.urdf",
    "/content/drive/My Drive/MSCS/AdvML/urdf_files/franka_panda/panda.urdf",
    "/content/drive/My Drive/MSCS/AdvML/urdf_files/cube_small.urdf"
]

# Check each file
for file in urdf_files:
    check_file_exists(file)

File exists: /content/drive/My Drive/MSCS/AdvML/urdf_files/plane.urdf
File exists: /content/drive/My Drive/MSCS/AdvML/urdf_files/franka_panda/panda.urdf
File exists: /content/drive/My Drive/MSCS/AdvML/urdf_files/cube_small.urdf


### Defining the PickAndPlaceEnv Class

In [21]:
class PickAndPlaceEnv(gym.Env):
    metadata = {'render.modes' : ['human']}

    def __init__(self, GUI=False):
        super(PickAndPlaceEnv, self).__init__()

        # Define action space: [delta_x, delta_y, delta_z, gripper_action]
        self.action_space = spaces.Box(low=-1, high=1, shape=(4,), dtype=np.float32)

        # Define observation space: [ee_x, ee_y, ee_z, obj_x, obj_y, obj_z, gripper_state]
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(7,), dtype=np.float32)

        # Connect to the PyBullet physics server
        if not GUI:
            self.physics_client = p.connect(p.DIRECT)  # Use DIRECT mode for headless rendering
        else:
            self.physics_client = p.connect(p.GUI)     # GUI mode for visualization

        self.GUI = GUI

        # Set up PyBullet environment
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        p.setGravity(0, 0, -9.81)

        # Load plane, robot and object
        self.plane_id = p.loadURDF("plane.urdf")
        self.robot_id = p.loadURDF("franka_panda/panda.urdf", [0, 0, 0], p.getQuaternionFromEuler([0,0,0]))
        self.object_id = p.loadURDF("cube_small.urdf", [0.5, 0, 0.05])

        # Identify end-effector and gripper joints
        self.ee_link_index = 11                 # Index of the end-effector link in the robot's URDF
        self.gripper_joint_indices = [9, 10]    # Gripper joint indices for Franka Panda

    def reset(self, seed=None, **kwargs):
        # Reset simulation and reload objects
        p.resetSimulation()
        p.setGravity(0, 0, -9.81)
        self.plane_id = p.loadURDF("plane.urdf")
        self.robot_id = p.loadURDF("franka_panda/panda.urdf", [0, 0, 0], p.getQuaternionFromEuler([0,0,0]))
        self.object_id = p.loadURDF("cube_small.urdf", [0.5, 0, 0.05])

        # Reset robot joints to initial position
        num_joints = p.getNumJoints(self.robot_id)
        for joint in range(num_joints):
            p.resetJointState(self.robot_id, joint, 0)

        # Return initial observation
        return self.get_obs(), {}

    def get_obs(self):
        # Get end-effector position
        ee_state = p.getLinkState(self.robot_id, self.ee_link_index)
        ee_pos = ee_state[4]

        # Get object position
        obj_pos, _ = p.getBasePositionAndOrientation(self.object_id)

        # Get gripper state (open or closed)
        gripper_states = [p.getJointState(self.robot_id, idx)[0] for idx in self.gripper_joint_indices]
        gripper_closed = 1.0 if sum(gripper_states) > 0 else 0.0

        # Combine all observations
        observation = np.array(list(ee_pos) + list(obj_pos) + [gripper_closed], dtype=np.float32)

        return observation

    def step(self, action):
        truncated = False
        try:
            # Scale and apply actions
            delta_pos = action[:3] * 0.05           # Scale movement
            gripper_action = (action[3] + 1) / 2    # Scale gripper action from [-1,1] to [0,1]

            # Get current end-effector position
            ee_state = p.getLinkState(self.robot_id, self.ee_link_index)
            current_pos = np.array(ee_state[4])
            target_pos = current_pos + delta_pos

            # Calculate inverse kinematics
            joint_angles = p.calculateInverseKinematics(self.robot_id, self.ee_link_index, target_pos)

            # Apply joint angles to robot
            for i, angle in enumerate(joint_angles):
                p.setJointMotorControl2(bodyIndex=self.robot_id, jointIndex=i,
                                        controlMode=p.POSITION_CONTROL,
                                        targetPosition=angle, force=500)

            # Step simulation
            p.stepSimulation()

            # Get new observation
            obs = self.get_obs()

            # Compute reward and check if done
            reward, done = self._compute_reward(obs)

        except:
            # Handle exceptions (e.g., inverse kinematics failure)
            truncated = True
            obs = self.get_obs()
            reward = -100
            done = False

        return obs, reward, done, truncated, {}

    def _compute_reward(self):
        # Define target position
        target_pos = np.array([0.6, 0, 0.05])   # Example target position

        # Get current object position
        obj_pos, _ = p.getBasePositionAndOrientation(self.object_id)

        # Calculate distance to target
        distance = np.linalg.norm(np.array(obj_pos[:3] - target_pos))

        # Compute reward and check for task completion
        reward = -distance          # Negative distance as reward
        done = False
        success_threshold = 0.02    # Distance threshold for success

        if distance < success_threshold:
            reward += 100           # Bonus reward for successful placement
            done = True

        return reward, done

    def render(self, mode='human'):
        if self.GUI:
            p.configureDebugVisualizer(p.COV_ENABLE_GUI, 1)
            p.configureDebugVisualizer(p.COV_ENABLE_RGB_BUFFER_PREVIEW, 1)
            p.configureDebugVisualizer(p.COV_ENABLE_DEPTH_BUFFER_PREVIEW, 1)
            p.configureDebugVisualizer(p.COV_ENABLE_SEGMENTATION_MARK_PREVIEW, 1)
        else:
            pass

    def close(self):
        p.disconnect()

In [22]:
env = PickAndPlaceEnv()
check_env(env)