In [1]:
import gymnasium as gym
from stable_baselines3 import DDPG, PPO
from stable_baselines3.common.noise import NormalActionNoise, OrnsteinUhlenbeckActionNoise
import numpy as np
import base64
from PIL import Image
import io
import os
from openai import OpenAI
from openaikey import OPENAI_API_KEY
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from stable_baselines3.common.logger import configure
from custom_race import * 
import imageio
from utils import record_videos, show_videos

  import distutils.spawn


Registering CustomRace-v0


In [2]:
# Create an OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)

#getting render of the environment
def compute_reward(self, achieved_goal: np.ndarray, desired_goal: np.ndarray, info: dict = None, p: float = 0.5) -> float:
       return 0.0

env = gym.make("CustomRace-v0",  reward_func=compute_reward, render_mode="rgb_array")
obs = env.reset()
print(obs)

image_base64_str = env_render_to_base64(env)

# PROMPTS FOR THE CLIENT
env_description = """
Here is an image of a top-down view of a racecar on a race track environment. 
The yellow rectangle represents a car that can be controlled via throttle and steering angle.
We would like the yellow car to follow the track and avoid collisions. 
We are given an action in the form [0.0, 0.0] where the first element is the steering angle and the second element is the throttle.
We are also given a dictionary of:
            "lane_centering_reward": 1/(1+self.config["lane_centering_cost"]*lateral**2),
            "action_reward": np.linalg.norm(action),
            "collision_reward": self.vehicle.crashed,
            "on_road_reward": self.vehicle.on_road,
Please describe 3 behaviors in natural language that would incentivize an agent to efficiently learn drive in this environment desired.
"""

language_to_code_prompt = """
You are an assistant tasked with turning language subgoals into machine readable code. 
You are tasked with designing a reward function to incentivise a car to drive on a racetrack.
You are given an action in the form [0.0, 0.0] where the first element is the steering angle and the second element is the throttle.
You are also given a dictionary of:
            "lane_centering_reward": 1/(1+self.config["lane_centering_cost"]*lateral**2),
            "action_reward": np.linalg.norm(action),
            "collision_reward": self.vehicle.crashed,
            "on_road_reward": self.vehicle.on_road,
Please use all information to define a reward function that incentivizes the behaviors described in the following list.
Please respond with only the code for the reward function which is defined by reward_func(self, action: np.ndarray, obs: {}) -> float:
The reward function should address each point in the following list:
""" 
# Our actions come in the form: [0.0, 0.0] where the first element is the steering angle and the second element is the throttle.
natural_language = language_to_code_prompt + prompt_api_vision(client, env_description, image_base64_str)
print(natural_language)
code = prompt_api_no_vision(client, language_to_code_prompt)
print(code)

(array([[[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]],

       [[0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 1., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
        [0.

In [4]:
def reward_func(self, action: np.ndarray, obs: dict) -> float:
        # 1. Discourage straying too far from the lane center
        lateral_deviation = obs.get('lateral_deviation', 0.0)  # Distance from lane center
        lane_centering_cost = 1 / (1 + self.config["lane_centering_cost"] * lateral_deviation ** 2)

        # 2. Penalize high action values to prevent erratic behavior
        action_magnitude = np.linalg.norm(action)

        # 3. Strong penalty for collisions
        collision_penalty = -500.0 if self.vehicle['crashed'] else 0.0

        # 4. Reward for staying on the road
        on_road_reward = 100.0 if self.vehicle['on_road'] else -100.0

        # Combine all rewards to form final reward
        total_reward = (lane_centering_cost 
                        - action_magnitude * self.config["action_cost"]
                        + collision_penalty 
                        + on_road_reward)
        
        return total_reward

mode = "USE_PPO"
training_steps =100000

env = gym.make("CustomRace-v0",  reward_func=compute_reward, render_mode="rgb_array")
obs = env.reset()
print(obs[1])


# FOR DDPG
if mode == "USE_PPO":
    # set up logger
    tmp_path = "wiley/log" # PATH FOR LOGGING
    new_logger = configure(tmp_path, ["stdout", "csv", "tensorboard"])
    #set noise for ddpg
    n_actions = env.action_space.shape[-1]
    action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))
    # creating model and assign logger
    model = PPO("MlpPolicy", env, verbose=1)
    model.set_logger(new_logger)
    model.learn(total_timesteps=training_steps)
    model.save("wiley/models/PPO_race_ai_reward")

{'speed': 10, 'crashed': False, 'action': array([-0.78005207], dtype=float32), 'rewards': {'lane_centering_reward': 1.0, 'action_reward': 0.78005207, 'collision_reward': False, 'on_road_reward': True}}
Logging to wiley/log
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 522      |
|    ep_rew_mean     | 14.1     |
| time/              |          |
|    fps             | 94       |
|    iterations      | 1        |
|    time_elapsed    | 21       |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 767          |
|    ep_rew_mean          | 14.6         |
| time/                   |              |
|    fps                  | 92           |
|    iterations           | 2            |
|    time_elapsed         | 44           |