<a href="https://colab.research.google.com/github/pr-lgtm/VOYAGEAI_AGENT/blob/main/VoyageAI_agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
#Capstone Project of 5 Days Generative AI course by Kaggle X Google

In [21]:
!pip install requests_cache

# CAPSTONE SUBMISSION NOTE:
# This project implements a multi-agent generative AI travel planning assistant.
# It demonstrates the following Google x Kaggle “AI Agents” concepts from the course:
# 1. Multi-agent architecture (DestinationAgent → FlightAgent + HotelAgent → ItineraryAgent).
# 2. Parallel and sequential agents via ThreadPoolExecutor.
# 3. Custom tool creation via ToolManager (MCP style).
# 4. Sessions and Memory: InMemorySessionService for short-term session state + MemoryBank for simulated long-term memory.
# 5. Context engineering and summarization using (Mock)LLM.
# 6. Observability: structured logging, basic agent evaluation metrics.
# 7. A2A Protocol: agents send messages using A2AMessage class.
# The system is error-resilient, modular, and ready for expansion with real APIs.


import os
import sys
import json
import time
import uuid
import math
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, List, Tuple, Optional

import requests
import requests_cache
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from concurrent.futures import ThreadPoolExecutor, as_completed

# Optional OpenAI import - used if OPENAI_API_KEY is present
try:
    import openai
except Exception:
    openai = None

# SQLAlchemy for persistence
from sqlalchemy import create_engine, Column, Integer, String, Text, JSON
from sqlalchemy.orm import sessionmaker, declarative_base

# Prometheus metrics
from prometheus_client import start_http_server, Counter, Gauge, REGISTRY


# Environment variables (preferred)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")          # set to your OpenAI API key
AMADEUS_API_KEY = os.getenv("AMADEUS_API_KEY")        # or another flights provider
SKYSCANNER_API_KEY = os.getenv("SKYSCANNER_API_KEY")  # optional
GOOGLE_PLACES_KEY = os.getenv("GOOGLE_PLACES_KEY")    # for POI
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///travel_agent.db")  # change to postgres for prod

# Observability / Metrics server port
METRICS_PORT = int(os.getenv("METRICS_PORT", "8000"))

# Caching config
CACHE_NAME = "travel_cache"
CACHE_EXPIRE_SECONDS = 60 * 60  # 1 hour

# Rate limiting / retry tuned parameters
MAX_RETRIES = 3
RETRY_WAIT_SECONDS = 1

# LLM prompt config
LLM_MAX_TOKENS = 800
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini")  # change to available model

# Cost optimization weights (tunable)
WEIGHT_COST = 0.6
WEIGHT_COMFORT = 0.4

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s [%(component)s] %(message)s"
)
logger = logging.getLogger("travel_agent")

# helper for structured logs
def log(component: str, level: int, message: str):
    logger.log(level, message, extra={"component": component})

# requests cache
requests_cache.install_cache(CACHE_NAME, expire_after=CACHE_EXPIRE_SECONDS)

# Check if metrics are already initialized to prevent re-registration errors in Colab
# (useful when running the cell multiple times during development)
# The REGISTRY.clear() method does not exist in prometheus_client library.
if "METRICS_FLIGHT_SEARCH" not in globals():
    METRICS_FLIGHT_SEARCH = Counter("flight_search_requests_total", "Total flight search requests")
    METRICS_HOTEL_SEARCH = Counter("hotel_search_requests_total", "Total hotel search requests")
    METRICS_POI_SEARCH = Counter("poi_search_requests_total", "Total poi search requests")
    METRICS_ITINERARIES = Counter("itineraries_created_total", "Total itineraries created")
    METRICS_ERRORS = Counter("errors_total", "Total errors occurred")
    METRICS_DB_WRITES = Counter("db_writes_total", "Number of DB writes")
    METRICS_LAST_RUNTIME = Gauge("last_orchestration_runtime_seconds", "last orchestration runtime seconds")

    # start prometheus metrics server (non-blocking)
    start_http_server(METRICS_PORT)
    log("Metrics", logging.INFO, f"Prometheus metrics server started on port {METRICS_PORT}")

Base = declarative_base()

class MemoryEntry(Base):
    __tablename__ = "memory"
    id = Column(Integer, primary_key=True)
    user_id = Column(String(128), index=True, nullable=False)
    key = Column(String(128), nullable=False)
    value = Column(JSON, nullable=False)

engine = create_engine(DATABASE_URL, echo=False, future=True)
SessionLocal = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)

def save_memory(user_id: str, key: str, value: Any):

    db = SessionLocal()
    try:
        existing = db.query(MemoryEntry).filter_by(user_id=user_id, key=key).first()
        if existing:
            existing.value = value
        else:
            db.add(MemoryEntry(user_id=user_id, key=key, value=value))
        db.commit()
        METRICS_DB_WRITES.inc()
        log("DB", logging.INFO, f"Saved memory for {user_id}:{key}")
    except Exception as e:
        db.rollback()
        METRICS_ERRORS.inc()
        log("DB", logging.ERROR, f"DB save error: {e}")
    finally:
        db.close()

def load_memory(user_id: str, key: str) -> Optional[Any]:
    db = SessionLocal()
    try:
        entry = db.query(MemoryEntry).filter_by(user_id=user_id, key=key).first()
        return entry.value if entry else None
    finally:
        db.close()


class LLMClient:

    def __init__(self, api_key: Optional[str] = None, model: str = LLM_MODEL):
        self.api_key = api_key
        self.model = model
        if api_key and openai:
            openai.api_key = api_key
        elif api_key and not openai:
            log("LLM", logging.WARNING, "openai library not installed; LLM calls will fail")

    def chat(self, system_prompt: str, user_prompt: str, max_tokens: int = LLM_MAX_TOKENS) -> str:
        """
        Calls OpenAI ChatCompletion endpoint. Uses a simple context compaction strategy:
        - If the user prompt is long, first ask the model to summarize (compaction) then send compacted context.
        """
        if not self.api_key:
            # fallback: return a simple template response so CLI still can run for demonstration
            log("LLM", logging.WARNING, "No LLM API key provided, returning fallback text.")
            return f"[LLM-FALLBACK] {user_prompt[:200]}"

        # Context compaction: if user_prompt > 1200 chars, get a compaction summary first.
        if len(user_prompt) > 1200:
            compaction_prompt = (
                "You are a context compaction assistant. Condense the following user context into a 200-300 "
                "character summary that preserves user preferences, constraints and key facts.\n\n"
                f"{user_prompt}"
            )
            log("LLM", logging.INFO, "Performing context compaction step")
            resp = openai.ChatCompletion.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "You are a helpful summarizer."},
                    {"role": "user", "content": compaction_prompt}
                ],
                max_tokens=300,
                temperature=0.0
            )
            compacted = resp.choices[0].message.content.strip()
            user_prompt = f"[COMPACTED CONTEXT]\n{compacted}\n\nOriginal prompt summary: {user_prompt[:400]}"
        # Main request
        log("LLM", logging.INFO, "Sending chat completion request")
        resp = openai.ChatCompletion.create(
            model=self.model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            max_tokens=max_tokens,
            temperature=0.7
        )
        text = resp.choices[0].message.content.strip()
        return text

# instantiate LLM client
llm = LLMClient(api_key=OPENAI_API_KEY)


def is_transient_exception(exc):
    return isinstance(exc, (requests.exceptions.RequestException, ValueError))

def retry_decorator():
    return retry(
        retry=retry_if_exception_type(Exception),
        stop=stop_after_attempt(MAX_RETRIES),
        wait=wait_exponential(multiplier=RETRY_WAIT_SECONDS, min=1, max=10),
        reraise=True
    )

@retry_decorator()
def flight_search(origin: str, destination: str, depart_date: str, return_date: str, budget: float) -> Dict[str, Any]:
    METRICS_FLIGHT_SEARCH.inc()
    # Example: call an API like Amadeus / Skyscanner via requests
    log("FlightTool", logging.INFO, f"Searching flights {origin}->{destination} between {depart_date} and {return_date} budget={budget}")
    # Replace the URL and params with real provider endpoints
    # This is a placeholder "best-effort" logic to return plausible options
    # If you have provider creds, implement the HTTP call and parse response.
    # ---- START PLACEHOLDER ----
    fake_options = []
    base_price = 300 + int(abs(hash((origin, destination))) % 600)
    for i in range(3):
        price = base_price + i * 120
        comfort = max(1, 5 - i)  # simple comfort score
        fake_options.append({
            "provider": f"Provider-{i+1}",
            "price": float(price),
            "stops": (0 if i == 0 else 1),
            "duration": 6 + i,
            "comfort": comfort,
            "score": round((budget / max(1, price)) * 50 + comfort * 10, 2)
        })
    # ---- END PLACEHOLDER ----
    return {"origin": origin, "destination": destination, "depart": depart_date, "return": return_date, "options": fake_options}

# Hotel search tool (example with Hotels.com / Booking.com / Amadeus)
@retry_decorator()
def hotel_search(destination: str, checkin: str, nights: int, budget_per_night: float) -> Dict[str, Any]:
    METRICS_HOTEL_SEARCH.inc()
    log("HotelTool", logging.INFO, f"Searching hotels in {destination} nights={nights} budget/night={budget_per_night}")
    # ---- START PLACEHOLDER ----
    hotels = []
    base = 60 + int(abs(hash(destination)) % 200)
    for i in range(4):
        nightly = base + i * 40
        comfort = min(5, 3 + i)
        hotels.append({
            "name": f"{destination} Grand {i+1}",
            "nightly_price": float(nightly),
            "stars": comfort,
            "amenities": ["wifi", "breakfast"],
            "score": round(comfort * 10 + max(0, budget_per_night - nightly) * 0.1, 2)
        })

    return {"destination": destination, "checkin": checkin, "nights": nights, "hotels": hotels}

# POI search using Google Places API or Triposo
@retry_decorator()
def poi_search(country: str, days: int, interests: List[str]) -> Dict[str, Any]:
    METRICS_POI_SEARCH.inc()
    log("POITool", logging.INFO, f"Searching POIs for {country} days={days} interests={interests}")
    # If GOOGLE_PLACES_KEY is set, you can perform actual Places API calls here
    # ---- START PLACEHOLDER ----
    sample = [
        "Historic Center", "National Museum", "Food Market", "Scenic Overlook",
        "Botanical Garden", "Riverside Promenade", "Local Art District", "Cultural Show"
    ]
    pois = []
    for d in range(days):
        pois.append({
            "day": d + 1,
            "morning": sample[(d*2) % len(sample)],
            "afternoon": sample[(d*2+1) % len(sample)],
            "evening": "Food Market / Local Eatery"
        })
    # ---- END PLACEHOLDER ----
    return {"country": country, "pois": pois}

@dataclass
class A2AMessage:
    sender: str
    receiver: str
    payload: Dict[str, Any]
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: float = field(default_factory=time.time)

class BaseAgent:
    def __init__(self, name: str, session_id: str):
        self.name = name
        self.session_id = session_id
        self.inbox: List[A2AMessage] = []

    def send(self, msg: A2AMessage, recipient: 'BaseAgent'):
        log(self.name, logging.INFO, f"Send message to {recipient.name}: {msg.payload}")
        recipient.receive(msg)

    def receive(self, msg: A2AMessage):
        log(self.name, logging.INFO, f"Received message from {msg.sender}")
        self.inbox.append(msg)

    def read_messages(self) -> List[A2AMessage]:
        msgs = list(self.inbox)
        self.inbox.clear()
        return msgs

class DestinationAgent(BaseAgent):
    def __init__(self, name: str, session_id: str):
        super().__init__(name, session_id)

    def run(self, user_inputs: Dict[str, Any]) -> Dict[str, Any]:
        log(self.name, logging.INFO, "Running destination planning")
        country = user_inputs["country_pref"]
        days = user_inputs["duration_days"]
        interests = user_inputs.get("interests", ["sightseeing", "food"])
        poi_result = poi_search(country, days, interests)
        # Use LLM to produce a short summary (context compaction)
        system_prompt = "You are a travel planner assistant. Produce a concise summary of destinations and top activities."
        user_prompt = f"Country: {country}\nDays: {days}\nInterests: {interests}\nPOIs: {json.dumps(poi_result['pois'])}"
        summary = llm.chat(system_prompt, user_prompt)
        return {"country": country, "pois": poi_result["pois"], "summary": summary}

class FlightAgent(BaseAgent):
    def run(self, origin: str, destination: str, depart: str, ret: str, budget: float) -> Dict[str, Any]:
        log(self.name, logging.INFO, "Running flight search")
        flight_result = flight_search(origin, destination, depart, ret, budget)
        return flight_result

class HotelAgent(BaseAgent):
    def run(self, destination: str, checkin: str, nights: int, budget_per_night: float) -> Dict[str, Any]:
        log(self.name, logging.INFO, "Running hotel search")
        hotel_result = hotel_search(destination, checkin, nights, budget_per_night)
        return hotel_result

class ItineraryAgent(BaseAgent):
    def evaluate(self,
                 flight_options: List[Dict[str, Any]],
                 hotel_options: List[Dict[str, Any]],
                 duration_days: int,
                 budget: float) -> Dict[str, Any]:

        best = None
        for f in flight_options:
            for h in hotel_options:
                total_est = f["price"] + h["nightly_price"] * duration_days
                # Normalize cost factor: smaller cost -> higher score contribution
                cost_score = max(0, (budget - total_est) / max(1, budget))  # 0..1
                comfort_score = ((f.get("comfort", f.get("score", 0))/10) + (h.get("stars", h.get("score",0))/5)) / 2
                score = WEIGHT_COST * cost_score + WEIGHT_COMFORT * comfort_score
                candidate = {"flight": f, "hotel": h, "total_estimated": round(total_est,2), "score": round(score,3)}
                if best is None or candidate["score"] > best["score"]:
                    best = candidate
        if not best:
            raise RuntimeError("No viable combos found")
        return best

    def run(self, user_inputs: Dict[str, Any], dest_info: Dict[str, Any], flight_info: Dict[str, Any], hotel_info: Dict[str, Any]) -> Dict[str, Any]:
        log(self.name, logging.INFO, "Assembling itinerary")
        duration = user_inputs["duration_days"]
        budget = user_inputs["budget"]
        flight_options = flight_info.get("options", [])
        hotel_options = hotel_info.get("hotels", [])
        best_combo = self.evaluate(flight_options, hotel_options, duration, budget)
        # Build day-by-day plan from dest_info
        daily_plan = dest_info.get("pois", [])
        itinerary = {
            "user": user_inputs["name"],
            "user_id": user_inputs["user_id"],
            "origin": user_inputs["origin_country"],
            "destination_country": dest_info["country"],
            "flight_choice": best_combo["flight"],
            "hotel_choice": best_combo["hotel"],
            "estimated_budget_total": best_combo["total_estimated"],
            "daily_plan": daily_plan,
            "notes": dest_info.get("summary"),
            "created_at": time.time()
        }
        # Persist memory (last_trip)
        save_memory(user_inputs["user_id"], "last_trip", itinerary)
        METRICS_ITINERARIES.inc()
        return itinerary

# ----------------------------
# Orchestrator
# ----------------------------
class TravelOrchestrator:
    def __init__(self, session_id: str, user_id: str):
        self.session_id = session_id
        self.user_id = user_id
        # instantiate agents
        self.dest_agent = DestinationAgent("DestinationAgent", session_id)
        self.flight_agent = FlightAgent("FlightAgent", session_id)
        self.hotel_agent = HotelAgent("HotelAgent", session_id)
        self.itinerary_agent = ItineraryAgent("ItineraryAgent", session_id)

    def run(self, user_inputs: Dict[str, Any]) -> Dict[str, Any]:
        start_time = time.time()
        # Input validation
        required = ["name", "origin_country", "country_pref", "budget", "duration_days", "user_id"]
        for r in required:
            if r not in user_inputs:
                raise ValueError(f"Missing required input: {r}")
        # Step 1: Destination planning
        dest_info = self.dest_agent.run(user_inputs)
        # A2A: send summary to flight and hotel agents (demonstration)
        msg = A2AMessage(sender=self.dest_agent.name, receiver=self.flight_agent.name, payload={"summary": dest_info["summary"]})
        self.dest_agent.send(msg, self.flight_agent)
        self.dest_agent.send(msg, self.hotel_agent)
        # Step 2: Flight & Hotel search in parallel
        origin = user_inputs["origin_country"]
        destination = dest_info["country"]
        depart = user_inputs.get("depart_date", "2026-01-01")
        ret = user_inputs.get("return_date", f"2026-01-{1+user_inputs['duration_days']}")
        checkin = depart
        nights = user_inputs["duration_days"]
        budget = user_inputs["budget"]
        budget_per_night = max(30.0, (budget * 0.4) / max(1, nights))  # simple split heuristics
        results = {}

        with ThreadPoolExecutor(max_workers=2) as ex:
            futures = {
                ex.submit(self.flight_agent.run, origin, destination, depart, ret, budget): "flight",
                ex.submit(self.hotel_agent.run, destination, checkin, nights, budget_per_night): "hotel"
            }
            for fut in as_completed(futures):
                role = futures[fut]
                try:
                    results[role] = fut.result()
                except Exception as e:
                    METRICS_ERRORS.inc()
                    log("Orchestrator", logging.ERROR, f"{role} search failed: {e}")
                    raise

        # A2A: Flight & Hotel -> Itinerary
        fmsg = A2AMessage(sender=self.flight_agent.name, receiver=self.itinerary_agent.name, payload={"flight_info": results["flight"]})
        hmsg = A2AMessage(sender=self.hotel_agent.name, receiver=self.itinerary_agent.name, payload={"hotel_info": results["hotel"]})
        self.flight_agent.send(fmsg, self.itinerary_agent)
        self.hotel_agent.send(hmsg, self.itinerary_agent)

        # Step 3: Itinerary assembly
        itinerary = self.itinerary_agent.run(user_inputs, dest_info, results["flight"], results["hotel"])

        elapsed = time.time() - start_time
        METRICS_LAST_RUNTIME.set(elapsed)
        log("Orchestrator", logging.INFO, f"Orchestration completed in {elapsed:.2f}s")
        return itinerary

def prompt_user_inputs() -> Dict[str, Any]:
    print("Welcome to the Travel Agent AI (production scaffold). Please enter trip details.")
    name = input("Your name: ").strip() or "Traveler"
    user_id = input("User ID (leave blank to auto-generate): ").strip() or f"user_{uuid.uuid4().hex[:8]}"
    origin = input("Origin country (e.g. India): ").strip()
    country_pref = input("Destination country preference (e.g. Japan): ").strip()
    while True:
        try:
            budget = float(input("Total budget (USD): ").strip())
            break
        except Exception:
            print("Please enter a numeric budget (e.g. 1500).")
    while True:
        try:
            duration_days = int(input("Travel duration (days): ").strip())
            break
        except Exception:
            print("Please enter an integer number of days.")
    interests_raw = input("Interests (comma separated) [food,culture]: ").strip()
    interests = [i.strip() for i in interests_raw.split(",") if i.strip()] or ["food", "culture"]
    depart_date = input("Depart date (YYYY-MM-DD) [2026-01-01]: ").strip() or "2026-01-01"
    return_date = input(f"Return date (YYYY-MM-DD) [auto]: ").strip() or f"2026-01-{1+duration_days}"

    return {
        "name": name,
        "user_id": user_id,
        "origin_country": origin,
        "country_pref": country_pref,
        "budget": budget,
        "duration_days": duration_days,
        "interests": interests,
        "depart_date": depart_date,
        "return_date": return_date
    }

def main():
    try:
        user_inputs = prompt_user_inputs()
        session_id = str(uuid.uuid4())
        orchestrator = TravelOrchestrator(session_id, user_inputs["user_id"])
        itinerary = orchestrator.run(user_inputs)
        # Output to console & save as JSON
        print("\n--- Final Itinerary ---")
        print(json.dumps(itinerary, indent=2, default=str))
        out_file = f"itinerary_{user_inputs['user_id']}_{int(time.time())}.json"
        with open(out_file, "w", encoding="utf-8") as f:
            json.dump(itinerary, f, indent=2, default=str)
        print(f"Itinerary saved to {out_file}")
    except KeyboardInterrupt:
        print("\nCancelled by user.")
    except Exception as e:
        METRICS_ERRORS.inc()
        log("Main", logging.ERROR, f"Fatal error: {e}")
        raise

if __name__ == "__main__":
    main()

Welcome to the Travel Agent AI (production scaffold). Please enter trip details.
Your name: p
User ID (leave blank to auto-generate): 
Origin country (e.g. India): india
Destination country preference (e.g. Japan): spain
Total budget (USD): 90000
Travel duration (days): 10
Interests (comma separated) [food,culture]: veg, dance
Depart date (YYYY-MM-DD) [2026-01-01]: 20-07-2025
Return date (YYYY-MM-DD) [auto]: 30-07-2025





--- Final Itinerary ---
{
  "user": "p",
  "user_id": "user_fb27329d",
  "origin": "india",
  "destination_country": "spain",
  "flight_choice": {
    "provider": "Provider-1",
    "price": 799.0,
    "stops": 0,
    "duration": 6,
    "comfort": 5,
    "score": 5682.04
  },
  "hotel_choice": {
    "name": "spain Grand 3",
    "nightly_price": 193.0,
    "stars": 5,
    "amenities": [
      "wifi",
      "breakfast"
    ],
    "score": 390.7
  },
  "estimated_budget_total": 2729.0,
  "daily_plan": [
    {
      "day": 1,
      "morning": "Historic Center",
      "afternoon": "National Museum",
      "evening": "Food Market / Local Eatery"
    },
    {
      "day": 2,
      "morning": "Food Market",
      "afternoon": "Scenic Overlook",
      "evening": "Food Market / Local Eatery"
    },
    {
      "day": 3,
      "morning": "Botanical Garden",
      "afternoon": "Riverside Promenade",
      "evening": "Food Market / Local Eatery"
    },
    {
      "day": 4,
      "morning": "Local 