# LangGraph Async Map-Reduce Summarization (Working Version)

## 1. Imports and Setup

In [61]:
import asyncio
from typing import List, TypedDict
from langchain_core.documents import Document
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain_core.prompts import PromptTemplate
from langgraph.graph import StateGraph, END
from langchain_community.chat_models import ChatOllama

## 2. Document Preparation

In [62]:

docs = [
    Document(page_content="LangChain is a framework for developing applications powered by language models."),
    Document(page_content="It enables applications like chatbots, agents, and summarizers."),
    Document(page_content="LangGraph adds control flow and memory to chains using a graph-based structure."),
]


## 3. LLM and Prompt Setup

In [63]:
llm = ChatOllama(model="llama3.2")

In [64]:
map_prompt = PromptTemplate.from_template("Summarize this document:\n{doc}")
map_chain = map_prompt | llm

In [76]:
reduce_prompt = PromptTemplate.from_template("Combine these summaries into one coherent summary:\n{summaries}")
reduce_chain = reduce_prompt | llm


## 4. Define State and Nodes

In [77]:

class State(TypedDict):
    docs: List[Document]
    accumulated_summaries: List[str]
    current_summary: str
    final_summary: str

In [78]:
def map_node(state: State) -> State:
    print("🧩 Running map_node")
    if state["docs"]:
        doc = state["docs"].pop(0)
        result = map_chain.invoke({"doc": doc})
        # print("📄 Map result:", result)
        state["accumulated_summaries"].append(result)
    return state

In [79]:
def reduce_node(state: State) -> State:
    print("\n🧪 Reduce step")

    # In case AIMessage objects are accidentally passed in
    summaries = []
    for msg in state["accumulated_summaries"]:
        if hasattr(msg, "content"):  # AIMessage
            summaries.append(msg.content)
        else:  # Regular string
            summaries.append(str(msg))

    combined = "\n".join(summaries)
    result = reduce_chain.invoke({"summaries": combined})
    state["final_summary"] = result["text"]
    return state


In [80]:
def should_continue(state: State) -> str:
    # print("🔁 Deciding next step. Docs left:", len(state["docs"]))
    if state["docs"]:
        return "map"
    else:
        return "reduce"


## 5. Define LangGraph

In [81]:
builder = StateGraph(State)

In [82]:
builder.add_node("map", map_node)
builder.add_node("reduce", reduce_node)

<langgraph.graph.state.StateGraph at 0x1a175df19d0>

In [83]:
builder.set_entry_point("map")
builder.add_conditional_edges("map", should_continue)
builder.add_edge("reduce", END)

<langgraph.graph.state.StateGraph at 0x1a175df19d0>

In [84]:
graph = builder.compile()

## 6. Run Graph Asynchronously

In [85]:
async def summarize_with_map_reduce(documents: List[Document]) -> str:
    initial_state: State = {
        "docs": documents.copy(),
        "accumulated_summaries": [],
        "current_summary": "",
        "final_summary": "",
    }
    async for step in graph.astream(initial_state):
        # print("🔄 Step Output:", step)
        if "final_summary" in step and step["final_summary"]:
            return step["final_summary"]
    return "❌ Final summary not generated"

# Run inside an event loop
import nest_asyncio
nest_asyncio.apply()

In [86]:
summary = await summarize_with_map_reduce(docs)

🧩 Running map_node
🧩 Running map_node
🧩 Running map_node

🧪 Reduce step


TypeError: 'AIMessage' object is not subscriptable

In [None]:
print("\n✅ Final Summary:")
print(summary)
