In [2]:
import asyncio, uuid, time
from typing import Dict, Any, Optional
from pydantic import BaseModel, Field

def now_ms(): 
    return int(time.time() * 1000)

class Msg(BaseModel):
    msg_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    ts_ms: int = Field(default_factory=now_ms)
    type: str
    sender: str
    recipient: str
    payload: Dict[str, Any] = Field(default_factory=dict)
    corr: str = Field(default_factory=lambda: str(uuid.uuid4()))

class InMemoryBus:
    def __init__(self):
        self.queues: Dict[str, asyncio.Queue] = {}

    def queue_for(self, agent_id: str) -> asyncio.Queue:
        if agent_id not in self.queues:
            self.queues[agent_id] = asyncio.Queue()
        return self.queues[agent_id]

    async def send(self, msg: Msg):
        await self.queue_for(msg.recipient).put(msg)
        print(f"[BUS] -> {msg.recipient:12s} | {msg.type:18s} | from={msg.sender:12s} | corr={msg.corr[:8]}")

class BaseAgent:
    def __init__(self, agent_id: str, bus: InMemoryBus):
        self.agent_id = agent_id
        self.bus = bus

    async def handle(self, msg: Msg) -> Optional[Msg]:
        return None

    async def run(self):
        q = self.bus.queue_for(self.agent_id)
        print(f"[{self.agent_id}] RUN LOOP STARTED. Waiting...")
        while True:
            msg = await q.get()  # <-- the agent blocks here until a message arrives
            print(f"[{self.agent_id}] got {msg.type:18s} from={msg.sender:12s} corr={msg.corr[:8]}")
            reply = await self.handle(msg)
            if reply:
                await self.bus.send(reply)


In [3]:
class WeatherAgent(BaseAgent):
    async def handle(self, msg: Msg) -> Optional[Msg]:
        if msg.type == "WEATHER_REQUEST":
            city = msg.payload["city"]
            # toy forecast
            forecast = {"city": city, "day": "Saturday", "temp_c": 17, "rain_chance": 10, "note": "pleasant"}
            return Msg(type="WEATHER_RESPONSE", sender=self.agent_id, recipient=msg.sender, corr=msg.corr,
                       payload={"forecast": forecast})
        return None

class PlacesAgent(BaseAgent):
    async def handle(self, msg: Msg) -> Optional[Msg]:
        if msg.type == "PLACES_REQUEST":
            city = msg.payload["city"]
            preference = msg.payload.get("preference", "balanced")
            weather_note = msg.payload.get("weather_note", "")

            if preference == "cheaper":
                places = [
                    {"name": "India Gate + Rajpath walk", "cost_est": 0, "time": "7:00–9:00"},
                    {"name": "National Museum", "cost_est": 20, "time": "10:00–12:00"},
                    {"name": "Chandni Chowk street food", "cost_est": 250, "time": "13:00–15:00"},
                    {"name": "Lodhi Garden", "cost_est": 0, "time": "16:00–17:30"},
                ]
            else:
                places = [
                    {"name": "Qutub Minar", "cost_est": 40, "time": "9:00–11:00"},
                    {"name": "Khan Market cafe lunch", "cost_est": 600, "time": "12:00–13:30"},
                    {"name": "Humayun's Tomb", "cost_est": 40, "time": "14:30–16:00"},
                    {"name": "Dilli Haat", "cost_est": 100, "time": "17:00–19:00"},
                ]

            return Msg(type="PLACES_RESPONSE", sender=self.agent_id, recipient=msg.sender, corr=msg.corr,
                       payload={"city": city, "weather_note": weather_note, "places": places, "preference": preference})
        return None

class BudgetAgent(BaseAgent):
    async def handle(self, msg: Msg) -> Optional[Msg]:
        if msg.type == "BUDGET_CHECK":
            budget = msg.payload["budget_inr"]
            places = msg.payload["places"]

            # sum only the activity costs (ignore travel/food for demo unless included)
            activity_total = sum(p["cost_est"] for p in places)

            ok = activity_total <= budget
            return Msg(type="BUDGET_RESULT", sender=self.agent_id, recipient=msg.sender, corr=msg.corr,
                       payload={"budget_inr": budget, "activity_total": activity_total, "ok": ok})
        return None

class WriterAgent(BaseAgent):
    async def handle(self, msg: Msg) -> Optional[Msg]:
        if msg.type == "WRITE_ITINERARY":
            city = msg.payload["city"]
            forecast = msg.payload["forecast"]
            places = msg.payload["places"]
            budget = msg.payload["budget_inr"]
            activity_total = msg.payload["activity_total"]

            lines = []
            lines.append(f"1-Day Itinerary: {city}")
            lines.append(f"Weather: {forecast['day']} • {forecast['temp_c']}°C • rain chance {forecast['rain_chance']}% ({forecast['note']})")
            lines.append(f"Budget: ₹{budget} (activity est: ₹{activity_total})")
            lines.append("")
            for i, p in enumerate(places, 1):
                lines.append(f"{i}. {p['time']} — {p['name']} (₹{p['cost_est']})")

            text = "\n".join(lines)

            return Msg(type="FINAL_RESPONSE", sender=self.agent_id, recipient=msg.sender, corr=msg.corr,
                       payload={"text": text})
        return None


In [4]:
class Orchestrator(BaseAgent):
    def __init__(self, agent_id: str, bus: InMemoryBus):
        super().__init__(agent_id, bus)
        self.state: Dict[str, Dict[str, Any]] = {}

    async def handle(self, msg: Msg) -> Optional[Msg]:
        corr = msg.corr
        self.state.setdefault(corr, {})

        if msg.type == "USER_REQUEST":
            city = msg.payload["city"]
            budget = msg.payload["budget_inr"]
            self.state[corr].update({"city": city, "budget": budget, "attempt": 0})

            # Step 1: ask weather
            return Msg(type="WEATHER_REQUEST", sender=self.agent_id, recipient="weather", corr=corr,
                       payload={"city": city})

        if msg.type == "WEATHER_RESPONSE":
            self.state[corr]["forecast"] = msg.payload["forecast"]

            # Step 2: ask places (first attempt: balanced)
            self.state[corr]["attempt"] += 1
            return Msg(type="PLACES_REQUEST", sender=self.agent_id, recipient="places", corr=corr,
                       payload={
                           "city": self.state[corr]["city"],
                           "weather_note": self.state[corr]["forecast"]["note"],
                           "preference": "balanced"
                       })

        if msg.type == "PLACES_RESPONSE":
            self.state[corr]["places_list"] = msg.payload["places"]

            # Step 3: check budget
            return Msg(type="BUDGET_CHECK", sender=self.agent_id, recipient="budget", corr=corr,
                       payload={"budget_inr": self.state[corr]["budget"], "places": self.state[corr]["places_list"]})

        if msg.type == "BUDGET_RESULT":
            ok = msg.payload["ok"]
            self.state[corr]["activity_total"] = msg.payload["activity_total"]

            if not ok and self.state[corr]["attempt"] < 2:
                # Retry: ask for cheaper places
                self.state[corr]["attempt"] += 1
                return Msg(type="PLACES_REQUEST", sender=self.agent_id, recipient="places", corr=corr,
                           payload={
                               "city": self.state[corr]["city"],
                               "weather_note": self.state[corr]["forecast"]["note"],
                               "preference": "cheaper"
                           })

            # Step 4: ask writer to format final response
            return Msg(type="WRITE_ITINERARY", sender=self.agent_id, recipient="writer", corr=corr,
                       payload={
                           "city": self.state[corr]["city"],
                           "forecast": self.state[corr]["forecast"],
                           "places": self.state[corr]["places_list"],
                           "budget_inr": self.state[corr]["budget"],
                           "activity_total": self.state[corr]["activity_total"],
                       })

        if msg.type == "FINAL_RESPONSE":
            # Orchestrator forwards to client (so client always receives from orch)
            return Msg(type="CLIENT_FINAL", sender=self.agent_id, recipient="client", corr=corr,
                       payload={"text": msg.payload["text"]})

        return None


In [5]:
async def demo_trip_chain():
    bus = InMemoryBus()

    agents = [
        Orchestrator("orch", bus),
        WeatherAgent("weather", bus),
        PlacesAgent("places", bus),
        BudgetAgent("budget", bus),
        WriterAgent("writer", bus),
    ]

    running = [asyncio.create_task(a.run()) for a in agents]

    corr = str(uuid.uuid4())
    await bus.send(Msg(
        type="USER_REQUEST",
        sender="client",
        recipient="orch",
        corr=corr,
        payload={"city": "Delhi", "budget_inr": 2000}
    ))

    # Client waits for final response
    final = await bus.queue_for("client").get()
    print("\n[CLIENT] got response:\n")
    print(final.payload["text"])

    for t in running:
        t.cancel()



In [6]:
await demo_trip_chain()

[BUS] -> orch         | USER_REQUEST       | from=client       | corr=cabac885
[orch] RUN LOOP STARTED. Waiting...
[orch] got USER_REQUEST       from=client       corr=cabac885
[BUS] -> weather      | WEATHER_REQUEST    | from=orch         | corr=cabac885
[weather] RUN LOOP STARTED. Waiting...
[weather] got WEATHER_REQUEST    from=orch         corr=cabac885
[BUS] -> orch         | WEATHER_RESPONSE   | from=weather      | corr=cabac885
[places] RUN LOOP STARTED. Waiting...
[budget] RUN LOOP STARTED. Waiting...
[writer] RUN LOOP STARTED. Waiting...
[orch] got WEATHER_RESPONSE   from=weather      corr=cabac885
[BUS] -> places       | PLACES_REQUEST     | from=orch         | corr=cabac885
[places] got PLACES_REQUEST     from=orch         corr=cabac885
[BUS] -> orch         | PLACES_RESPONSE    | from=places       | corr=cabac885
[orch] got PLACES_RESPONSE    from=places       corr=cabac885
[BUS] -> budget       | BUDGET_CHECK       | from=orch         | corr=cabac885
[budget] got BUDGET_CH