<a href="https://colab.research.google.com/github/syedanida/AI-Agents-using-Langraph/blob/main/LangGraph_AI_Agents.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### 1. The Expert Pattern

In [18]:
!pip install langchain_openai



In [19]:
!pip install langgraph
!pip install langgraph --upgrade



In [20]:
!pip install --force-reinstall "langgraph[all]"

Collecting langgraph[all]
  Using cached langgraph-0.3.29-py3-none-any.whl.metadata (7.7 kB)
[0mCollecting langchain-core<0.4,>=0.1 (from langgraph[all])
  Using cached langchain_core-0.3.51-py3-none-any.whl.metadata (5.9 kB)
Collecting langgraph-checkpoint<3.0.0,>=2.0.10 (from langgraph[all])
  Using cached langgraph_checkpoint-2.0.24-py3-none-any.whl.metadata (4.6 kB)
Collecting langgraph-prebuilt<0.2,>=0.1.1 (from langgraph[all])
  Using cached langgraph_prebuilt-0.1.8-py3-none-any.whl.metadata (5.0 kB)
Collecting langgraph-sdk<0.2.0,>=0.1.42 (from langgraph[all])
  Using cached langgraph_sdk-0.1.61-py3-none-any.whl.metadata (1.8 kB)
Collecting xxhash<4.0.0,>=3.5.0 (from langgraph[all])
  Using cached xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting langsmith<0.4,>=0.1.125 (from langchain-core<0.4,>=0.1->langgraph[all])
  Using cached langsmith-0.3.30-py3-none-any.whl.metadata (15 kB)
Collecting tenacity!=8.4.0,<10.0.0,>=8.1.0 (fro

In [21]:
!pip show langgraph

Name: langgraph
Version: 0.3.29
Summary: Building stateful, multi-actor applications with LLMs
Home-page: 
Author: 
Author-email: 
License: MIT
Location: /usr/local/lib/python3.11/dist-packages
Requires: langchain-core, langgraph-checkpoint, langgraph-prebuilt, langgraph-sdk, xxhash
Required-by: 


In [6]:
!pip install langgraph-checkpoint
from langgraph.checkpoint.memory import MemorySaver # Import MemorySaver from correct location



In [12]:
import os
from typing import List, Dict, Any, TypedDict, Annotated, Sequence, Tuple
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_core.tools import BaseTool, tool
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.utils.function_calling import convert_to_openai_function
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
import langsmith

# Import the checkpointing class - using the in-memory checkpoint
# from langgraph.checkpoint import MemorySaver

# Set environment variables
os.environ["OPENAI_API_KEY"] = "ABC"
os.environ["LANGCHAIN_API_KEY"] = "XYZ"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "expert-pattern-demo"

# Create a state for our graph
class GraphState(TypedDict):
    messages: Annotated[Sequence[Any], "The messages in the conversation so far"]
    next_expert: str
    research_results: Dict[str, Any]
    final_answer: str

# The different experts
PROGRAMMING_EXPERT_SYSTEM = """You are a programming expert specialized in Python coding.
When approached with coding problems, you provide detailed solutions with explanations.
Be specific, technical, and thorough in your answers.
"""

MATH_EXPERT_SYSTEM = """You are a mathematics expert specialized in calculus, statistics, and linear algebra.
When approached with math problems, you provide step-by-step solutions with clear explanations.
Use mathematical notation when appropriate.
"""

PHYSICS_EXPERT_SYSTEM = """You are a physics expert specialized in mechanics and electromagnetism.
When approached with physics problems, you provide explanations based on fundamental physical laws.
Use physics equations when appropriate and explain the physical intuition.
"""

COORDINATOR_SYSTEM = """You are a coordinator that decides which expert should handle the incoming question.
Available experts: 'programming', 'math', 'physics', or 'final'.
Choose 'final' only when all needed experts have provided their input and a final answer can be generated.
"""

def build_expert(system_message: str) -> ChatOpenAI:
    """Build an expert with a specific system message."""
    return ChatOpenAI(
        model="gpt-4", # Changed to gpt-4
        temperature=0,
    )

# Create the experts
programming_expert = build_expert(PROGRAMMING_EXPERT_SYSTEM)
math_expert = build_expert(MATH_EXPERT_SYSTEM)
physics_expert = build_expert(PHYSICS_EXPERT_SYSTEM)

# Create the coordinator
coordinator_prompt = ChatPromptTemplate.from_messages([
    SystemMessage(content=COORDINATOR_SYSTEM),
    MessagesPlaceholder(variable_name="messages"),
    SystemMessage(content="Based on the question and previous responses, which expert should handle this next? Reply with just 'programming', 'math', 'physics', or 'final'.")
])

coordinator_chain = coordinator_prompt | ChatOpenAI(temperature=0) | (lambda x: x.content.strip().lower())

# Define the nodes for our graph
def route_to_expert(state: GraphState) -> Dict[str, Any]:
    """Route the query to the appropriate expert."""
    expert = state["next_expert"]
    if expert == "final":
        return {"next": "final_answer_generator"}
    elif expert == "programming":
        return {"next": "programming_expert"}
    elif expert == "math":
        return {"next": "math_expert"}
    elif expert == "physics":
        return {"next": "physics_expert"}
    else:
        raise ValueError(f"Unknown expert: {expert}")

def call_programming_expert(state: GraphState) -> Dict[str, Any]:
    """Call the programming expert."""
    response = programming_expert.invoke(state["messages"])
    return {
        "messages": state["messages"] + [response],
        "next": "coordinator"
    }

def call_math_expert(state: GraphState) -> Dict[str, Any]:
    """Call the math expert."""
    # Embed the system message in the prompt
    prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=MATH_EXPERT_SYSTEM),
        MessagesPlaceholder(variable_name="messages"),
    ])
    chain = prompt | math_expert
    response = chain.invoke({"messages": state["messages"]})
    return {
        "messages": state["messages"] + [response],
        "next": "coordinator"
    }

def call_physics_expert(state: GraphState) -> Dict[str, Any]:
    """Call the physics expert."""
    response = physics_expert.invoke(state["messages"])
    return {
        "messages": state["messages"] + [response],
        "next": "coordinator"
    }

def call_coordinator(state: GraphState) -> Dict[str, Any]:
    """Call the coordinator to decide which expert to route to next."""
    next_expert = coordinator_chain.invoke({"messages": state["messages"]})
    return {"next_expert": next_expert}

def generate_final_answer(state: GraphState) -> Dict[str, Any]:
    """Generate the final answer based on all expert inputs."""
    final_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content="You are an assistant that synthesizes expert opinions to provide a comprehensive final answer."),
        MessagesPlaceholder(variable_name="messages"),
        SystemMessage(content="Based on all the expert input above, provide a comprehensive final answer.")
    ])

    final_chain = final_prompt | ChatOpenAI(temperature=0)
    response = final_chain.invoke({"messages": state["messages"]})

    return {
        "messages": state["messages"] + [response],
        "final_answer": response.content,
        "next": END
    }

# Create the graph
def build_expert_graph():
    graph = StateGraph(GraphState)

    # Add nodes
    graph.add_node("coordinator", call_coordinator)
    graph.add_node("router", route_to_expert)
    graph.add_node("programming_expert", call_programming_expert)
    graph.add_node("math_expert", call_math_expert)
    graph.add_node("physics_expert", call_physics_expert)
    graph.add_node("final_answer_generator", generate_final_answer)

    # Add edges
    graph.add_edge("coordinator", "router")
    graph.add_conditional_edges(
        "router",
        lambda x: x["next"],
        {
            "programming_expert": "programming_expert",
            "math_expert": "math_expert",
            "physics_expert": "physics_expert",
            "final_answer_generator": "final_answer_generator"
        }
    )
    graph.add_edge("programming_expert", "coordinator")
    graph.add_edge("math_expert", "coordinator")
    graph.add_edge("physics_expert", "coordinator")
    graph.add_edge("final_answer_generator", END)

    # Set entry point
    graph.set_entry_point("coordinator")

    # 🟢 Compile WITHOUT checkpointing
    return graph.compile()

# Create a runnable graph
expert_graph = build_expert_graph()

# Example usage
if __name__ == "__main__":
    # Initialize state
    initial_state = {
        "messages": [HumanMessage(content="I need to calculate the volume of a sphere with radius r=5cm and then write a Python function to calculate this for any radius.")],
        "next_expert": "coordinator",
        "research_results": {},
        "final_answer": ""
    }

    # Run the graph with tracing
    result = expert_graph.invoke(initial_state)

    # Print final answer
    print("\nFinal Answer:")
    print(result["final_answer"])
    print("\nFull conversation:")
    for msg in result["messages"]:
        if isinstance(msg, HumanMessage):
            print(f"Human: {msg.content}")
        elif isinstance(msg, AIMessage):
            print(f"AI: {msg.content[:100]}...")  # Print first 100 chars


Final Answer:
To calculate the volume of a sphere with a radius of 5 cm, you can use the formula V = 4/3 * π * r³, where r is the radius. Substituting r = 5 cm into the formula gives V ≈ 523.6 cm³.

To calculate the volume of a sphere for any given radius, you can use the Python function below:

```python
import math

def volume_of_sphere(radius):
    return (4/3) * math.pi * (radius**3)
```

This function takes the radius of the sphere as an argument and returns the volume of the sphere. The `math.pi` constant provides the value of π, and the `**` operator is used for exponentiation in Python.

Full conversation:
Human: I need to calculate the volume of a sphere with radius r=5cm and then write a Python function to calculate this for any radius.
AI: The volume V of a sphere is given by the formula:

V = 4/3 * π * r³

where r is the radius of the sp...
AI: To calculate the volume of a sphere with a radius of 5 cm, you can use the formula V = 4/3 * π * r³,...


### 2. The Supervisor Pattern

In [29]:
import os
from typing import List, Dict, Any, TypedDict, Annotated, Sequence, Tuple, Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, FunctionMessage
from langchain_core.tools import BaseTool, tool
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.utils.function_calling import convert_to_openai_function
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
import langsmith
#from langgraph.checkpoint import MemoryCheckpoint
import json

# Set environment variables
os.environ["OPENAI_API_KEY"] = "ABC"
os.environ["LANGCHAIN_API_KEY"] = "XYZ"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "supervisor-pattern-demo"

# Create a state for our graph
class GraphState(TypedDict):
    messages: Annotated[Sequence[Any], "The messages in the conversation so far"]
    task_queue: Annotated[List[Dict], "The queue of tasks to be executed"]
    completed_tasks: Annotated[List[Dict], "The tasks that have been completed"]
    current_task: Annotated[Optional[Dict], "The current task being executed"]
    final_report: Annotated[str, "The final report summarizing all task results"]

# Create the supervisor system message
SUPERVISOR_SYSTEM = """You are a project supervisor responsible for breaking down complex projects into tasks,
assigning them to workers, and synthesizing the results.

Given a project request, you will:
1. Break it down into smaller, manageable tasks
2. Keep track of task progress
3. Synthesize the completed task results into a final report
"""

# Create worker system message
WORKER_SYSTEM = """You are a competent worker responsible for completing assigned tasks.
Complete the task to the best of your ability, providing detailed outputs.
"""

# Create our tools
@tool
def create_tasks(project_description: str) -> List[Dict]:
    """
    Break down a project into individual tasks.
    Args:
        project_description: Description of the project to be broken down
    Returns:
        List of task dictionaries with 'id', 'description', and 'status'
    """
    # This function simulates task breakdown - in production, the LLM would do this
    return [
        {"id": "1", "description": f"Research phase for {project_description}", "status": "pending"},
        {"id": "2", "description": f"Analysis phase for {project_description}", "status": "pending"},
        {"id": "3", "description": f"Implementation phase for {project_description}", "status": "pending"},
        {"id": "4", "description": f"Testing phase for {project_description}", "status": "pending"},
        {"id": "5", "description": f"Documentation phase for {project_description}", "status": "pending"}
    ]

@tool
def assign_next_task() -> Dict:
    """
    Assign the next task from the queue.
    Returns:
        The next task to be executed
    """
    # This is just a placeholder - the actual implementation would happen in the state transition
    return {"id": "next_task_id", "description": "Next task description", "status": "in_progress"}

@tool
def complete_task(task_id: str, result: str) -> Dict:
    """
    Mark a task as completed and store its result.
    Args:
        task_id: The ID of the task to mark as completed
        result: The result of the completed task
    Returns:
        The updated task with status 'completed'
    """
    # This is just a placeholder - the actual implementation would happen in the state transition
    return {"id": task_id, "description": "Task description", "status": "completed", "result": result}

@tool
def generate_final_report(completed_tasks: List[Dict]) -> str:
    """
    Generate a final report based on all completed tasks.
    Args:
        completed_tasks: List of all completed tasks with their results
    Returns:
        A comprehensive final report
    """
    if not completed_tasks:
        return "No tasks were completed. No final report can be generated."

    report_lines = ["Final Report:\n"]
    for task in completed_tasks:
        line = f"Task {task['id']} - {task['description']}\nResult: {task.get('result', 'No result provided')}\n"
        report_lines.append(line)

    return "\n".join(report_lines)


# Build the supervisor and worker agents
supervisor_llm = ChatOpenAI(model="gpt-4o", temperature=0)
worker_llm = ChatOpenAI(model="gpt-4o", temperature=0.2)

# Define node functions
def initialize_project(state: GraphState) -> Dict:
    """Initialize the project by creating tasks."""
    # Extract project description from initial message
    initial_message = state["messages"][0]
    project_description = initial_message.content

    # Create tasks using the LLM
    supervisor_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=SUPERVISOR_SYSTEM),
        HumanMessage(content=f"I need to break down this project into individual tasks: {project_description}")
    ])

    supervisor_with_tools = supervisor_llm.bind(
        functions=[convert_to_openai_function(create_tasks)]
    )

    # Pass a dictionary with input values to the prompt template
    response = supervisor_with_tools.invoke(supervisor_prompt.format_prompt(input=project_description).to_messages())

    # Parse the response to get the tasks
    if hasattr(response, "function_call"):
        function_name = response.function_call["name"]
        function_args = json.loads(response.function_call["arguments"])

        if function_name == "create_tasks":
            tasks = create_tasks(function_args["project_description"])
        else:
            tasks = []
    else:
        # Fallback if no function call was made
        tasks = create_tasks(project_description)

    # Update state
    system_message = f"Project initialized with {len(tasks)} tasks."
    return {
        "messages": state["messages"] + [AIMessage(content=system_message)],
        "task_queue": tasks,
        "completed_tasks": [],
        "current_task": None
    }

def decide_next_action(state: GraphState) -> Dict:
    if state["task_queue"]:
        return {"next": "assign_task"}
    else:
        return {"next": "generate_report"}

def assign_task(state: GraphState) -> Dict:
    """Assign the next task from the queue."""
    # Get the next task
    next_task = state["task_queue"][0]
    remaining_tasks = state["task_queue"][1:]

    next_task["status"] = "in_progress"

    # Update state
    system_message = f"Assigning task {next_task['id']}: {next_task['description']}"
    return {
        "messages": state["messages"] + [AIMessage(content=system_message)],
        "task_queue": remaining_tasks,
        "current_task": next_task,
        "next": "execute_task"
    }

def execute_task(state: GraphState) -> Dict:
    task = state["current_task"]
    result = f"Completed {task['description']}"
    task["result"] = result
    task["status"] = "completed"
    return {
        "messages": state["messages"] + [AIMessage(content=f"Task {task['id']} completed: {result}")],
        "completed_tasks": state["completed_tasks"] + [task],
        "current_task": None
    }

def task_feedback(state: GraphState) -> Dict:
    """Provide feedback on the completed task."""
    latest_completed_task = state["completed_tasks"][-1]

    # Create supervisor feedback prompt
    supervisor_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=SUPERVISOR_SYSTEM),
        MessagesPlaceholder(variable_name="messages"),
        HumanMessage(content=f"Provide feedback on the completed task: {latest_completed_task['description']}\n\nResult: {latest_completed_task['result']}")
    ])

    # Generate feedback
    response = supervisor_llm.invoke(state["messages"])  # Pass the list of messages directly

    # Update state
    return {
        "messages": state["messages"] + [response],
        "next": "decision_point"
    }

def generate_report(state: GraphState) -> Dict:
    """Generate a final report based on all completed tasks."""
    # Manually summarize the completed tasks
    completed_summaries = [
        f"Task {task['id']} - {task['description']}: {task.get('result', 'No result')}"
        for task in state["completed_tasks"]
    ]
    report = "\n".join(completed_summaries)

    return {
        "messages": state["messages"] + [AIMessage(content="Final Report:\n" + report)],
        "final_report": report
    }

# Build the graph
def build_supervisor_graph():
    graph = StateGraph(GraphState)

    # Add nodes
    graph.add_node("initialize_project", initialize_project)
    graph.add_node("decision_point", decide_next_action)
    graph.add_node("assign_task", assign_task)
    graph.add_node("execute_task", execute_task)
    graph.add_node("task_feedback", task_feedback)
    graph.add_node("generate_report", generate_report)

    # Add edges
    graph.add_edge("initialize_project", "decision_point")
    graph.add_conditional_edges(
        "decision_point",
        lambda x: x["next"],
        {
            "assign_task": "assign_task",
            "generate_report": "generate_report"
        }
    )
    graph.add_edge("assign_task", "execute_task")
    graph.add_edge("execute_task", "task_feedback")
    graph.add_edge("task_feedback", "decision_point")
    graph.add_edge("generate_report", END)

    # Set entry point
    graph.set_entry_point("initialize_project")

    # Compile without any checkpointer
    return graph.compile()

# Create a runnable graph
supervisor_graph = build_supervisor_graph()

# Example usage
if __name__ == "__main__":
    # Initialize state
    initial_state = {
        "messages": [HumanMessage(content="Build a simple e-commerce website with product listings and a shopping cart")],
        "task_queue": [],
        "completed_tasks": [],
        "current_task": None,
        "final_report": ""
    }

    # Run the graph with tracing
    result = supervisor_graph.invoke(initial_state)

    # Print final report
    # Changed final_state to result
    print("\n".join([
       msg.content for msg in result["messages"] if isinstance(msg, AIMessage)
]))


Project initialized with 5 tasks.
Assigning task 1: Research phase for Build a simple e-commerce website with product listings and a shopping cart
Task 1 completed: Completed Research phase for Build a simple e-commerce website with product listings and a shopping cart
To build a simple e-commerce website with product listings and a shopping cart, you can follow these steps. This guide will help you create a basic e-commerce site using HTML, CSS, and JavaScript for the front end, and a simple backend using Node.js and Express. For simplicity, we'll use a JSON file to store product data and a session to manage the shopping cart.

### Step 1: Set Up Your Project

1. **Create a Project Directory:**
   - Create a new directory for your project, e.g., `simple-ecommerce`.

2. **Initialize Node.js:**
   - Open a terminal in your project directory and run:
     ```bash
     npm init -y
     ```
   - This will create a `package.json` file.

3. **Install Dependencies:**
   - Install Express for 

### 3. The Reflexion Pattern

In [41]:
import os
from typing import List, Dict, Any, TypedDict, Annotated, Sequence, Tuple, Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, FunctionMessage
from langchain_core.tools import BaseTool, tool
from langgraph.graph import StateGraph, END
import json

# Set environment variables
os.environ["OPENAI_API_KEY"] = "ABC"
os.environ["LANGCHAIN_API_KEY"] = "XYZ"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "reflexion-pattern-demo"

# Create a state for our graph
class GraphState(TypedDict):
    messages: Annotated[Sequence[Any], "The messages in the conversation so far"]
    reflections: Annotated[List[str], "The agent's reflections on its previous responses"]
    response_draft: Annotated[str, "The current draft response being considered"]
    final_response: Annotated[str, "The final response after reflection"]
    reflection_count: Annotated[int, "How many times the agent has reflected"]
    max_reflections: Annotated[int, "Maximum number of reflections to perform"]

# Create system messages
AGENT_SYSTEM = """You are a thoughtful assistant that generates careful and helpful responses.
You will be given a question to answer. You will:
1. Generate an initial response draft
2. Reflect on that draft to identify potential issues or improvements
3. Revise your response based on your reflection
4. Repeat this process until you have a high-quality response or reach the maximum number of reflections
"""

REFLECTION_SYSTEM = """You are a reflective critic that helps identify weaknesses in responses.
Look critically at the response and identify any issues such as:
- Factual errors or inconsistencies
- Incomplete or partial responses
- Logical fallacies or weak reasoning
- Unclear or ambiguous language
- Missing important considerations or perspectives
- Potential harmful or misleading content

Be specific and constructive in your criticism.
"""

REVISION_SYSTEM = """You are a thoughtful reviser that improves responses based on reflection.
Take the original response and the reflection feedback, and create an improved response that:
- Addresses all issues identified in the reflection
- Maintains the helpful aspects of the original response
- Is clear, accurate, comprehensive, and balanced
"""

# Define LLM instances
response_llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
reflection_llm = ChatOpenAI(model="gpt-4o", temperature=0)
revision_llm = ChatOpenAI(model="gpt-4o", temperature=0.2)

# Define node functions
def generate_initial_response(state: GraphState) -> Dict:
    """Generate an initial response to the user's query."""
    # Extract query from messages
    query = state["messages"][0].content

    # Create response prompt
    response_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=AGENT_SYSTEM),
        HumanMessage(content=f"Generate an initial response to this query: {query}")
    ])

    # Generate response
    response = response_llm.invoke(response_prompt.format_prompt(input_variables={'query': query}).to_messages())

    # Update state
    return {
        "response_draft": response.content,
        "reflections": [],
        "reflection_count": 0
    }

def reflect_on_response(state: GraphState) -> Dict:
    """Reflect on the current response draft."""
    # Get the current query and response draft
    query = state["messages"][0].content
    current_draft = state["response_draft"]

    # Create reflection prompt
    reflection_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=REFLECTION_SYSTEM),
        HumanMessage(content=f"Query: {query}\n\nResponse draft:\n{current_draft}\n\nReflect on this response draft and identify any issues or areas for improvement:")
    ])

    # Generate reflection
    # Format the prompt and convert it to a list of BaseMessages
    reflection = reflection_llm.invoke(reflection_prompt.format_prompt(input_variables={'query': query, 'current_draft': current_draft}).to_messages())

    # Update state
    return {
        "reflections": state["reflections"] + [reflection.content],
        "reflection_count": state["reflection_count"] + 1
    }

def revise_response(state: GraphState) -> Dict:
    """Revise the response based on the reflection."""
    # Get the current query, response draft, and latest reflection
    query = state["messages"][0].content
    current_draft = state["response_draft"]
    latest_reflection = state["reflections"][-1]

    # Create revision prompt
    revision_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=REVISION_SYSTEM),
        HumanMessage(content=f"Query: {query}\n\nOriginal response draft:\n{current_draft}\n\nReflection:\n{latest_reflection}\n\nPlease revise the response to address the issues identified in the reflection:")
    ])

    # Generate revised response
    # revision = revision_llm.invoke(revision_prompt) # This is line 116
    revision = revision_llm.invoke(revision_prompt.format_prompt(input_variables={'query': query, 'current_draft': current_draft, 'latest_reflection': latest_reflection}).to_messages()) # Format the prompt and convert to BaseMessages

    # Update state
    return {
        "response_draft": revision.content
    }

def decide_next_step(state: GraphState) -> Dict:
    """Decide whether to reflect more or finalize the response."""
    if state["reflection_count"] < state["max_reflections"]:
        # Check if the latest reflection suggests more improvements are needed
        latest_reflection = state["reflections"][-1]

        # Create decision prompt
        decision_prompt = ChatPromptTemplate.from_messages([
            SystemMessage(content="Determine if the response needs further improvement based on the reflection."),
            HumanMessage(content=f"Reflection: {latest_reflection}\n\nBased on this reflection, does the response need further improvement? Answer with 'yes' or 'no'.")
        ])

        # Generate the formatted prompt and invoke the model
        formatted_prompt = decision_prompt.format_prompt()  # Correct usage here
        decision = response_llm.invoke(formatted_prompt.to_messages()).content.strip().lower()

        if "yes" in decision:
            return {"next": "reflect"}
        else:
            return {"next": "finalize"}
    else:
        return {"next": "finalize"}

def finalize_response(state: GraphState) -> Dict:
    """Finalize the response after reflection iterations."""
    # Get the current query, final draft, and all reflections
    query = state["messages"][0].content
    final_draft = state["response_draft"]
    all_reflections = "\n\n".join([f"Reflection {i+1}: {reflection}" for i, reflection in enumerate(state["reflections"])])

    # Create final response prompt
    final_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content="Create a final polished response that incorporates all reflection feedback."),
        HumanMessage(content=f"Query: {query}\n\nCurrent draft:\n{final_draft}\n\nAll reflections:\n{all_reflections}\n\nPlease provide the final polished response:")
    ])

    # Generate final response
    final_response = response_llm.invoke(final_prompt.format_prompt(input_variables={'query': query, 'final_draft': final_draft, 'all_reflections': all_reflections}).to_messages()) # Format the prompt and convert to BaseMessages

    # Update state with final response and add to messages
    return {
        "messages": state["messages"] + [AIMessage(content=final_response.content)],
        "final_response": final_response.content,
        "next": END
    }

# Build the graph
def build_reflexion_graph():
    graph = StateGraph(GraphState)

    # Add nodes
    graph.add_node("generate_initial_response", generate_initial_response)
    graph.add_node("reflect", reflect_on_response)
    graph.add_node("revise", revise_response)
    graph.add_node("decide", decide_next_step)
    graph.add_node("finalize", finalize_response)

    # Add edges
    graph.add_edge("generate_initial_response", "reflect")
    graph.add_edge("reflect", "revise")
    graph.add_edge("revise", "decide")
    graph.add_conditional_edges(
        "decide",
        lambda x: x["next"],
        {
            "reflect": "reflect",
            "finalize": "finalize"
        }
    )
    graph.add_edge("finalize", END)

    # Set entry point
    graph.set_entry_point("generate_initial_response")

    return graph.compile()

# Create a runnable graph
reflexion_graph = build_reflexion_graph()

# Example usage
if __name__ == "__main__":
    # Initialize state
    initial_state = {
        "messages": [HumanMessage(content="What are the ethical implications of artificial general intelligence?")],
        "reflections": [],
        "response_draft": "",
        "final_response": "",
        "reflection_count": 0,
        "max_reflections": 3
    }

    # Run the graph with tracing
    result = reflexion_graph.invoke(initial_state)

    # Print final response
    print("\nFinal Response after Reflection:")
    print(result["final_response"])

    # Print reflection process
    print("\nReflection Process:")
    for i, reflection in enumerate(result["reflections"]):
        print(f"\nReflection {i+1}:")
        print(reflection)


Final Response after Reflection:
The ethical implications of artificial general intelligence (AGI) are profound and multifaceted, encompassing considerations related to autonomy, control, safety, societal impact, and more. As AGI remains a speculative concept, it is crucial to explore these implications with both caution and optimism, while clearly distinguishing between current AI capabilities and the hypothetical nature of AGI.

1. **Autonomy and Control**: A major ethical concern is determining the level of autonomy AGI should possess. AGI systems could potentially make independent decisions, so ensuring their alignment with human values is crucial. This raises questions about maintaining human oversight and preventing AGI from acting against human interests. Developing ethical guidelines and control mechanisms, such as continuous "alignment checks"—which involve ensuring that AGI systems' goals and actions remain consistent with human values—is essential.

2. **Safety and Risk**: 

### 4. The Chain-of-Thought Pattern

In [46]:
import os
from typing import List, Dict, Any, TypedDict, Annotated, Sequence, Tuple, Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, FunctionMessage
from langgraph.graph import StateGraph, END
import json
import re

# Set environment variables
os.environ["OPENAI_API_KEY"] = "ABC"
os.environ["LANGCHAIN_API_KEY"] = "XYZ"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "cot-pattern-demo"

# Create a state for our graph
class GraphState(TypedDict):
    messages: Annotated[Sequence[Any], "The messages in the conversation so far"]
    problem: Annotated[str, "The problem to be solved"]
    reasoning_steps: Annotated[List[str], "The chain of reasoning steps"]
    current_step: Annotated[int, "The current reasoning step number"]
    final_answer: Annotated[str, "The final answer to the problem"]

# Create system messages
PROBLEM_ANALYZER_SYSTEM = """You are an expert problem analyzer that breaks down complex problems into simpler steps.
When given a problem, analyze it and create a step-by-step plan to solve it.
Focus on identifying the key components and the logical sequence of steps needed to reach the solution.
"""

REASONING_SYSTEM = """You are an expert problem solver that uses careful reasoning to solve problems.
You will be given a specific step in a reasoning chain to complete.
Focus only on this specific step. Do not attempt to solve the entire problem.
Show your work clearly and explain your reasoning for this step.
"""

ANSWER_SYSTEM = """You are a solution summarizer that creates clear and concise final answers.
Based on the complete chain of reasoning steps provided, synthesize a final answer to the original problem.
Be precise and ensure your answer directly addresses the original question.
"""

# Define LLM instances
analyzer_llm = ChatOpenAI(model="gpt-4o", temperature=0)
reasoning_llm = ChatOpenAI(model="gpt-4o", temperature=0)
answer_llm = ChatOpenAI(model="gpt-4o", temperature=0)

# Define node functions
def analyze_problem(state: GraphState) -> Dict:
    """Analyze the problem and create a plan of reasoning steps."""
    # Extract problem from messages
    problem = state["messages"][0].content

    # Create analyzer prompt
    analyzer_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=PROBLEM_ANALYZER_SYSTEM),
        HumanMessage(content=f"Analyze this problem and break it down into a step-by-step reasoning plan. Each step should build logically on the previous steps:\n\n{problem}")
    ])

    # Generate analysis
    analysis = analyzer_llm.invoke(analyzer_prompt.format_prompt(input=problem).to_messages()) # Format the prompt and convert to BaseMessages

    # Extract reasoning steps from the analysis
    steps_text = analysis.content
    steps = re.findall(r'(?:Step \d+:|^\d+\.)(.*?)(?=(?:Step \d+:|^\d+\.)|$)', steps_text, re.DOTALL | re.MULTILINE)

    if not steps:  # Fallback if regex doesn't match
        steps = steps_text.split('\n\n')

    # Clean up the steps
    steps = [step.strip() for step in steps if step.strip()]

    # Update state
    return {
        "problem": problem,
        "reasoning_steps": steps,
        "current_step": 0
    }

def execute_reasoning_step(state: GraphState) -> Dict:
    """Execute the current reasoning step."""
    # Get the problem and current step information
    problem = state["problem"]
    steps = state["reasoning_steps"]
    current_step_idx = state["current_step"]

    # Check if we have steps to execute
    if current_step_idx >= len(steps):
        return {"next": "generate_answer"}

    current_step = steps[current_step_idx]
    previous_steps = steps[:current_step_idx]
    previous_steps_text = "\n".join([f"Step {i+1}: {step}" for i, step in enumerate(previous_steps)])

    # Create reasoning prompt
    reasoning_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=REASONING_SYSTEM),
        HumanMessage(content=f"""
Problem: {problem}

Previous reasoning steps:
{previous_steps_text}

Current step to execute (Step {current_step_idx + 1}): {current_step}

Please complete this specific reasoning step. Show your work and explain your reasoning clearly.
""")
    ])

    # Generate reasoning for this step
    reasoning = reasoning_llm.invoke(reasoning_prompt.format_prompt().to_messages()) # Format the prompt and convert it to BaseMessages

    # Update the step with the detailed reasoning
    updated_steps = state["reasoning_steps"].copy()
    updated_steps[current_step_idx] = f"{current_step}\n\nReasoning: {reasoning.content}"

    # Update state
    return {
        "reasoning_steps": updated_steps,
        "current_step": current_step_idx + 1
    }

def decide_next_action(state: GraphState) -> Dict:
    """Decide whether to continue reasoning or generate final answer."""
    current_step = state["current_step"]
    total_steps = len(state["reasoning_steps"])

    if current_step < total_steps:
        return {"next": "reasoning"}
    else:
        return {"next": "generate_answer"}

def generate_answer(state: GraphState) -> Dict:
    """Generate the final answer based on the complete reasoning chain."""
    # Get the problem and reasoning steps
    problem = state["problem"]
    steps = state["reasoning_steps"]
    steps_text = "\n\n".join([f"Step {i+1}: {step}" for i, step in enumerate(steps)])

    # Create answer prompt
    answer_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=ANSWER_SYSTEM),
        HumanMessage(content=f"""
Original problem: {problem}

Complete chain of reasoning:
{steps_text}

Based on this chain of reasoning, what is the final answer to the original problem?
Provide a clear and concise summary of the solution.
""")
    ])

    # Generate final answer
    answer = answer_llm.invoke(answer_prompt.format_prompt(input_variables={'problem': problem, 'steps_text': steps_text}).to_messages())

    # Update state with final answer and add to messages
    response_message = f"""
Here's my solution to the problem:

{answer.content}

My reasoning process:
{steps_text}
"""

    return {
        "messages": state["messages"] + [AIMessage(content=response_message)],
        "final_answer": answer.content,
        "next": END
    }

# Build the graph
def build_cot_graph():
    graph = StateGraph(GraphState)

    # Add nodes
    graph.add_node("analyze_problem", analyze_problem)
    graph.add_node("reasoning", execute_reasoning_step)
    graph.add_node("decision", decide_next_action)
    graph.add_node("generate_answer", generate_answer)

    # Add edges
    graph.add_edge("analyze_problem", "reasoning")
    graph.add_edge("reasoning", "decision")
    graph.add_conditional_edges(
        "decision",
        lambda x: x["next"],
        {
            "reasoning": "reasoning",
            "generate_answer": "generate_answer"
        }
    )
    graph.add_edge("generate_answer", END)

    # Set entry point
    graph.set_entry_point("analyze_problem")

    return graph.compile()  # Removed the checkpointer (MemoryCheckpoint)

# Create a runnable graph
cot_graph = build_cot_graph()

# Example usage
if __name__ == "__main__":
    # Initialize state with a complex math problem
    initial_state = {
        "messages": [HumanMessage(content="A boat travels upstream for 2 hours at 10 km/h and then returns downstream covering the same distance in 1 hour. What is the speed of the current?")],
        "problem": "",
        "reasoning_steps": [],
        "current_step": 0,
        "final_answer": ""
    }

    # Run the graph with tracing
    result = cot_graph.invoke(initial_state)

    # Print final answer
    print("\nFinal Answer:")
    print(result["final_answer"])



Final Answer:
The speed of the current is approximately 3.33 km/h.


### The RAG (Retrieval-Augmented Generation) Pattern

In [48]:
!pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB)
Downloading python_dotenv-1.1.0-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.1.0


In [52]:
!pip install langchain-community
!pip install chromadb

Collecting chromadb
  Downloading chromadb-1.0.4-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.9 kB)
Collecting build>=1.0.3 (from chromadb)
  Downloading build-1.2.2.post1-py3-none-any.whl.metadata (6.5 kB)
Collecting chroma-hnswlib==0.7.6 (from chromadb)
  Downloading chroma_hnswlib-0.7.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (252 bytes)
Collecting fastapi==0.115.9 (from chromadb)
  Downloading fastapi-0.115.9-py3-none-any.whl.metadata (27 kB)
Collecting uvicorn>=0.18.3 (from uvicorn[standard]>=0.18.3->chromadb)
  Downloading uvicorn-0.34.1-py3-none-any.whl.metadata (6.5 kB)
Collecting posthog>=2.4.0 (from chromadb)
  Downloading posthog-3.24.1-py2.py3-none-any.whl.metadata (3.0 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.21.0-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.5 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentele

In [53]:
# RAG Pattern Implementation with LangGraph
import os
from typing import List, Dict, Any, Tuple
from dotenv import load_dotenv
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langgraph.graph import StateGraph, END

# Load environment variables
load_dotenv()

# Define the state structure for our RAG flow
class RAGState(dict):
    """State for the RAG application."""
    query: str
    retrieval_results: List[Document] = None
    context: str = None
    answer: str = None

# Step 1: Set up the document ingestion process
def ingest_documents(docs_path: str, collection_name: str) -> Chroma:
    """Process documents and load them into a vector database."""
    # Simple document loading for demo purposes
    documents = []
    for filename in os.listdir(docs_path):
        if filename.endswith('.txt'):
            with open(os.path.join(docs_path, filename), 'r') as f:
                text = f.read()
                documents.append(Document(page_content=text, metadata={"source": filename}))

    # Split documents into chunks
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
    splits = text_splitter.split_documents(documents)

    # Create vector store
    embeddings = OpenAIEmbeddings()
    vectorstore = Chroma.from_documents(
        documents=splits,
        embedding=embeddings,
        collection_name=collection_name,
        persist_directory="./chroma_db"
    )

    return vectorstore

# Step 2: Define the retrieval component
def retrieve(state: RAGState) -> RAGState:
    """Retrieve relevant documents based on the query."""
    vectorstore = Chroma(
        persist_directory="./chroma_db",
        embedding_function=OpenAIEmbeddings()
    )
    retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
    state["retrieval_results"] = retriever.invoke(state["query"])
    return state

# Step 3: Format the retrieved documents into context
def format_context(state: RAGState) -> RAGState:
    """Format retrieved documents into a single context string."""
    if state["retrieval_results"]:
        context_texts = [doc.page_content for doc in state["retrieval_results"]]
        sources = [doc.metadata.get("source", "Unknown") for doc in state["retrieval_results"]]

        formatted_context = "\n\n".join([
            f"Source: {sources[i]}\nContent: {context_texts[i]}"
            for i in range(len(context_texts))
        ])

        state["context"] = formatted_context
    else:
        state["context"] = "No relevant information found."

    return state

# Step 4: Generate an answer using the LLM
def generate_answer(state: RAGState) -> RAGState:
    """Generate an answer using the retrieved context."""
    llm = ChatOpenAI(model="gpt-4o", temperature=0)

    prompt = ChatPromptTemplate.from_template("""
    You are a helpful AI assistant. Answer the user's question based on the provided context.
    If the context doesn't contain relevant information, say so and try to provide general information.

    Context:
    {context}

    Question: {query}

    Answer:
    """)

    # Create the chain
    chain = (
        prompt
        | llm
        | StrOutputParser()
    )

    # Generate the answer
    state["answer"] = chain.invoke({
        "context": state["context"],
        "query": state["query"]
    })

    return state

# Step 5: Build the LangGraph
def build_rag_graph() -> StateGraph:
    """Build the RAG workflow graph."""
    workflow = StateGraph(RAGState)

    # Add nodes
    workflow.add_node("retrieve", retrieve)
    workflow.add_node("format_context", format_context)
    workflow.add_node("generate_answer", generate_answer)

    # Add edges
    workflow.add_edge("retrieve", "format_context")
    workflow.add_edge("format_context", "generate_answer")
    workflow.add_edge("generate_answer", END)

    # Set entry point
    workflow.set_entry_point("retrieve")

    return workflow.compile()

# Example usage
def main():
    # Set up your document collection (do this once)
    # ingest_documents("./documents", "my_knowledge_base")

    # Create the graph
    rag_graph = build_rag_graph()

    # Run the graph with a query
    result = rag_graph.invoke({"query": "What is the capital of France?"})

    print(f"Query: {result['query']}")
    print(f"Answer: {result['answer']}")

if __name__ == "__main__":
    main()

Query: What is the capital of France?
Answer: The context does not provide relevant information, but I can help with general knowledge. The capital of France is Paris.
