<a href="https://colab.research.google.com/github/dipanjanS/mastering-intelligent-agents-langgraph-workshop-dhs2025/blob/main/Module-4-Building-Advanced-Agentic-AI-Systems/M4LC1_Parallelized_Plan_Execution_in_Report_Planner_Agents.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Parallelized Plan Execution in Planning Agents with  Map-Reduce using Send in LangGraph


Map-reduce operations are essential for efficient task decomposition and parallel processing.

It has two phases:

(1) `Map` - Break a task into smaller sub-tasks, processing each sub-task in parallel.

(2) `Reduce` - Aggregate the results across all of the completed, parallelized sub-tasks.

We will design a system that will do two things:

(1) `Map` - Create a set of questions about a topic. Then answer them in parallel (using `Send(...)`)

(2) `Reduce` - Compile a comprehensive report based on the QAs on the topic.

![](https://i.imgur.com/SN7KifO.png)


LangGraph's Map-Reduce pattern enables efficient task decomposition and parallel processing, enhancing performance in complex workflows. The Send function plays a pivotal role in this mechanism.

Map-Reduce in LangGraph:
- Task Decomposition: Breaks down a large task into smaller, manageable sub-tasks (planning or complex question decomposition)
- Parallel Processing: Executes sub-tasks concurrently, significantly reducing overall processing time.
- Result Aggregation: Combines outcomes from all sub-tasks to form a comprehensive response.





## Install OpenAI, LangGraph and LangChain dependencies

In [None]:
!pip install langchain==0.3.27 langchain-community==0.3.27 langchain-openai==0.3.30 langgraph==0.6.5 --quiet

## Enter Open AI API Key & Setup Environment Variables

In [None]:
import os
import getpass

# OpenAI API Key (for chat & embeddings)
if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key (https://platform.openai.com/account/api-keys):\n")


## Define Agent State Schema

In [None]:
from typing_extensions import TypedDict
from pydantic import BaseModel
import operator
from typing import Annotated

# Define state
class Questions(BaseModel):
    questions: list[str]

class Answer(BaseModel):
    question: str
    answer: str

class Report(BaseModel):
    report: str

class OverallState(TypedDict):
    topic: str
    questions: list
    answers: Annotated[list, operator.add]
    report: str

## Define Agent Node Functions

Role of the Send Function:
- Dynamic Task Distribution: Utilizes the Send function to dispatch different states to multiple instances of a node, facilitating parallel execution.
- Flexible Workflow Management: Very useful when you do not have a fixed number of static edges to parallelize like in router agent. A simple example would be generating a random number of questions or steps to solve a problem and parallelizing the generation process for each of those questions or steps.

In [None]:
from langgraph.types import Send
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# Node to generate questions
def generate_questions(state: OverallState):
    # sometimes gpt-4o just generates 5 questions always so feel free to play around with the following prompt
    # or you can also use gpt-4o-mini or other LLMs or just randomly select a subselt of questions from the response
    # just to demonstrate and see how Send works with a variable number of questions
    questions_prompt = """Generate a list of concise sub-questions related to this overall topic: {topic}
                          which would help build a good report.
                          Follow these rules for question generation:
                            - Do not create very long questions.
                            - Number of questions should always be 3 for broad generic topics (Birds, Animals, Artifical Intelligence)
                              and 5 for more complex specific topics (Outlook for ..., Impact of ...)
                       """
    prompt = questions_prompt.format(topic=state["topic"])
    response = llm.with_structured_output(Questions).invoke(prompt)
    return {"questions": response.questions}

# Node to generate answer to one question
def generate_answer(state: Answer):
    answer_prompt = """Generate the answer about {question}."""
    prompt = answer_prompt.format(question=state["question"])
    response = llm.with_structured_output(Answer).invoke(prompt)
    return {"answers": [{"question": state["question"], "answer": response.answer}]}

# Node to parallelize answer generation
def parallelize_answer_generation(state: OverallState):
    return [Send("generate_answer", {"question": q}) for q in state["questions"]] # does the parallel execution

# Node to compile the report
def compile_report(state: OverallState):
    q_and_a = "\n\n".join(
        [f"Q: {qa['question']}\nA: {qa['answer']}" for qa in state["answers"]]
    )
    report_prompt = """Below are a bunch of questions and answers about topic:
                       {topic}.
                       Generate a detailed report from this about the topic.
                       {q_and_a}"""
    prompt = report_prompt.format(topic=state["topic"], q_and_a=q_and_a)
    response = llm.with_structured_output(Report).invoke(prompt)
    return {"report": response.report}

## Create Agent

In [None]:
from langgraph.graph import StateGraph, START, END

# Compile the graph
graph = StateGraph(OverallState)
graph.add_node("generate_questions", generate_questions)
graph.add_node("generate_answer", generate_answer)
graph.add_node("compile_report", compile_report)

graph.add_edge(START, "generate_questions")
# the following is not doing any conditional branching
# except using the Send function to dynamically create N copies of generate_answer
# where N = the number of questions (steps in the plan)
graph.add_conditional_edges("generate_questions",
                            parallelize_answer_generation,
                            ["generate_answer"])
graph.add_edge("generate_answer", "compile_report")
graph.add_edge("compile_report", END)

# Compile the app
agent = graph.compile()

In [None]:
# Display the graph
from IPython.display import display, Image
display(Image(agent.get_graph().draw_mermaid_png()))

## Run and Test the Agent

In [None]:
from IPython.display import display, Markdown

for state in agent.stream({"topic": "Artificial Intelligence"}):
    print(state)
    if 'compile_report' in state:
        display(Markdown(state['compile_report']['report']))

In [None]:
for state in agent.stream({"topic": "Impact of AI on jobs"}):
    print(state)
    if 'compile_report' in state:
        display(Markdown(state['compile_report']['report']))