In [1]:
%pip install -qU "langchain[openai]"
%pip install -qU langchain-openai
%pip install -qU langchain-chroma
%pip install --upgrade --quiet langgraph langchain-community beautifulsoup4


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;4

In [2]:
import os 
import getpass

# if the api key doesnt exist get it 
if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass.getpass()

 ········


In [51]:
from langchain.chat_models import init_chat_model
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma 

llm = init_chat_model("openai:gpt-5-nano")
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vector_store = Chroma(
    collection_name="qna",
    embedding_function=embeddings
)


In [4]:
os.environ["LANGSMITH_TRACING"] = "true"
if not os.environ.get("LANGSMITH_API_KEY"):
    os.environ["LANGSMITH_API_KEY"] = getpass.getpass()

 ········


In [52]:
import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from typing_extensions import List, TypedDict

# Load and chunk contents of the blog
loader = WebBaseLoader(
    web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
    bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(
            class_=("post-content", "post-title", "post-header")
        )
    ),
)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)

In [53]:
_ = vector_store.add_documents(all_splits)
print(f"Loaded {len(all_splits)} document chunks")


Loaded 63 document chunks


In [54]:
from langgraph.graph import MessagesState, StateGraph

graph_builder = StateGraph(MessagesState)

Conversational experiences can be tracked through messages. Retrieved documents and other artifacts can be retrieved through tool messages. Specifically we will have:

1. User input as **HumanMessage**
2. Vector store query as an **AIMessage** with tool call
3. Retrieved documents as a **ToolMessage**
4. Final response as a **AIMessage**

Tool calling is a way for the AI to interact with not just the user, but also databases and API's. 

In [55]:
from langchain_core.tools import tool


@tool(response_format="content_and_artifact")
def retrieve(query: str):
    """Retrieve information related to a query."""
    retrieved_docs = vector_store.similarity_search(query, k=2)
    serialized = "\n\n".join(
        (f"Source: {doc.metadata}\nContent: {doc.page_content}")
        for doc in retrieved_docs
    )
    return serialized, retrieved_docs

Our graph will have 3 nodes: 

1. A node that fields user input; either generating a query for the retriever or responding directly
2. A node for the retrieval tool that executes the retrieval step
3. A node that generates the final response using the retrieved context

In [56]:
from langchain_core.messages import SystemMessage
from langgraph.prebuilt import ToolNode


# Step 1: Generate an AIMessage that may include a tool-call to be sent.
def query_or_respond(state: MessagesState):
    """Generate tool call for retrieval or respond."""
    llm_with_tools = llm.bind_tools([retrieve])
    response = llm_with_tools.invoke(state["messages"], config={"temperature": 0})
    # MessagesState appends messages to state instead of overwriting
    return {"messages": [response]}


# Step 2: Execute the retrieval.
tools = ToolNode([retrieve])


# Step 3: Generate a response using the retrieved content.
def generate(state: MessagesState):
    """Generate answer."""
    # Get generated ToolMessages
    recent_tool_messages = []
    for message in reversed(state["messages"]):
        if message.type == "tool":
            recent_tool_messages.append(message)
        else:
            break
    tool_messages = recent_tool_messages[::-1]

    # Format into prompt
    docs_content = "\n\n".join(doc.content for doc in tool_messages)
    system_message_content = (
        "You are an assistant for question-answering tasks. "
        "Use the following pieces of retrieved context to answer "
        "the question. If you don't know the answer, say that you "
        "don't know. Use three sentences maximum and keep the "
        "answer concise."
        "\n\n"
        f"{docs_content}"
    )
    conversation_messages = [
        message
        for message in state["messages"]
        if message.type in ("human", "system")
        or (message.type == "ai" and not message.tool_calls)
    ]
    prompt = [SystemMessage(system_message_content)] + conversation_messages

    # Run
    response = llm.invoke(prompt)
    return {"messages": [response]}

Finally we create a graph object to connect the steps in the sequence. We also allow **query_or_respond** to respond directly to the user if it does not generate a tool call 

In [57]:
from langgraph.graph import END
from langgraph.prebuilt import ToolNode, tools_condition

graph_builder.add_node("query_or_respond", query_or_respond)
graph_builder.add_node("tools", tools)
graph_builder.add_node("generate", generate)

graph_builder.set_entry_point("query_or_respond")
graph_builder.add_conditional_edges(
    "query_or_respond",
    tools_condition,
    {END: END, "tools": "tools"},
)
graph_builder.add_edge("tools", "generate")
graph_builder.add_edge("generate", END)

graph = graph_builder.compile()

In [58]:
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

ValueError: Failed to reach https://mermaid.ink/ API while trying to render your graph. Status code: 502.

To resolve this issue:
1. Check your internet connection and try again
2. Try with higher retry settings: `draw_mermaid_png(..., max_retries=5, retry_delay=2.0)`
3. Use the Pyppeteer rendering method which will render your graph locally in a browser: `draw_mermaid_png(..., draw_method=MermaidDrawMethod.PYPPETEER)`

In [60]:
input_message = "What is Task Decomposition based off of the article provided?"

for step in graph.stream(
    {"messages": [{"role": "user", "content": input_message}]},
    stream_mode="values",
):
    step["messages"][-1].pretty_print()


What is Task Decomposition based off of the article provided?

I don’t have the article you’re referring to. Could you paste the article or share the key section? I can summarize exactly what it says about task decomposition once I can see it.

If you just want a quick, general answer in the meantime, here it is:

- Task decomposition is the process of breaking a complex task or problem into smaller, more manageable subtasks.
- It typically follows a top-down or hierarchical approach: start with the overall goal, subdivide it into subgoals, and continue splitting until each task is actionable (often atomic).
- The resulting structure is often organized as a Work Breakdown Structure (WBS) or similar hierarchy, showing dependencies and the sequence of work.
- Benefits: easier estimation, assignment, parallel work, risk reduction, and clearer scope.
- Common considerations: deciding on the right level of granularity, ensuring each subtask has a defined input/output and owner, and trackin

In [14]:
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)

# Specify an ID for the thread
config = {"configurable": {"thread_id": "abc123"}}

In [15]:
input_message = "What is Task Decomposition?"

for step in graph.stream(
    {"messages": [{"role": "user", "content": input_message}]},
    stream_mode="values",
    config=config,
):
    step["messages"][-1].pretty_print()


What is Task Decomposition?

Task decomposition is a problem‑solving technique where a complex task or problem is broken down into smaller, more manageable subtasks or components. Each subtask is easier to plan, implement, test, and integrate than the whole task at once.

Key points:
- Purpose: reduce complexity, clarify work, improve estimates, and allow parallel work.
- How it’s done (typical steps):
  1) Define the overall goal or outcome.
  2) Identify major components or deliverables.
  3) Break each component into smaller tasks, repeating until tasks are actionable (with clear acceptance criteria).
  4) Define interfaces, dependencies, owners, and how you’ll know “done.”
  5) Sequence and estimate the tasks.
- Common framework: Work Breakdown Structure (WBS) in project management.
- Example: Building a user registration feature
  - Overall: Implement user registration
  - Subtasks: design UI, implement frontend form, create backend API, add validation, implement email verificati