<a href="https://colab.research.google.com/github/parimalaeashu-prog/Multi-Agent-AI-Travel-Agent-System/blob/main/Travel_Agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!mkdir -p project/agents project/tools project/memory project/core project/tests project/examples


In [2]:
%%writefile project/requirements.txt
fastapi==0.95.2
uvicorn==0.22.0
requests==2.31.0
jsonschema==4.21.0
pytest==7.4.0


Writing project/requirements.txt


In [3]:
%%writefile project/agents/planner.py
from typing import Dict, Any
from project.core.a2a_protocol import make_task_message
from project.core.context_engineering import build_planner_context

class Planner:
    def __init__(self):
        self.version = "planner-v1"

    def plan(self, user_request: str, session_context: Dict[str, Any]) -> Dict[str, Any]:
        context = build_planner_context(session_context)

        task = make_task_message(
            session_id=session_context.get("session_id", "default"),
            from_agent="planner",
            to_agent="worker",
            task_type="generate_itinerary",
            payload={"user_request": user_request, "context": context},
            expected_schema={"type": "object"},
            priority="high"
        )

        plan = {
            "plan_id": "plan-1",
            "tasks": [task],
            "meta": {"version": self.version}
        }
        return plan


Writing project/agents/planner.py


In [4]:
%%writefile project/agents/worker.py
from typing import Dict, Any
from project.tools.tools import Tools
from project.core.a2a_protocol import make_result_message

class Worker:
    def __init__(self):
        self.tools = Tools()
        self.version = "worker-v1"

    def execute(self, task_message: Dict[str, Any]) -> Dict[str, Any]:
        task_info = task_message.get("task", {})
        task_type = task_info.get("task_type")
        payload = task_info.get("payload", {})
        session_id = task_message.get("session_id")

        if task_type == "generate_itinerary":
            user_request = payload.get("user_request", "")
            result_payload = {
                "itinerary": [
                    {"day": 1, "activity": "Arrive and explore"},
                    {"day": 2, "activity": "City tour"}
                ],
                "note": f"Generated for: {user_request}"
            }
            status = "success"
        else:
            result_payload = {"error": "unknown task type"}
            status = "failed"

        return make_result_message(
            session_id=session_id,
            from_agent="worker",
            to_agent="planner",
            task_id=task_info.get("task_id"),
            status=status,
            payload=result_payload
        )


Writing project/agents/worker.py


In [5]:
%%writefile project/agents/evaluator.py
from typing import Dict, Any

class Evaluator:
    def __init__(self):
        self.version = "evaluator-v1"

    def evaluate(self, plan: Dict[str, Any], constraints: Dict[str, Any]) -> Dict[str, Any]:
        return {"accepted": True, "score": 0.9, "notes": "Auto-approved"}


Writing project/agents/evaluator.py


In [6]:
%%writefile project/tools/tools.py
from typing import Dict, Any

class Tools:
    def __init__(self):
        pass

    def call(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
        if tool_name == "flight_search":
            return {"flights": []}
        if tool_name == "hotel_search":
            return {"hotels": []}
        return {"error": "unknown tool"}


Writing project/tools/tools.py


In [7]:
%%writefile project/memory/session_memory.py
from typing import Dict, Any
import time

class SessionMemory:
    def __init__(self):
        self.store = {}

    def create_session(self, session_id: str, user_profile: Dict[str, Any] = None):
        self.store[session_id] = {
            "session_id": session_id,
            "user_profile": user_profile or {},
            "conversation_history": [],
            "planner_state": {},
            "tasks": [],
            "created_at": time.time()
        }
        return self.store[session_id]

    def get_session(self, session_id: str):
        return self.store.get(session_id)

    def update_session(self, session_id: str, patch: Dict[str, Any]):
        if session_id in self.store:
            self.store[session_id].update(patch)
        return self.store.get(session_id)


Writing project/memory/session_memory.py


In [8]:
%%writefile project/core/context_engineering.py
from typing import Dict, Any

def build_planner_context(session_context: Dict[str, Any]) -> Dict[str, Any]:
    return {
        "user_profile": session_context.get("user_profile", {}),
        "last_messages": session_context.get("conversation_history", [])[-5:]
    }


Writing project/core/context_engineering.py


In [9]:
%%writefile project/core/observability.py
import json
import time

class Observability:
    def log_event(self, trace_id: str, level: str, payload: dict):
        entry = {
            "trace_id": trace_id,
            "level": level,
            "payload": payload,
            "ts": time.time()
        }
        print(json.dumps(entry))


Writing project/core/observability.py


In [10]:
%%writefile project/core/a2a_protocol.py
import uuid
import time
from typing import Dict, Any

PROTOCOL_VERSION = "1.0"

def _now_iso():
    return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())

def make_task_message(session_id, from_agent, to_agent, task_type, payload, expected_schema, priority="medium"):
    task_id = str(uuid.uuid4())
    return {
        "message_id": str(uuid.uuid4()),
        "session_id": session_id,
        "trace_id": str(uuid.uuid4()),
        "from": from_agent,
        "to": to_agent,
        "timestamp": _now_iso(),
        "type": "TASK",
        "task": {
            "task_id": task_id,
            "task_type": task_type,
            "payload": payload,
            "constraints": {},
            "expected_schema": expected_schema,
            "priority": priority,
            "ttl_seconds": 300
        },
        "result": None,
        "meta": {"protocol_version": PROTOCOL_VERSION}
    }

def make_result_message(session_id, from_agent, to_agent, task_id, status, payload):
    return {
        "message_id": str(uuid.uuid4()),
        "session_id": session_id,
        "trace_id": str(uuid.uuid4()),
        "from": from_agent,
        "to": to_agent,
        "timestamp": _now_iso(),
        "type": "RESULT",
        "task": {"task_id": task_id},
        "result": {"status": status, "payload": payload},
        "meta": {"protocol_version": PROTOCOL_VERSION}
    }


Writing project/core/a2a_protocol.py


In [11]:
%%writefile project/main_agent.py
from project.agents.planner import Planner
from project.agents.worker import Worker
from project.agents.evaluator import Evaluator
from project.memory.session_memory import SessionMemory
from project.core.observability import Observability

class MainAgent:
    def __init__(self):
        self.planner = Planner()
        self.worker = Worker()
        self.evaluator = Evaluator()
        self.memory = SessionMemory()
        self.obs = Observability()

    def handle_message(self, user_input: str, session_id: str = "default_session"):
        session = self.memory.get_session(session_id) or self.memory.create_session(session_id)
        plan = self.planner.plan(user_input, session)
        responses = [self.worker.execute(t) for t in plan["tasks"]]
        feedback = self.evaluator.evaluate(plan, {})

        response = {
            "response": {
                "plan": plan,
                "worker_results": responses,
                "evaluation": feedback
            }
        }

        self.memory.update_session(session_id, {"planner_state": plan, "tasks": responses})
        return response

def run_agent(user_input: str):
    agent = MainAgent()
    result = agent.handle_message(user_input)
    return result["response"]


Writing project/main_agent.py


In [12]:
%%writefile project/app.py
from fastapi import FastAPI
from project.main_agent import MainAgent

app = FastAPI()
agent = MainAgent()

@app.post("/query")
def query(payload: dict):
    return agent.handle_message(payload.get("text", ""), payload.get("session_id", "default_session"))


Writing project/app.py


In [13]:
%%writefile project/run_demo.py
import sys, os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from project.main_agent import run_agent

if __name__ == "__main__":
    print(run_agent("Hello! This is a demo."))


Writing project/run_demo.py


In [14]:
from project.main_agent import run_agent
print(run_agent("Hello!"))


{'plan': {'plan_id': 'plan-1', 'tasks': [{'message_id': 'c2416f9b-a54f-4455-a7c0-81d23a530b9b', 'session_id': 'default_session', 'trace_id': '57b74988-ad98-4756-89b2-c33a9d4017d5', 'from': 'planner', 'to': 'worker', 'timestamp': '2025-11-29T06:50:08Z', 'type': 'TASK', 'task': {'task_id': 'ec84b625-acf1-4090-a814-287575e25d5b', 'task_type': 'generate_itinerary', 'payload': {'user_request': 'Hello!', 'context': {'user_profile': {}, 'last_messages': []}}, 'constraints': {}, 'expected_schema': {'type': 'object'}, 'priority': 'high', 'ttl_seconds': 300}, 'result': None, 'meta': {'protocol_version': '1.0'}}], 'meta': {'version': 'planner-v1'}}, 'worker_results': [{'message_id': 'e444b82a-ebd6-4276-a07d-6b88abe2c07d', 'session_id': 'default_session', 'trace_id': '1b33eec5-adca-4a69-a2d4-dbb67b8f48b2', 'from': 'worker', 'to': 'planner', 'timestamp': '2025-11-29T06:50:08Z', 'type': 'RESULT', 'task': {'task_id': 'ec84b625-acf1-4090-a814-287575e25d5b'}, 'result': {'status': 'success', 'payload': 

In [15]:
!zip -r project.zip project


  adding: project/ (stored 0%)
  adding: project/examples/ (stored 0%)
  adding: project/tools/ (stored 0%)
  adding: project/tools/tools.py (deflated 49%)
  adding: project/tools/__pycache__/ (stored 0%)
  adding: project/tools/__pycache__/tools.cpython-312.pyc (deflated 34%)
  adding: project/run_demo.py (deflated 31%)
  adding: project/app.py (deflated 34%)
  adding: project/core/ (stored 0%)
  adding: project/core/a2a_protocol.py (deflated 67%)
  adding: project/core/observability.py (deflated 45%)
  adding: project/core/__pycache__/ (stored 0%)
  adding: project/core/__pycache__/a2a_protocol.cpython-312.pyc (deflated 45%)
  adding: project/core/__pycache__/observability.cpython-312.pyc (deflated 36%)
  adding: project/core/__pycache__/context_engineering.cpython-312.pyc (deflated 26%)
  adding: project/core/context_engineering.py (deflated 40%)
  adding: project/requirements.txt (deflated 9%)
  adding: project/agents/ (stored 0%)
  adding: project/agents/planner.py (deflated 57%)
