In [29]:
import sys
sys.path.append('/Users/z5288866/.venvs/pnd_env/lib/python3.9/site-packages')

In [31]:
# racing_env.py
import numpy as np
import gym
from gym import spaces
import matplotlib.pyplot as plt

class DiscreteRacingEnv(gym.Env):
    def __init__(self, path_file='path.npy', track_radius=0.2, n_sensors=5):
        super().__init__()
        self.centerline = np.load(path_file)
        self.centerline = self.centerline - np.mean(self.centerline, axis=0)
        self.centerline = self.centerline / np.max(np.abs(self.centerline))
        self.track_radius = track_radius
        self.center = np.mean(self.centerline, axis=0)

        # Sensor configuration
        self.sensor_angles = np.linspace(-np.pi/4, np.pi/4, n_sensors)
        self.observation_space = spaces.Box(low=0, high=3.0, shape=(n_sensors,), dtype=np.float32)

        # Discrete action space: 5 steering x 3 throttle = 15 total actions
        self.steering_bins = [-0.5, -0.25, 0.0, 0.25, 0.5]
        self.throttle_bins = [-0.5, 0.0, 0.5]
        self.action_space = spaces.Discrete(len(self.steering_bins) * len(self.throttle_bins))

        self.reset()

    def decode_action(self, action):
        steer_idx = action % len(self.steering_bins)
        throttle_idx = action // len(self.steering_bins)
        return self.steering_bins[steer_idx], self.throttle_bins[throttle_idx]

    def reset(self):
        self.car_pos = np.copy(self.centerline[0])
        self.car_vel = 0.1
        self.car_angle = np.pi / 2  # face upward
        self.done = False
        self.passed_checkpoints = np.zeros(len(self.centerline), dtype=bool)
        return self.get_observation()

    def is_on_track(self, point):
        return np.any(np.linalg.norm(self.centerline - point, axis=1) < self.track_radius)

    def get_observation(self):
        distances = []
        for offset in self.sensor_angles:
            direction = np.array([
                np.cos(self.car_angle + offset),
                np.sin(self.car_angle + offset)
            ])
            for d in np.linspace(0, 3.0, 30):
                probe = self.car_pos + d * direction
                if not self.is_on_track(probe):
                    distances.append(d)
                    break
            else:
                distances.append(3.0)
        return np.array(distances, dtype=np.float32)

    def step(self, action):
        steer, throttle = self.decode_action(action)
        self.car_angle += steer * 0.1
        self.car_vel = np.clip(self.car_vel + throttle * 0.01, 0.05, 0.2)
        forward = np.array([np.cos(self.car_angle), np.sin(self.car_angle)])
        self.car_pos += self.car_vel * forward

        if not self.is_on_track(self.car_pos):
            self.done = True
            return self.get_observation(), -10.0, self.done, {}

        # Checkpoints and direction reward
        for i, point in enumerate(self.centerline):
            if not self.passed_checkpoints[i] and np.linalg.norm(self.car_pos - point) < self.track_radius / 2:
                self.passed_checkpoints[i] = True

        r = self.car_pos - self.center
        cross_z = r[0] * forward[1] - r[1] * forward[0]
        rotation_reward = np.sign(cross_z) * 1.0

        progress = np.sum(self.passed_checkpoints) / len(self.centerline)
        reward = self.car_vel + 2.0 * progress + rotation_reward
        return self.get_observation(), reward, self.done, {}

    def render(self, mode='human'):
        fig, ax = plt.subplots()
        ax.plot(self.centerline[:, 0], self.centerline[:, 1], 'k--')
        for p in self.centerline:
            ax.add_patch(plt.Circle(p, self.track_radius, color='lightgray', alpha=0.3))
        ax.plot(self.car_pos[0], self.car_pos[1], 'ro')
        ax.plot(self.center[0], self.center[1], 'gx')
        plt.axis('equal')
        plt.close(fig)
        return fig

# Training config parameters (can be imported into training script)
TRAINING_CONFIG = {
    "total_timesteps": 200_000,
    "learning_rate": 3e-4,
    "n_steps": 2048,
    "batch_size": 64,
    "n_epochs": 10,
    "gamma": 0.99
}


In [33]:
import gym
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
from racing_env import DiscreteRacingEnv, TRAINING_CONFIG

# Register custom env with Gym
class RacingWrapper(gym.Env):
    def __init__(self):
        self.env = DiscreteRacingEnv()
        self.observation_space = self.env.observation_space
        self.action_space = self.env.action_space

    def reset(self):
        return self.env.reset()

    def step(self, action):
        return self.env.step(action)

    def render(self, mode="human"):
        return self.env.render(mode)

env = RacingWrapper()
check_env(env, warn=True)

model = PPO(
    "MlpPolicy",
    env,
    verbose=1,
    learning_rate=TRAINING_CONFIG["learning_rate"],
    n_steps=TRAINING_CONFIG["n_steps"],
    batch_size=TRAINING_CONFIG["batch_size"],
    n_epochs=TRAINING_CONFIG["n_epochs"],
    gamma=TRAINING_CONFIG["gamma"],
    tensorboard_log="./ppo_racing_tensorboard/"
)

model.learn(total_timesteps=TRAINING_CONFIG["total_timesteps"])
model.save("ppo_racing_agent")


2025-05-31 13:33:34.525976: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


AssertionError: Your environment must inherit from the gymnasium.Env class cf. https://gymnasium.farama.org/api/env/

In [None]:
import imageio
import numpy as np
import matplotlib.pyplot as plt
from stable_baselines3 import PPO
from racing_env import DiscreteRacingEnv

# Load environment and trained model
env = DiscreteRacingEnv()
model = PPO.load("ppo_racing_agent")

# Prepare video writer
writer = imageio.get_writer("racing_demo.mp4", fps=30, codec='libx264')

obs = env.reset()
done = False
while not done:
    action, _ = model.predict(obs)
    obs, reward, done, _ = env.step(action)

    fig = env.render()
    fig.canvas.draw()
    image = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8')
    image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    writer.append_data(image)
    plt.close(fig)

writer.close()
print("✅ Saved racing_demo.mp4")
