# LangGraph Essentials - Lab
## Building A Disaster Response/Notification Agent System

In this lab activity, you will implement a multi-agent disaster response notification system that checks national and local news for disaster related information. The system will also have a

### Environment Setup

In [1]:
%%capture --no-stderr
%pip install langchain langchain-openai langchain-google-genai langgraph gradio

In [2]:
import os
from getpass import getpass

keys = [
    "OPENAI_API_KEY",
    "GOOGLE_API_KEY",
    "GNEWS_API_KEY",
    "WEATHER_API_KEY",
    "IP_ADDRESS"
]

for key in keys:
  value = getpass(f"{key} (press enter to skip if not inputting key):")
  if value == "" or value is None:
    continue
  os.environ[key] = value


OPENAI_API_KEY (press enter to skip if not inputting key):··········
GOOGLE_API_KEY (press enter to skip if not inputting key):··········
GNEWS_API_KEY (press enter to skip if not inputting key):··········
WEATHER_API_KEY (press enter to skip if not inputting key):··········
IP_ADDRESS (press enter to skip if not inputting key):··········


### Agent Tools

Here we are defining the tools that our agents will have access to:

- **Disaster RSS Tool**: This tool aggregates disaster-related news feeds and creates a summary of the contents.
- **Geolocation Tool**: This tool captures the geolocation of the user based on their IP address.
- **Local News Tool**: This tool gets local news for the user based on location.
- **Local Weather Tool**: This tool gets the local weather forecast for a user based on their location.

In [12]:
from typing import List, Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
import requests
import ipaddress
from datetime import datetime
from langchain_core.tools import tool
from getpass import getpass  # For secure key input
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt.chat_agent_executor import create_react_agent
from langgraph.types import Command
from langgraph.errors import GraphRecursionError
from langchain_core.runnables.config import RunnableConfig

# --- Initialize a general purpose summarizer ---
summarizer_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant that summarizes text, focusing on disaster-related events with dates, locations, and severity."),
    ("human", "{text}")
])
summarizer_llm = init_chat_model(model_provider="google_genai", model="gemini-2.5-flash")
summarizer = summarizer_prompt | summarizer_llm

# --- Helper: Safe GET ---
def fetch_and_parse_url(url: str, params: Optional[Dict[str, Any]] = None, timeout: int = 10) -> Any:
    """A helper function to run GET requests, handling errors gracefully."""
    try:
        response = requests.get(url, params=params, timeout=timeout)
        response.raise_for_status()
        try:
            return response.json()
        except ValueError:
            return response.text
    except requests.RequestException as e:
        return f"error fetching {url}: {str(e)}"

# --- Tools ---

@tool
def get_public_ip() -> str:
    """Fetches the current public IP address."""
    try:
        response = requests.get("https://api.ipify.org?format=json", timeout=10)
        return response.json().get("ip", "")
    except Exception:
        raise ValueError("Unable to fetch public IP")

@tool
def geolocation(ip: Optional[str] = None) -> Dict[str, Any]:
    """Gathers location data based on a given IPv4 address. Suggests manual input if fetch fails."""
    if not ip:
        ip = os.getenv("IP_ADDRESS")
        if not ip:
            try:
                ip = get_public_ip()
            except ValueError:
                return {"error": "Unable to fetch IP", "suggestion": "Please provide a city name"}
    try:
        ipaddress.ip_address(ip)
    except ValueError:
        return {"error": f"Invalid IP format: {ip}", "suggestion": "Please provide a city name"}
    url = f"http://ip-api.com/json/{ip}"
    response = fetch_and_parse_url(url)
    if isinstance(response, dict) and response.get("status") == "fail":
        return {"error": response.get("message"), "suggestion": "Please provide a city name"}
    return response if isinstance(response, dict) else {"error": "Invalid response", "suggestion": "Please provide a city name"}

@tool
def disaster_rss_summary() -> str:
    """Aggregates disaster-related news feeds and summarizes contents for the last 7 days."""
    current_date = datetime.now().strftime("%Y-%m-%d")  # e.g., 2025-09-10
    feeds = [
        f"https://news.google.com/rss/search?q=natural+disasters+when:{current_date}-7d",
        "https://www.fema.gov/feeds/disasters-major.rss",
        "https://www.fema.gov/feeds/disasters-fire.rss"
    ]
    feed_responses = [fetch_and_parse_url(feed) for feed in feeds]
    text_input = "\n".join(str(response) for response in feed_responses)
    summary = summarizer.invoke({"text": text_input})
    return summary.content if hasattr(summary, "content") else str(summary)

@tool
def local_news(query: str, max_results: int = 10) -> Dict[str, Any]:
    """Retrieves the latest news based on the given query."""
    api_key = os.getenv("GNEWS_API_KEY")
    if not api_key:
        return {"error": "Missing GNEWS_API_KEY", "suggestion": "Please set GNEWS_API_KEY"}
    params = {"q": query, "max": max_results, "apikey": api_key}
    url = "https://gnews.io/api/v4/search"
    response = fetch_and_parse_url(url, params=params)
    return response if isinstance(response, dict) else {"error": "Invalid news response"}

@tool
def local_weather(lat: float, lon: float) -> Dict[str, Any]:
    """Retrieves the local weather based on the given location."""
    api_key = os.getenv("WEATHER_API_KEY")
    if not api_key:
        return {"error": "Missing WEATHER_API_KEY", "suggestion": "Please set WEATHER_API_KEY"}
    params = {"lat": lat, "lon": lon, "appid": api_key, "units": "metric"}
    url = "https://api.openweathermap.org/data/2.5/weather"
    response = fetch_and_parse_url(url, params=params)
    return response if isinstance(response, dict) else {"error": "Invalid weather response"}

### Define the Agent State

In [13]:
from typing import TypedDict, List, Dict, Any

class DisasterState(TypedDict, total=False):
    input: str                        # the current user prompt
    messages: List[Dict[str, Any]]    # conversation turns accumulated by the agent

    # optional convenience fields that tools/agent may fill
    ip: str                           # detected public IP
    location: str                     # "City, Region" derived from geolocation
    geo: Dict[str, Any]               # full geolocation payload (ip-api result)
    news: Dict[str, Any]              # raw GNews response (if fetched)
    weather: Dict[str, Any]           # raw weather response (if fetched)


### Define the Agent Roles and Chains

In [14]:
# Define the system prompt and appropriate chains for each agent (e.g. StructuredOutput, etc.)

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# tools
TOOLS = [
    get_public_ip,      # new: auto-detect IP if env var not set
    geolocation,        # accepts ip or auto-fetches via get_public_ip
    local_news,         # GNews (needs GNEWS_API_KEY)
    local_weather,      # OpenWeather (needs WEATHER_API_KEY)
    disaster_rss_summary,  # optional: global disaster feeds summary
]

# how the agent should behave
AGENT_SYS = """
You are a disaster advisory assistant. Use ReAct: think, use tools, then answer.
1) Determine user's location with get_public_ip then geolocation. If that fails, ask the user to provide a city.
2) Always check disaster_rss_summary for national/global disasters. Then pull relevant local_news and local_weather if keys are available.
3) Produce a concise 24–72h advisory with 3–6 specific actions, incorporating RSS data even if local tools fail.
Be factual, avoid hype, cite sources/links when available, and explain briefly why actions are recommended.
"""

# state modifier prompt (pre/appends system instructions to the agent loop)
agent_prompt = ChatPromptTemplate.from_messages([
    ("system", AGENT_SYS),
    ("human", "{input}")
])

# single ReAct agent that will pick tools as needed
react_agent = create_react_agent(
    model=llm,
    tools=TOOLS,
    prompt=AGENT_SYS,
)

### Define the Agent Nodes

In [15]:
# Agent Nodes
from typing import Dict, Any, Optional
from langchain_core.runnables.config import RunnableConfig

DisasterState = Dict[str, Any]

DEFAULT_PROMPT = (
    "Please determine my location using the tools, then give a concise 24–72h "
    "disaster advisory with specific actions."
)

def agent_node(state: DisasterState, config: Optional[RunnableConfig] = None) -> DisasterState:
    """
    Delegates to the prebuilt ReAct agent.
    Expects state to contain an 'input' string; provides a safe default if missing.
    """
    if not state.get("input"):
        state["input"] = DEFAULT_PROMPT
    # react_agent comes from the previous cell (create_react_agent)
    return react_agent.invoke(state, config)


### Create and Compile the Graph

In [16]:
builder = StateGraph(DisasterState)

builder.add_node("agent", agent_node)

# entry point → agent → END
builder.set_entry_point("agent")
builder.add_edge("agent", END)

# compile with checkpointer
graph = builder.compile(checkpointer=InMemorySaver())

### Set up the UI with Gradio

In [17]:
import gradio as gr
from uuid import uuid4

SESSION_NS = "react-ui"
thread_state = gr.State(value="")  # Persisted across turns

def disaster_response_agent(message: str, history: List[Dict[str, Any]], thread_id: str):
    # Allocate a stable thread ID for the checkpointer if none provided
    if not thread_id:
        thread_id = f"ui-{uuid4()}"

    user_input = message.strip() or (
        "please determine my location using the tools, then give a concise 24–72h "
        "disaster advisory with 3–6 specific actions."
    )

    # Use messages with the ReAct executor
    cfg = {"configurable": {"thread_id": thread_id, "checkpoint_ns": SESSION_NS}}
    final_state = graph.invoke({"messages": [{"role": "user", "content": user_input}]}, cfg)

    # Pull last assistant reply from the conversation
    msgs = final_state.get("messages", [])
    out = "no response"
    if msgs:
        for m in reversed(msgs):
            if hasattr(m, 'role'):
                role = m.role.lower()
            else:
                role = getattr(m, 'type', '').lower()
            if role in ("assistant", "ai") and hasattr(m, 'content') and m.content:
                out = m.content
                break
        else:
            out = msgs[-1].content if hasattr(msgs[-1], 'content') else str(msgs[-1])

    # Return the response and the current thread_id (Gradio will update state)
    return out, thread_id  # Tuple return: (response, updated_thread_id)

# Update ChatInterface with explicit inputs and outputs
gr.ChatInterface(
    fn=disaster_response_agent,
    title="Disaster Agent",
    type="messages",
    additional_inputs=[thread_state],  # Pass thread_id as input
    additional_outputs=[thread_state],  # Update thread_state with the returned thread_id
).launch(debug=True)

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://5f3a03729f64e78749.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://5f3a03729f64e78749.gradio.live


