## PPO - Proximal Policy Optimization 
State (Observations): Your LaserScan data (scan_0 to scan_359)

Actions: Continuous cmd_vel values (linear_x and angular_z)

Reward:
  -  Positive: For reaching goals or following a clear path
  -  Negative: For collisions, inefficiencies, or leaving boundaries

Done Signal: Indicates when an episode ends (e.g., collision or reaching the goal)

In [22]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env

In [48]:
class LaserScanEnv(gym.Env):
    def __init__(self):
        super(LaserScanEnv, self).__init__()
        
        # Observation space
        self.observation_space = spaces.Box(low=0, high=10, shape=(360,), dtype=np.float32)
        
        # Action space
        self.action_space = spaces.Box(low=np.array([-1.0, -1.0]), high=np.array([1.0, 1.0]), dtype=np.float32)
        
        # Initial state
        self.state = np.random.uniform(0, 10, size=(360,))
        self.done = False

    def reset(self, seed=None, options=None):
        # Ensure reproducibility by setting the random seed
        if seed is not None:
            np.random.seed(seed)
        
        self.state = np.random.uniform(0, 10, size=(360,)).astype(np.float32)  # Cast to float32
        self.done = False
        info = {}  # Additional info can be added here if needed
        return self.state, info  # Return a tuple (obs, info)

    def step(self, action):
        # Apply action and update state
        self.state = np.random.uniform(0, 10, size=(360,)).astype(np.float32)  # Ensure dtype is float32
        reward = self._calculate_reward(action)
        
        # Split the "done" condition into `terminated` and `truncated`
        terminated = self._check_done()  # E.g., collision or reaching goal
        truncated = False  # For now, we won't handle time limits, so this is always False
        
        info = {}  # Additional information (if any)
        return self.state, reward, terminated, truncated, info


    def _calculate_reward(self, action):
        # reward function
        if self.done:
            return -100 if self.done else 100
        return 1.0 - np.abs(action[0])  # Penalty 
    
    def _check_done(self):
        return np.random.choice([True, False], p=[0.1, 0.9]) 

In [49]:
# Environment
env = LaserScanEnv()

In [51]:
# check_env(env, warn=True)

In [52]:
# Train PPO model
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=100000)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 9.87     |
|    ep_rew_mean     | 3.68     |
| time/              |          |
|    fps             | 3267     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 10.5       |
|    ep_rew_mean          | 4.02       |
| time/                   |            |
|    fps                  | 2211       |
|    iterations           | 2          |
|    time_elapsed         | 1          |
|    total_timesteps      | 4096       |
| train/                  |            |
|    approx_kl            | 0.01470397 |
|    clip_fraction        | 0.0827     |
|    clip_range           | 0.2        |
|    entropy_loss         | -2.8 

<stable_baselines3.ppo.ppo.PPO at 0x7ad998d56f80>

In [53]:
# Save the trained model
model.save("ppo_robot_navigation")

In [60]:
from stable_baselines3.common.env_util import DummyVecEnv
env = DummyVecEnv([lambda: LaserScanEnv()])

# Load the trained model
model = PPO.load("ppo_robot_navigation")

# Test the model
obs = env.reset()
rewards = []
for _ in range(1000):
    action, _states = model.predict(obs, deterministic=True)  # Predict action
    obs, reward, done, info = env.step(action)  # Step through the environment
    rewards.append(reward)  # Collect rewards
    if done.any():  # VecEnv returns `done` as an array
        obs = env.reset()

# Print total rewards and metrics
total_reward = sum(rewards)
print(f"Total Reward: {total_reward}")


Total Reward: [778.79944]


In [62]:
# Track rewards for each episode
episode_rewards = []
current_reward = 0

obs = env.reset()
for _ in range(1000):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, info = env.step(action)
    current_reward += reward
    if done.any():
        episode_rewards.append(current_reward)
        current_reward = 0  # Reset episode reward
        obs = env.reset()

# Calculate metrics
average_reward = sum(episode_rewards) / len(episode_rewards)
print(f"Average Reward per Episode: {average_reward}")


Average Reward per Episode: [8.033576]


In [64]:
success_count = 0
total_episodes = 0

obs = env.reset()
for _ in range(1000):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, info = env.step(action)
    if done.any():
        total_episodes += 1
        # Assume success if reward > threshold (e.g., 50 for reaching a goal)
        if reward > 50:
            success_count += 1
        obs = env.reset()

success_rate = success_count / total_episodes
print(f"Success Rate: {success_rate * 100:.2f}%")


Success Rate: 0.00%


In [67]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import rclpy
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist

class RosLaserScanEnv(gym.Env):
    def __init__(self):
        super(RosLaserScanEnv, self).__init__()

        # Initialize ROS node
        rclpy.init_node("rl_environment", anonymous=True)

        # Publishers and Subscribers
        self.pub_cmd_vel = rclpy.Publisher("/cmd_vel", Twist, queue_size=10)
        self.sub_scan = rclpy.Subscriber("/scan", LaserScan, self.scan_callback)

        # Observation space: LiDAR data (360 values)
        self.observation_space = spaces.Box(low=0, high=10, shape=(360,), dtype=np.float32)

        # Action space: cmd_vel (linear_x, angular_z)
        self.action_space = spaces.Box(low=np.array([-1.0, -1.0]), high=np.array([1.0, 1.0]), dtype=np.float32)

        # Initialize state and done flag
        self.state = np.zeros(360, dtype=np.float32)
        self.done = False

    def scan_callback(self, msg):
        # Update state with LaserScan data
        self.state = np.array(msg.ranges, dtype=np.float32)

    def reset(self, seed=None, options=None):
        # Reset the environment
        self.done = False
        rclpy.sleep(1)  # Allow sensors to stabilize
        return self.state, {}

    def step(self, action):
        # Publish action to /cmd_vel
        cmd_vel = Twist()
        cmd_vel.linear.x = action[0]
        cmd_vel.angular.z = action[1]
        self.pub_cmd_vel.publish(cmd_vel)

        # Calculate reward
        reward = self._calculate_reward(action)

        # Check if episode is done
        self.done = self._check_done()

        return self.state, reward, self.done, False, {}

    def _calculate_reward(self, action):
        # Reward for smooth movement and avoiding collisions
        return 1.0 - abs(action[0]) if not self.done else -100.0

    def _check_done(self):
        # Example condition: stop episode if too close to an obstacle
        return np.min(self.state) < 0.2


In [68]:
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import DummyVecEnv
from stable_baselines3.common.monitor import Monitor

# Wrap the environment for RL training
env = DummyVecEnv([lambda: Monitor(RosLaserScanEnv())])

# Train PPO
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=100000)

# Save the trained model
model.save("ppo_robot_navigation")
#s


AttributeError: module 'rclpy' has no attribute 'init_node'