In [1]:
import gym
from gym import spaces
import pybullet as p
import pybullet_data
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
import gymnasium as gym
from gymnasium import spaces
import pybullet as p
import pybullet_data
import numpy as np
import time
from stable_baselines3 import PPO, SAC
from stable_baselines3.common.env_checker import check_env


In [2]:

class RoboticDogEnv(gym.Env):
    def __init__(self):
        super(RoboticDogEnv, self).__init__()
        
        self.current_target_positions = np.zeros(12)  # Action dimensions
        self.physicsClient = p.connect(p.GUI)  # Connect to PyBullet
        p.setAdditionalSearchPath(pybullet_data.getDataPath())  # Set the simulation environment
        self.plane_id = p.loadURDF("plane.urdf")  # Load the plane and URDF
        self.urdf_path = "C:/Users/youss/OneDrive - aucegypt.edu/Desktop/bolt/spot.urdf"
        self.robot_id = p.loadURDF(self.urdf_path, [0, 0, 0.15], useFixedBase=False)
        self.num_joints = p.getNumJoints(self.robot_id)
        
        # Ensure the robot is correctly initialized
        if self.robot_id < 0:
            raise ValueError("Failed to initialize the robot.")
        if self.num_joints == 0:
            raise ValueError("No joints found in the robot.")
        
        print(f"Robot ID: {self.robot_id}")
        
        # Set gravity
        p.setGravity(0, 0, -9.8)
        
        # Get joint limits and ensure joints are valid
        self.joint_indices = []
        self.joint_limits = []
        
        for joint_index in range(self.num_joints):
            joint_info = p.getJointInfo(self.robot_id, joint_index)
            joint_name = joint_info[1].decode('utf-8')
            
            # Set the appropriate limits based on joint type
            if "coxa" in joint_name:
                joint_lower_limit, joint_upper_limit = -0.25, 0.25
            elif "femur" in joint_name:
                joint_lower_limit, joint_upper_limit = -0.5, 0.5
            elif "tibia" in joint_name:
                joint_lower_limit, joint_upper_limit = -0.5, 0.5
            else:
                continue
            
            self.joint_limits.append((joint_lower_limit, joint_upper_limit))
            self.joint_indices.append(joint_index)
        
        # Define the action and observation space
        self.action_space = spaces.Box(
            low=np.array([limit[0] for limit in self.joint_limits]),
            high=np.array([limit[1] for limit in self.joint_limits]),
            dtype=np.float32
        )
        
        self.observation_space = spaces.Box(
            low=-np.inf,
            high=np.inf,
            shape=(len(self.joint_indices) * 3 + 14,),  # Observation space
            dtype=np.float32
        )
        
        # Data for plots
        self.avg_rewards_per_episode = []
        self.min_rewards_per_episode = []
        self.max_rewards_per_episode = []
        
        self.std_rewards_per_episode = []
        self.rewards_per_step = []
        self.actions_per_episode = []
        
        # Reward components
        self.forward_rewards = []   # Energy rewards tracking
        self.stability_rewards = []# Stability rewards tracking
        self.energy_rewards=[]
        
        self.fall_start_time = None  # Initialize the fall start time
        self.leg_indices = self.joint_indices
        self.noise_counter = 0
        
        # Rewards
        self.forward = 0
        self.stable = 0
        self.energy = 0
        self.velocity_pen = 0
        self.fall = 0
        self.smooth = 0
        
        self.previous_joint_velocities = None
        self.sym = 0
        
        self.total_reward = 0
        self.counter = 0
        self.steps = 1
        self.total_steps = 0
        
        # Store the link indices for the feet
        self.front_left_foot = self.get_link_index('foot_FL')
        self.front_right_foot = self.get_link_index('foot_FR')
        self.back_left_foot = self.get_link_index('foot_BL')
        self.back_right_foot = self.get_link_index('foot_BR')
        
    
        
        
    def get_link_index(self, link_name):
        for joint_index in range(self.num_joints):
            joint_info = p.getJointInfo(self.robot_id, joint_index)
            joint_name = joint_info[12].decode('utf-8')
            if link_name in joint_name:
                return joint_info[0]
        raise ValueError(f"Link {link_name} not found")

        
        
    
    def reset(self, **kwargs):
        # Increment episode counter and total steps
        self.counter += 1
        self.total_steps += self.steps

        # Print episode statistics if applicable
        if self.steps != 0:
            episode_rewards = self.rewards_per_step
            if len(episode_rewards) > 0:
                average = np.mean(episode_rewards)
                self.avg_rewards_per_episode.append(average)
                self.min_rewards_per_episode.append(np.min(episode_rewards))
                self.max_rewards_per_episode.append(np.max(episode_rewards))
                self.std_rewards_per_episode.append(np.std(episode_rewards))

        if self.total_reward != 0:
            print(f"Episode {self.counter} finished with cumulative reward: {self.total_reward}")
            print(f"Average reward of: {average}")
            print(f"Number of steps in this episode: {self.steps}")
            print(f"Total steps till now: {self.total_steps}")
            print(f"..........................................")

        # Reset the simulation environment
        p.resetSimulation()
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        self.plane_id = p.loadURDF("plane.urdf")

       
        upside_down_orientation = [0, 0, 0, 1]  # Default upright orientation

        self.robot_id = p.loadURDF(self.urdf_path, [0, 0, 0.15], upside_down_orientation, useFixedBase=False)

        # Set gravity
        p.setGravity(0, 0, -9.8)

        # Wait for the simulation to stabilize
        time.sleep(1)

        # Ensure the robot is correctly initialized
        if self.robot_id < 0:
            print("Error: Robot failed to load.")
            return None, {}

        if p.getNumJoints(self.robot_id) == 0:
            print("Error: No joints found in the robot.")
            return None, {}

        # Reset fall start time
        self.fall_start_time = None

        # Print reward statistics
        print(f"Forward: {self.forward / self.steps}")
        print(f"Velocity penalty: {self.velocity_pen / self.steps}")
        print(f"Stability: {self.stable / self.steps}")
        
        self.stability_rewards.append(self.stable / self.steps)
        print(f"Energy: {self.energy / self.steps}")
        self.forward_rewards.append(self.forward / self.steps)
        self.energy_rewards.append(self.energy / self.steps)
        print(f"Fall: {self.fall / self.steps}")
        print(f"Smooth: {self.smooth / self.steps}")
        print(f"Symmetry: {self.sym / self.steps}")

        # Plot metrics if available
        if len(self.avg_rewards_per_episode) > 1:
            self.plot_average_rewards()
            self.plot_std_rewards()
            self.plot_action_distribution()
            self.plot_stability_rewards()  # Plot energy rewards
            self.plot_forward_rewards()  # Plot stability rewards
            self.plot_energy_rewards()
        # Reset statistics and reward tracking
        self.actions_per_episode = []
        self.forward = 0
        self.stable = 0
        self.energy = 0
        self.velocity_pen = 0
        self.fall = 0
        self.smooth = 0
        self.sym = 0
        self.rewards_per_step = []
       

        self.total_reward = 0
        self.steps = 1

        # Return the initial state
        initial_state = self.get_state()
        return initial_state, {}
    
    
    def step(self, action):
        # Update the target positions based on the action
        self.current_target_positions = action
        self.actions_per_episode.append(action)

        # Apply the action to the robot's joints
        for i, joint_index in enumerate(self.joint_indices):
            p.setJointMotorControl2(self.robot_id, joint_index, p.POSITION_CONTROL, targetPosition=action[i])

        # Step the simulation
        p.stepSimulation()
        time.sleep(1. / 240)  # Ensure the simulation steps are processed

        # Verify that joint positions have updated correctly
        joint_positions = [p.getJointState(self.robot_id, joint_index)[0] for joint_index in self.joint_indices]

        # Get the next state, reward, and check if done
        state = self.get_state()
        reward = self.compute_reward()
        self.rewards_per_step.append(reward)
        terminated = self.is_done(state)
        truncated = False  # This can be set to True based on some logic, e.g., max steps reached

        # Update total reward and step count
        self.total_reward += reward
        self.steps += 1
       # time.sleep(1 / 20)  # Slow down the simulation


        return state, reward, terminated, truncated, {}
    
    def get_state(self):
        # Get joint states: positions and velocities
        joint_states = p.getJointStates(self.robot_id, self.joint_indices)
        joint_positions = [state[0] for state in joint_states]
        joint_velocities = [state[1] for state in joint_states]

        # Get base position, orientation, velocity, and angular velocity
        base_position, base_orientation = p.getBasePositionAndOrientation(self.robot_id)
        base_velocity, base_angular_velocity = p.getBaseVelocity(self.robot_id)

        # Convert orientation from quaternion to Euler angles
        base_orientation_euler = p.getEulerFromQuaternion(base_orientation)

        # Contact information for the robot's feet
        front_left_contact = p.getContactPoints(bodyA=self.robot_id, linkIndexA=self.front_left_foot, bodyB=self.plane_id)
        front_right_contact = p.getContactPoints(bodyA=self.robot_id, linkIndexA=self.front_right_foot, bodyB=self.plane_id)
        back_left_contact = p.getContactPoints(bodyA=self.robot_id, linkIndexA=self.back_left_foot, bodyB=self.plane_id)
        back_right_contact = p.getContactPoints(bodyA=self.robot_id, linkIndexA=self.back_right_foot, bodyB=self.plane_id)

        # Convert contact points to a binary indicator
        foot_contacts = [
            1.0 if len(front_left_contact) > 0 else 0.0,
            1.0 if len(front_right_contact) > 0 else 0.0,
            1.0 if len(back_left_contact) > 0 else 0.0,
            1.0 if len(back_right_contact) > 0 else 0.0
        ]

        # Get the height of the base from its position
        base_height = np.array([base_position[2]])

        # Concatenate all the state information into a single array
        state = np.concatenate([
            joint_positions,
            joint_velocities,
            base_orientation_euler,  # Using Euler angles instead of Quaternion
            base_angular_velocity,
            base_velocity,
            base_height,
            foot_contacts,
            self.current_target_positions  # Include current target positions
        ])

        return state
    
    def compute_reward(self):
        # Get base position, orientation, velocity, and angular velocity
        base_position, base_orientation = p.getBasePositionAndOrientation(self.robot_id)
        base_velocity, base_angular_velocity = p.getBaseVelocity(self.robot_id)

        
        # Calculate the forward vector from the orientation
        forward_vector = p.getMatrixFromQuaternion(base_orientation)[0:3:2]
        forward_vector = np.array(forward_vector)  # Forward direction in the world frame
        forward_velocity = np.dot(forward_vector, base_velocity[:2])
        forward_reward = forward_velocity

            
            
        # Get base orientation in Euler angles
        base_orientation_euler = p.getEulerFromQuaternion(base_orientation)
        pitch, roll = base_orientation_euler[1], base_orientation_euler[0]
        # Set acceptable thresholds for instability
        acceptable_pitch_threshold = 0.03  # 0.03 and 0.04 worked good 
        acceptable_roll_threshold = 0.03 # 0.03 and 0.04 worked good 
        stability_penalty = 0.0
        
        if abs(pitch) < acceptable_pitch_threshold: 
            stability_penalty += -0.05
        else: stability_penalty= (abs(pitch)-acceptable_pitch_threshold)**2
        
        if abs(roll) < acceptable_roll_threshold: 
            stability_penalty += -0.05
        else: stability_penalty= (abs(roll)-acceptable_roll_threshold)**2
           
    
        
        
        # Penalize high velocities
        max_velocity_threshold = 5.0  # Set the maximum desired velocity
        if forward_velocity > max_velocity_threshold:
            velocity_penalty = (forward_velocity - max_velocity_threshold) ** 2
        else:
            velocity_penalty = -1
            
    
        # Apply dynamic energy penalty
        joint_states = p.getJointStates(self.robot_id, self.joint_indices)
        joint_velocities = [state[1] for state in joint_states]
        energy_penalty = 0.0
        joint_velocity_threshold = 10  # Threshold for penalizing high joint velocities
        for velocity in joint_velocities:
            if abs(velocity) > joint_velocity_threshold:
                energy_penalty += (abs(velocity) - joint_velocity_threshold) ** 2
            else:
                energy_penalty += -2

       
                
                
        # Penalize if the robot has fallen (based on height threshold)
        fall_penalty = 0.0
        if base_position[2] < 0.12:
            fall_penalty = 50.0
        else:
            fall_penalty = -1

        # Calculate smoothness and symmetry penalties
        smoothness_penalty = self.compute_smoothness_reward(joint_velocities)
        symmetry_penalty = self.compute_symmetry_reward()

        # Scaling factors
        forward_scale = 10  # Adjust as needed
        stability_penalty_scale = -200  # Adjust as needed
        energy_penalty_scale = 0
        fall_penalty_scale = 0  # Adjust as needed
        smoothness_scale = 0  # Adjust as needed
        symmetry_penalty_scale = 0  # Adjust as needed
        velocity_penalty_scale = 0  # Adjust as needed
        
        stability= np.clip( stability_penalty * stability_penalty_scale, -50, 50)
         
        # Compute the total reward
        reward = (forward_reward * forward_scale) + \
                 (velocity_penalty * velocity_penalty_scale) + \
                 (energy_penalty * energy_penalty_scale) + \
                 (stability) + \
                 (fall_penalty * fall_penalty_scale) + \
                 (smoothness_penalty * smoothness_scale) + \
                 (symmetry_penalty * symmetry_penalty_scale)

        # Update reward tracking
        self.forward += (forward_reward * forward_scale)
        self.velocity_pen += (velocity_penalty * velocity_penalty_scale)
        self.stable += (stability)
        self.energy += (energy_penalty * energy_penalty_scale)
        self.fall += (fall_penalty * fall_penalty_scale)

        return reward

    def compute_symmetry_reward(self):
             # Get the height of each foot joint
        front_left_height = p.getLinkState(self.robot_id, self.front_left_foot)[0][2]
        front_right_height = p.getLinkState(self.robot_id, self.front_right_foot)[0][2]
        back_left_height = p.getLinkState(self.robot_id, self.back_left_foot)[0][2]
        back_right_height = p.getLinkState(self.robot_id, self.back_right_foot)[0][2]

        # Reward for diagonal leg coordination
        symmetry_reward = 0.0
        if abs(front_left_height - back_right_height) < 0.015:  # Adjust threshold as needed
            symmetry_reward += 1.0
        else: symmetry_reward -= 1.0
        if abs(front_right_height - back_left_height) < 0.015:  # Adjust threshold as needed
            symmetry_reward += 1.0
        else: symmetry_reward -= 1.0
        return symmetry_reward

    def compute_smoothness_reward(self, joint_velocities):
        velocity_change_threshold = 2
        smoothness_penalty = 0

        if self.previous_joint_velocities is None:
            smoothness_reward = 0.0
        else:
            velocity_differences = np.abs(np.array(joint_velocities) - np.array(self.previous_joint_velocities))
            velocity_differences = np.clip(velocity_differences - velocity_change_threshold, 0, None)
            smoothness_penalty = np.sum(velocity_differences)

        self.previous_joint_velocities = joint_velocities
        return smoothness_penalty

    def plot_average_rewards(self):
        plt.figure(figsize=(10, 5))
        plt.plot(self.avg_rewards_per_episode, label='Avg Reward')
        plt.fill_between(range(len(self.avg_rewards_per_episode)),
                         self.min_rewards_per_episode, self.max_rewards_per_episode, alpha=0.3)
        plt.xlabel('Episode')
        plt.ylabel('Average Reward')
        plt.title('Average Reward with Min/Max')
        plt.legend()
        plt.show()
        
        
    def plot_forward_rewards(self):
        plt.figure(figsize=(10, 5))
        plt.plot(self.forward_rewards, label='forward Reward')
        plt.xlabel('Step')
        plt.ylabel('forward Reward')
        plt.title('forward Reward per Step')
        plt.legend()
        plt.show()

    def plot_stability_rewards(self):
        plt.figure(figsize=(10, 5))
        plt.plot(self.stability_rewards, label='Stability Reward')
        plt.xlabel('Step')
        plt.ylabel('Stability Reward')
        plt.title('Stability Reward per Step')
        plt.legend()
        plt.show()
        
        
    def plot_energy_rewards(self):
        plt.figure(figsize=(10, 5))
        plt.plot(self.energy_rewards, label='energy Reward')
        plt.xlabel('Step')
        plt.ylabel('energy Reward')
        plt.title('energy Reward per Step')
        plt.legend()
        plt.show()

    def plot_std_rewards(self):
        plt.figure(figsize=(10, 5))
        plt.plot(self.std_rewards_per_episode)
        plt.xlabel('Episode')
        plt.ylabel('Standard Deviation')
        plt.title('Standard Deviation of Total Reward')
        plt.show()

    def plot_action_distribution(self):
        if len(self.actions_per_episode) == 0:
            print("No actions to plot.")
            return

        actions_array = np.array(self.actions_per_episode)

        if actions_array.shape[1] != 12:
            print(f"Expected 12 actions per step, but got {actions_array.shape[1]}.")
            return

        plt.figure(figsize=(15, 10))
        for i in range(12):  # We know there are 12 joints
            plt.subplot(3, 4, i + 1)
            plt.hist(actions_array[:, i], bins=20, alpha=0.7, label=f'Joint {i}',
                     range=self.joint_limits[i])  # Respect the action limits
            plt.xlabel('Action Value')
            plt.ylabel('Frequency')
            plt.title(f'Action Distribution for Joint {i}')
            plt.xlim(self.joint_limits[i])  # Set the x-axis limits to respect the joint limits
        plt.tight_layout()
        plt.show()
        
    def is_done(self, state):
        base_position, base_orientation = p.getBasePositionAndOrientation(self.robot_id)

        # Check if the robot has fallen
        if base_position[2] < 0.12:
            if self.fall_start_time is None:
                self.fall_start_time = time.time()  # Start the timer
                # print("Robot has started falling")
            elif time.time() - self.fall_start_time > 2:  # Wait for 2 seconds before deciding to reset
                # print("Robot has fallen for 2 seconds, resetting...")
                return True
        else:
            self.fall_start_time = None  # Reset the timer if the robot is not fallen

        # Check if the maximum number of steps is reached
        if self.steps == 2500:
            return True

        return False

    def render(self, mode='human'):
        pass

    def close(self):
        p.disconnect()


     

In [3]:
gym.envs.registration.register(id='RoboticDog-v0', entry_point='__main__:RoboticDogEnv')

In [4]:
env = RoboticDogEnv()

Robot ID: 1


In [5]:
# Instantiate the agent
# For more exploration, consider decreasing the learning rate to 1e-4 and increasing entropy to 0.1

#model = PPO('MlpPolicy', env, verbose=1)
# Load the pre-trained model
model_path = "C://Users//youss//OneDrive - aucegypt.edu//Desktop//bolt//bolt_final.zip"
model = PPO.load(model_path, env=env)


Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [None]:
model.learn(total_timesteps=1000000)

In [1]:
print(" good boy bolt")

 good boy bolt


In [None]:
# Save the model with a specific name
model.save("ppo_spot_trial_basic2")

print("Model saved successfully as trial_one.zip.")
