In [84]:
from typing import Optional
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.logger import configure
from stable_baselines3.common.monitor import Monitor
from sympy.physics.units import velocity


# Define the RL environment
class ColorMatchEnv(gym.Env):
    def __init__(self):
        super(ColorMatchEnv, self).__init__()
        # self.target_color = np.random.rand(3)  # Random target RGB
        # self.current_color = np.random.rand(3)  # Start with random color
        
        self.velocity = 0
        self.difference = 0
    
        # Action space: Increase or decrease R, G, B
        self.action_space = spaces.Discrete(6)  
        self.observation_space = spaces.Box(low=0, high=1, shape=(6,), dtype=np.float32)
    
    
    def step(self, action):
        
        prev_color = self.current_color.copy()
        
        step_size = 0.01
        if action == 0: self.current_color[0] += step_size  # Increase R
        elif action == 1: self.current_color[0] -= step_size  # Decrease R
        elif action == 2: self.current_color[1] += step_size  # Increase G
        elif action == 3: self.current_color[1] -= step_size  # Decrease G
        elif action == 4: self.current_color[2] += step_size  # Increase B
        elif action == 5: self.current_color[2] -= step_size  # Decrease B

        # Clamp values between 0 and 1
        self.current_color = np.clip(self.current_color, 0, 1)
        max_distance = np.linalg.norm(np.array([1, 1, 1]))
        prev_distance = np.linalg.norm(self.target_color - prev_color) / max_distance
        new_distance = np.linalg.norm(self.target_color - self.current_color) / max_distance
 
        if prev_distance < new_distance:
            reward = -0.5
        else:
            reward = pow(1 - new_distance, 2) * 0.5 #max(0.1, 1 / (1 + new_distance)) 
        
        self.difference = prev_distance - new_distance
        
        # Compute reward (negative Euclidean distance)

        self.velocity = self.velocity_reward(self.current_color, self.target_color, self.current_color - prev_color)
        
        # Done if very close to target
        done = np.linalg.norm(self.current_color - self.target_color) < 0.01
        truncated = False
        observation = self._get_obs()
        info = self._get_info()
    
        return observation, reward, done.all(), truncated, info
    
    def direction_reward(self, agent_position, target_position, prev_position):
        distance_to_target = np.abs(target_position - agent_position)
        prev_distance = np.abs(target_position - prev_position)
    
        if distance_to_target < 0.1:
            return 1  # Large reward for reaching the goal
    
        if np.abs(agent_position - prev_position) == 0:
            return 0  # Small penalty for no movement
    
        if distance_to_target > prev_distance:
            return -0.5  # Mild penalty for moving away
        
        #return 1 - np.abs(target_position - agent_position)
        return 1 / (1 + distance_to_target)  # Reward inversely proportional to distance
    
    # def direction_reward(self, agent_position, target_position, prev_position):
    #     if np.abs(agent_position - target_position) < 0.1:
    #         return 10
    #     
    #     if np.abs(agent_position - prev_position) == 0:
    #         #no change
    #         return 0
    #     elif np.abs(target_position - agent_position) > np.abs(target_position - prev_position):
    #         return - np.abs(agent_position - prev_position)
    #     else:
    #         return 1 - np.abs(target_position - agent_position)
        
    def velocity_reward(self, agent_position, target_position, agent_velocity):
        direction_to_target = target_position - agent_position
        direction_to_target /= np.linalg.norm(direction_to_target)  # Normalize direction
        velocity_dot_product = np.dot(agent_velocity, direction_to_target)
        return velocity_dot_product
    
    def smooth_reward(self, agent_position, target_position, agent_velocity, lambda_factor=1):
        # distance = np.linalg.norm(agent_position - target_position)
        distance = 1 - np.linalg.norm(self.target_color - self.current_color)
        distance = pow(distance, 2)
        direction_to_target = target_position - agent_position
        direction_to_target /= np.linalg.norm(direction_to_target)  # Normalize direction
        velocity_dot_product = np.dot(agent_velocity, direction_to_target)
        
        return distance + lambda_factor * velocity_dot_product

    def _get_obs(self):
        return np.concatenate([self.current_color, self.target_color])
    
    def _get_info(self):
        return { "distance" : np.linalg.norm(self.current_color - self.target_color), "velocity" : self.velocity, "difference" : self.difference}
    
    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)
        self.target_color = self.np_random.uniform(0, 1, size=3)  # New target
        self.current_color = self.np_random.uniform(0, 1, size=3)  # Reset color

        observation = self._get_obs()
        info = self._get_info()

        return observation, info

log_dir = "./ppo_logs/"
new_logger = configure(log_dir, ["csv", "tensorboard"])

# Train with PPO
env = ColorMatchEnv()
env = Monitor(env, log_dir)
model = PPO("MlpPolicy", env, verbose=1, clip_range=0.35,  learning_rate=0.00035, ent_coef=0.1, gae_lambda=0.99, batch_size=128*2, vf_coef=0.5)
# model.set_logger(new_logger)
model.learn(total_timesteps=100000)


Using cpu device
Wrapping the env in a DummyVecEnv.
-----------------------------
| time/              |      |
|    fps             | 2164 |
|    iterations      | 1    |
|    time_elapsed    | 0    |
|    total_timesteps | 2048 |
-----------------------------
-----------------------------------------
| time/                   |             |
|    fps                  | 1912        |
|    iterations           | 2           |
|    time_elapsed         | 2           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.053135432 |
|    clip_fraction        | 0.0876      |
|    clip_range           | 0.35        |
|    entropy_loss         | -1.77       |
|    explained_variance   | 0.00296     |
|    learning_rate        | 0.00035     |
|    loss                 | 8.1         |
|    n_updates            | 10          |
|    policy_gradient_loss | -0.0273     |
|    value_loss           | 41.1        |
------------------------

<stable_baselines3.ppo.ppo.PPO at 0x1e638366bd0>

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Load PPO logs
log_file = "./ppo_logs/progress.csv"  # Make sure this path matches your logger output
data = pd.read_csv(log_file)
print(data.columns)

# Plot Episode Rewards
plt.figure(figsize=(12, 5))
plt.plot(data["time/total_timesteps"], data["rollout/ep_rew_mean"], label="Mean Episode Reward", color='b')
plt.xlabel("Timesteps")
plt.ylabel("Mean Reward")
plt.title("PPO Convergence: Episode Rewards Over Time")
plt.legend()
plt.grid()
plt.show()

# Plot Losses (Policy Gradient Loss & Value Loss)
plt.figure(figsize=(12, 5))
plt.plot(data["time/total_timesteps"], data["train/policy_gradient_loss"], label="Policy Gradient Loss", color='r')
plt.plot(data["time/total_timesteps"], data["train/value_loss"], label="Value Loss", color='g')
plt.xlabel("Timesteps")
plt.ylabel("Loss")
plt.title("PPO Losses Over Time")
plt.legend()
plt.grid()
plt.show()

# Plot KL Divergence (to check stability)
plt.figure(figsize=(12, 5))
plt.plot(data["time/total_timesteps"], data["train/approx_kl"], label="Approx KL", color='m')
plt.xlabel("Timesteps")
plt.ylabel("KL Divergence")
plt.title("PPO KL Divergence Over Time")
plt.legend()
plt.grid()
plt.show()


In [85]:

# Test the trained agent
obs, _ = env.reset()
done = False
count = 0
while not done:
    count+=1
    action, _ = model.predict(obs)
    obs, reward, done, _, infos = env.step(action)
    direction = obs[3:] - obs[:3]
    max_distance = np.linalg.norm(np.array([1, 1, 1]))
    distance = np.linalg.norm(direction) / max_distance
    print(f"Distance {round(distance, 2)} Color: {obs[:3]}, Target: {obs[3:]}, {action} Reward: {reward}")
    # print(f"Distance {round(distance, 2)} Reward: {reward}")
    # print(f"Reward: {reward} Velocity: {round(infos['velocity'], 2)} Difference: {round(infos['difference'], 2)}")

print(f"Done in {count} steps")

Distance 0.49 Color: [0.13165493 0.02216735 0.94102723], Target: [0.63434781 0.70429493 0.89713054], 0 Reward: 0.13011505273850696
Distance 0.49 Color: [0.13165493 0.02216735 0.93102723], Target: [0.63434781 0.70429493 0.89713054], 5 Reward: 0.13025014085319278
Distance 0.48 Color: [0.13165493 0.03216735 0.93102723], Target: [0.63434781 0.70429493 0.89713054], 2 Reward: 0.13262495963120474
Distance 0.48 Color: [0.13165493 0.03216735 0.92102723], Target: [0.63434781 0.70429493 0.89713054], 5 Reward: 0.13272729059112073
Distance 0.48 Color: [0.13165493 0.03216735 0.91102723], Target: [0.63434781 0.70429493 0.89713054], 5 Reward: 0.13279425288831317
Distance 0.48 Color: [0.13165493 0.03216735 0.90102723], Target: [0.63434781 0.70429493 0.89713054], 5 Reward: 0.132825791251412
Distance 0.48 Color: [0.14165493 0.03216735 0.90102723], Target: [0.63434781 0.70429493 0.89713054], 0 Reward: 0.13460248616321302
Distance 0.48 Color: [0.14165493 0.03216735 0.89102723], Target: [0.63434781 0.704294

KeyboardInterrupt: 