# Agent: Control (Phase 4)

This notebook subscribes to weather and people positions, then publishes person movement commands.

In [None]:
# Cell purpose: import helpers and initialize static shops + MQTT config.
from __future__ import annotations

import json
import time
from dataclasses import dataclass

from simulated_city.config import load_config
from simulated_city.mqtt import MqttConnector, MqttPublisher
from simulated_city.routing import (
    Shop,
    build_move_command,
    select_nearest_shop,
    should_enter_random_walk_mode,
    should_enter_shelter_mode,
)

config = load_config()
simulation_cfg = config.simulation
if simulation_cfg is None:
    raise ValueError("config.yaml must include a 'simulation' section for this notebook")

MIN_X, MAX_X = simulation_cfg.map.min_x, simulation_cfg.map.max_x
MIN_Y, MAX_Y = simulation_cfg.map.min_y, simulation_cfg.map.max_y
shops = [
    Shop(shop_id="shop_1", name="Central Coffee", x=MIN_X + (MAX_X - MIN_X) * 0.25, y=MIN_Y + (MAX_Y - MIN_Y) * 0.25),
    Shop(shop_id="shop_2", name="Riverside Beans", x=MIN_X + (MAX_X - MIN_X) * 0.75, y=MIN_Y + (MAX_Y - MIN_Y) * 0.75),
]

WEATHER_STATE_TOPIC = "city/weather/state"
PEOPLE_POSITION_WILDCARD = "city/people/+/position"

print(f"Control agent broker: {config.mqtt.host}:{config.mqtt.port}")
print(f"Weather subscription: {WEATHER_STATE_TOPIC}")
print(f"Position subscription: {PEOPLE_POSITION_WILDCARD}")

In [None]:
# Cell purpose: define a control agent class that subscribes and publishes routing commands.
@dataclass(slots=True)
class PersonObservation:
    person_id: str
    x: float
    y: float


class ControlAgent:
    def __init__(self, *, shops: list[Shop]):
        self.shops = shops
        self.latest_weather = "sunny"
        self.person_observations: dict[str, PersonObservation] = {}
        self.last_commands: dict[str, dict[str, object]] = {}
        self.tick = 0

        self.connector = MqttConnector(config.mqtt, client_id_suffix="control")
        self.publisher = MqttPublisher(self.connector)
        self.connector.client.on_message = self._on_message

    def connect(self) -> None:
        self.connector.connect()
        if not self.connector.wait_for_connection(timeout=10.0):
            raise TimeoutError("Control agent could not connect to MQTT within timeout")
        self.connector.client.subscribe(WEATHER_STATE_TOPIC, qos=1)
        self.connector.client.subscribe(PEOPLE_POSITION_WILDCARD, qos=0)

    def disconnect(self) -> None:
        self.connector.disconnect()

    def _on_message(self, client, userdata, msg) -> None:
        try:
            payload = json.loads(msg.payload.decode("utf-8"))
        except json.JSONDecodeError:
            return

        if msg.topic == WEATHER_STATE_TOPIC:
            weather_state = str(payload.get("weather_state", "")).strip().lower()
            if weather_state in {"sunny", "rain"}:
                self.latest_weather = weather_state
            return

        if msg.topic.startswith("city/people/") and msg.topic.endswith("/position"):
            person_id = str(payload.get("person_id") or msg.topic.split("/")[2])
            x = float(payload.get("x", 0.0))
            y = float(payload.get("y", 0.0))
            self.person_observations[person_id] = PersonObservation(person_id=person_id, x=x, y=y)

    def _publish_command_if_needed(self, observation: PersonObservation) -> None:
        command_topic = f"city/people/{observation.person_id}/command"
        previous = self.last_commands.get(observation.person_id)

        if should_enter_shelter_mode(self.latest_weather, str(previous.get("mode", "random_walk") if previous else "random_walk")):
            nearest_shop = select_nearest_shop(person_x=observation.x, person_y=observation.y, shops=self.shops)
            payload = build_move_command(
                person_id=observation.person_id,
                mode="move_to_shop",
                tick=self.tick,
                target_shop=nearest_shop,
            )
            if previous is None or previous.get("mode") != "move_to_shop" or previous.get("shop_id") != nearest_shop.shop_id:
                self.publisher.publish_json(command_topic, json.dumps(payload), qos=1, retain=False)
                self.last_commands[observation.person_id] = {"mode": "move_to_shop", "shop_id": nearest_shop.shop_id}

        elif should_enter_random_walk_mode(self.latest_weather, str(previous.get("mode", "move_to_shop") if previous else "move_to_shop")):
            payload = build_move_command(
                person_id=observation.person_id,
                mode="random_walk",
                tick=self.tick,
            )
            self.publisher.publish_json(command_topic, json.dumps(payload), qos=1, retain=False)
            self.last_commands[observation.person_id] = {"mode": "random_walk", "shop_id": None}

    def run(self, *, total_ticks: int, tick_sleep_s: float) -> None:
        for _ in range(total_ticks):
            self.tick += 1
            for observation in list(self.person_observations.values()):
                self._publish_command_if_needed(observation)
            if self.person_observations:
                print(
                    f"tick={self.tick} weather={self.latest_weather} observed_people={len(self.person_observations)} commands={len(self.last_commands)}"
                )
            time.sleep(tick_sleep_s)

In [None]:
# Cell purpose: start control loop and publish movement commands based on weather + observed positions.
CONTROL_TICK_SECONDS = simulation_cfg.movement.tick_s
CONTROL_TOTAL_TICKS = simulation_cfg.movement.total_ticks

agent = ControlAgent(shops=shops)
agent.connect()
print("Control agent connected. Waiting for people positions and weather updates...")

try:
    agent.run(total_ticks=CONTROL_TOTAL_TICKS, tick_sleep_s=CONTROL_TICK_SECONDS)
finally:
    agent.disconnect()
    print("Control agent disconnected from MQTT.")