OpenTelemetry-based tracing SDK for AI applications.
Using pip:
pip install msgtrace-sdkUsing uv (recommended):
uv add msgtrace-sdkimport os
from msgtrace.sdk import Spans, MsgTraceAttributes
# Enable tracing
os.environ["MSGTRACE_TELEMETRY_ENABLED"] = "true"
os.environ["MSGTRACE_OTLP_ENDPOINT"] = "http://localhost:8000/api/v1/traces/export"
# Mock function for demonstration
def chat_completion(prompt):
"""Simulate LLM API call."""
return {"content": "AI is artificial intelligence", "tokens": {"input": 100, "output": 50}}
# Trace your AI operations
with Spans.span_context(name="chat_completion"):
MsgTraceAttributes.set_model("gpt-5")
MsgTraceAttributes.set_operation_name("chat")
# Your AI logic here
response = chat_completion("What is AI?")
MsgTraceAttributes.set_usage(
input_tokens=response["tokens"]["input"],
output_tokens=response["tokens"]["output"]
)
MsgTraceAttributes.set_cost(input_cost=0.003, output_cost=0.0015)- ✅ Zero-overhead when disabled
- ✅ Thread-safe singleton pattern
- ✅ Async-first with sync support
- ✅ 60+ OpenTelemetry attributes for AI/GenAI
- ✅ Context managers and decorators
All configuration via environment variables:
# Enable/disable tracing
MSGTRACE_TELEMETRY_ENABLED=true
# OTLP endpoint
MSGTRACE_OTLP_ENDPOINT=http://localhost:8000/api/v1/traces/export
# Exporter type (otlp or console)
MSGTRACE_EXPORTER=otlp
# Service name
MSGTRACE_SERVICE_NAME=my-ai-app
# Capture platform info
MSGTRACE_CAPTURE_PLATFORM=truefrom msgtrace.sdk import Spans
# Basic span
with Spans.span_context("operation_name"):
# Your code here
pass
# Flow-level span (top-level operation)
with Spans.init_flow("user_query_flow"):
# Flow logic
pass
# Module-level span
with Spans.init_module("vector_search"):
# Module logic
pass
# Async spans
async with Spans.aspan_context("async_operation"):
await some_async_function()
# Decorators
@Spans.instrument("process_data")
def process(data: str):
return data.upper()
@Spans.ainstrument("async_process")
async def async_process(data: str):
return await process_async(data)All attributes follow OpenTelemetry GenAI semantic conventions:
from msgtrace.sdk import MsgTraceAttributes
# Operation
MsgTraceAttributes.set_operation_name("chat") # chat, tool, agent, embedding
MsgTraceAttributes.set_system("openai") # openai, anthropic, google
# Model & Parameters
MsgTraceAttributes.set_model("gpt-5")
MsgTraceAttributes.set_temperature(0.7)
MsgTraceAttributes.set_max_tokens(1000)
# Prompt & Completion
MsgTraceAttributes.set_prompt("What is AI?")
MsgTraceAttributes.set_prompt([
{"role": "system", "content": "You are helpful"},
{"role": "user", "content": "What is AI?"}
])
MsgTraceAttributes.set_completion("AI is artificial intelligence...")
# Usage & Cost
MsgTraceAttributes.set_usage(input_tokens=100, output_tokens=50)
MsgTraceAttributes.set_cost(input_cost=0.003, output_cost=0.0015, currency="USD")
# Tools
MsgTraceAttributes.set_tool_name("search_web")
MsgTraceAttributes.set_tool_call_arguments({"query": "AI", "limit": 5})
MsgTraceAttributes.set_tool_response({"results": ["a", "b", "c"]})
# Agent
MsgTraceAttributes.set_agent_name("research_agent")
MsgTraceAttributes.set_agent_id("agent_001")
MsgTraceAttributes.set_agent_type("autonomous")
# Workflow
MsgTraceAttributes.set_workflow_name("user_query_flow")
MsgTraceAttributes.set_workflow_id("wf_123")
MsgTraceAttributes.set_user_id("user_456")
MsgTraceAttributes.set_session_id("session_789")
# Custom attributes
MsgTraceAttributes.set_custom("business_metric", 99.9)
MsgTraceAttributes.set_custom("metadata", {"key": "value"})import os
from msgtrace.sdk import Spans, MsgTraceAttributes
os.environ["MSGTRACE_TELEMETRY_ENABLED"] = "true"
# Mock LLM API call
def call_llm(prompt):
"""Simulate OpenAI API call."""
return {
"id": "resp_123",
"content": "AI is artificial intelligence...",
"usage": {"input_tokens": 10, "output_tokens": 50}
}
with Spans.span_context("chat_completion"):
# Request
MsgTraceAttributes.set_operation_name("chat")
MsgTraceAttributes.set_system("openai")
MsgTraceAttributes.set_model("gpt-5")
MsgTraceAttributes.set_temperature(0.7)
prompt = "What is AI?"
MsgTraceAttributes.set_prompt(prompt)
# API call
response = call_llm(prompt)
# Response
MsgTraceAttributes.set_response_id(response["id"])
MsgTraceAttributes.set_finish_reason("stop")
MsgTraceAttributes.set_completion(response["content"])
MsgTraceAttributes.set_usage(
input_tokens=response["usage"]["input_tokens"],
output_tokens=response["usage"]["output_tokens"]
)
MsgTraceAttributes.set_cost(input_cost=0.0015, output_cost=0.0005)with Spans.init_flow("research_flow"):
MsgTraceAttributes.set_workflow_name("research_agent")
MsgTraceAttributes.set_user_id("user_123")
# Tool execution
with Spans.init_module("tool_search"):
MsgTraceAttributes.set_operation_name("tool")
MsgTraceAttributes.set_tool_name("search_web")
MsgTraceAttributes.set_tool_call_arguments({"query": "AI"})
# Execute tool
# results = search_web("AI")
MsgTraceAttributes.set_tool_response({"results": [...]})
# LLM processing
with Spans.init_module("llm_synthesis"):
MsgTraceAttributes.set_operation_name("chat")
MsgTraceAttributes.set_model("gpt-5")
MsgTraceAttributes.set_usage(input_tokens=200, output_tokens=100)
MsgTraceAttributes.set_cost(input_cost=0.006, output_cost=0.003)@Spans.set_tool_attributes("search_db", description="Search database")
@Spans.instrument("database_search")
def search(query: str):
MsgTraceAttributes.set_tool_call_arguments({"query": query})
# Database search
results = db.search(query)
MsgTraceAttributes.set_tool_response({"count": len(results)})
return results
# Call it
results = search("AI research")import asyncio
from msgtrace.sdk import Spans, MsgTraceAttributes
# Mock async API call
async def async_api_call(prompt):
"""Simulate async LLM API call."""
await asyncio.sleep(0.1)
return {"content": "AI response", "tokens": {"input": 50, "output": 30}}
@Spans.ainstrument("async_chat")
async def chat_completion(prompt: str):
MsgTraceAttributes.set_operation_name("chat")
MsgTraceAttributes.set_model("gpt-5")
# Async API call
response = await async_api_call(prompt)
MsgTraceAttributes.set_usage(
input_tokens=response["tokens"]["input"],
output_tokens=response["tokens"]["output"]
)
return response["content"]
# Use it
async def main():
async with Spans.ainit_flow("async_flow"):
result = await chat_completion("What is AI?")
print(result)
# Run
asyncio.run(main())Create custom decorators to capture function arguments and outputs:
from functools import wraps
from msgtrace.sdk import Spans, MsgTraceAttributes
def trace_function(operation_name: str = None):
"""Custom decorator that captures function arguments and output."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Use function name if operation_name not provided
span_name = operation_name or func.__name__
with Spans.span_context(span_name):
# Capture function arguments
MsgTraceAttributes.set_custom("function.args", list(args))
MsgTraceAttributes.set_custom("function.kwargs", kwargs)
# Execute function
result = func(*args, **kwargs)
# Capture output (be careful with large outputs)
MsgTraceAttributes.set_custom("function.output", str(result)[:1000])
return result
return wrapper
return decorator
# Usage
@trace_function("calculate_price")
def calculate_price(base_price: float, discount: float = 0.0):
return base_price * (1 - discount)
result = calculate_price(100.0, discount=0.2)
# Traces: function.args=[100.0], function.kwargs={'discount': 0.2}, function.output='80.0'import asyncio
from functools import wraps
from msgtrace.sdk import Spans, MsgTraceAttributes
def trace_async_function(operation_name: str = None):
"""Custom decorator for async functions."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
span_name = operation_name or func.__name__
async with Spans.aspan_context(span_name):
# Capture inputs
MsgTraceAttributes.set_custom("function.args", list(args))
MsgTraceAttributes.set_custom("function.kwargs", kwargs)
# Execute async function
result = await func(*args, **kwargs)
# Capture output
MsgTraceAttributes.set_custom("function.output", str(result)[:1000])
return result
return wrapper
return decorator
# Usage
@trace_async_function("fetch_user_data")
async def fetch_user_data(user_id: str):
# Simulate async API call
await asyncio.sleep(0.1)
return {"id": user_id, "name": "John Doe"}def trace_llm_call(model: str, provider: str = "openai"):
"""Specialized decorator for LLM calls."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
with Spans.span_context(f"llm_{func.__name__}"):
# Set LLM attributes
MsgTraceAttributes.set_operation_name("chat")
MsgTraceAttributes.set_model(model)
MsgTraceAttributes.set_system(provider)
# Capture prompt (first argument)
if args:
MsgTraceAttributes.set_prompt(str(args[0]))
# Execute LLM call
result = func(*args, **kwargs)
# Capture completion
if isinstance(result, dict) and "content" in result:
MsgTraceAttributes.set_completion(result["content"])
# Capture usage if available
if "usage" in result:
usage = result["usage"]
MsgTraceAttributes.set_usage(
input_tokens=usage.get("input_tokens", 0),
output_tokens=usage.get("output_tokens", 0)
)
return result
return wrapper
return decorator
# Usage
@trace_llm_call(model="gpt-5", provider="openai")
def ask_llm(prompt: str):
# Your LLM API call here
return {
"content": "AI is artificial intelligence...",
"usage": {"input_tokens": 10, "output_tokens": 50}
}def trace_with_error_handling(operation_name: str = None):
"""Decorator that captures exceptions and function metadata."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
span_name = operation_name or func.__name__
with Spans.span_context(span_name):
# Set function metadata
MsgTraceAttributes.set_custom("function.name", func.__name__)
MsgTraceAttributes.set_custom("function.module", func.__module__)
try:
# Capture inputs
MsgTraceAttributes.set_custom("function.args", list(args))
MsgTraceAttributes.set_custom("function.kwargs", kwargs)
# Execute function
result = func(*args, **kwargs)
# Mark as successful
MsgTraceAttributes.set_custom("function.success", True)
MsgTraceAttributes.set_custom("function.output_type", type(result).__name__)
return result
except Exception as e:
# Capture error details
MsgTraceAttributes.set_custom("function.success", False)
MsgTraceAttributes.set_custom("error.type", type(e).__name__)
MsgTraceAttributes.set_custom("error.message", str(e))
raise
return wrapper
return decorator
# Usage
@trace_with_error_handling("risky_operation")
def divide(a: float, b: float):
return a / b
try:
result = divide(10, 0) # Will trace the error
except ZeroDivisionError:
pass- Limit captured data size: Truncate large strings/objects
- Sanitize sensitive data: Don't capture passwords, API keys, etc.
- Use appropriate attribute names: Clear, descriptive keys
- Handle exceptions properly: Let exceptions propagate after capturing
- Combine with built-in decorators: Stack with
@Spans.instrument()
For better visualization in the msgtrace frontend, follow these naming conventions:
Use the module.type attribute to categorize spans for specialized visualizations:
from msgtrace.sdk import Spans, MsgTraceAttributes
# Agent visualization
with Spans.init_module("research_agent"):
MsgTraceAttributes.set_custom("module.type", "Agent")
MsgTraceAttributes.set_agent_name("research_agent")
# Agent logic here
# Tool visualization
with Spans.init_module("web_search"):
MsgTraceAttributes.set_custom("module.type", "Tool")
MsgTraceAttributes.set_tool_name("search_web")
# Tool execution here
# Transcriber visualization
with Spans.init_module("speech_to_text"):
MsgTraceAttributes.set_custom("module.type", "Transcriber")
# Transcription logic here
# LLM visualization
with Spans.init_module("llm_call"):
MsgTraceAttributes.set_custom("module.type", "LLM")
MsgTraceAttributes.set_model("gpt-5")
# LLM call here| Type | Description | Visualization |
|---|---|---|
Agent |
Autonomous agents | Agent flow diagram |
Tool |
Tool executions | Tool analytics |
LLM |
LLM API calls | Token/cost analysis |
Transcriber |
Speech-to-text | Audio processing view |
Retriever |
Vector/DB search | Retrieval metrics |
Embedder |
Text embedding | Embedding analytics |
Custom |
Custom operations | Generic span view |
# ✅ Good: Descriptive and consistent
with Spans.init_module("data_retrieval"):
MsgTraceAttributes.set_custom("module.type", "Retriever")
MsgTraceAttributes.set_custom("module.name", "vector_search")
# ✅ Good: Clear hierarchy
with Spans.init_flow("user_query"):
with Spans.init_module("intent_classifier"):
MsgTraceAttributes.set_custom("module.type", "LLM")
with Spans.init_module("response_generator"):
MsgTraceAttributes.set_custom("module.type", "Agent")
# ❌ Bad: Vague names
with Spans.init_module("process"): # What process?
pass
# ❌ Bad: Inconsistent typing
with Spans.init_module("tool_call"):
MsgTraceAttributes.set_custom("module.type", "tool") # Should be "Tool"from msgtrace.sdk import Spans, MsgTraceAttributes
with Spans.init_flow("customer_support_query"):
MsgTraceAttributes.set_workflow_name("support_agent")
MsgTraceAttributes.set_user_id("user_123")
# Step 1: Classify intent
with Spans.init_module("intent_classification"):
MsgTraceAttributes.set_custom("module.type", "LLM")
MsgTraceAttributes.set_custom("module.name", "intent_classifier")
MsgTraceAttributes.set_model("gpt-5")
# Classification logic
# Step 2: Search knowledge base
with Spans.init_module("knowledge_retrieval"):
MsgTraceAttributes.set_custom("module.type", "Retriever")
MsgTraceAttributes.set_custom("module.name", "vector_db")
# Vector search logic
# Step 3: Execute tool if needed
with Spans.init_module("order_lookup"):
MsgTraceAttributes.set_custom("module.type", "Tool")
MsgTraceAttributes.set_custom("module.name", "order_api")
MsgTraceAttributes.set_tool_name("get_order_status")
# Tool execution
# Step 4: Generate response
with Spans.init_module("response_generation"):
MsgTraceAttributes.set_custom("module.type", "Agent")
MsgTraceAttributes.set_custom("module.name", "response_agent")
MsgTraceAttributes.set_agent_name("support_responder")
# Agent response logicThese conventions enable the msgtrace frontend to:
- Group related operations by type
- Generate specialized visualizations (agent flows, tool analytics)
- Calculate type-specific metrics (LLM costs, tool latencies)
- Provide better filtering and search capabilities
- Enable conditionally: Use environment variables to control tracing
- Set attributes early: Set operation/model before execution
- Use decorators: For frequently instrumented functions
- Nest properly: Flow → Module → Span hierarchy
- Handle errors: Let context managers auto-record exceptions
- Shutdown gracefully: Call
tracer_manager.shutdown()at exit
All operations are thread-safe:
- TracerManager uses RLock for initialization
- OpenTelemetry SDK is thread-safe
- Multiple threads can create spans simultaneously
When MSGTRACE_TELEMETRY_ENABLED=false:
- Tracer initialization is lazy (no cost until used)
- No-op tracer is created (minimal overhead)
- Attribute setters check
span.is_recording()(fast path)
We welcome contributions! Please see CONTRIBUTING.md for:
- Development setup
- Testing and code quality guidelines
- Pull request process
- Release workflow
For automation details (CI/CD, bots, etc.), see AUTOMATION.md.
MIT License - see LICENSE file for details.