In [1]:
from typing import List

from fastapi import FastAPI
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from langchain.prompts import PromptTemplate
from langchain.prompts.chat import ChatPromptTemplate
from langchain.schema import BaseOutputParser
from langserve import add_routes

In [None]:
llm = OpenAI(openai_api_key="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")
chat_model = ChatOpenAI(openai_api_key="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")

In [None]:
class EgoVehicle:
    def __init__(self, speed, lane, road, orientation, position, waypoints, roads_info):
        self.speed = speed  # Current speed of the ego vehicle
        self.lane = lane  # Current lane of the ego vehicle
        self.road = road  # Current road of the ego vehicle
        self.orientation = orientation  # Orientation of the ego vehicle
        self.position = position  # Current position of the ego vehicle
        self.waypoints = waypoints  # Waypoints for navigation
        self.roads_info = roads_info  # Information about the roads at the intersection

    def update_speed(self, new_speed):
        """Updates the speed of the ego vehicle."""
        self.speed = new_speed

    def update_position(self, new_position):
        """Updates the position of the ego vehicle."""
        self.position = new_position

    def update_orientation(self, new_orientation):
        """Updates the orientation of the ego vehicle."""
        self.orientation = new_orientation
    
    def update_lane(self, new_lane):
        """Updates the current lane of the ego vehicle."""
        self.lane = new_lane

    def update_road(self, new_road):
        """Updates the current road of the ego vehicle."""
        self.road = new_road

    def update_waypoints(self, new_waypoints):
        """Updates the waypoints for the ego vehicle."""
        self.waypoints = new_waypoints

    def update_roads_info(self, new_roads_info):
        """Updates the information about the roads at the intersection."""
        self.roads_info = new_roads_info

In [None]:
# Road Data Structure
class Road:
    def __init__(self, road_id, lanes, direction, start_position, start_rotation):
        self.road_id = road_id  # Identifier for the road
        self.lanes = lanes  # List of Lane objects on this road
        self.direction = direction  # Direction of the road
        self.start_position = start_position  # Starting position of the road
        self.start_rotation = start_rotation  # Starting rotation angle of the road

    def add_lane(self, lane):
        """Adds a new lane to the road."""
        self.lanes.append(lane)

    def get_lane_info(self, lane_id):
        """Returns information about a specific lane."""
        return next((lane for lane in self.lanes if lane.lane_id == lane_id), None)

In [None]:
# Lane Data Structure
class Lane:
    def __init__(self, lane_id, direction, start_position, lane_type):
        self.lane_id = lane_id  # Identifier for the lane
        self.direction = direction  # Direction of the lane (relative to the ego vehicle)
        self.start_position = start_position  # Starting position of the lane (relative to the ego vehicle)
        self.lane_type = lane_type  # Type of the lane (e.g., 'left turn only', 'straight ahead only')

    # Method to update lane direction
    def update_direction(self, new_direction):
        """Updates the direction of the lane."""
        self.direction = new_direction

    # Method to update lane start position
    def update_start_position(self, new_start_position):
        """Updates the start position of the lane."""
        self.start_position = new_start_position

    # Method to update lane type
    def update_lane_type(self, new_lane_type):
        """Updates the type of the lane."""
        self.lane_type = new_lane_type

In [None]:
# Traffic Participant Data Structure
class TrafficParticipant:
    def __init__(self, participant_id, position, speed, orientation, type, lane, intention, relative_velocity):
        self.participant_id = participant_id  # Identifier for the participant
        self.position = position  # Current position of the participant
        self.speed = speed  # Current speed of the participant
        self.orientation = orientation  # Orientation of the participant
        self.type = type  # Type (e.g., vehicle, pedestrian)
        self.lane = lane  # Lane in which the participant is located
        self.intention = intention  # Intended action or direction of the participant
        self.relative_velocity = relative_velocity  # Velocity relative to the ego vehicle

    def update_position(self, new_position):
        """Updates the position of the participant."""
        self.position = new_position

    def update_speed(self, new_speed):
        """Updates the speed of the participant."""
        self.speed = new_speed

    def update_orientation(self, new_orientation):
        """Updates the orientation of the participant."""
        self.orientation = new_orientation

    def update_lane(self, new_lane):
        """Updates the lane of the participant."""
        self.lane = new_lane

    def update_intention(self, new_intention):
        """Updates the intention of the participant."""
        self.intention = new_intention

    def update_relative_velocity(self, new_relative_velocity):
        """Updates the relative velocity of the participant."""
        self.relative_velocity = new_relative_velocity

In [None]:
class SimulationEnvironment:
    def __init__(self, ego_vehicle, roads, traffic_participants):
        self.ego_vehicle = ego_vehicle  # Instance of EgoVehicle
        self.roads = roads  # Dictionary or list of Road instances
        self.traffic_participants = traffic_participants  # Dictionary or list of TrafficParticipant instances
        self.decision_tools = {}  # Tools for decision-making (e.g., 'Record Judgments', 'Get Possible Situations')
        self.decision_outcomes = {}  # Store outcomes of decisions

    def update_environment(self):
        """Updates the state of the entire environment."""
        # Update the ego vehicle state
        self.ego_vehicle.update_speed(...)  # Example, update with new data
        self.ego_vehicle.update_position(...)

        # Update the state of each traffic participant
        for participant in self.traffic_participants.values():
            participant.update_position(...)
            participant.update_speed(...)

        # Additional logic for environment update
        self.apply_decision_making_tools()

    def detect_collisions(self):
        """Detects potential collisions in the environment."""
        collision_risks = []

        # Check for potential collision with each traffic participant
        for participant in self.traffic_participants.values():
            if self.is_collision_risk(self.ego_vehicle, participant):
                collision_risks.append(participant.participant_id)

        return collision_risks

    def is_collision_risk(self, ego_vehicle, TrafficParticipant):
        """Determines if there is a collision risk between the ego vehicle and a participant."""
        # Implement collision risk detection logic based on positions, velocities, orientations, etc.
        # Example:
        # Check if the paths of the ego vehicle and the participant intersect
        # Consider their current speeds and directions to predict future positions
        # Return True if a collision is likely, False otherwise

        # Placeholder logic
        return False

    def determine_situations(self):
        """Determines the current situation based on the environment state."""
        # Analyze the current state of the ego vehicle and traffic participants
        # Determine the most appropriate situation category based on the analysis
        # Possible situations might include 'CROSS NORMAL', 'CROSS WAIT', 'CROSS FAST', etc.

        # Example logic:
        # If there is a potential collision risk, consider 'CROSS WAIT'
        # If the path is clear and it's safe to proceed, consider 'CROSS NORMAL'
        # If there is a need to expedite crossing due to traffic conditions, consider 'CROSS FAST'

        # Placeholder logic
        current_situation = "CROSS NORMAL"  # Default situation
        collision_risks = self.detect_collisions()

        if collision_risks:
            current_situation = "CROSS WAIT"
        # Further logic to refine the situation determination

        return current_situation

    def provide_action_guidance(self):
        """Provides action guidance for the ego vehicle based on the current situation."""
        current_situation = self.determine_situations()
        action_guidance = ""

        # Based on the current situation, determine the best course of action
        if current_situation == "CROSS NORMAL":
            # Actions for normal crossing
            action_guidance = "SLIGHTLY LEFT MAINTAIN"  # Example action
        elif current_situation == "CROSS WAIT":
            # Actions when waiting is required
            action_guidance = "IDLE MAINTAIN"  # Example action
        elif current_situation == "CROSS FAST":
            # Actions for fast crossing
            action_guidance = "SLIGHTLY LEFT FASTER"  # Example action

        # Placeholder logic
        return action_guidance

    # Additional methods for managing the environment

In [None]:
# Initialize the simulation environment

ego_vehicle = EgoVehicle(
    speed=8.0,  # Example initial speed
    lane="lane_id",  # Initial lane id
    road="road_id",  # Initial road id
    orientation=0,  # Initial orientation
    position=(0, 0),  # Initial position (x, y)
    waypoints=[],  # Initial waypoints
    roads_info={}  # Road information
)

# Example initialization of roads and lanes
roads = {
    "road_id": Road(
        road_id="road_id",
        lanes=["lane_id1", "lane_id2"],
        direction="NORTH",
        start_position=(0, 0)
    )
    # Add more roads as needed
}

lanes = {
    "lane_id": Lane(
        lane_id="lane_id",
        allowed_directions=["NORTH"],
        start_position=(0, 0),
        lane_type="NORMAL"
    )
    # Add more lanes as needed
}

# Example initialization of traffic participants
traffic_participants = {
    "participant_id": TrafficParticipant(
        participant_id="participant_id",
        position=(10, 10),
        speed=5.0,
        orientation=45,
        type="vehicle",
        lane="lane_id",
        intention="straight",
        relative_velocity=0
    )
    # Add more traffic participants as needed
}

simulation_env = SimulationEnvironment(
    ego_vehicle=ego_vehicle,
    roads=roads,
    traffic_participants=traffic_participants
)