<a href="https://colab.research.google.com/github/reachosen/MortalityGraph/blob/main/MortalityWorkerSupervisorGraph.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Explanation of the Code
1. Setup
Dependencies: Installs langgraph and langsmith via pip. Run this in a Colab cell first.
LangSmith: Optional setup for observability. Uncomment and add your API key from LangSmith if you want tracing (get it from their site). If skipped, the code still runs but without detailed logs.
2. Simulated APIs
fetch_mortality_data: Hardcoded to return 5.2% for Facility A, 3.8% for B.
fetch_context_data: Hardcoded staffing (A: 1:6, B: 1:4) and acuity (A: 4.5, B: 4.0).
Added time.sleep(0.2) to mimic API latency.
3. Worker Agents
Mortality Fetcher: Calls the fake API for mortality rates.
Context Fetcher: Gets staffing and acuity data.
Analyzer: Compares data and generates a text explanation.
Visualizer: Simulates a chart as text (in practice, you’d use matplotlib).
4. Supervisor Agent
supervisor: A simple routing function checking the state to decide the next node. It mimics the supervisor’s role in orchestrating the workflow.
5. LangGraph Workflow
Nodes: Each worker agent is a node.
Edges: Conditional edges use the supervisor to route between nodes in sequence: fetch mortality → fetch context → analyze → visualize → end.
State: A dictionary tracks data across nodes (e.g., mortality_A, analysis).
6. Execution
app.invoke(initial_state) runs the graph, simulating the full process.
Output shows the analysis and a text-based “chart.”
Observability in Action
If LangSmith is enabled:
Each node’s execution (e.g., API calls, analysis) is logged with inputs, outputs, and timing.
Check https://smith.langchain.com/ after running to see traces (e.g., “mortality_fetcher took 200ms”).
Without LangSmith, you’ll still see print statements (Fetching mortality data...) for basic visibility.
Sample Output

In [1]:
!pip install langgraph langsmith

Collecting langgraph
  Downloading langgraph-0.2.74-py3-none-any.whl.metadata (17 kB)
Collecting langgraph-checkpoint<3.0.0,>=2.0.10 (from langgraph)
  Downloading langgraph_checkpoint-2.0.16-py3-none-any.whl.metadata (4.6 kB)
Collecting langgraph-sdk<0.2.0,>=0.1.42 (from langgraph)
  Downloading langgraph_sdk-0.1.53-py3-none-any.whl.metadata (1.8 kB)
Downloading langgraph-0.2.74-py3-none-any.whl (151 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m151.4/151.4 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading langgraph_checkpoint-2.0.16-py3-none-any.whl (38 kB)
Downloading langgraph_sdk-0.1.53-py3-none-any.whl (45 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.4/45.4 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: langgraph-sdk, langgraph-checkpoint, langgraph
Successfully installed langgraph-0.2.74 langgraph-checkpoint-2.0.16 langgraph-sdk-0.1.53


In [2]:
# Import libraries
from langgraph.graph import StateGraph, END
from typing import Dict, Any, List
import time
from langsmith import Client
import os

In [None]:
# Optional: Set up LangSmith for observability (skip if no API key)
# Get your API key from https://smith.langchain.com/settings and uncomment below
# os.environ["LANGSMITH_API_KEY"] = "your-langsmith-api-key"
# os.environ["LANGCHAIN_TRACING_V2"] = "true"
# os.environ["LANGCHAIN_PROJECT"] = "Mortality_LangGraph_Case"
# client = Client()

In [3]:
# Simulated API functions with hardcoded returns
def fetch_mortality_data(facility: str) -> Dict[str, float]:
    """Simulate Vertica API call for mortality rates."""
    time.sleep(0.2)  # Simulate network delay
    if facility == "A":
        return {"facility": "A", "mortality_rate": 5.2}
    elif facility == "B":
        return {"facility": "B", "mortality_rate": 3.8}
    return {"error": "Unknown facility"}

def fetch_context_data(facility: str) -> Dict[str, Any]:
    """Simulate DB API call for staffing and acuity."""
    time.sleep(0.2)  # Simulate network delay
    if facility == "A":
        return {"facility": "A", "nurse_to_patient_ratio": "1:6", "acuity_score": 4.5}
    elif facility == "B":
        return {"facility": "B", "nurse_to_patient_ratio": "1:4", "acuity_score": 4.0}
    return {"error": "Unknown facility"}

In [4]:
# Worker Agent Functions
def mortality_fetcher(state: Dict[str, Any]) -> Dict[str, Any]:
    """Worker Agent 1: Fetch mortality data for both facilities."""
    print("Fetching mortality data...")
    state["mortality_A"] = fetch_mortality_data("A")
    state["mortality_B"] = fetch_mortality_data("B")
    return state

def context_fetcher(state: Dict[str, Any]) -> Dict[str, Any]:
    """Worker Agent 2: Fetch contextual data."""
    print("Fetching context data...")
    state["context_A"] = fetch_context_data("A")
    state["context_B"] = fetch_context_data("B")
    return state

def analyzer(state: Dict[str, Any]) -> Dict[str, Any]:
    """Worker Agent 3: Analyze data and find differences."""
    print("Analyzing data...")
    mort_A = state["mortality_A"]["mortality_rate"]
    mort_B = state["mortality_B"]["mortality_rate"]
    ratio_A = state["context_A"]["nurse_to_patient_ratio"]
    ratio_B = state["context_B"]["nurse_to_patient_ratio"]
    acuity_A = state["context_A"]["acuity_score"]
    acuity_B = state["context_B"]["acuity_score"]

    diff = mort_A - mort_B
    analysis = (
        f"Mortality rate is {diff:.1f}% higher at Facility A. "
        f"Facility A has {ratio_A} nurse-to-patient ratio vs {ratio_B} at B, "
        f"and acuity of {acuity_A} vs {acuity_B}."
    )
    state["analysis"] = analysis
    return state

def visualizer(state: Dict[str, Any]) -> Dict[str, Any]:
    """Worker Agent 4: Simulate visualization generation."""
    print("Generating visualization...")
    # Simulate chart creation (in reality, use matplotlib or similar)
    chart = (
        "Bar chart: Mortality Rates\n"
        f"Facility A: {state['mortality_A']['mortality_rate']}%\n"
        f"Facility B: {state['mortality_B']['mortality_rate']}%\n"
        "Overlays: Staffing, Acuity"
    )
    state["visualization"] = chart
    return state

In [5]:
# Supervisor Agent Logic (Simplified Routing)
def supervisor(state: Dict[str, Any]) -> str:
    """Decide next step based on state."""
    if "mortality_A" not in state:
        return "mortality_fetcher"
    if "context_A" not in state:
        return "context_fetcher"
    if "analysis" not in state:
        return "analyzer"
    if "visualization" not in state:
        return "visualizer"
    return END

In [6]:
# Define the LangGraph Workflow
workflow = StateGraph(Dict[str, Any])

# Add nodes (worker agents)
workflow.add_node("mortality_fetcher", mortality_fetcher)
workflow.add_node("context_fetcher", context_fetcher)
workflow.add_node("analyzer", analyzer)
workflow.add_node("visualizer", visualizer)

# Add edges (supervised routing)
workflow.set_entry_point("mortality_fetcher")
workflow.add_conditional_edges(
    "mortality_fetcher", supervisor, {"context_fetcher": "context_fetcher"}
)
workflow.add_conditional_edges(
    "context_fetcher", supervisor, {"analyzer": "analyzer"}
)
workflow.add_conditional_edges(
    "analyzer", supervisor, {"visualizer": "visualizer"}
)
workflow.add_conditional_edges(
    "visualizer", supervisor, {END: END}
)

<langgraph.graph.state.StateGraph at 0x7a26968e0950>

In [7]:
# Compile the graph
app = workflow.compile()

# Run the workflow
initial_state = {}
result = app.invoke(initial_state)

# Display results
print("\nFinal Result:")
print("Analysis:", result["analysis"])
print("Visualization:\n", result["visualization"])

Fetching mortality data...
Fetching context data...
Analyzing data...
Generating visualization...

Final Result:
Analysis: Mortality rate is 1.4% higher at Facility A. Facility A has 1:6 nurse-to-patient ratio vs 1:4 at B, and acuity of 4.5 vs 4.0.
Visualization:
 Bar chart: Mortality Rates
Facility A: 5.2%
Facility B: 3.8%
Overlays: Staffing, Acuity
