LangSmith is a platform built specifically for debugging, testing, and monitoring LLM applications. For LangGraph, it's an indispensable tool that gives you an X-ray view into your graph's execution, turning a complex black box into a transparent, explorable flowchart.

**What LangSmith Provides**<br>
Instead of trying to follow a messy stream of print statements, LangSmith gives you a rich, interactive UI where you can see:

A full trace of your graph's execution path, node by node.

The exact inputs (the state) each node received.

The exact outputs (the state updates) each node produced.

The full prompt and response for every LLM call.

The inputs and outputs for every tool call.

A "diff" view showing precisely how the state changed at each step.

In [1]:
import os
from dotenv import load_dotenv
load_dotenv()

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGSMITH_API_KEY")
os.environ["LANGCHAIN_PROJECT"] = "Hierarchical Agent Teams"

In [2]:
import os
import json
from typing import TypedDict, Annotated, List
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.tools.tavily_search import TavilySearchResults



tool = TavilySearchResults(max_results=3)
model = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0, gemini_api_key=os.getenv("GOOGLE_API_KEY"))

# ==============================================================================
# PART 1: CREATE THE WORKER GRAPH (Our Adaptive RAG Agent)
# This is a self-contained agent that can research a topic and self-correct.
# ==============================================================================

class AdaptiveRAGState(TypedDict):
    """The state for our worker agent."""
    messages: Annotated[List[BaseMessage], add_messages]

def create_adaptive_rag_graph():
    """Factory function to create the worker graph."""
    
    # Define worker nodes
    def retrieval_node(state: AdaptiveRAGState):
        print("---WORKER: RETRIEVAL---")
        query = state['messages'][-1].content
        retrieved_docs = tool.invoke({"query": query})
        doc_text = "\n\n".join(str(d) for d in retrieved_docs)
        return {"messages": [HumanMessage(content=doc_text, name="Retriever")]}

    def assessment_node(state: AdaptiveRAGState):
        print("---WORKER: ASSESSMENT---")
        system_prompt = """You are a relevance assessor. Your task is to evaluate if the retrieved documents are sufficient to answer the user's question.
        Respond with a JSON object with one key, 'is_relevant': a boolean."""
        retrieved_docs = state['messages'][-1].content
        user_question = state['messages'][-2].content
        prompt = f"User Question: {user_question}\n\nRetrieved Documents:\n{retrieved_docs}"
        response = model.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)])
        return {"messages": [HumanMessage(content=response.content, name="Assessor")]}

    def generation_node(state: AdaptiveRAGState):
        print("---WORKER: GENERATION---")
        prompt = f"Based on the following documents, please provide a comprehensive answer to this question:\n\nQuestion: {state['messages'][0].content}\n\nDocuments:\n{state['messages'][-2].content}"
        response = model.invoke(prompt)
        return {"messages": [response]}

    # Define worker router
    def relevance_router(state: AdaptiveRAGState) -> str:
        print("---WORKER ROUTER---")
        assessment_message = state['messages'][-1].content
        try:
            assessment_json = json.loads(assessment_message)
            if assessment_json.get('is_relevant'):
                return "generate"
            else:
                return "retrieve" # Loop back to retrieve with the same query for simplicity
        except (json.JSONDecodeError, KeyError):
            return "end"

    # Build the worker graph
    builder = StateGraph(AdaptiveRAGState)
    builder.add_node("retriever", retrieval_node)
    builder.add_node("assessor", assessment_node)
    builder.add_node("generator", generation_node)
    
    builder.set_entry_point("retriever")
    builder.add_edge("retriever", "assessor")
    builder.add_conditional_edges("assessor", relevance_router, {
        "generate": "generator",
        "retrieve": "retriever",
        "end": END,
    })
    builder.add_edge("generator", END)
    
    return builder.compile()

# ==============================================================================
# PART 2: CREATE THE MANAGER GRAPH
# This graph manages a list of tasks and delegates each one to the worker.
# ==============================================================================

# First, create an instance of our worker graph.
research_worker_graph = create_adaptive_rag_graph()

# Define the Manager's state
class ManagerState(TypedDict):
    topics_to_research: List[str]
    current_topic: str
    results: Annotated[List[str], lambda x, y: x + y] # Reducer to append results

# Define the Manager's nodes
def select_topic_node(state: ManagerState):
    """Selects the next topic from the list to be researched."""
    print(f"---MANAGER: {len(state['topics_to_research'])} topics left---")
    topic = state['topics_to_research'][0]
    remaining_topics = state['topics_to_research'][1:]
    return {"current_topic": topic, "topics_to_research": remaining_topics}

def researcher_proxy_node(state: ManagerState):
    """This is the key node. It invokes the worker graph."""
    topic = state['current_topic']
    print(f"---MANAGER: DELEGATING '{topic}' TO WORKER---")
    
    # Invoke the worker graph with the current topic
    worker_response = research_worker_graph.invoke(
        {"messages": [HumanMessage(content=topic)]}
    )
    
    # The worker's final answer is in its last message.
    final_answer = worker_response['messages'][-1].content
    
    # Append the result to the manager's list of results
    return {"results": [f"Topic: {topic}\n\n{final_answer}"]}

# Define the Manager's router
def manager_router(state: ManagerState):
    """Routes to the end if there are no more topics to research."""
    if not state['topics_to_research']:
        return "end"
    else:
        return "continue"

# Build the Manager graph
manager_builder = StateGraph(ManagerState)
manager_builder.add_node("select_topic", select_topic_node)
manager_builder.add_node("research_worker", researcher_proxy_node)

manager_builder.set_entry_point("select_topic")
manager_builder.add_edge("select_topic", "research_worker")
manager_builder.add_conditional_edges("research_worker", manager_router, {
    "continue": "select_topic",
    "end": END
})

manager_graph = manager_builder.compile()

# ==============================================================================
# PART 3: EXECUTE THE HIERARCHY
# ==============================================================================

# The list of topics for the manager to delegate.
topics = ["The future of AI hardware", "Recent advancements in large language models"]
initial_state = {"topics_to_research": topics, "results": []}

# Stream the results from the manager graph.
print("\n---EXECUTING HIERARCHICAL TEAM---")
final_result = manager_graph.invoke(initial_state)

print("\n\n" + "="*80)
print("--- HIERARCHY EXECUTION COMPLETE ---")
print("Collected results:")
for i, result in enumerate(final_result['results']):
    print(f"\n--- RESULT {i+1} ---")
    print(result)

  tool = TavilySearchResults(max_results=3)
Unexpected argument 'gemini_api_key' provided to ChatGoogleGenerativeAI. Did you mean: 'google_api_key'?
                gemini_api_key was transferred to model_kwargs.
                Please confirm that gemini_api_key is what you intended.
  exec(code_obj, self.user_global_ns, self.user_ns)



---EXECUTING HIERARCHICAL TEAM---
---MANAGER: 2 topics left---
---MANAGER: DELEGATING 'The future of AI hardware' TO WORKER---
---WORKER: RETRIEVAL---
---WORKER: ASSESSMENT---
---WORKER ROUTER---
---MANAGER: 1 topics left---
---MANAGER: DELEGATING 'Recent advancements in large language models' TO WORKER---
---WORKER: RETRIEVAL---
---WORKER: ASSESSMENT---
---WORKER ROUTER---


--- HIERARCHY EXECUTION COMPLETE ---
Collected results:

--- RESULT 1 ---
Topic: The future of AI hardware

```json
{
  'is_relevant': True
}
```

--- RESULT 2 ---
Topic: Recent advancements in large language models

```json
{
  "is_relevant": true
}
```


### Streaming and Asynchronous Operations
### For Real-Time Token Streaming:

**If you want to see individual tokens as they're generated (like ChatGPT), you need astream_log() because:**

<ol><li>graph.astream() only shows you the final result after each node finishes</li>
<li>graph.astream_log() shows you every intermediate yield from your generator functions</li></ul>


 you use yield to turn a regular function into a generator that can stream out multiple values over time instead of just returning one value at the end.

using astream_log() for in depth metadata data

In [None]:
import asyncio
import os
from typing import TypedDict, Annotated, List
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import StateGraph, END
from langchain_google_genai import ChatGoogleGenerativeAI

# --- 1. State and Model ---
class StreamState(TypedDict):
    messages: Annotated[List[BaseMessage], ...]
    poem: Annotated[str, lambda old, new: new]  # REPLACE, not add

model = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash", 
    temperature=0.7,
    gemini_api_key=os.getenv("GOOGLE_API_KEY")
)

# --- 2. The Streaming Node ---
async def writer_node(state: StreamState):
    """A node that streams the output of an LLM."""
    print("---STREAMING WRITER---")
    prompt = f"Write a short, four-line poem about {state['messages'][-1].content}."
    stream = model.astream(prompt)
    
    async for chunk in stream:
        yield {**state, "poem": chunk.content}

# --- 3. Build The Graph ---
builder = StateGraph(StreamState)
builder.set_entry_point("writer")
builder.add_node("writer", writer_node)
builder.add_edge("writer", END)
graph = builder.compile()

async def run_agent():
    """Runs the agent and prints the streamed output in real-time."""
    print("---STARTING STREAMING AGENT---")
    async for patch in graph.astream_log(
        {"messages": [HumanMessage(content="the moon")], "poem": ""},
        config={"include_values": True}
    ):
        i=0
        if i==0:
            print(patch)
            i+=1
        for op in patch.ops:
            # Look for our writer node's streamed output
            if (op["op"] == "add" and 
                "/logs/writer" in op["path"] and 
                "/streamed_output/-" in op["path"]):
                # Extract the poem content from the streamed output
                value = op.get("value", {})
                if isinstance(value, dict) and "poem" in value:
                    poem_chunk = value["poem"]
                    print(poem_chunk, end="", flush=True)
            
            # Also catch the final complete poem
            elif (op["op"] == "replace" and op["path"] == "/final_output"):
                final_state = op.get("value", {})
                if "poem" in final_state:
                    print(f"\n\n--- FINAL POEM ---\n{final_state['poem']}")
    
    print("\n---STREAMING COMPLETE---")

await run_agent()

Unexpected argument 'gemini_api_key' provided to ChatGoogleGenerativeAI. Did you mean: 'google_api_key'?
                gemini_api_key was transferred to model_kwargs.
                Please confirm that gemini_api_key is what you intended.
  exec(code_obj, self.user_global_ns, self.user_ns)


---STARTING STREAMING AGENT---
RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': 'd0f345f5-dc0e-4db8-b186-c2d58ca69410',
            'logs': {},
            'name': 'LangGraph',
            'streamed_output': [],
            'type': 'chain'}})
RunLogPatch({'op': 'add',
  'path': '/logs/writer',
  'value': {'end_time': None,
            'final_output': None,
            'id': '5c3153f4-c487-4a46-afb4-543705d89eaf',
            'metadata': {'include_values': True,
                         'langgraph_checkpoint_ns': 'writer:5a5da2d7-588f-81d0-b49f-754390e7690b',
                         'langgraph_node': 'writer',
                         'langgraph_path': ('__pregel_pull', 'writer'),
                         'langgraph_step': 1,
                         'langgraph_triggers': ('branch:to:writer',)},
            'name': 'writer',
            'start_time': '2025-07-26T13:46:38.520+00:00',
            'streamed_output': [],
            'streamed_

for astream()

In [52]:
import asyncio
import os
from typing import TypedDict, Annotated, List
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import StateGraph, END
from langchain_google_genai import ChatGoogleGenerativeAI

# --- 1. State and Model ---
class StreamState(TypedDict):
    messages: Annotated[List[BaseMessage], ...]
    poem: Annotated[str, lambda old, new: new]  # REPLACE, not add

model = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash", 
    temperature=0.7,
    gemini_api_key=os.getenv("GOOGLE_API_KEY")
)

# --- 2. The Streaming Node ---
async def writer_node(state: StreamState):
    """A node that streams the output of an LLM."""
    print("---STREAMING WRITER---")
    prompt = f"Write a short, four-line poem about {state['messages'][-1].content}."
    stream = model.astream(prompt)
    
    async for chunk in stream:
        yield {**state, "poem": chunk.content}

# --- 3. Build The Graph ---
builder = StateGraph(StreamState)
builder.set_entry_point("writer")
builder.add_node("writer", writer_node)
builder.add_edge("writer", END)
graph = builder.compile()

async def run_agent_simple():
    """Simple streaming - just gets the final state of each step."""
    print("---STARTING SIMPLE STREAMING---")
    async for step in graph.astream(
        {"messages": [HumanMessage(content="the moon")], "poem": ""}
    ):
        # Each 'step' is a dict with node_name: final_state
        i=0
        if i==0:
            print(step)
            i+=1
        for node_name, state in step.items():
            if "poem" in state and state["poem"]:
                print(f"[{node_name.upper()}]: {state['poem']}")
    
    print("\n---SIMPLE STREAMING COMPLETE---")

await run_agent_simple()

Unexpected argument 'gemini_api_key' provided to ChatGoogleGenerativeAI. Did you mean: 'google_api_key'?
                gemini_api_key was transferred to model_kwargs.
                Please confirm that gemini_api_key is what you intended.
  exec(code_obj, self.user_global_ns, self.user_ns)


---STARTING SIMPLE STREAMING---
---STREAMING WRITER---
{'writer': {'messages': [HumanMessage(content='the moon', additional_kwargs={}, response_metadata={})], 'poem': ' with gentle sighs,\nWhile the world softly sleeps.\n'}}
[WRITER]:  with gentle sighs,
While the world softly sleeps.


---SIMPLE STREAMING COMPLETE---


In [16]:
def generator_function():
    print("Sending first chunk")
    yield "First" # Pauses here
    
    print("Resuming and sending second chunk")
    yield "Second" # Pauses here
    
    print("Resuming and sending final chunk")
    yield "Third" # Pauses and ends

# How you use it:
for chunk in generator_function():
    print(f"Received: {chunk}")

# Output:
# Sending first chunk
# Received: First
# Resuming and sending second chunk
# Received: Second
# Resuming and sending final chunk
# Received: Third

Sending first chunk
Received: First
Resuming and sending second chunk
Received: Second
Resuming and sending final chunk
Received: Third


In [47]:
import os
from typing import TypedDict, Annotated, List
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import StateGraph, END
from langchain_google_genai import ChatGoogleGenerativeAI

class StreamState(TypedDict):
    messages: Annotated[List[BaseMessage], ...]
    poem: Annotated[str, lambda old, new: new]

model = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash",
    temperature=0.7,
    google_api_key=os.getenv("GOOGLE_API_KEY")
)

async def writer_node(state: StreamState):
    print("---STREAMING WRITER STARTED---")
    prompt = f"Write a short, four-line poem about {state['messages'][-1].content}."
    stream = model.astream(prompt)
    chunk_count = 0
    async for chunk in stream:
        chunk_count += 1
        print(f"[CHUNK {chunk_count}]: '{chunk.content}'")  # Debug print
        yield {**state, "poem": chunk.content}
    print(f"---STREAMING WRITER FINISHED--- (Total chunks: {chunk_count})")

builder = StateGraph(StreamState)
builder.set_entry_point("writer")
builder.add_node("writer", writer_node)
builder.add_edge("writer", END)
graph = builder.compile()

async def run_agent():
    print("---STARTING AGENT---")
    async for patch in graph.astream_log(
        {"messages": [HumanMessage(content="the moon")], "poem": ""},
        config={"include_values": True}
    ):
        for op in patch.ops:
            # Look for the actual streaming output path
            if op["op"] == "add" and "/streamed_output/-" in op["path"]:
                # Check if it's a poem update (from our writer node)
                if "poem" in str(op.get("value", {})):
                    poem_content = op["value"].get("poem", "")
                    if poem_content:
                        print(poem_content, end="", flush=True)
            
            # Alternative: Look for final output
            elif op["op"] == "replace" and op["path"] == "/final_output":
                final_state = op.get("value", {})
                if "poem" in final_state:
                    print(f"\n\nFINAL POEM:\n{final_state['poem']}")
    
    print("\n---STREAMING COMPLETE---")

await run_agent()


---STARTING AGENT---
---STREAMING WRITER STARTED---
[CHUNK 1]: 'Silver'
Silver[CHUNK 2]: ' orb in velvet skies,
A watchful eye, where darkness lies.
Pull'
 orb in velvet skies,
A watchful eye, where darkness lies.
Pull[CHUNK 3]: 'ing tides with gentle grace,
A silent smile on night's dark face.
'
ing tides with gentle grace,
A silent smile on night's dark face.
---STREAMING WRITER FINISHED--- (Total chunks: 3)
ing tides with gentle grace,
A silent smile on night's dark face.

---STREAMING COMPLETE---
