# Distributed Tracing

## Setup

Let's start by loading in our environment variables, and setting up an LLM

In [1]:
# Using a .env file
from dotenv import load_dotenv
load_dotenv(dotenv_path="../.env", override=True)

from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

Distributed tracing lets you associate traces even if they take place on different systems or processes. We'll demonstrate this concept by creating a trace that spans a Server-Client call. 

The trace will originate in the Client, which will serve as the parent run. We will then trace the Server's execution to appear as a child run under the parent run.

## Creating the Client

Let's start by setting up the client, which will originate the trace and send a request to the server. When sending the request, the client will include information about the current trace in its request headers, using the run_tree.to_headers() function.

In [2]:
import httpx
import nest_asyncio
from typing_extensions import TypedDict
from langsmith import Client
from langsmith.run_helpers import tracing_context, get_current_run_tree
from langgraph.graph import StateGraph, START, END

# Client (Parent) ------------------------------------------------------------ #

class ParentState(TypedDict):
    input_value: int
    output_value: int

async def parent_node(state: ParentState) -> ParentState:
    print(f"Parent graph received input: {state['input_value']}")
    headers = {}
    async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
        run_tree = get_current_run_tree()
        headers.update(run_tree.to_headers())
        response = await client.post("/tracing", headers=headers, json={"value": state["input_value"]})
        result = response.json()
        print(f"Child graph returned: {result['value']}")
        return {"input_value": state["input_value"], "output_value": result["value"]}

parent_builder = StateGraph(ParentState)
parent_builder.add_node("parent_node", parent_node)
parent_builder.add_edge(START, "parent_node")
parent_builder.add_edge("parent_node", END)
parent_graph = parent_builder.compile()

async def run_client():
    # Run the parent graph with initial input
    result = await parent_graph.ainvoke({"input_value": 10, "output_value": 0})
    return result["output_value"]


## Creating the Server

Next, we'll create the server, which will receive the trace. It will take in trace metadata sent from the client in the headers, which it will use to set the context of its own trace.

This will allow any tracing done in the server to still be associated with the request from the client.

In [3]:
# server.py
import asyncio
import threading
from typing_extensions import TypedDict

import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, Request

from langgraph.graph import StateGraph, START, END
from langsmith.run_helpers import tracing_context, get_current_run_tree
from langsmith import Client


# Server (Child) ------------------------------------------------------------ #
class ChildState(TypedDict):
    value: int

async def child_node(state: ChildState):
    generation = llm.invoke("What is " + str(state["value"]) + " 1? Respond with a single number, no extra text.")
    return {"value": int(generation.content)}

child_builder = StateGraph(ChildState)
child_builder.add_node("child_node", child_node)
child_builder.add_edge(START, "child_node")
child_builder.add_edge("child_node", END)
child_graph = child_builder.compile()

app = FastAPI() 

@app.post("/tracing")
async def tracing(request: Request):
    parent_headers = {
        "langsmith-trace": request.headers.get("langsmith-trace"),
        "baggage": request.headers.get("baggage"),
    }

    with tracing_context(parent=parent_headers):
        data = await request.json()
        result = await child_graph.ainvoke({"value": data["value"]})
        return result

def run_server():
    uvicorn.run(app, host="0.0.0.0", port=8000, loop="asyncio")

## Running Our Distributed System

Let's put this all into action!

In [4]:
# ---------- Main ---------- #
import nest_asyncio
nest_asyncio.apply()

async def main():
    thread = threading.Thread(target=run_server, daemon=True)
    thread.start()
    await asyncio.sleep(1)

    result = await run_client()
    print("Server replied:", result)
    await asyncio.sleep(1)

await main()

INFO:     Started server process [64819]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


Parent graph received input: 10
INFO:     127.0.0.1:56716 - "POST /tracing HTTP/1.1" 200 OK
Child graph returned: 11
Server replied: 11


### Now let's view the trace in LangSmith!