# Trace Ingestion with Tinman

This notebook demonstrates how to ingest traces from various observability platforms
into Tinman for failure analysis.

Supported formats:
- OpenTelemetry (OTLP)
- Datadog APM
- AWS X-Ray
- Generic JSON

In [None]:
from tinman.ingest import (
    OTLPAdapter,
    DatadogAdapter,
    XRayAdapter,
    JSONAdapter,
    AdapterRegistry,
    parse_traces,
    Trace,
    Span,
    SpanStatus,
)

## 1. OpenTelemetry (OTLP) Traces

OTLP is the native format for OpenTelemetry data.

In [None]:
# Example OTLP data
otlp_data = {
    "resourceSpans": [
        {
            "resource": {
                "attributes": [
                    {"key": "service.name", "value": {"stringValue": "ai-assistant"}},
                    {"key": "service.version", "value": {"stringValue": "1.2.3"}},
                ]
            },
            "scopeSpans": [
                {
                    "spans": [
                        {
                            "traceId": "0123456789abcdef0123456789abcdef",
                            "spanId": "0123456789abcdef",
                            "name": "llm.completion",
                            "kind": 2,
                            "startTimeUnixNano": "1704067200000000000",
                            "endTimeUnixNano": "1704067203000000000",
                            "status": {"code": 2, "message": "Tool execution failed"},
                            "attributes": [
                                {"key": "llm.model", "value": {"stringValue": "claude-3"}},
                                {"key": "llm.tokens.input", "value": {"intValue": "1500"}},
                                {"key": "llm.tokens.output", "value": {"intValue": "500"}},
                            ],
                            "events": [
                                {
                                    "name": "exception",
                                    "timeUnixNano": "1704067202500000000",
                                    "attributes": [
                                        {
                                            "key": "exception.type",
                                            "value": {"stringValue": "ToolExecutionError"},
                                        },
                                        {
                                            "key": "exception.message",
                                            "value": {
                                                "stringValue": "Invalid parameters for search tool"
                                            },
                                        },
                                    ],
                                }
                            ],
                        }
                    ]
                }
            ],
        }
    ]
}

# Parse with OTLP adapter
adapter = OTLPAdapter()
traces = list(adapter.parse(otlp_data))

print(f"Parsed {len(traces)} trace(s)")
for trace in traces:
    print(f"\nTrace ID: {trace.trace_id}")
    print(f"Spans: {trace.span_count}")
    print(f"Has errors: {trace.has_errors}")

    for span in trace.spans:
        print(f"  - {span.name} ({span.duration_ms:.0f}ms) [{span.status.value}]")
        if span.has_exception():
            for exc in span.get_exceptions():
                print(f"    Exception: {exc['type']}: {exc['message']}")

## 2. Datadog APM Traces

Import traces from Datadog's APM format.

In [None]:
# Example Datadog data
datadog_data = [
    [
        {
            "trace_id": 12345678901234567890,
            "span_id": 9876543210987654321,
            "name": "web.request",
            "service": "ai-gateway",
            "resource": "/v1/chat/completions",
            "type": "web",
            "start": 1704067200000000000,
            "duration": 2500000000,
            "error": 1,
            "meta": {
                "http.method": "POST",
                "http.url": "/v1/chat/completions",
                "http.status_code": "500",
                "error.type": "RateLimitError",
                "error.msg": "Rate limit exceeded for model claude-3-opus",
            },
            "metrics": {"_dd.measured": 1},
        },
        {
            "trace_id": 12345678901234567890,
            "span_id": 1111111111111111111,
            "parent_id": 9876543210987654321,
            "name": "llm.call",
            "service": "ai-gateway",
            "resource": "claude-3-opus",
            "type": "custom",
            "start": 1704067200100000000,
            "duration": 2400000000,
            "error": 1,
            "meta": {
                "llm.model": "claude-3-opus",
                "error.type": "RateLimitError",
                "error.msg": "429 Too Many Requests",
            },
        },
    ]
]

# Parse with Datadog adapter
adapter = DatadogAdapter()
traces = list(adapter.parse(datadog_data))

print(f"Parsed {len(traces)} trace(s)")
for trace in traces:
    print(f"\nTrace ID: {trace.trace_id}")
    print(f"Services: {trace.services}")
    print(f"Error spans: {len(trace.error_spans)}")

    for span in trace.spans:
        status_icon = "❌" if span.is_error else "✓"
        print(f"  {status_icon} {span.name} - {span.service_name}")

## 3. AWS X-Ray Traces

Parse traces from AWS X-Ray format.

In [None]:
# Example X-Ray data
xray_data = {
    "Traces": [
        {
            "Segments": [
                {
                    "trace_id": "1-5f84c7a5-abc123def456789012345678",
                    "id": "abc123def456",
                    "name": "ai-assistant-lambda",
                    "start_time": 1704067200.0,
                    "end_time": 1704067205.0,
                    "origin": "AWS::Lambda::Function",
                    "fault": True,
                    "http": {
                        "request": {"method": "POST", "url": "https://api.example.com/invoke"},
                        "response": {"status": 500},
                    },
                    "cause": {
                        "exceptions": [
                            {
                                "id": "exc-001",
                                "type": "TimeoutError",
                                "message": "Lambda execution timed out after 30 seconds",
                            }
                        ]
                    },
                    "subsegments": [
                        {
                            "id": "sub-001",
                            "name": "anthropic-api-call",
                            "start_time": 1704067201.0,
                            "end_time": 1704067204.0,
                            "namespace": "remote",
                            "error": True,
                            "annotations": {"model": "claude-3", "tokens_used": 5000},
                        }
                    ],
                }
            ]
        }
    ]
}

# Parse with X-Ray adapter
adapter = XRayAdapter()
traces = list(adapter.parse(xray_data))

print(f"Parsed {len(traces)} trace(s)")
for trace in traces:
    print(f"\nTrace ID: {trace.trace_id}")
    print(f"Duration: {trace.duration_ms:.0f}ms")

    for span in trace.spans:
        indent = "  " if span.is_root else "    "
        print(f"{indent}{span.name} [{span.kind}] - {span.duration_ms:.0f}ms")
        if span.has_exception():
            for exc in span.get_exceptions():
                print(f"{indent}  ⚠️ {exc['type']}: {exc['message']}")

## 4. Auto-Detection with Registry

The adapter registry can automatically detect the trace format.

In [None]:
# Auto-detect and parse
registry = AdapterRegistry()
registry.register(OTLPAdapter)
registry.register(DatadogAdapter)
registry.register(XRayAdapter)
registry.register(JSONAdapter)

# Test auto-detection
print("Format detection results:")
print(f"OTLP data -> {registry.detect_format(otlp_data)}")
print(f"Datadog data -> {registry.detect_format(datadog_data)}")
print(f"X-Ray data -> {registry.detect_format(xray_data)}")

# Parse with auto-detection
traces = registry.parse_auto(otlp_data)
print(f"\nAuto-parsed {len(traces)} trace(s) from OTLP data")

## 5. Analyzing Traces for Failures

Extract failure patterns from ingested traces.

In [None]:
def analyze_trace_for_failures(trace: Trace) -> list[dict]:
    """Extract potential failures from a trace."""
    findings = []

    for span in trace.error_spans:
        finding = {
            "trace_id": trace.trace_id,
            "span_id": span.span_id,
            "service": span.service_name,
            "operation": span.name,
            "duration_ms": span.duration_ms,
            "status_message": span.status_message,
            "exceptions": span.get_exceptions(),
            "attributes": span.attributes,
        }
        findings.append(finding)

    return findings


# Analyze all parsed traces
all_traces = (
    list(OTLPAdapter().parse(otlp_data))
    + list(DatadogAdapter().parse(datadog_data))
    + list(XRayAdapter().parse(xray_data))
)

print(f"\nAnalyzing {len(all_traces)} traces for failures...\n")

all_findings = []
for trace in all_traces:
    findings = analyze_trace_for_failures(trace)
    all_findings.extend(findings)

print(f"Found {len(all_findings)} potential failures:\n")

for i, finding in enumerate(all_findings, 1):
    print(f"{i}. {finding['service']} / {finding['operation']}")
    print(f"   Duration: {finding['duration_ms']:.0f}ms")
    if finding["exceptions"]:
        exc = finding["exceptions"][0]
        print(f"   Error: {exc.get('type', 'Unknown')}: {exc.get('message', 'No message')}")
    print()

## 6. Ingesting Traces into Tinman

Feed traces to Tinman for automated failure analysis.

In [None]:
from tinman import create_tinman
from tinman.config.modes import OperatingMode


async def ingest_and_analyze():
    # Create Tinman instance
    tinman = await create_tinman(
        mode=OperatingMode.LAB,
        skip_db=True,
    )

    # Parse traces
    traces = list(OTLPAdapter().parse(otlp_data))

    # Feed traces to Tinman for analysis
    for trace in traces:
        # Add error spans as observations
        for span in trace.error_spans:
            observation = {
                "type": "trace_error",
                "trace_id": trace.trace_id,
                "span_name": span.name,
                "service": span.service_name,
                "error_type": span.get_exceptions()[0].get("type")
                if span.has_exception()
                else "unknown",
                "error_message": span.status_message or "",
                "attributes": span.attributes,
            }

            # Store in memory graph for analysis
            if tinman.graph:
                tinman.graph.add_observation(
                    source="trace_ingestion",
                    observation_type="error_span",
                    data=observation,
                )

    print(f"Ingested {len(traces)} traces")
    print(f"\nAsking Tinman to analyze...")

    # Ask Tinman to analyze the ingested data
    response = await tinman.discuss(
        "I've just ingested trace data showing errors. "
        "Can you analyze the patterns and suggest hypotheses for investigation?"
    )

    print(f"\nTinman's Analysis:\n{response}")

    await tinman.close()


await ingest_and_analyze()

## Next Steps

- Set up continuous trace ingestion from your observability platform
- Configure alert rules based on failure patterns
- Create custom adapters for proprietary trace formats