<!-- NOTEBOOK_METADATA source: "⚠️ Jupyter Notebook" title: "Trace Temporal Workflows with Langfuse" sidebarTitle: "Temporal" logo: "/images/integrations/temporal_icon.svg" description: "Learn how to use Langfuse to monitor Temporal workflows and activities via OpenTelemetry" category: "Integrations" -->

# Trace Temporal Workflows with Langfuse

This notebook demonstrates how to **integrate Langfuse** into your **Temporal workflows** to monitor, debug, and evaluate your AI agents and LLM-powered applications.

> **What is Temporal?**: [Temporal](https://temporal.io/) is a durable execution platform that guarantees the execution of your application code, even in the presence of failures. It provides reliability, scalability, and visibility into long-running workflows and distributed applications.

> **What is Langfuse?**: [Langfuse](https://langfuse.com/) is an open-source observability platform for AI agents and LLM applications. It helps you visualize and monitor LLM calls, tool usage, cost, latency, and more.

## Use Case: Deep Research Agent with Temporal

In this example, we'll build a **deep research agent** that:
- Uses Temporal workflows to orchestrate long-running research tasks
- Leverages OpenAI for research planning and content generation
- Sends all observability data to Langfuse via OpenTelemetry

This setup allows you to:
- **Track workflow execution**: See all workflow runs, activities, and their status
- **Monitor LLM calls**: View prompts, completions, token usage, and costs
- **Debug failures**: Identify bottlenecks and errors in your research pipeline
- **Evaluate quality**: Assess the quality of research outputs over time

## 1. Install Dependencies

Install Temporal SDK, OpenTelemetry packages, and Langfuse:

In [None]:
%pip install temporalio openai opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-http langfuse

## 2. Configure Environment & API Keys

Set up your Langfuse, Temporal, and OpenAI credentials. You can get Langfuse keys by signing up for a free [Langfuse Cloud](https://cloud.langfuse.com/) account or by [self-hosting Langfuse](https://langfuse.com/self-hosting).

In [None]:
import os

# Langfuse configuration
# Get keys for your project from: https://cloud.langfuse.com
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..." 
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..." 
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com" # 🇪🇺 EU region
# os.environ["LANGFUSE_HOST"] = "https://us.cloud.langfuse.com" # 🇺🇸 US region

# OpenAI API key
os.environ["OPENAI_API_KEY"] = "sk-proj-..."

# Temporal server address (use Temporal Cloud or local dev server)
TEMPORAL_HOST = "localhost:7233"  # or your Temporal Cloud endpoint

## 3. Set Up OpenTelemetry Integration

Configure OpenTelemetry to send traces from Temporal to Langfuse. This setup uses the Langfuse OTLP endpoint with Basic Authentication.

In [None]:
import base64
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

# Create authentication header for Langfuse
public_key = os.environ.get("LANGFUSE_PUBLIC_KEY")
secret_key = os.environ.get("LANGFUSE_SECRET_KEY")
langfuse_host = os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")

auth_string = f"{public_key}:{secret_key}"
auth_header = base64.b64encode(auth_string.encode()).decode()

# Configure OTLP exporter for Langfuse
otlp_endpoint = f"{langfuse_host}/api/public/otel"
headers = {"Authorization": f"Basic {auth_header}"}

# Set up OpenTelemetry with Langfuse exporter
resource = Resource(attributes={"service.name": "temporal-research-agent"})
tracer_provider = TracerProvider(resource=resource)
span_processor = BatchSpanProcessor(
    OTLPSpanExporter(endpoint=otlp_endpoint, headers=headers)
)
tracer_provider.add_span_processor(span_processor)
trace.set_tracer_provider(tracer_provider)

print(f"✅ OpenTelemetry configured to send traces to Langfuse at {otlp_endpoint}")

## 4. Initialize Langfuse Client

Verify the Langfuse connection:

In [None]:
from langfuse import get_client
 
langfuse = get_client()
 
# Verify connection
if langfuse.auth_check():
    print("✅ Langfuse client is authenticated and ready!")
else:
    print("❌ Authentication failed. Please check your credentials and host.")

## 5. Define Temporal Activities

Create activities that will be executed as part of the research workflow. Each activity represents a discrete step in the research process.

In [None]:
from temporalio import activity
from openai import OpenAI

client = OpenAI()

@activity.defn
async def plan_research(search_term: str) -> dict:
    """Generate a research plan for the given search term."""
    tracer = trace.get_tracer(__name__)
    
    with tracer.start_as_current_span("PlannerAgent") as span:
        span.set_attribute("search_term", search_term)
        
        response = client.chat.completions.create(
            model="gpt-4.1-2025-04-14",
            messages=[
                {
                    "role": "system",
                    "content": "You are a research assistant. Given a search term, you search the web for relevant information and provide a comprehensive research plan."
                },
                {
                    "role": "user",
                    "content": f"Search term: {search_term}\nReason for searching: Look for travel tips and recommendations for April vacations in the Caribbean."
                }
            ],
            temperature=1,
        )
        
        result = {
            "search_term": search_term,
            "plan": response.choices[0].message.content,
            "model": response.model,
            "usage": {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens,
            }
        }
        
        span.set_attribute("result.plan", result["plan"])
        span.set_attribute("result.total_tokens", result["usage"]["total_tokens"])
        
        return result

@activity.defn
async def execute_research(plan: dict) -> dict:
    """Execute the research plan and generate findings."""
    tracer = trace.get_tracer(__name__)
    
    with tracer.start_as_current_span("ResearchAgent") as span:
        span.set_attribute("plan", str(plan))
        
        response = client.chat.completions.create(
            model="gpt-4.1-2025-04-14",
            messages=[
                {
                    "role": "system",
                    "content": "You are a research assistant. Execute the research plan and provide comprehensive findings."
                },
                {
                    "role": "user",
                    "content": f"Research plan: {plan['plan']}\nPlease execute this plan and provide detailed findings."
                }
            ],
            temperature=1,
        )
        
        result = {
            "findings": response.choices[0].message.content,
            "model": response.model,
            "usage": {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens,
            }
        }
        
        span.set_attribute("result.total_tokens", result["usage"]["total_tokens"])
        
        return result

## 6. Define Temporal Workflow

Create a workflow that orchestrates the research activities. Temporal ensures the workflow executes reliably, even if failures occur.

In [None]:
from temporalio import workflow
from datetime import timedelta

@workflow.defn
class ResearchWorkflow:
    @workflow.run
    async def run(self, search_term: str) -> dict:
        """Execute a complete research workflow."""
        
        # Step 1: Create research plan
        plan = await workflow.execute_activity(
            plan_research,
            search_term,
            start_to_close_timeout=timedelta(seconds=30),
        )
        
        # Step 2: Execute research
        findings = await workflow.execute_activity(
            execute_research,
            plan,
            start_to_close_timeout=timedelta(seconds=60),
        )
        
        return {
            "search_term": search_term,
            "plan": plan,
            "findings": findings,
            "status": "completed"
        }

## 7. Run the Workflow

Execute the research workflow. This will send all traces to Langfuse via OpenTelemetry.

**Note**: This requires a running Temporal server. You can start a local dev server with:
```bash
temporal server start-dev
```

In [None]:
from temporalio.client import Client
from temporalio.worker import Worker
import asyncio

async def main():
    # Connect to Temporal
    client = await Client.connect(TEMPORAL_HOST)
    
    # Start a worker in the background
    async with Worker(
        client,
        task_queue="research-tasks",
        workflows=[ResearchWorkflow],
        activities=[plan_research, execute_research],
    ):
        # Execute the workflow
        result = await client.execute_workflow(
            ResearchWorkflow.run,
            "Caribbean travel tips April 2023",
            id="research-workflow-001",
            task_queue="research-tasks",
        )
        
        print("\n" + "="*50)
        print("WORKFLOW COMPLETED")
        print("="*50)
        print(f"\nSearch Term: {result['search_term']}")
        print(f"\nStatus: {result['status']}")
        print(f"\nPlan Tokens: {result['plan']['usage']['total_tokens']}")
        print(f"Findings Tokens: {result['findings']['usage']['total_tokens']}")
        print(f"\nFindings Preview: {result['findings']['findings'][:200]}...")

# Run the workflow
await main()

## 8. View Traces in Langfuse

After running the workflow, you can view the complete trace in Langfuse. The trace will show:

- **Workflow execution**: The entire `ResearchWorkflow` with timing and status
- **Activity spans**: Each activity (`plan_research`, `execute_research`) as nested spans
- **LLM calls**: OpenAI API calls with prompts, completions, and token usage
- **Cost tracking**: Estimated costs based on token usage
- **Latency metrics**: Time spent in each component

![Example trace in Langfuse](https://langfuse.com/images/cookbook/integration_temporal/temporal-research-workflow-trace.png)

**Example Trace**: [View in Langfuse](https://cloud.langfuse.com/project/cloramnkj0002jz088vzn1ja4/traces/08d49878ef6fd5985bf82b84b2dd5383?timestamp=2025-10-13T12%3A50%3A09.574Z&display=preview&observation=a29ef94f77c13acf)

The trace view helps you:
- Debug workflow failures by seeing exactly where errors occurred
- Optimize performance by identifying slow activities
- Monitor costs by tracking token usage across all LLM calls
- Evaluate output quality by reviewing prompts and completions

## 9. Advanced: Custom Trace Attributes

You can add custom attributes to your spans for better filtering and analysis in Langfuse:

In [None]:
@activity.defn
async def advanced_research(search_term: str, user_id: str, session_id: str) -> dict:
    """Research activity with custom trace attributes."""
    tracer = trace.get_tracer(__name__)
    
    with tracer.start_as_current_span("AdvancedResearch") as span:
        # Add custom Langfuse attributes
        span.set_attribute("langfuse.trace.user_id", user_id)
        span.set_attribute("langfuse.trace.session_id", session_id)
        span.set_attribute("langfuse.trace.tags", ["research", "production"])
        span.set_attribute("langfuse.generation.name", "research-planner")
        
        # Your activity logic here
        response = client.chat.completions.create(
            model="gpt-4.1-2025-04-14",
            messages=[{"role": "user", "content": search_term}],
        )
        
        return {"result": response.choices[0].message.content}

## Resources

- [Temporal Documentation](https://docs.temporal.io/)
- [Langfuse OpenTelemetry Integration Guide](https://langfuse.com/docs/integrations/opentelemetry)
- [OpenTelemetry Python SDK](https://opentelemetry.io/docs/languages/python/)

<!-- MARKDOWN_COMPONENT name: "LearnMore" path: "@/components-mdx/integration-learn-more.mdx" -->