---

### PART 1: BUILDING THE TRANSPORT QUERY AGENT
----


#### Singapore Transport Query Agent

This is an AI assistant that answers your questions about Singapore's public transport system. Think of it as your personal transport guide that can tell you when the next bus is coming, whether the MRT is busy, or what's happening with traffic right now. 

> Go to [readme.md](readme.md) for more information.




### Installation

First, we'll install the libraries the agent needs to run.


In [1]:
%pip install langgraph langchain-openai python-dotenv pytz pydantic requests --quiet

print("Installation complete")


[0mNote: you may need to restart the kernel to use updated packages.
Installation complete


### Imports

Now we'll load all the Python libraries we need. These handle things like making API calls, working with JSON data and structured responses using pydantic, managing time zones, and running our workflow using langgraph.


In [2]:

import json, os, requests, re, time
from datetime import datetime
from typing import TypedDict, List, Dict, Any, Optional
from dotenv import load_dotenv
import pytz
from pydantic import BaseModel
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

print("Imports complete")

Imports complete


#### Setting Up API Keys

The agent needs two API keys to work: one from Singapore's LTA DataMall (for transport data) and one from OpenAI (for the llm). I load these from a `.env` file so they're not displayed in the code.


In [3]:
load_dotenv()
LTA_API_KEY = os.getenv('data_mall_sec_key')
OPENAI_API_KEY = os.getenv('openai_api_key')
if not OPENAI_API_KEY: raise ValueError("OPENAI_API_KEY not set")
if not LTA_API_KEY: raise ValueError("LTA_API_KEY not found")

BASE_URL = "https://datamall2.mytransport.sg/ltaodataservice"
HEADERS = {"AccountKey": LTA_API_KEY, "accept": "application/json"}
BUS_STOPS_CACHE = None

print("Environment keys loaded")


Environment keys loaded


### Helper Functions

These are utility functions that the agent uses throughout its workflow. They handle repetitive tasks so the main logic stays clean.

**What each helper does:**
- **get_singapore_time_context()**: Figure out the current time in Singapore, check if it's peak hour, and note whether it's a weekday or weekend
- **calculate_minutes_until()**: Convert an API timestamp (like "2024-02-14T14:30:00+08:00") into "how many minutes until arrival?" for buses
- **format_api_time()**: Clean up messy timestamps from APIs and turn them into readable times (like "2:30 PM")
- **extract_entities()**: Find key information in the user's question (bus stop codes, service numbers, train line names) without needing to ask
- **fetch_all_paginated_data()**: Handle the LTA APIs' pagination—they return 500 records at a time, so this loops through all pages to get the complete list

These helpers make the rest of the code simpler and more readable.


In [4]:
# Helpers
def get_singapore_time_context():
    sg_tz = pytz.timezone('Asia/Singapore'); now = datetime.now(sg_tz); h, w = now.hour, now.weekday()
    is_peak = (7 <= h < 10 or 17 <= h < 20) and w < 5; is_weekend = w >= 5 or (w == 4 and h >= 17)
    return {"timestamp": now.strftime("%Y-%m-%d %H:%M:%S"), "time_display": now.strftime("%I:%M %p"),
            "day_of_week": now.strftime("%A"), "hour": h, "is_peak_hour": is_peak, "is_weekend": is_weekend,
            "day_type": "weekend" if is_weekend else "weekday"}

def calculate_minutes_until(eta_str):
    try:
        sg_tz = pytz.timezone('Asia/Singapore')
        eta = datetime.fromisoformat(eta_str.replace('+08:00', ''))
        delta_secs = (sg_tz.localize(eta) - datetime.now(sg_tz)).total_seconds()
        if delta_secs <= 0:
            return 0
        return int((delta_secs + 59) // 60)
    except (ValueError, TypeError, AttributeError):
        return 0

def format_api_time(value):
    if not value:
        return "Unknown"
    try:
        text = str(value).strip()
        if 'T' in text:  # ISO datetime from APIs
            dt = datetime.fromisoformat(text.split('+')[0])
            return dt.strftime("%I:%M %p")
        return text
    except:
        return str(value)

def extract_entities(query):
    q_lower = query.lower()
    import re
    bus_code_match = re.search(r'\b\d{5}\b', q_lower)
    service_match = re.search(r'\bbus\s+(\d+\w?)\b|\bservice\s+(\d+\w?)\b|\b(\d+\w?)\s+bus\b', q_lower)
    line_match = re.search(r'(ewl|nsl|nel|ccl|dtl|cgl|te|de|cr|lr)\b', q_lower)
    road_match = re.findall(r'\b(pie|cte|aye|ecp|tpe|sle|kr|kkh|bbr|bkc|mcr|ner|kpe|pte|map|pbe)\b', q_lower)
    station_keywords = ['station', 'mrt', 'lrt', 'interchange']
    has_station = any(kw in q_lower for kw in station_keywords)
    time_match = re.search(r'(\d{1,2}):?(\d{2})?\s?(am|pm)', q_lower)
    return {
        "bus_stop_code": bus_code_match.group(0) if bus_code_match else None,
        "service_number": (service_match.group(1) or service_match.group(2) or service_match.group(3)).upper() if service_match else None,
        "train_line": line_match.group(0).upper() if line_match else None,
        "roads": [r.upper() for r in road_match] if road_match else [],
        "has_station_keyword": has_station,
        "mentioned_time": time_match.group(0) if time_match else None,
    }

def fetch_all_paginated_data(endpoint, max_records=5000):
    all_data, skip = [], 0
    while skip < max_records:
        try:
            r = requests.get(f"{BASE_URL}/{endpoint}", headers=HEADERS, params={"\$skip": skip, "\$top": 500}, timeout=10)
            if r.status_code != 200:
                print(f"⚠️ Pagination error ({endpoint}, skip={skip}): {r.status_code}")
                break
            vals = r.json().get('value', [])
            if not vals: break
            all_data.extend(vals); skip += 500
            if len(vals) < 500: break
        except Exception as e:
            print(f"⚠️ Pagination error ({endpoint}, skip={skip}): {e}")
            break
    return all_data

print("Helpers ready")


Helpers ready


### Caching & Station Mapping

I added two smart features to make the agent faster and more reliable:

**1. Bus Stop Cache**
Bus stop information (like "Bugis Station is at stop code 42009") never changes. So instead of asking the API every single time a user queries, I fetch all ~6000 bus stops once and keep them in memory.
- First time setup: Load all bus stops from the API and store them in a list
- Every query after that: Instant lookups from memory (no API calls, no waiting)
- Result: 100x faster responses and way fewer API calls to the LTA



In [5]:
# Fetch and cache all bus stops instantly
BUS_STOPS_CACHE = None
ALL_BUS_STOPS = fetch_all_paginated_data("BusStops", 6000)

# Helper function to search for stops by name
def search_bus_stops_by_name(query, all_stops):
    """Find bus stops by user's description (e.g., 'Orchard Road', 'Tampines Mall')"""
    q_lower = query.lower()
    matches_scored = []
    
    # Normalize common abbreviations in the query
    abbrevs = {'station':'stn','street':'st','road':'rd','interchange':'int','opposite':'opp'}
    q_norm = q_lower
    for full, abbr in abbrevs.items():
        q_norm = q_norm.replace(full, abbr)
    
    # Extract meaningful keywords
    kws = [w for w in q_norm.split() if len(w) > 2 and w not in ['from','to','at','the','bus','mrt']]
    
    # Score each stop for how well it matches
    for stop in all_stops:
        desc = stop.get('Description', '').lower()
        road = stop.get('RoadName', '').lower()
        combined = f"{road} {desc}"
        
        if q_norm in desc:
            score = 100  # Exact match in description
        elif q_norm in combined:
            score = 90   # Match in road + description
        elif kws and all(k in combined for k in kws):
            score = 75   # All keywords found
        else:
            score = 0
        
        if score > 0:
            matches_scored.append((stop, score))
    
    # Return top 5 matches, sorted by score
    matches_scored.sort(key=lambda x: x[1], reverse=True)
    matches = [s for s, _ in matches_scored[:5]]
    return matches


**2. Station to Line Mapping**
Users say things like "Bugis Station" or "Dhoby Ghaut", not "DTL" or "NSL". So I created a simple dictionary that maps human-readable station names to their train lines.
- Lookup a station name (e.g., "Bugis") in the map
- Instantly get the train lines that serve it (e.g., ["EWL", "DTL"])
- Result: The agent knows exactly which train lines to query, no follow-up questions needed


In [6]:
# Station→Line mapping (all major MRT/LRT stations)
STATION_LINE_MAP = {
    'changi airport': ['CGL'], 'bugis': ['EWL','DTL'], 'dhoby ghaut': ['NSL','NEL','CCL'], 'raffles place': ['EWL','NSL'],
    'city hall': ['EWL','NSL'], 'outram park': ['EWL','NEL'], 'tiong bahru': ['EWL'], 'clementi': ['EWL'],
    'jurong east': ['EWL','NSL'], 'yew tee': ['NSL'], 'kranji': ['NSL'], 'woodlands': ['NSL'], 'yishun': ['NSL'],
    'serangoon': ['NEL','CCL'], 'geylang': ['EWL','NEL'], 'tanah merah': ['EWL'], 'pasir ris': ['EWL'], 'tampines': ['EWL','DTL'],
    'bedok': ['EWL'], 'kembangan': ['EWL'], 'eunos': ['EWL'], 'paya lebar': ['EWL','CCL'], 'macpherson': ['NEL'],
    'potong pasir': ['NEL'], 'boon keng': ['NEL'], 'kallang': ['NEL'], 'aljunied': ['NEL'], 'matai': ['NEL'],
    'tai seng': ['NEL'], 'bartley': ['NEL'], 'serangoon': ['NEL','CCL'], 'caldecott': ['CCL','CGL'], 'botanic gardens': ['CCL','DTL'],
    'bishan': ['NSL','CCL'], 'buona vista': ['EWL','CCL'], 'farrer road': ['CCL'], 'holland village': ['CCL'],
    'orchard': ['NSL'], 'newton': ['NSL','DTL'], 'lavender': ['NEL'], 'mountain view': ['DTL'], 'expo': ['DTL'],
    'simei': ['DTL'], 'simei': ['DTL'], 'bayfront': ['CCL','DTL'], 'marina bay': ['CCL','NEL'], 'harbourfront': ['NEL','CCL'],
    'sentosa': ['CGL'], 'khatib': ['CGL'], 'canberra': ['CGL'], 'tanah merah': ['CGL','EWL'], 'jumbo': ['LRT1'],
    'bukit batok': ['NSL'], 'bukit gombak': ['NSL'], 'choa chu kang': ['NSL'], 'ang mo kio': ['NSL','DTL'],
    'fraser': ['NSL'], 'admiralty': ['NSL','CGL'], 'sembawang': ['NSL'], 'chua chu': ['DTL'], 'tanjong pagar': ['EWL'],
}



#### Structured Pydantic Schema

I use two key data structures to organize everything:

1. IntentOutput 
- This tells the agent which APIs to call based on your question
- For example: "You asked about buses, so call the Bus Arrival API"

 2. AgentState 
- This is like the agent's memory, it keeps track of everything:
  - Your question
  - What the agent understood from your question
  - Current time and whether it's peak hour
  - All the data it fetched from APIs (buses, trains, traffic, etc.)
  - Any errors that happened
  - The final answer to give you

This design keeps everything organized and makes it easy to debug if something goes wrong.


In [7]:
# Models
class IntentOutput(BaseModel):
    apis_needed: List[str]
    needs_stop_search: bool = False
    search_term: Optional[str] = None

class AgentState(TypedDict):
    query: str; time_context: Dict[str, Any]; intent: Dict[str, Any]
    bus_arrival_data: Optional[Dict]; bus_stops_data: Optional[List]; bus_routes_data: Optional[List]
    train_alerts_data: Optional[Dict]; crowd_density_data: Optional[List]
    traffic_incidents_data: Optional[List]; traffic_speed_data: Optional[List]
    llm_formatted_data: Optional[Dict]; api_errors: List[str]; final_response: str



#### LLM Setup: Choosing Stable & Consistent Responses

For this agent, I need the AI model to be predictable and reliable, not creative. Here's why:

**Model Choice: gpt-5-mini**
- Fast and cost-effective for a routing/decision task
- Powerful enough to understand transport questions in context
- Good at structured outputs (returning specific fields instead of rambling)

**Configuration Choices**
- **Low temperature (0.4)**: Makes the AI pick the most likely answer every time. High temperature would make it unpredictable ("sometimes use this API, sometimes that"). Low temperature ensures consistent behavior.
- **Structured output (IntentOutput schema)**: Instead of letting the LLM write free text like "You should probably call the bus API", I force it to return a structured object with specific fields like `apis_needed: ['bus_arrival', 'bus_stops']`. This is clearer for the code to work with.



In [8]:
llm = ChatOpenAI(model="gpt-5-mini", api_key=OPENAI_API_KEY, temperature=0.4)
structured_llm = llm.with_structured_output(IntentOutput)


#### Cleaning Messy API Responses

Raw data from APIs is messy.It has way more information than we need, and different APIs return it in different structures. Before the LLM writes the final answer, we need to clean this up.

**The Problem with Raw API Data**
- **Bus Arrival API**: Returns 20+ fields per bus (like boarding type, vehicle type, etc.), but we only care about: which bus number, when it arrives, and how crowded it is
- **Train Service API**: Has nested status codes and arrays of messages that need parsing
- **Traffic API**: Mixes traffic speed data with incident reports in the same response, they're related but separate concerns

**What the Formatting Layer Does**
Each formatter function (like `format_bus_arrival()`, `format_train_alerts()`, etc.) takes the raw API response and extracts ONLY the useful fields, then presents them in a clean, consistent summary format.

**Why This Matters**
- Keeps the code cleaner: The LLM doesn't have to parse messy data
- Makes answers better: With clean data, the LLM can focus on writing naturally instead of hunting for important fields
- Makes debugging easier: If something's wrong, it's clear whether it's an API issue or a formatting issue


In [9]:
# Data Formatters
def format_bus_arrival(raw, stops):
    if not raw or "Services" not in raw: return {"stop": None, "services": []}
    load = {"SEA":"Seats Available","SDA":"Standing Available","LSD":"Limited Standing"}
    services = [{"service_no": s["ServiceNo"], "next_mins": calculate_minutes_until(s["NextBus"]["EstimatedArrival"]),
                 "time": format_api_time(s["NextBus"].get("EstimatedArrival")), "crowd": load.get(s["NextBus"]["Load"],"Unknown"),
                 "wheelchair": s["NextBus"]["Feature"]=="WAB"} for s in raw.get("Services",[])[:5] if calculate_minutes_until(s["NextBus"]["EstimatedArrival"]) >= 0]
    stop_code = raw.get("BusStopCode"); stop_name = "Unknown"
    if stops and (st := next((s for s in stops if s.get("BusStopCode")==stop_code), None)):
        stop_name = st.get('Description', 'Unknown')
    return {"stop": {"code": stop_code, "name": stop_name}, "services": services}

def format_bus_routes(raw):
    if not raw: return []
    unique = {}  # service_direction as key
    for r in raw:
        key = f"{r.get('ServiceNo')}_{r.get('Direction',1)}"
        if key not in unique: unique[key] = {"service_no": r["ServiceNo"], "direction": r["Direction"],
            "first_bus_weekday": format_api_time(r.get("WD_FirstBus")), "last_bus_weekday": format_api_time(r.get("WD_LastBus")),
            "first_bus_weekend": format_api_time(r.get("SUN_FirstBus")), "last_bus_weekend": format_api_time(r.get("SUN_LastBus"))}
    return list(unique.values())[:10]

def format_bus_arrival_with_fallback(bus_arrival, bus_routes, stops, service_number=None):
    """Try live arrival data first; fallback to scheduled times from bus_routes."""
    # Try live arrival data first
    if bus_arrival and bus_arrival.get("Services"):
        result = format_bus_arrival(bus_arrival, stops)
        if result.get("services"):  # Only return if we got actual service data
            result["source"] = "live"
            return result
    # Fallback to scheduled times from bus_routes
    if bus_routes:
        formatted_routes = format_bus_routes(bus_routes)
        if formatted_routes:
            # Filter by service number if specified
            if service_number:
                formatted_routes = [r for r in formatted_routes if r["service_no"] == service_number]
            if formatted_routes:
                return {
                    "stop": {"code": None, "name": "Unknown"},
                    "services": formatted_routes,
                    "source": "scheduled"
                }
    # Return empty if nothing available
    return {"stop": {"code": None, "name": "Unknown"}, "services": [], "source": "unavailable"}

def format_train_alerts(raw):
    if not raw: return {"status": "Unknown", "lines": [], "messages": []}
    val = raw.get("value", {}); status = val.get("Status", 1)
    return {"status": "Normal" if status == 1 else "Disrupted",
            "lines": [s.get("Line") for s in val.get("AffectedSegments", [])],
            "messages": [m.get("Content") for m in val.get("Message", [])[:3]]}

def format_traffic(incidents, speeds, road_filter=None):
    """Format traffic data, optionally filtered by specific roads."""
    incident_list = incidents or []
    # Filter by roads if specified
    if road_filter and isinstance(road_filter, list) and road_filter:
        road_filter_upper = [r.upper() for r in road_filter]
        incident_list = [i for i in incident_list if any(road in i.get("Message","").upper() for road in road_filter_upper)]
    major = [i.get("Message","")[:100] for i in incident_list[:5] if "accident" in i.get("Message","").lower() or "breakdown" in i.get("Message","").lower()]
    speed_slice = (speeds or [])[:50]
    avg = sum((int(s.get("MinimumSpeed",0))+int(s.get("MaximumSpeed",0)))//2 for s in speed_slice)//max(len(speed_slice),1)
    cond = "Smooth" if avg>=50 else "Moderate" if avg>=30 else "Slow" if avg>=15 else "Heavy"
    return {"total_incidents": len(incident_list), "major": major, "condition": cond, "avg_speed": avg}

print("Formatters ready")


Formatters ready


#### Prompt Templates: The LLM's Instructions

I use three carefully-written prompt templates that tell the LLM what to do at different stages:

**1. Routing Prompt** ("Which APIs do I need?")
- Input: The user's question + extracted entities (bus stops, service numbers) + current time context
- Output: A structured decision like `apis_needed: ['bus_arrival', 'bus_stops']`
- Contains rules like: "If they ask 'when is the next bus', you need bus_arrival + bus_stops"

**2. Response Prompt** ("How should I answer?")
- Input: Clean formatted data from all the APIs
- Output: A natural 2-3 sentence answer for the user
- Tells the LLM: "Use 24-hour times, mention if it's peak hour, be helpful"

**3. Proofread Prompt** ("Does this read well?")
- Input: The drafted answer
- Output: The same answer, but with better grammar/clarity
- Constraint: "Don't change any numbers or facts, only fix spelling and grammar"

**Example: API Choice Logic**
Different questions trigger different API combinations:
- "When is the next 14 bus?" → bus_arrival + bus_stops (know which stop, fetch arrival times)
- "What buses go to Orchard?" → bus_stops + bus_routes (search by name, show which services)
- "Is the MRT running?" → train_alerts + crowd_density (service status + how busy)
- "What's the traffic?" → traffic_speed + traffic_incidents (road conditions + specific problems)


In [10]:
# Prompt templates 
ROUTING_PROMPT_TEMPLATE = """Analyze this Singapore transport query and decide which APIs to call.

Query: "{query}"
Context: {time_display} on {day_of_week}, Peak: {is_peak_hour}, {day_type}
Entities: {entities_json}

APIs: bus_arrival, bus_stops, bus_routes, train_alerts, crowd_density, traffic_speed, traffic_incidents

RULES:
1. Bus arrival queries -> bus_arrival + bus_stops
2. Stop NAME (not code) -> needs_stop_search=true, extract ONLY location name as search_term
3. "Which services" or "operating hours" bus queries -> include bus_routes
4. Train queries -> train_alerts + crowd_density
5. Traffic -> either traffic_speed OR traffic_incidents (both auto-fetched together)
6. Multi-modal -> combine all relevant"""

RESPONSE_PROMPT_TEMPLATE = """Singapore transport assistant. Answer concisely (2-3 sentences).

Query: "{query}"
Context: {time_display}, {day_type}, Peak: {is_peak_hour}
Data: {formatted_data}
Errors: {errors}

Rules: State times clearly and always use AM/PM format (for example, 2:45 PM). Never output 24-hour format codes like 0619. For bus arrivals: if source='live' mention 'next bus arriving'; if source='scheduled' say 'buses operate' or 'last bus at'; if source='unavailable' suggest alternatives. Do not use phrases like 'just left' or 'already left'. Add context (crowding/peak), be helpful, suggest alternatives if data missing. Use plain sentences only (no bullet points, no markdown). Keep grammar clean and avoid awkward phrasing."""

PROOFREAD_PROMPT_TEMPLATE = """Proofread the response below.
Fix spelling, grammar, and fluency only.
Do not change facts, numbers, times, or recommendations.
Keep it plain text and at most 3 sentences.

Response:
{draft}"""



The agent follows 5 nodes to answer your question. Each step has one specific job:

**Step 1: Add Context**
- Figures out what time it is in Singapore right now
- Checks if it's peak hour (rush time) or off-peak
- Notes whether it's a weekday or weekend

**Step 2: Parse Your Question**
- Understands what you're really asking for
- Extracts key details (like the bus stop name or service number)
- Decides which APIs need to be called

**Step 3: Fetch Live Data**
- Calls the APIs to get real-time transport information
- Grabs everything that might help answer your question

**Step 4: Clean & Organize**
- Takes the raw API data (which can be messy) and organizes it nicely
- Turns it into simple summaries (like "Bus 14 arriving in 3 minutes")

**Step 5: Generate Answer & Proofread**
- Writes a natural-sounding answer based on the organized data
- Checks it for spelling, grammar, and clarity
- Sends you the final polished answer


In [11]:
# Workflow Nodes
def add_context_node(state):
    return {"time_context": get_singapore_time_context()}


def parse_query_node(state):
    query = state["query"]
    time_ctx = state["time_context"]
    entities = extract_entities(query)

    prompt = ROUTING_PROMPT_TEMPLATE.format(
        query=query,
        time_display=time_ctx['time_display'],
        day_of_week=time_ctx['day_of_week'],
        is_peak_hour=time_ctx['is_peak_hour'],
        day_type=time_ctx['day_type'],
        entities_json=json.dumps(entities),
    )

    try:
        intent = structured_llm.invoke([HumanMessage(content=prompt)])
        result = {
            "apis_needed": intent.apis_needed,
            "needs_stop_search": intent.needs_stop_search,
            "search_term": intent.search_term,
            "entities": entities,
        }
    except Exception as e:
        result = {
            "apis_needed": ["bus_arrival", "bus_stops", "traffic_speed"],
            "needs_stop_search": False,
            "search_term": None,
            "entities": entities,
        }

    return {"intent": result}


def call_apis_node(state):
    intent = state["intent"]
    entities = intent["entities"]
    apis = intent.get("apis_needed", [])
    errors = []

    res = {f"{api}_data": None for api in [
        "bus_arrival", "bus_stops", "bus_routes", "train_alerts", "crowd_density", "traffic_incidents", "traffic_speed"
    ]}

    def request_json(path, label, params=None):
        try:
            r = requests.get(f"{BASE_URL}/{path}", headers=HEADERS, params=params or {}, timeout=10)
            if r.status_code != 200:
                errors.append(f"{label}: {r.status_code}")
                return None
            return r.json()
        except Exception as e:
            errors.append(f"{label}: {e}")
            return None

    if "bus_stops" in apis or intent.get("needs_stop_search"):
        try:
            stops = ALL_BUS_STOPS
            if intent.get("needs_stop_search") and intent.get("search_term"):
                res["bus_stops_data"] = search_bus_stops_by_name(intent["search_term"], stops)
            elif entities.get("bus_stop_code"):
                res["bus_stops_data"] = [s for s in stops if s["BusStopCode"] == entities["bus_stop_code"]][:1]
            else:
                res["bus_stops_data"] = stops[:10]
        except Exception as e:
            errors.append(f"Bus Stops: {e}")

    if "bus_arrival" in apis:
        code = entities.get("bus_stop_code")
        if not code and res.get("bus_stops_data"):
            code = res["bus_stops_data"][0].get("BusStopCode")
        if code:
            params = {"BusStopCode": code}
            if entities.get("service_number"):
                params["ServiceNo"] = entities["service_number"]
            raw = request_json("v3/BusArrival", "Bus Arrival", params)
            if raw is not None:
                res["bus_arrival_data"] = raw

    if "bus_routes" in apis:
        try:
            routes = fetch_all_paginated_data("BusRoutes", 50000)
            if entities.get("service_number"):
                res["bus_routes_data"] = [r for r in routes if r["ServiceNo"] == entities["service_number"]][:50]
            elif entities.get("bus_stop_code"):
                res["bus_routes_data"] = [r for r in routes if r["BusStopCode"] == entities["bus_stop_code"]][:50]
            elif res.get("bus_stops_data"):
                stop_codes = {s.get("BusStopCode") for s in res["bus_stops_data"][:3] if s.get("BusStopCode")}
                res["bus_routes_data"] = [r for r in routes if r.get("BusStopCode") in stop_codes][:100]
            else:
                res["bus_routes_data"] = routes[:50]
        except Exception as e:
            errors.append(f"Bus Routes: {e}")

    if "train_alerts" in apis:
        res["train_alerts_data"] = request_json("TrainServiceAlerts", "Train Alerts")

    if "crowd_density" in apis:
        line = entities.get("train_line")
        # If no line extracted, try to look up station name in STATION_LINE_MAP
        if not line and intent.get("search_term"):
            station_name = intent["search_term"].lower().strip().split()[0]
            line = STATION_LINE_MAP.get(station_name, ["EWL"])[0]
        line = line or "EWL"
        raw = request_json("PCDRealTime", "Crowd Density", {"TrainLine": line})
        if raw is not None:
            res["crowd_density_data"] = raw.get("value", [])

    if "traffic_speed" in apis or "traffic_incidents" in apis:
        speed_raw = request_json("v4/TrafficSpeedBands", "Traffic Speed")
        if speed_raw is not None:
            res["traffic_speed_data"] = speed_raw.get("value", [])[:50]

        incidents_raw = request_json("TrafficIncidents", "Traffic Incidents")
        if incidents_raw is not None:
            res["traffic_incidents_data"] = incidents_raw.get("value", [])

    res["api_errors"] = errors
    return res


def format_data_node(state):
    fmt = {}

    # Use fallback logic: try live arrival, fallback to scheduled routes
    if state.get("bus_arrival_data") or state.get("bus_routes_data"):
        service_number = state.get("intent", {}).get("entities", {}).get("service_number")
        fmt["bus_arrival"] = format_bus_arrival_with_fallback(
            state.get("bus_arrival_data"),
            state.get("bus_routes_data"),
            state.get("bus_stops_data", []),
            service_number
        )

    if state.get("train_alerts_data"):
        fmt["train_alerts"] = format_train_alerts(state["train_alerts_data"])

    if state.get("traffic_incidents_data") or state.get("traffic_speed_data"):
        roads = state.get("intent", {}).get("entities", {}).get("roads", [])
        fmt["traffic"] = format_traffic(state.get("traffic_incidents_data"), state.get("traffic_speed_data"), roads)

    if state.get("crowd_density_data"):
        crowd = {"l": "Low", "m": "Moderate", "h": "High", "NA": "Unknown"}
        fmt["crowd"] = [
            {"station": s["Station"], "level": crowd.get(s["CrowdLevel"], "Unknown")}
            for s in state["crowd_density_data"][:10]
        ]

    return {"llm_formatted_data": fmt}


def generate_response_node(state):
    prompt = RESPONSE_PROMPT_TEMPLATE.format(
        query=state['query'],
        time_display=state['time_context']['time_display'],
        day_type=state['time_context']['day_type'],
        is_peak_hour=state['time_context']['is_peak_hour'],
        formatted_data=json.dumps(state.get('llm_formatted_data', {}), indent=2),
        errors=state.get('api_errors', []),
    )
    try:
        draft = llm.invoke([HumanMessage(content=prompt)]).content.strip()
    except Exception as e:
        return {"final_response": f"Error: {e}"}

    polish_prompt = PROOFREAD_PROMPT_TEMPLATE.format(draft=draft)
    try:
        polished = llm.invoke([HumanMessage(content=polish_prompt)]).content.strip()
        final = re.sub(r"\s+", " ", polished).strip()
    except Exception:
        final = re.sub(r"\s+", " ", draft).strip()

    return {"final_response": final}


print("Nodes ready")


Nodes ready


In [12]:
# Build Workflow
workflow = StateGraph(AgentState)
workflow.add_node("add_context", add_context_node)
workflow.add_node("parse_query", parse_query_node)
workflow.add_node("call_apis", call_apis_node)
workflow.add_node("format_data", format_data_node)
workflow.add_node("generate_response", generate_response_node)
workflow.set_entry_point("add_context")
workflow.add_edge("add_context", "parse_query")
workflow.add_edge("parse_query", "call_apis")
workflow.add_edge("call_apis", "format_data")
workflow.add_edge("format_data", "generate_response")
workflow.add_edge("generate_response", END)
app = workflow.compile()


### Running the Agent: How to Query It

There are two functions to run the agent:

**1. query_agent(question)**
The simple way to ask a question. Usage:

What it does:
1. Creates an initial state with your question
2. Runs all 5 workflow nodes in sequence (add context → parse → fetch data → format → generate)
3. Returns the final answer (in `result['final_response']`)
4. If anything fails, it catches the error and returns a safe error message

**2. query_agent_debug(question)**
The detailed way, shows the agent's thinking. Useful for understanding why it gave a certain answer.

Shows:
- What entities were extracted from your question (bus stop code, service number, etc.)
- Which APIs were called and in what order
- Any API errors that happened
- The final answer with a nice formatted output


In [13]:
# Query Function
def query_agent(question, verbose=True):
    if verbose:
        print(f"\nQuery: {question}")

    initial = {
        k: None if k.endswith('_data') or k == 'llm_formatted_data' else ([] if k == 'api_errors' else '' if k == 'final_response' else {})
        for k in AgentState.__annotations__.keys()
    }
    initial["query"] = question

    try:
        result = app.invoke(initial)
        if verbose:
            print(f"Answer: {result['final_response']}\n")
        return result
    except Exception as e:
        if verbose:
            print(f"Answer: Error: {e}\n")
        return {"query": question, "final_response": f"Error: {e}", "api_errors": [str(e)]}


# Debug Query Function (for testing - shows reasoning)
def query_agent_debug(question):
    print(f"\n{'='*60}")
    print(f"Query: {question}")
    print(f"{'='*60}")
    
    initial = {
        k: None if k.endswith('_data') or k == 'llm_formatted_data' else ([] if k == 'api_errors' else '' if k == 'final_response' else {})
        for k in AgentState.__annotations__.keys()
    }
    initial["query"] = question
    
    try:
        result = app.invoke(initial)
        
        # Show extracted entities
        entities = result.get('intent', {}).get('entities', {})
        print(f"\n Entities Extracted:")
        if entities:
            for key, val in entities.items():
                if val:
                    print(f"   - {key}: {val}")
        else:
            print("   - None")
        
        # Show APIs called
        apis = result.get('intent', {}).get('apis_needed', [])
        print(f"\n APIs Called: {', '.join(apis) if apis else 'None'}")
        
        # Show search behavior
        if result.get('intent', {}).get('needs_stop_search'):
            print(f" Bus Stop Search: '{result['intent']['search_term']}'")
        
        # DEBUG: Show bus stops found
        bus_stops = result.get('bus_stops_data', [])
        if bus_stops:
            print(f"\n Bus Stops Found: {len(bus_stops)} stop(s)")
            for stop in bus_stops[:3]:
                print(f"   - {stop.get('BusStopCode')}: {stop.get('Description', 'Unknown')}")
        
        # DEBUG: Show bus arrival data
        bus_arr = result.get('bus_arrival_data')
        if bus_arr:
            services = bus_arr.get('Services', [])
            print(f"\n Bus Arrival Data: {len(services)} service(s) at stop {bus_arr.get('BusStopCode')}")
            for svc in services[:3]:
                print(f"   - Service {svc.get('ServiceNo')}: {svc.get('NextBus', {}).get('EstimatedArrival', 'N/A')}")
        
        # DEBUG: Show bus routes data
        bus_routes = result.get('bus_routes_data', [])
        if bus_routes:
            print(f"\n Bus Routes Data: {len(bus_routes)} route record(s)")
            unique_services = set(r.get('ServiceNo') for r in bus_routes)
            print(f"   - Unique services: {', '.join(sorted(unique_services)[:5])}{'...' if len(unique_services) > 5 else ''}")
            if len(unique_services) <= 5:
                for svc_no in sorted(unique_services):
                    routes = [r for r in bus_routes if r.get('ServiceNo') == svc_no]
                    if routes:
                        r = routes[0]
                        print(f"     Service {svc_no}: WD {r.get('WD_FirstBus')}-{r.get('WD_LastBus')}, WE {r.get('SUN_FirstBus')}-{r.get('SUN_LastBus')}")
        
        # DEBUG: Show formatted data
        fmt = result.get('llm_formatted_data', {})
        if fmt and fmt.get('bus_arrival'):
            ba = fmt['bus_arrival']
            print(f"\n Formatted Bus Arrival: source={ba.get('source')}, {len(ba.get('services', []))} service(s)")
        
        # Show any errors
        if result.get('api_errors'):
            print(f"\n API Errors: {result['api_errors']}")
        
        # Show final answer
        print(f"\n Answer:\n{result['final_response']}")
        print(f"{'='*60}\n")
        
        return result
    except Exception as e:
        print(f"\n Error: {e}")
        print(f"{'='*60}\n")
        return {"query": question, "final_response": f"Error: {e}", "api_errors": [str(e)]}


---

### PART 2: TESTING WITH MULTI-USER REQUESTS

Now that we've built the agent, let's test it with 10 diverse real-world queries from different users and scenarios.

---

#### Debug Mode Example

Before running all 10 test queries, let's run ONE example with debug output. This shows you exactly what the agent is thinking:
- Which details it extracted from your question (bus stops, service numbers, etc.)
- Which APIs it decided to call (and why)
- Any errors or issues that came up
- The final polished answer

This gives you visibility into the agent's reasoning before we test it on many questions.


In [22]:
# Example: Debug mode shows entities, APIs called, and reasoning
query_agent_debug("When is the next bus arriving at stop 01012?")


# Explaining the output : I'm testing this around 3:12 AM Singapore time and during off-peak hours, the data api is sparse or unavailable.



Query: When is the next bus arriving at stop 01012?

 Entities Extracted:
   - bus_stop_code: 01012

 APIs Called: bus_arrival, bus_stops

 Bus Stops Found: 1 stop(s)
   - 01012: Hotel Grand Pacific

 Bus Arrival Data: 0 service(s) at stop 01012

 Formatted Bus Arrival: source=unavailable, 0 service(s)

 Answer:
I cannot retrieve live or scheduled arrivals for stop 01012 right now (data unavailable). It is 3:12 AM on a weekend and off-peak, so crowding is likely low but overnight services may be limited; please check LTA MyTransport.SG or TransitLink, the bus operator's app, the bus-stop display, or consider a taxi or ride-hail if you need to leave promptly.



{'query': 'When is the next bus arriving at stop 01012?',
 'time_context': {'timestamp': '2026-02-15 03:12:19',
  'time_display': '03:12 AM',
  'day_of_week': 'Sunday',
  'hour': 3,
  'is_peak_hour': False,
  'is_weekend': True,
  'day_type': 'weekend'},
 'intent': {'apis_needed': ['bus_arrival', 'bus_stops'],
  'needs_stop_search': False,
  'search_term': None,
  'entities': {'bus_stop_code': '01012',
   'service_number': None,
   'train_line': None,
   'roads': [],
   'has_station_keyword': False,
   'mentioned_time': None}},
 'bus_arrival_data': {'odata.metadata': 'https://datamall2.mytransport.sg/ltaodataservice/v3/BusArrival',
  'BusStopCode': '01012',
  'Services': []},
 'bus_stops_data': [{'BusStopCode': '01012',
   'RoadName': 'Victoria St',
   'Description': 'Hotel Grand Pacific',
   'Latitude': 1.29684825487647,
   'Longitude': 103.85253591654006}],
 'bus_routes_data': None,
 'train_alerts_data': None,
 'crowd_density_data': None,
 'traffic_incidents_data': None,
 'traffic_sp

### Running 10 Multi-User Test Queries

Now I'm testing the agent with **10 different real-world questions** from various users. The questions range from simple single-API queries to complex multi-modal scenarios, demonstrating variety in both difficulty and use cases.

#### Test Coverage

**Simple Queries (Questions 1-5):**
- Basic bus arrival and train status checks
- Simple traffic and crowd queries
- Operating hours lookups
- Single-API calls with straightforward answers

**Moderate to Complex Queries (Questions 6-10):**
- Multi-modal route comparisons (bus vs MRT)
- Weather-aware routing and traffic checks
- Late-night service planning
- Accessibility-focused queries (wheelchair access)
- MRT disruption handling with alternative routes
- Comprehensive area status reports combining multiple data sources

**What this demonstrates:**
- Entity extraction (station names, service numbers, locations)
- API routing logic (choosing the right data sources)
- Context awareness (peak/off-peak, weekday/weekend, weather, accessibility)
- Multi-source data integration (combining buses, trains, traffic, crowds)
- Natural language generation (clear, helpful answers)


In [15]:
test_queries = [
    # SIMPLE QUESTIONS (1-5)
    "Is the East-West Line running normally right now?",
    "What's the traffic like on the PIE right now?",
    "How crowded is Orchard MRT station right now?",
    "When is the next bus 15 at Bugis Station?",
    "What time does bus service 100 stop operating at night?",
    
    # MODERATE TO COMPLEX QUESTIONS (6-10)
    "Should I take the bus or MRT from Bedok to City Hall at 8:30 AM on a weekday?",
    "It's raining now. How's the traffic on CTE and AYE, and are there any accidents?",
    "I'm at Changi Airport at 11:45 PM on a Saturday. What transport options do I have to get to Jurong East?",
    "I need a wheelchair-accessible bus from Orchard Road to City Hall. Which buses are available and when's the next one?",
    "Give me a transport update for Marina Bay right now: MRT crowding, bus arrivals, and traffic conditions."
]
print(f"{len(test_queries)} test queries ready")


10 test queries ready


In [16]:
# Run test queries and print results. 
results = []
print(f"Running {len(test_queries)} tests...\n")

for i, q in enumerate(test_queries, 1):
    result = query_agent(q, verbose=True)
    results.append({"query": q, "answer": result["final_response"]})
    if i < len(test_queries):
        time.sleep(1) # Added a delay between queries to avoid overwhelming the LTA DataMall API



Running 10 tests...


Query: Is the East-West Line running normally right now?
Answer: Yes. The East‑West Line is running normally at 2:36 AM, and there are no active alerts affecting the line; crowding at EW stations is low during this off-peak weekend period.


Query: What's the traffic like on the PIE right now?
Answer: As of 2:36 AM this weekend, traffic on the PIE is smooth, with an average speed of about 58 km/h and two incidents reported, none classified as major. Peak conditions are not in effect, so travel should be quick; if you want to avoid the incident areas, consider diverting to the ECP or AYE.


Query: How crowded is Orchard MRT station right now?
Answer: At 2:36 AM on a weekend (off-peak), the feed does not include crowd data for Orchard MRT (NS22). Crowds are typically low at this hour, but for real-time confirmation, check the LTA train crowd status map or consider taking a taxi/ride-hailing service if you prefer fewer people.


Query: When is the next bus 15 at Bugi

### Conclusion

This agent demonstrates a **agentic workflow** for answering real-world transport queries using LangGraph and 7 integrated LTA APIs. It handles entity extraction, intelligent API routing, data fallbacks. The design prioritizes **reliability and clarity** over perfection:it intelligently combines multiple data sources, caches efficiently, and never hallucinates when data is missing.