In [1]:
import os
import requests
import time
import boto3
import json
import random
import pandas as pd
from botocore.exceptions import ClientError
import numpy as np

# Environment
import gymnasium as gym
import highway_env
from tqdm import trange

# Register highway environment
gym.register_envs(highway_env)

setup cluade

In [2]:
# Define Boto3 client for Bedrock
client = boto3.client("bedrock-runtime", region_name="us-east-1")
model_id = "anthropic.claude-3-5-sonnet-20240620-v1:0"

# Claude API settings
CLAUDE_API_KEY = os.getenv('CLAUDE_API_KEY')
CLAUDE_API_URL = 'https://api.anthropic.com/v1/complete'


def claude_action(prompt1, assist1, prompt2, model='claude-v1', max_tokens_to_sample=50, temperature=0.7):
    """
    Sends prompts to Claude.ai and retrieves the recommended action.
    """
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {CLAUDE_API_KEY}',
    }

    full_prompt = f"{prompt1}\n\n{assist1}\n\n{prompt2}"
    
    native_request = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1024,
        "temperature": 0,
        "messages": [
            {
                "role": "user",
                "content": [{"type": "text", "text": full_prompt}],
            }
        ],
    }
    request = json.dumps(native_request)
    try:
        response = client.invoke_model(modelId=model_id, body=request)
        model_response = json.loads(response["body"].read())
        response_json = model_response["content"][0]["text"]
        #action_text = response_json.get('completion', '').strip()
        if 'Final decision:' in response_json:
            action = response_json.split('Final decision:')[-1].strip().upper()
            return action
        else:
            print(f"Unexpected response format: {action_text}")
            return 'IDLE'
    except requests.exceptions.RequestException as e:
        print(f"Error communicating with Claude.ai: {e}")
        time.sleep(1)
        return 'IDLE'

action llm map

In [3]:
def map_llm_action_to_label(llm_act):
    """
    Maps the LLM-recommended action string to a numerical label.
    """
    action_map = {
        'LANE_LEFT': 0,
        'IDLE': 1,
        'LANE_RIGHT': 2,
        'FASTER': 3,
        'SLOWER': 4,
    }
    return action_map.get(llm_act.upper(), 1)  # Default to IDLE if unrecognized

setup env

In [4]:
class MyHighwayEnvLLM(gym.Env):
    """
    Custom Gym environment for highway driving with LLM prompts.
    """
    def __init__(self, vehicleCount):
        super(MyHighwayEnvLLM, self).__init__()
        self.vehicleCount = vehicleCount
        self.prev_action = 'FASTER'

        self.config = {
            "observation": {
                "type": "Kinematics",
                "features": ["presence", "x", "y", "vx", "vy"],
                "absolute": True,
                "normalize": False,
                "vehicles_count": vehicleCount,
                "see_behind": True,
            },
            "action": {
                "type": "DiscreteMetaAction",
                "target_speeds": np.linspace(0, 32, 9),
            },
            "duration": 40,
            "vehicles_density": 2,
            "show_trajectories": True,
            "render_agent": True,
        }
        self.env = gym.make("highway-v0",render_mode='rgb_array', config= self.config)
        self.action_space = self.env.action_space
        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(vehicleCount, 5), dtype=np.float32)

    def find_smallest_positive(self, arr):
        smallest_positive = float('inf')
        index = -1
        for i, value in enumerate(arr):
            if 0 < value < smallest_positive:
                smallest_positive = value
                index = i
        return smallest_positive, index

    def prompt_design(self, obs_):
        prompt1 = (
            "You are a smart driving assistant. You, the 'ego' car, are now driving on a highway. "
            "You need to recommend ONLY ONE best action among the following set of actions based on the current scenario: "
            "1. IDLE -- maintain the current speed in the current lane "
            "2. FASTER -- accelerate the ego vehicle "
            "3. SLOWER -- decelerate the ego vehicle "
            "4. LANE_LEFT -- change to the adjacent left lane "
            "5. LANE_RIGHT -- change to the adjacent right lane"
        )
        assist1 = (
            "Understood. Please provide the current scenario or conditions, such as traffic density, speed of surrounding vehicles, "
            "your current speed, and any other relevant information, so I can recommend the best action."
        )

        # Extract and organi0ze vehicle state information from observations
        x, y, vx, vy = obs_[:, 1], obs_[:, 2], obs_[:, 3], obs_[:, 4]
        ego_x, ego_y, ego_vx, ego_vy = x[0], y[0], vx[0], vy[0]
        veh_x, veh_y, veh_vx, veh_vy = x[1:] - ego_x, y[1:] - ego_y, vx[1:], vy[1:]

        lanes = y // 4 + 1
        ego_lane = lanes[]
        veh_lanes = lanes[1:]

        if ego_lane == 1:
            ego_left_lane, ego_right_lane = 'Left lane: Not available\n', f'Right lane: Lane-{ego_lane + 1}\n'
        elif ego_lane == 4:
            ego_left_lane, ego_right_lane = f'Left lane: Lane-{ego_lane - 1}\n', 'Right lane: Not available\n'
        else:
            ego_left_lane, ego_right_lane = f'Left lane: Lane-{ego_lane - 1}\n', f'Right lane: Lane-{ego_lane + 1}\n'

        prompt2 = (
            f"Ego vehicle:\n\tCurrent lane: Lane-{ego_lane}\n\t{ego_left_lane}\t{ego_right_lane}\tCurrent speed: {ego_vx} m/s \n\n"
            "Lane info:\n"
        )
        for i in range(4):
            inds = np.where(veh_lanes == i + 1)[0]
            num_v = len(inds)
            if num_v > 0:
                val, ind = self.find_smallest_positive(veh_x[inds])
                true_ind = inds[ind]
                prompt2 += (
                    f"\tLane-{i + 1}: There are {num_v} vehicle(s) in this lane ahead of ego vehicle, "
                    f"closest being {veh_x[true_ind]} m ahead traveling at {veh_vx[true_ind]} m/s.\n"
                )
            else:
                prompt2 += f"\tLane-{i + 1} No other vehicle ahead of ego vehicle.\n"

        prompt2 += (
            "\nAttention points:\n"
            "\t1. SLOWER has least priority and should be used only when no other action is safe.\n"
            "\t2. DO NOT change lanes frequently.\n"
            "\t3. Safety is priority, but do not forget efficiency.\n"
            "\t4. Your suggested action has to be one from one of the above five listed actions - IDLE, SLOWER, FASTER, LANE_LEFT, LANE_RIGHT. \n"
            f"Your last action was {self.prev_action}. Please recommend action for the current scenario only in this format and DONT propound anything else other than 'Final decision: <final decision>'.\n"
        )

        return prompt1, assist1, prompt2

    def step(self, action):
        """
        Steps the environment with the given action.
        """
        action_dict = {0: 'LANE_LEFT', 1: 'IDLE', 2: 'LANE_RIGHT', 3: 'FASTER', 4: 'SLOWER'}
        obs, dqn_reward, done, truncated, info = self.env.step(action)

        self.prev_action = action_dict.get(action, 'IDLE')
        reward = 1 / (1 + np.exp(-dqn_reward))

        return obs, reward, done, truncated, info

    def reset(self, **kwargs):
        """
        Resets the environment.
        """
        obs = self.env.reset(**kwargs)
        return obs

setup videos

In [5]:
import base64
from pathlib import Path

from gymnasium.wrappers import RecordVideo
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display



def record_videos(env, video_folder="videos"):
    wrapped = RecordVideo(
        env, video_folder=video_folder, episode_trigger=lambda e: True
    )

    # Capture intermediate frames
    env.unwrapped.set_record_video_wrapper(wrapped)

    return wrapped


def show_videos(path="videos"):
    html = []
    for mp4 in Path(path).glob("*.mp4"):
        video_b64 = base64.b64encode(mp4.read_bytes())
        html.append(
            """<video alt="{}" autoplay
                      loop controls style="height: 400px;">
                      <source src="data:video/mp4;base64,{}" type="video/mp4" />
                 </video>""".format(
                mp4, video_b64.decode("ascii")
            )
        )
    ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))

directlyt prompt claude for a response 

In [6]:
def claude_query(env,obs):
    # Generate prompt for LLM
    prompt1, assist1, prompt2 = env.prompt_design(obs)
    ##ask for claude response
    llm_act = claude_action(prompt1, assist1, prompt2, env.prev_action).strip().split('.')[0]
    ##int action
    action = map_llm_action_to_label(llm_act)
    return action

video test

In [7]:
from gymnasium.wrappers import RecordVideo
from stable_baselines3.common.vec_env import DummyVecEnv

##make env
env_llm = MyHighwayEnvLLM(vehicleCount =10)


##video folder path
video_folder = "videos"
model_name = "test_claude"
video_path = f"{video_folder}/{model_name}"

##wrap video
env = RecordVideo(env_llm.env, video_folder=video_path, episode_trigger=lambda ep: True)

for episode in trange(4, desc='Test episodes'):
    (obs, info), done, truncated = env.reset(), False, False
    episode_predictions = []  # Store predictions for the current episode

    while not (done or truncated):
        action = claude_query(env_llm, obs)  # Predict action using the random forest model
        episode_predictions.append(action)  # Save the predicted action

        # Step in the environment
        obs, reward, done, truncated, info = env.step(int(action))

    # Save predictions for this episode to a file
    predictions_dir = "predictions"  # Define the directory path
    if not os.path.exists(predictions_dir):
        os.makedirs(predictions_dir)  # Create the directory if it doesn't exist
    prediction_file = os.path.join(predictions_dir, f"testing_claude_{episode + 1}_predictions.txt")
    with open(prediction_file, 'w') as f:
        for pred_action in episode_predictions:
            f.write(f"{pred_action}\n")  # Write each action to the file

env.close()
show_videos()

2024-10-09 18:13:49.918334: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-10-09 18:13:49.920413: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-10-09 18:13:49.955473: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
  logger.warn(
Test episodes:   0%|          | 0/4 [00:00<?, ?it/s]


NoCredentialsError: Unable to locate credentials