## Import libraries

In [None]:
import sys
import threading
from pathlib import Path
sys.path.append(str(Path().resolve().parent))
from simworld.config import Config
from simworld.communicator.communicator import Communicator
from simworld.communicator.unrealcv import UnrealCV
from simworld.traffic.controller.traffic_controller import TrafficController
from simworld.traffic.base.traffic_signal import TrafficSignalState
from simworld.llm.a2a_llm import A2ALLM
from simworld.map.map import Map
from simworld.agent.humanoid import Humanoid
from simworld.utils.vector import Vector
from simworld.local_planner.local_planner import LocalPlanner

## Init

In [None]:
config = Config()
communicator = Communicator(UnrealCV())

## World Generation

In [None]:
communicator.generate_world('E:\Projects\SimWorld\output_test\progen_world.json', config['citygen.ue_asset_path'], run_time=True)

## Traffic Simulation

In [None]:
traffic_controller = TrafficController(config, 15, 15, "E:/Projects/SimWorld/output_test/roads.json", 1, 0.1)
traffic_controller.init_communicator(communicator)

In [5]:
map = Map(config, traffic_signals=traffic_controller.traffic_signals)
humanoid = Humanoid(position=Vector(1700, 1700), direction=Vector(1, 0), map=map, communicator=communicator, config=config)

In [6]:
communicator.spawn_agent(humanoid, config['user.model_path'])

In [None]:
traffic_controller.spawn_objects_in_unreal_engine()
communicator.spawn_ue_manager(config['simworld.ue_manager_path'])

In [8]:
def update_states():
        vehicle_ids = [vehicle.id for vehicle in traffic_controller.vehicles]
        pedestrian_ids = [pedestrian.id for pedestrian in traffic_controller.pedestrians]
        traffic_signal_ids = [signal.id for signal in traffic_controller.traffic_signals]
        humanoid_ids = [humanoid.id]
        result = traffic_controller.communicator.get_position_and_direction(vehicle_ids, pedestrian_ids, traffic_signal_ids, humanoid_ids)
        for (type, object_id), values in result.items():
            if type == 'vehicle':
                position, direction = values
                traffic_controller.vehicles[object_id].position = position
                traffic_controller.vehicles[object_id].direction = direction
            elif type == 'pedestrian':
                position, direction = values
                traffic_controller.pedestrians[object_id].position = position
                traffic_controller.pedestrians[object_id].direction = direction
            elif type == 'traffic_signal':
                is_vehicle_green, is_pedestrian_walk, left_time = values
                for signal in traffic_controller.traffic_signals:
                    if signal.id == object_id:
                        if is_vehicle_green:
                            signal.set_state((TrafficSignalState.VEHICLE_GREEN, TrafficSignalState.PEDESTRIAN_RED))
                        elif is_pedestrian_walk:
                            signal.set_state((TrafficSignalState.VEHICLE_RED, TrafficSignalState.PEDESTRIAN_GREEN))
                        else:
                            signal.set_state((TrafficSignalState.VEHICLE_RED, TrafficSignalState.PEDESTRIAN_RED))
                        signal.set_left_time(left_time)
                        break
            elif type == 'humanoid':
                pos, dir = values
                humanoid.position = pos
                humanoid.direction = dir

In [9]:
exit_event = threading.Event()

In [None]:
t = threading.Thread(target=traffic_controller.simulation, args=(update_states, exit_event))
t.start()

## Action Planner

In [None]:
import os
os.environ['OPENAI_API_KEY'] = '<your_openai_api_key>'

In [None]:
llm = A2ALLM(model_name='gpt-4o', provider='openai')
map.initialize_map_from_file(roads_file='E:/Projects/SimWorld/output_test/roads.json')  # use default map

In [None]:
action_planner = LocalPlanner(agent=humanoid, model=llm, rule_based=False)

In [14]:
communicator.spawn_object('GEN_BP_Box_1_C', '/Game/InteractableAsset/Box/BP_Interactable_Box.BP_Interactable_Box_C', (1700, -1700, 20), (0, 0, 0))

## Agent

In [None]:
class Agent:
    def __init__(self, action_planner, goal):
        self.action_planner = action_planner
        self.goal = goal

    def action(self, obs):
        action = self.action_planner.parse(self.goal)
        return action

In [None]:
class Environment:
    def __init__(self, communicator, traffic_controller, humanoid, action_planner):
        self.communicator = communicator
        self.traffic_controller = traffic_controller
        self.humanoid = humanoid
        self.action_planner = action_planner

    def reset(self):
        """Clear the UE scene and (re)spawn the humanoid and target."""
        # Clear spawned objects
        self.communicator.clear_env()

        # Blueprint path for the humanoid agent to spawn in the UE level
        agent_bp = "/Game/TrafficSystem/Pedestrian/Base_User_Agent.Base_User_Agent_C"

        # Initial spawn position and facing direction for the humanoid (2D)
        spawn_location = Vector(0, 0)
        spawn_forward = Vector(0, 1)
        self.agent = Humanoid(spawn_location, spawn_forward)

        # Spawn the humanoid agent in the Unreal world
        # NOTE: name is ignored for humanoid type, but required by the API.
        self.communicator.spawn_agent(self.agent, name=None, model_path=agent_bp, type="humanoid")

        # Cache the generated UE actor name
        self.agent_name = self.communicator.get_humanoid_name(self.agent.id)

        # Define a target position the agent is encouraged to move toward (example value)
        self.target = Vector(1000, 0)

        # Return initial observation (optional, but RL-style)
        observation = self.communicator.get_camera_observation(self.agent.camera_id, "lit")
        return observation

    def step(self, action):
        """Move the humanoid forward a bit and compute reward."""
        # Parse the action text and map it to the action space
        self.action_planner.execute(action)

        # Get current location from UE (x, y, z) and convert to 2D Vector
        loc_3d = self.communicator.unrealcv.get_location(self.agent_name)
        # loc_3d is a numpy array; explicitly use x, y to build our 2D Vector
        location = Vector(loc_3d[0], loc_3d[1])

        # Camera observation for RL
        observation = self.communicator.get_camera_observation(self.agent.camera_id, "lit")

        # Reward: negative Euclidean distance in 2D plane
        reward = -location.distance(self.target)

        return observation, reward

## Gym-like Interface

In [None]:
agent = Agent(action_planner, goal='Go to (1700, -1700) and pick up GEN_BP_Box_1_C.')
env = Environment(communicator, traffic_controller, humanoid, action_planner)
obs = env.reset()

# Roll out a short trajectory
for _ in range(100):
    action = agent.action(obs)
    obs, reward = env.step(action)
    # Plug this into your RL loop / logging as needed

In [None]:
plan = 'Go to (1700, -1700) and pick up GEN_BP_Box_1_C.'
action = action_planner.parse(plan)

In [None]:
### Test executor
action_planner.execute(action)

In [None]:
exit_event.set()
t.join()

In [3]:
communicator.disconnect()