In [7]:
!pip install langgraph langchain-google-genai langchain-community langchain-core python-dotenv

import os
from typing import Annotated, List, Tuple, Union
from typing_extensions import TypedDict
import operator
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
import functools

import getpass
GOOGLE_API_KEY = getpass.getpass("Enter your Google API Key: ")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m881.0/881.0 kB[0m [31m29.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.2/33.2 MB[0m [31m19.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.1/4.1 MB[0m [31m41.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m305.5/305.5 kB[0m [31m12.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m37.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.2/45.2 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m236.0/236.0 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
class AgentState(TypedDict):
    """State shared between all agents in the graph"""
    messages: Annotated[list, operator.add]
    next: str
    current_agent: str
    research_topic: str
    findings: dict
    final_report: str

class AgentResponse(TypedDict):
    """Standard response format for all agents"""
    content: str
    next_agent: str
    findings: dict

def create_llm(temperature: float = 0.1, model: str = "gemini-1.5-flash") -> ChatGoogleGenerativeAI:
    """Create a configured Gemini LLM instance"""
    return ChatGoogleGenerativeAI(
        model=model,
        temperature=temperature,
        google_api_key=os.environ["GOOGLE_API_KEY"]
    )

In [None]:
def create_research_agent(llm: ChatGoogleGenerativeAI) -> callable:
    """Creates a research specialist agent for initial data gathering"""

    research_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a Research Specialist AI. Your role is to:
        1. Analyze the research topic thoroughly
        2. Identify key areas that need investigation
        3. Provide initial research findings and insights
        4. Suggest specific angles for deeper analysis

        Focus on providing comprehensive, accurate information and clear research directions.
        Always structure your response with clear sections and bullet points.
        """),
        MessagesPlaceholder(variable_name="messages"),
        ("human", "Research Topic: {research_topic}")
    ])

    research_chain = research_prompt | llm

    def research_agent(state: AgentState) -> AgentState:
        """Execute research analysis"""
        try:
            response = research_chain.invoke({
                "messages": state["messages"],
                "research_topic": state["research_topic"]
            })

            findings = {
                "research_overview": response.content,
                "key_areas": ["area1", "area2", "area3"],
                "initial_insights": response.content[:500] + "..."
            }

            return {
                "messages": state["messages"] + [AIMessage(content=response.content)],
                "next": "analyst",
                "current_agent": "researcher",
                "research_topic": state["research_topic"],
                "findings": {**state.get("findings", {}), "research": findings},
                "final_report": state.get("final_report", "")
            }

        except Exception as e:
            error_msg = f"Research agent error: {str(e)}"
            return {
                "messages": state["messages"] + [AIMessage(content=error_msg)],
                "next": "analyst",
                "current_agent": "researcher",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }

    return research_agent

In [None]:
def create_analyst_agent(llm: ChatGoogleGenerativeAI) -> callable:
    """Creates a data analyst agent for deep analysis"""

    analyst_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a Data Analyst AI. Your role is to:
        1. Analyze data and information provided by the research team
        2. Identify patterns, trends, and correlations
        3. Provide statistical insights and data-driven conclusions
        4. Suggest actionable recommendations based on analysis

        Focus on quantitative analysis, data interpretation, and evidence-based insights.
        Use clear metrics and concrete examples in your analysis.
        """),
        MessagesPlaceholder(variable_name="messages"),
        ("human", "Analyze the research findings for: {research_topic}")
    ])

    analyst_chain = analyst_prompt | llm

    def analyst_agent(state: AgentState) -> AgentState:
        """Execute data analysis"""
        try:
            response = analyst_chain.invoke({
                "messages": state["messages"],
                "research_topic": state["research_topic"]
            })

            analysis_findings = {
                "analysis_summary": response.content,
                "key_metrics": ["metric1", "metric2", "metric3"],
                "recommendations": response.content.split("recommendations:")[-1] if "recommendations:" in response.content.lower() else "No specific recommendations found"
            }

            return {
                "messages": state["messages"] + [AIMessage(content=response.content)],
                "next": "writer",
                "current_agent": "analyst",
                "research_topic": state["research_topic"],
                "findings": {**state.get("findings", {}), "analysis": analysis_findings},
                "final_report": state.get("final_report", "")
            }

        except Exception as e:
            error_msg = f"Analyst agent error: {str(e)}"
            return {
                "messages": state["messages"] + [AIMessage(content=error_msg)],
                "next": "writer",
                "current_agent": "analyst",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }

    return analyst_agent

In [None]:
def create_writer_agent(llm: ChatGoogleGenerativeAI) -> callable:
    """Creates a report writer agent for final documentation"""

    writer_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a Report Writer AI. Your role is to:
        1. Synthesize all research and analysis into a comprehensive report
        2. Create clear, professional documentation
        3. Ensure proper structure with executive summary, findings, and conclusions
        4. Make complex information accessible to various audiences

        Focus on clarity, completeness, and professional presentation.
        Include specific examples and actionable insights.
        """),
        MessagesPlaceholder(variable_name="messages"),
        ("human", "Create a comprehensive report for: {research_topic}")
    ])

    writer_chain = writer_prompt | llm

    def writer_agent(state: AgentState) -> AgentState:
        """Execute report writing"""
        try:
            response = writer_chain.invoke({
                "messages": state["messages"],
                "research_topic": state["research_topic"]
            })

            return {
                "messages": state["messages"] + [AIMessage(content=response.content)],
                "next": "supervisor",
                "current_agent": "writer",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": response.content
            }

        except Exception as e:
            error_msg = f"Writer agent error: {str(e)}"
            return {
                "messages": state["messages"] + [AIMessage(content=error_msg)],
                "next": "supervisor",
                "current_agent": "writer",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": f"Error generating report: {str(e)}"
            }

    return writer_agent

In [None]:
def create_supervisor_agent(llm: ChatGoogleGenerativeAI, members: List[str]) -> callable:
    """Creates a supervisor agent to coordinate the team"""

    options = ["FINISH"] + members

    supervisor_prompt = ChatPromptTemplate.from_messages([
        ("system", f"""You are a Supervisor AI managing a research team. Your team members are:
        {', '.join(members)}

        Your responsibilities:
        1. Coordinate the workflow between team members
        2. Ensure each agent completes their specialized tasks
        3. Determine when the research is complete
        4. Maintain quality standards throughout the process

        Given the conversation, determine the next step:
        - If research is needed: route to "researcher"
        - If analysis is needed: route to "analyst"
        - If report writing is needed: route to "writer"
        - If work is complete: route to "FINISH"

        Available options: {options}

        Respond with just the name of the next agent or "FINISH".
        """),
        MessagesPlaceholder(variable_name="messages"),
        ("human", "Current status: {current_agent} just completed their task for topic: {research_topic}")
    ])

    supervisor_chain = supervisor_prompt | llm

    def supervisor_agent(state: AgentState) -> AgentState:
        """Execute supervisor coordination"""
        try:
            response = supervisor_chain.invoke({
                "messages": state["messages"],
                "current_agent": state.get("current_agent", "none"),
                "research_topic": state["research_topic"]
            })

            next_agent = response.content.strip().lower()

            if "finish" in next_agent or "complete" in next_agent:
                next_step = "FINISH"
            elif "research" in next_agent:
                next_step = "researcher"
            elif "analy" in next_agent:
                next_step = "analyst"
            elif "writ" in next_agent:
                next_step = "writer"
            else:
                current = state.get("current_agent", "")
                if current == "researcher":
                    next_step = "analyst"
                elif current == "analyst":
                    next_step = "writer"
                elif current == "writer":
                    next_step = "FINISH"
                else:
                    next_step = "researcher"

            return {
                "messages": state["messages"] + [AIMessage(content=f"Supervisor decision: Next agent is {next_step}")],
                "next": next_step,
                "current_agent": "supervisor",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }

        except Exception as e:
            error_msg = f"Supervisor error: {str(e)}"
            return {
                "messages": state["messages"] + [AIMessage(content=error_msg)],
                "next": "FINISH",
                "current_agent": "supervisor",
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }

    return supervisor_agent

In [None]:
def create_research_team_graph() -> StateGraph:
    """Creates the complete research team workflow graph"""

    llm = create_llm()

    members = ["researcher", "analyst", "writer"]
    researcher = create_research_agent(llm)
    analyst = create_analyst_agent(llm)
    writer = create_writer_agent(llm)
    supervisor = create_supervisor_agent(llm, members)

    workflow = StateGraph(AgentState)

    workflow.add_node("researcher", researcher)
    workflow.add_node("analyst", analyst)
    workflow.add_node("writer", writer)
    workflow.add_node("supervisor", supervisor)

    workflow.add_edge("researcher", "supervisor")
    workflow.add_edge("analyst", "supervisor")
    workflow.add_edge("writer", "supervisor")

    workflow.add_conditional_edges(
        "supervisor",
        lambda x: x["next"],
        {
            "researcher": "researcher",
            "analyst": "analyst",
            "writer": "writer",
            "FINISH": END
        }
    )

    workflow.set_entry_point("supervisor")

    return workflow

def compile_research_team():
    """Compile the research team graph with memory"""
    workflow = create_research_team_graph()

    memory = MemorySaver()

    app = workflow.compile(checkpointer=memory)

    return app

def run_research_team(topic: str, thread_id: str = "research_session_1"):
    """Run the complete research team workflow"""

    app = compile_research_team()

    initial_state = {
        "messages": [HumanMessage(content=f"Research the topic: {topic}")],
        "research_topic": topic,
        "next": "researcher",
        "current_agent": "start",
        "findings": {},
        "final_report": ""
    }

    config = {"configurable": {"thread_id": thread_id}}

    print(f"🔍 Starting research on: {topic}")
    print("=" * 50)

    try:
        final_state = None
        for step, state in enumerate(app.stream(initial_state, config=config)):
            print(f"\n📍 Step {step + 1}: {list(state.keys())[0]}")

            current_state = list(state.values())[0]
            if current_state["messages"]:
                last_message = current_state["messages"][-1]
                if isinstance(last_message, AIMessage):
                    print(f"💬 {last_message.content[:200]}...")

            final_state = current_state

            if step > 10:
                print("⚠️  Maximum steps reached. Stopping execution.")
                break

        return final_state

    except Exception as e:
        print(f"❌ Error during execution: {str(e)}")
        return None

In [None]:
if __name__ == "__main__":
    result = run_research_team("Artificial Intelligence in Healthcare")

    if result:
        print("\n" + "=" * 50)
        print("📊 FINAL RESULTS")
        print("=" * 50)
        print(f"🏁 Final Agent: {result['current_agent']}")
        print(f"📋 Findings: {len(result['findings'])} sections")
        print(f"📄 Report Length: {len(result['final_report'])} characters")

        if result['final_report']:
            print("\n📄 FINAL REPORT:")
            print("-" * 30)
            print(result['final_report'])

def interactive_research_session():
    """Run an interactive research session"""

    app = compile_research_team()

    print("🎯 Interactive Research Team Session")
    print("Enter 'quit' to exit\n")

    session_count = 0

    while True:
        topic = input("🔍 Enter research topic: ").strip()

        if topic.lower() in ['quit', 'exit', 'q']:
            print("👋 Goodbye!")
            break

        if not topic:
            print("❌ Please enter a valid topic.")
            continue

        session_count += 1
        thread_id = f"interactive_session_{session_count}"

        result = run_research_team(topic, thread_id)

        if result and result['final_report']:
            print(f"\n✅ Research completed for: {topic}")
            print(f"📄 Report preview: {result['final_report'][:300]}...")

            show_full = input("\n📖 Show full report? (y/n): ").lower()
            if show_full.startswith('y'):
                print("\n" + "=" * 60)
                print("📄 COMPLETE RESEARCH REPORT")
                print("=" * 60)
                print(result['final_report'])

        print("\n" + "-" * 50)


def create_custom_agent(role: str, instructions: str, llm: ChatGoogleGenerativeAI) -> callable:
    """Create a custom agent with specific role and instructions"""

    custom_prompt = ChatPromptTemplate.from_messages([
        ("system", f"""You are a {role} AI.

        Your specific instructions:
        {instructions}

        Always provide detailed, professional responses relevant to your role.
        """),
        MessagesPlaceholder(variable_name="messages"),
        ("human", "Task: {task}")
    ])

    custom_chain = custom_prompt | llm

    def custom_agent(state: AgentState) -> AgentState:
        """Execute custom agent task"""
        try:
            response = custom_chain.invoke({
                "messages": state["messages"],
                "task": state["research_topic"]
            })

            return {
                "messages": state["messages"] + [AIMessage(content=response.content)],
                "next": "supervisor",
                "current_agent": role.lower().replace(" ", "_"),
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }

        except Exception as e:
            error_msg = f"{role} agent error: {str(e)}"
            return {
                "messages": state["messages"] + [AIMessage(content=error_msg)],
                "next": "supervisor",
                "current_agent": role.lower().replace(" ", "_"),
                "research_topic": state["research_topic"],
                "findings": state.get("findings", {}),
                "final_report": state.get("final_report", "")
            }

    return custom_agent

In [12]:
def visualize_graph():
    """Visualize the research team graph structure"""

    try:
        app = compile_research_team()

        graph_repr = app.get_graph()

        print("🗺️  Research Team Graph Structure")
        print("=" * 40)
        print(f"Nodes: {list(graph_repr.nodes.keys())}")
        print(f"Edges: {[(edge.source, edge.target) for edge in graph_repr.edges]}")

        try:
            graph_repr.draw_mermaid()
        except:
            print("📊 Visual graph requires mermaid-py package")
            print("Install with: !pip install mermaid-py")

    except Exception as e:
        print(f"❌ Error visualizing graph: {str(e)}")


import time
from datetime import datetime

def monitor_research_performance(topic: str):
    """Monitor and report performance metrics"""

    start_time = time.time()
    print(f"⏱️  Starting performance monitoring for: {topic}")

    result = run_research_team(topic, f"perf_test_{int(time.time())}")

    end_time = time.time()
    duration = end_time - start_time

    metrics = {
        "duration": duration,
        "total_messages": len(result["messages"]) if result else 0,
        "findings_sections": len(result["findings"]) if result else 0,
        "report_length": len(result["final_report"]) if result and result["final_report"] else 0,
        "success": result is not None
    }

    print("\n📊 PERFORMANCE METRICS")
    print("=" * 30)
    print(f"⏱️  Duration: {duration:.2f} seconds")
    print(f"💬 Total Messages: {metrics['total_messages']}")
    print(f"📋 Findings Sections: {metrics['findings_sections']}")
    print(f"📄 Report Length: {metrics['report_length']} chars")
    print(f"✅ Success: {metrics['success']}")

    return metrics


def quick_start_demo():
    """Complete demo of the research team system"""

    print("🚀 LangGraph Research Team - Quick Start Demo")
    print("=" * 50)

    topics = [
        "Climate Change Impact on Agriculture",
        "Quantum Computing Applications",
        "Digital Privacy in the Modern Age"
    ]

    for i, topic in enumerate(topics, 1):
        print(f"\n🔍 Demo {i}: {topic}")
        print("-" * 40)

        try:
            result = run_research_team(topic, f"demo_{i}")

            if result and result['final_report']:
                print(f"✅ Research completed successfully!")
                print(f"📊 Report preview: {result['final_report'][:150]}...")
            else:
                print("❌ Research failed")

        except Exception as e:
            print(f"❌ Error in demo {i}: {str(e)}")

        print("\n" + "="*30)

    print("🎉 Demo completed!")

quick_start_demo()

Enter your Google API Key: ··········
🔍 Starting research on: Artificial Intelligence in Healthcare

📍 Step 1: supervisor
💬 Supervisor decision: Next agent is researcher...

📍 Step 2: researcher
💬 ## Research Topic: Artificial Intelligence in Healthcare

This research explores the multifaceted applications of Artificial Intelligence (AI) within the healthcare sector, examining its benefits, cha...

📍 Step 3: supervisor
💬 Supervisor decision: Next agent is analyst...

📍 Step 4: analyst
💬 Analyzing Research Findings on Artificial Intelligence in Healthcare requires a nuanced approach, considering both the promising advancements and the significant challenges.  The following analysis ca...

📍 Step 5: supervisor
💬 Supervisor decision: Next agent is writer...

📍 Step 6: writer
💬 ## Artificial Intelligence in Healthcare: A Comprehensive Report

**Executive Summary:**

Artificial intelligence (AI) is rapidly transforming the healthcare landscape, offering the potential to revol...

📍 Step 7: 

  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-1.5-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 15
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 14
}
].



📍 Step 2: researcher
💬 ## Research Topic: Quantum Computing Applications

This research explores the potential applications of quantum computing across various fields.  While still in its nascent stages, quantum computing h...


  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-1.5-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 15
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 12
}
].



📍 Step 3: supervisor
💬 Supervisor error: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [viol...
❌ Research failed


🔍 Demo 3: Digital Privacy in the Modern Age
----------------------------------------
🔍 Starting research on: Digital Privacy in the Modern Age

📍 Step 1: supervisor
💬 Supervisor decision: Next agent is researcher...

📍 Step 2: researcher
💬 ## Research Topic: Digital Privacy in the Modern Age

This research topic is vast and multifaceted.  To effectively analyze it, we need to break it down into key areas.

**I. Key Areas Requiring Inves...

📍 Step 3: supervisor
💬 Supervisor decision: Next agent is analyst...

📍 Step 4: analyst
💬 Analyzing research findings on digital privacy in the modern age requires synthesizing information from various sources, including legal scholarship, technological analyses, sociological studies, and ...

📍 Step 5: su