<a href="https://colab.research.google.com/github/poojitha01-5/Crisis-Info-Navigator/blob/main/CrisisInfoNavigator_Gemini_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Cell 0: Install Google Generative AI client
!pip install -q google-generativeai

In [2]:
# Cell 1: Configure Google API key (Gemini)
import os
import google.generativeai as genai

# TODO: Replace with your real key or set it in Colab's environment
GOOGLE_API_KEY = "AIzaSyDR8muvGK_-5GHxJd-6Jc8jwIOJwyU58dQ"

if GOOGLE_API_KEY and GOOGLE_API_KEY != "PUT_YOUR_API_KEY_HERE":
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    genai.configure(api_key=GOOGLE_API_KEY)
else:
    print("WARNING: GOOGLE_API_KEY is not set. The LLM-based guidance tool will raise an error if called.")

In [3]:
# Cell 2: Create project folders
!mkdir -p project/agents project/tools project/memory project/core

In [4]:
# Cell 3: project/__init__.py
%%writefile project/__init__.py
"""
CrisisInfo Navigator project package.
"""


Writing project/__init__.py


In [5]:
# Cell 4: project/agents/__init__.py
%%writefile project/agents/__init__.py
from project.agents.planner import Planner
from project.agents.worker import Worker
from project.agents.evaluator import Evaluator

__all__ = ["Planner", "Worker", "Evaluator"]

Writing project/agents/__init__.py


In [6]:
# Cell 5: project/tools/__init__.py
%%writefile project/tools/__init__.py
"""
Tools package for CrisisInfo Navigator.
"""


Writing project/tools/__init__.py


In [7]:
# Cell 6: project/memory/__init__.py
%%writefile project/memory/__init__.py
"""
Memory package for CrisisInfo Navigator.
"""


Writing project/memory/__init__.py


In [8]:
# Cell 7: project/core/__init__.py
%%writefile project/core/__init__.py
"""
Core utilities for CrisisInfo Navigator.
"""


Writing project/core/__init__.py


In [9]:
# Cell 8: project/core/observability.py
%%writefile project/core/observability.py
import logging
from typing import Any, Dict

_logger = logging.getLogger("crisisinfo_navigator")

if not _logger.handlers:
    _handler = logging.StreamHandler()
    _formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
    )
    _handler.setFormatter(_formatter)
    _logger.addHandler(_handler)
    _logger.setLevel(logging.INFO)


def get_logger() -> logging.Logger:
    return _logger


def log_event(event_type: str, data: Dict[str, Any]) -> None:
    """
    Log a structured event for simple observability.
    """
    _logger.info("%s | %s", event_type, data)

Writing project/core/observability.py


In [10]:
# Cell 9: project/core/a2a_protocol.py
%%writefile project/core/a2a_protocol.py
from dataclasses import dataclass, field
from typing import Any, Dict
import uuid


@dataclass
class AgentMessage:
    """
    Simple message envelope for agent-to-agent communication.
    """
    message_id: str
    trace_id: str
    sender: str
    receiver: str
    type: str
    payload: Dict[str, Any]
    meta: Dict[str, Any] = field(default_factory=dict)


def generate_trace_id() -> str:
    """
    Generate a new trace ID for a user interaction.
    """
    return str(uuid.uuid4())


def generate_message_id() -> str:
    """
    Generate a new message ID.
    """
    return str(uuid.uuid4())


def create_message(
    sender: str,
    receiver: str,
    message_type: str,
    payload: Dict[str, Any],
    trace_id: str,
    meta: Dict[str, Any] | None = None,
) -> AgentMessage:
    """
    Convenience factory for AgentMessage.
    """
    if meta is None:
        meta = {}
    return AgentMessage(
        message_id=generate_message_id(),
        trace_id=trace_id,
        sender=sender,
        receiver=receiver,
        type=message_type,
        payload=payload,
        meta=meta,
    )

Writing project/core/a2a_protocol.py


In [11]:
# Cell 10: project/memory/session_memory.py
%%writefile project/memory/session_memory.py
from typing import Any, Dict


class SessionMemory:
    """
    Extremely simple in-memory session + user-profile storage.

    In a real deployment this would be backed by Redis, a database, or another
    external store. For the demo, it is process-local.
    """

    _sessions: Dict[str, Dict[str, Any]] = {}

    def __init__(self, session_id: str = "default") -> None:
        self.session_id = session_id
        if session_id not in self._sessions:
            self._sessions[session_id] = {
                "user_profile": {},
                "state": {},
            }

    # ---- User profile -----------------------------------------------------
    def get_user_profile(self) -> Dict[str, Any]:
        return self._sessions[self.session_id]["user_profile"]

    def update_user_profile(self, **kwargs: Any) -> None:
        self._sessions[self.session_id]["user_profile"].update(kwargs)

    # ---- Session state -----------------------------------------------------
    def get_state(self) -> Dict[str, Any]:
        return self._sessions[self.session_id]["state"]

    def update_state(self, **kwargs: Any) -> None:
        self._sessions[self.session_id]["state"].update(kwargs)

Writing project/memory/session_memory.py


In [12]:
# Cell 11: project/core/context_engineering.py
%%writefile project/core/context_engineering.py
from typing import Any, Dict
from project.memory.session_memory import SessionMemory


def build_planner_context(user_input: str, memory: SessionMemory) -> Dict[str, Any]:
    """
    Build a context structure for the Planner agent.
    """
    user_profile = memory.get_user_profile()
    state = memory.get_state()

    return {
        "user_input": user_input,
        "user_profile": user_profile,
        "state": state,
        "system_policy": {
            "non_medical": True,
            "safety_first": True,
            "disaster_phases": ["preparedness", "during", "recovery"],
        },
    }


def build_worker_context(plan_payload: Dict[str, Any], memory: SessionMemory) -> Dict[str, Any]:
    """
    Build a context structure for the Worker agent.
    """
    user_profile = memory.get_user_profile()
    state = memory.get_state()
    return {
        "plan": plan_payload,
        "user_profile": user_profile,
        "state": state,
    }


def build_evaluator_context(
    work_payload: Dict[str, Any], memory: SessionMemory
) -> Dict[str, Any]:
    """
    Build a context structure for the Evaluator agent.
    """
    user_profile = memory.get_user_profile()
    state = memory.get_state()
    return {
        "draft": work_payload,
        "user_profile": user_profile,
        "state": state,
        "checks": {
            "non_medical": True,
            "clarity": True,
            "source_alignment": True,
        },
    }

Writing project/core/context_engineering.py


In [13]:
# Cell 12: project/tools/tools.py
%%writefile project/tools/tools.py
from typing import Any, Dict, List
import os
import textwrap

import google.generativeai as genai


def _ensure_configured() -> None:
    api_key = os.environ.get("GOOGLE_API_KEY")
    if not api_key:
        raise RuntimeError(
            "GOOGLE_API_KEY is not set. Please set it before calling disaster guidance."
        )
    genai.configure(api_key=api_key)


def hotline_lookup(region: str = "generic") -> Dict[str, str]:
    """
    Stubbed hotline lookup. This stays simple and generic.
    """
    if region.lower() in ("india", "in"):
        return {
            "general_emergency": "112",
            "disaster_management": "108",  # example
        }
    return {
        "general_emergency": "911/112 (depending on your country)",
        "disaster_management": "Check your local government's disaster helpline.",
    }


def calculator(expression: str) -> float:
    """
    Very small calculator tool using a restricted eval environment.
    """
    try:
        value = eval(expression, {"__builtins__": {}}, {})
        return float(value)
    except Exception:
        return float("nan")


def generic_search(query: str) -> List[Dict[str, str]]:
    """
    Stubbed generic search tool.
    """
    return [
        {
            "title": "Generic safety guidance",
            "snippet": f"High-level safety information for query: {query}",
            "link": "https://example.com/search",
        }
    ]


def generate_disaster_guidance_llm(
    disaster_type: str,
    phase: str,
    region: str,
    raw_input: str,
) -> str:
    """
    Use Google Gemini to generate non-medical safety guidance dynamically
    for any kind of disaster, based on the user input and detected phase.
    """
    _ensure_configured()

    system_instructions = textwrap.dedent(
        """
        You are a calm, factual, non-medical disaster safety assistant.

        Your job:
        - Give clear, short, actionable safety steps.
        - Prioritise immediate safety over everything else.
        - Never give medical diagnosis or detailed treatment instructions.
        - Do not invent hotline numbers or specific local services.
        - Encourage users to follow local authority and emergency service guidance.

        Output format:
        - A numbered list of 6–10 steps.
        - Each step should be one sentence.
        """
    ).strip()

    user_context = textwrap.dedent(
        f"""
        Disaster type (may be generic): {disaster_type}
        Phase: {phase}
        Region or country (if known): {region}
        User message: {raw_input}
        """
    ).strip()

    model = genai.GenerativeModel("gemini-2.5-flash")
    response = model.generate_content(
        [
            system_instructions,
            "",
            "Now generate safety guidance:\n",
            user_context,
        ]
    )
    text = response.text or ""
    return text.strip()

Writing project/tools/tools.py


In [14]:
# Cell 13: project/agents/planner.py
%%writefile project/agents/planner.py
from typing import Any, Dict
from project.core.a2a_protocol import AgentMessage, create_message
from project.core.context_engineering import build_planner_context
from project.memory.session_memory import SessionMemory


class Planner:
    """
    Planner agent:
    - Classifies disaster type using simple keyword heuristics.
    - Classifies phase (preparedness / during / recovery).
    - Creates a simple plan for downstream agents.
    """

    def __init__(self, name: str = "planner") -> None:
        self.name = name

    @staticmethod
    def _detect_disaster_type(text: str) -> str:
        text_lower = text.lower()
        if "earthquake" in text_lower:
            return "earthquake"
        if "flood" in text_lower or "flooding" in text_lower:
            return "flood"
        if "cyclone" in text_lower or "hurricane" in text_lower or "typhoon" in text_lower:
            return "cyclone"
        if "fire" in text_lower or "wildfire" in text_lower:
            return "fire"
        if "heatwave" in text_lower or "heat wave" in text_lower or "extreme heat" in text_lower:
            return "heatwave"
        if "landslide" in text_lower or "mudslide" in text_lower:
            return "landslide"
        # Fallback for any other or unknown hazard
        return "generic"

    @staticmethod
    def _detect_phase(text: str) -> str:
        text_lower = text.lower()
        if any(word in text_lower for word in ["now", "right now", "happening", "currently", "during"]):
            return "during"
        if any(word in text_lower for word in ["after", "aftermath", "recovery", "post"]):
            return "recovery"
        if any(word in text_lower for word in ["prepare", "before", "ready", "readiness", "might happen"]):
            return "preparedness"
        # Default to preparedness if not clear
        return "preparedness"

    def plan(
        self,
        user_input: str,
        session_memory: SessionMemory,
        trace_id: str,
    ) -> AgentMessage:
        """
        Create a high-level plan for the Worker.
        """
        context = build_planner_context(user_input, session_memory)
        disaster_type = self._detect_disaster_type(user_input)
        phase = self._detect_phase(user_input)

        user_profile = context.get("user_profile", {})
        region = user_profile.get("region", "generic")

        plan_payload: Dict[str, Any] = {
            "disaster_type": disaster_type,
            "phase": phase,
            "region": region,
            "objectives": [
                "Provide concise, step-by-step non-medical safety guidance.",
                "Mention emergency hotlines if available.",
            ],
            "tool_calls": [
                "generate_disaster_guidance_llm",
                "hotline_lookup",
            ],
            "raw_user_input": user_input,
        }

        state = session_memory.get_state()
        state["last_disaster_type"] = disaster_type
        state["last_phase"] = phase
        session_memory.update_state(**state)

        return create_message(
            sender="planner",
            receiver="worker",
            message_type="PLAN_RESPONSE",
            payload=plan_payload,
            trace_id=trace_id,
        )

Writing project/agents/planner.py


In [15]:
# Cell 14: project/agents/worker.py
%%writefile project/agents/worker.py
from typing import Any, Dict
from project.core.a2a_protocol import AgentMessage, create_message
from project.core.context_engineering import build_worker_context
from project.memory.session_memory import SessionMemory
from project.tools import tools as tool_module


class Worker:
    """
    Worker agent:
    - Executes the plan produced by the Planner.
    - Calls Google Gemini via tools.generate_disaster_guidance_llm
      to get dynamic, non-hardcoded safety guidance.
    - Also adds hotline information where possible.
    """

    def __init__(self, name: str = "worker") -> None:
        self.name = name

    @staticmethod
    def _format_hotlines(hotlines: Dict[str, str]) -> str:
        if not hotlines:
            return ""
        lines = ["", "Emergency contacts (verify for your exact location):"]
        for key, value in hotlines.items():
            label = key.replace("_", " ").title()
            lines.append(f"- {label}: {value}")
        return "\n".join(lines)

    def execute(
        self,
        plan_message: AgentMessage,
        session_memory: SessionMemory,
    ) -> AgentMessage:
        """
        Execute the Planner's plan and generate a draft response.
        """
        context = build_worker_context(plan_message.payload, session_memory)
        plan = context["plan"]

        disaster_type = plan.get("disaster_type", "generic")
        phase = plan.get("phase", "preparedness")
        region = plan.get("region", "generic")
        raw_user_input = plan.get("raw_user_input", "")

        # Call Gemini to dynamically generate guidance text
        guidance_text = tool_module.generate_disaster_guidance_llm(
            disaster_type=disaster_type,
            phase=phase,
            region=region,
            raw_input=raw_user_input,
        )

        hotlines = tool_module.hotline_lookup(region)
        hotline_text = self._format_hotlines(hotlines)

        header = f"Here is some basic non-medical guidance for a {disaster_type} ({phase} phase):"
        draft_response = f"{header}\n\n{guidance_text}{hotline_text}"

        draft_payload: Dict[str, Any] = {
            "draft_response": draft_response,
            "disaster_type": disaster_type,
            "phase": phase,
            "region": region,
            "raw_user_input": raw_user_input,
        }

        return create_message(
            sender="worker",
            receiver="evaluator",
            message_type="WORK_RESPONSE",
            payload=draft_payload,
            trace_id=plan_message.trace_id,
        )

Writing project/agents/worker.py


In [16]:
# Cell 15: project/agents/evaluator.py
%%writefile project/agents/evaluator.py
from typing import Any, Dict
from project.core.a2a_protocol import AgentMessage, create_message
from project.core.context_engineering import build_evaluator_context
from project.memory.session_memory import SessionMemory


class Evaluator:
    """
    Evaluator agent:
    - Performs simple safety and clarity checks on the draft response.
    - Applies a light sanitization and always approves in this demo.
    """

    def __init__(self, name: str = "evaluator") -> None:
        self.name = name

    @staticmethod
    def _sanitize_text(text: str) -> str:
        """
        Very simple filter that avoids obviously medical language.
        This is a stub and not a replacement for real safety filters.
        """
        forbidden_phrases = [
            "diagnose",
            "prescription",
            "medication",
            "dose",
            "treat this condition",
        ]
        sanitized = text
        for phrase in forbidden_phrases:
            sanitized = sanitized.replace(phrase, "[redacted]")
        return sanitized

    def evaluate(
        self,
        work_message: AgentMessage,
        session_memory: SessionMemory,
    ) -> AgentMessage:
        """
        Evaluate the draft response and either approve or request changes.
        """
        context = build_evaluator_context(work_message.payload, session_memory)
        draft_payload = context["draft"]
        draft_response: str = draft_payload.get("draft_response", "")

        safe_text = self._sanitize_text(draft_response).strip()
        if not safe_text:
            safe_text = (
                "I was not able to generate guidance. "
                "Please contact your local emergency services or authorities."
            )

        eval_payload: Dict[str, Any] = {
            "status": "approved",
            "final_response": safe_text,
            "disaster_type": draft_payload.get("disaster_type"),
            "phase": draft_payload.get("phase"),
            "region": draft_payload.get("region"),
        }

        return create_message(
            sender="evaluator",
            receiver="main_agent",
            message_type="EVAL_RESPONSE",
            payload=eval_payload,
            trace_id=work_message.trace_id,
        )

Writing project/agents/evaluator.py


In [17]:
# Cell 16: project/main_agent.py
%%writefile project/main_agent.py
from typing import Any, Dict
from project.agents.planner import Planner
from project.agents.worker import Worker
from project.agents.evaluator import Evaluator
from project.core.a2a_protocol import generate_trace_id
from project.core.observability import log_event
from project.memory.session_memory import SessionMemory


class MainAgent:
    """
    Orchestrates the Planner -> Worker -> Evaluator multi-agent flow.
    """

    def __init__(self, session_id: str = "default") -> None:
        self.session_memory = SessionMemory(session_id=session_id)
        self.planner = Planner()
        self.worker = Worker()
        self.evaluator = Evaluator()

    def handle_message(self, user_input: str) -> Dict[str, Any]:
        """
        Handle a single user message through the multi-agent pipeline.
        """
        trace_id = generate_trace_id()
        log_event("user_message", {"trace_id": trace_id, "user_input": user_input})

        plan_msg = self.planner.plan(user_input, self.session_memory, trace_id)
        log_event("plan_created", {"trace_id": trace_id, "plan": plan_msg.payload})

        work_msg = self.worker.execute(plan_msg, self.session_memory)
        log_event("work_completed", {"trace_id": trace_id, "work": work_msg.payload})

        eval_msg = self.evaluator.evaluate(work_msg, self.session_memory)
        log_event("evaluation_completed", {"trace_id": trace_id, "evaluation": eval_msg.payload})

        final_text = eval_msg.payload.get("final_response", "No response generated.")

        return {
            "response": final_text,
            "trace_id": trace_id,
            "disaster_type": eval_msg.payload.get("disaster_type"),
            "phase": eval_msg.payload.get("phase"),
            "region": eval_msg.payload.get("region"),
        }


def run_agent(user_input: str):
    agent = MainAgent()
    result = agent.handle_message(user_input)
    return result["response"]

Writing project/main_agent.py


In [18]:
# Cell 17: project/app.py
%%writefile project/app.py
from project.main_agent import run_agent


def chat() -> None:
    """
    Very simple CLI loop for manual testing.
    """
    print("CrisisInfo Navigator demo. Type 'exit' to quit.")
    while True:
        try:
            user_input = input("You: ").strip()
        except (EOFError, KeyboardInterrupt):
            print("\nExiting.")
            break

        if user_input.lower() in {"exit", "quit"}:
            print("Goodbye.")
            break

        response = run_agent(user_input)
        print(f"Agent: {response}")


if __name__ == "__main__":
    chat()

Writing project/app.py


In [19]:
# Cell 18: project/run_demo.py
%%writefile project/run_demo.py
import sys
import os

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from project.main_agent import run_agent


if __name__ == "__main__":
    print(run_agent("Hello! This is a demo."))

Writing project/run_demo.py


In [20]:
# Cell 19: project/requirements.txt
%%writefile project/requirements.txt
google-generativeai>=0.8.0

Writing project/requirements.txt


In [23]:
# Cell 20: Test the main agent (requires GOOGLE_API_KEY to be set)
from project.main_agent import run_agent

print(run_agent("It’s extremely hot and humid today, and people are fainting outside. What should I do to stay safe during this heatwave?"))

2025-12-03 12:44:22,823 - INFO - crisisinfo_navigator - user_message | {'trace_id': 'de3bab85-8efe-444f-a6a7-89c595398a3a', 'user_input': 'It’s extremely hot and humid today, and people are fainting outside. What should I do to stay safe during this heatwave?'}
INFO:crisisinfo_navigator:user_message | {'trace_id': 'de3bab85-8efe-444f-a6a7-89c595398a3a', 'user_input': 'It’s extremely hot and humid today, and people are fainting outside. What should I do to stay safe during this heatwave?'}
2025-12-03 12:44:22,827 - INFO - crisisinfo_navigator - plan_created | {'trace_id': 'de3bab85-8efe-444f-a6a7-89c595398a3a', 'plan': {'disaster_type': 'heatwave', 'phase': 'during', 'region': 'generic', 'objectives': ['Provide concise, step-by-step non-medical safety guidance.', 'Mention emergency hotlines if available.'], 'tool_calls': ['generate_disaster_guidance_llm', 'hotline_lookup'], 'raw_user_input': 'It’s extremely hot and humid today, and people are fainting outside. What should I do to stay s

Here is some basic non-medical guidance for a heatwave (during phase):

Here is safety guidance for a heatwave:

1.  Drink plenty of water consistently throughout the day, without waiting to feel thirsty.
2.  Stay in air-conditioned or the coolest available indoor spaces as much as possible.
3.  Wear loose-fitting, lightweight, and light-colored clothing to help keep cool.
4.  Limit strenuous outdoor activities, especially during the peak heat hours.
5.  Check regularly on vulnerable individuals, such as the elderly, young children, and those who are unwell.
6.  Cool down by taking cool showers or baths, or by applying cool, wet cloths.
7.  Be aware of signs of heat-related illness in yourself and those around you.
8.  If you or someone else experiences severe symptoms of heat-related illness, seek professional help immediately.
9.  Always follow specific safety instructions and alerts issued by your local authorities.
Emergency contacts (verify for your exact location):
- General Emer

In [22]:
# Cell 21: Zip the project directory
!zip -r project.zip project

  adding: project/ (stored 0%)
  adding: project/main_agent.py (deflated 67%)
  adding: project/agents/ (stored 0%)
  adding: project/agents/worker.py (deflated 62%)
  adding: project/agents/evaluator.py (deflated 61%)
  adding: project/agents/__init__.py (deflated 45%)
  adding: project/agents/__pycache__/ (stored 0%)
  adding: project/agents/__pycache__/worker.cpython-312.pyc (deflated 39%)
  adding: project/agents/__pycache__/__init__.cpython-312.pyc (deflated 28%)
  adding: project/agents/__pycache__/planner.cpython-312.pyc (deflated 45%)
  adding: project/agents/__pycache__/evaluator.cpython-312.pyc (deflated 40%)
  adding: project/agents/planner.py (deflated 67%)
  adding: project/tools/ (stored 0%)
  adding: project/tools/__init__.py (stored 0%)
  adding: project/tools/__pycache__/ (stored 0%)
  adding: project/tools/__pycache__/tools.cpython-312.pyc (deflated 42%)
  adding: project/tools/__pycache__/__init__.cpython-312.pyc (deflated 14%)
  adding: project/tools/tools.py (defla