Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/warm-start-external-trace-context-leak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Fix external trace context leaking across runs on warm-started workers with `processKeepAlive` enabled. Every subsequent run's attempt span was being exported with the first run's `traceId` and `parentSpanId`, breaking causal-chain navigation in external APM tools. Runs without an external trace context are unaffected.
93 changes: 44 additions & 49 deletions packages/core/src/v3/otel/tracingSDK.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,12 @@ export class TracingSDK {
);

const externalTraceId = idGenerator.generateTraceId();
const externalTraceContext = traceContext.getExternalTraceContext();

for (const exporter of config.exporters ?? []) {
spanProcessors.push(
getEnvVar("TRIGGER_OTEL_BATCH_PROCESSING_ENABLED") === "1"
? new BatchSpanProcessor(
new ExternalSpanExporterWrapper(exporter, externalTraceId, externalTraceContext),
new ExternalSpanExporterWrapper(exporter, externalTraceId),
{
maxExportBatchSize: parseInt(
getEnvVar("TRIGGER_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE") ?? "64"
Expand All @@ -192,7 +191,7 @@ export class TracingSDK {
}
)
: new SimpleSpanProcessor(
new ExternalSpanExporterWrapper(exporter, externalTraceId, externalTraceContext)
new ExternalSpanExporterWrapper(exporter, externalTraceId)
)
);
}
Expand Down Expand Up @@ -245,11 +244,7 @@ export class TracingSDK {
logProcessors.push(
getEnvVar("TRIGGER_OTEL_BATCH_PROCESSING_ENABLED") === "1"
? new BatchLogRecordProcessor(
new ExternalLogRecordExporterWrapper(
externalLogExporter,
externalTraceId,
externalTraceContext
),
new ExternalLogRecordExporterWrapper(externalLogExporter, externalTraceId),
{
maxExportBatchSize: parseInt(
getEnvVar("TRIGGER_OTEL_LOG_MAX_EXPORT_BATCH_SIZE") ?? "64"
Expand All @@ -264,11 +259,7 @@ export class TracingSDK {
}
)
: new SimpleLogRecordProcessor(
new ExternalLogRecordExporterWrapper(
externalLogExporter,
externalTraceId,
externalTraceContext
)
new ExternalLogRecordExporterWrapper(externalLogExporter, externalTraceId)
)
);
}
Expand Down Expand Up @@ -417,32 +408,32 @@ function setLogLevel(level: TracingDiagnosticLogLevel) {
diag.setLogger(new DiagConsoleLogger(), diagLogLevel);
}

class ExternalSpanExporterWrapper {
private readonly _isExternallySampled: boolean;

export class ExternalSpanExporterWrapper {
constructor(
private underlyingExporter: SpanExporter,
private externalTraceId: string,
private externalTraceContext:
| { traceId: string; spanId: string; traceFlags: number; tracestate?: string }
| undefined
) {
this._isExternallySampled = externalTraceContext
? isTraceFlagSampled(externalTraceContext.traceFlags)
: !!externalTraceId;
}
private externalTraceId: string
) {}

private transformSpan(span: ReadableSpan): ReadableSpan | undefined {
if (!this._isExternallySampled) {
// Read external context live, so per-run reassignment of
// standardTraceContextManager.traceContext is honoured on warm-started
// workers that reuse a single TracingSDK across runs.
const externalTraceContext = traceContext.getExternalTraceContext();

const isExternallySampled = externalTraceContext
? isTraceFlagSampled(externalTraceContext.traceFlags)
: !!this.externalTraceId;

if (!isExternallySampled) {
return;
}

if (isSpanInternalOnly(span)) {
return;
}

const externalTraceId = this.externalTraceContext
? this.externalTraceContext.traceId
const externalTraceId = externalTraceContext
? externalTraceContext.traceId
: this.externalTraceId;

const isAttemptSpan = span.attributes[SemanticInternalAttributes.SPAN_ATTEMPT];
Expand All @@ -457,15 +448,15 @@ class ExternalSpanExporterWrapper {
};
}

if (isAttemptSpan && this.externalTraceContext) {
if (isAttemptSpan && externalTraceContext) {
parentSpanContext = {
...parentSpanContext,
traceId: externalTraceId,
spanId: this.externalTraceContext.spanId,
traceState: this.externalTraceContext.tracestate
? new TraceState(this.externalTraceContext.tracestate)
spanId: externalTraceContext.spanId,
traceState: externalTraceContext.tracestate
? new TraceState(externalTraceContext.tracestate)
: undefined,
traceFlags: this.externalTraceContext.traceFlags,
traceFlags: externalTraceContext.traceFlags,
};
} else if (isAttemptSpan) {
parentSpanContext = undefined;
Expand Down Expand Up @@ -502,28 +493,27 @@ class ExternalSpanExporterWrapper {
}

class ExternalLogRecordExporterWrapper {
private readonly _isExternallySampled: boolean;

constructor(
private underlyingExporter: LogRecordExporter,
private externalTraceId: string,
private externalTraceContext:
| { traceId: string; spanId: string; tracestate?: string; traceFlags: number }
| undefined
) {
this._isExternallySampled = externalTraceContext
? isTraceFlagSampled(externalTraceContext.traceFlags)
: !!externalTraceId;
}
private externalTraceId: string
) {}

export(logs: any[], resultCallback: (result: any) => void): void {
if (!this._isExternallySampled) {
const externalTraceContext = traceContext.getExternalTraceContext();

const isExternallySampled = externalTraceContext
? isTraceFlagSampled(externalTraceContext.traceFlags)
: !!this.externalTraceId;

if (!isExternallySampled) {
this.underlyingExporter.export([], resultCallback);

return;
}

const modifiedLogs = logs.map(this.transformLogRecord.bind(this));
const modifiedLogs = logs.map((log) =>
this.transformLogRecord(log, externalTraceContext)
);

this.underlyingExporter.export(modifiedLogs, resultCallback);
}
Expand All @@ -532,11 +522,16 @@ class ExternalLogRecordExporterWrapper {
return this.underlyingExporter.shutdown();
}

transformLogRecord(logRecord: ReadableLogRecord): ReadableLogRecord {
transformLogRecord(
logRecord: ReadableLogRecord,
externalTraceContext:
| { traceId: string; spanId: string; tracestate?: string; traceFlags: number }
| undefined
): ReadableLogRecord {
// Capture externalTraceId for use within the proxy's scope.
// Use externalTraceContext.traceId if available, otherwise fall back to generated externalTraceId
const externalTraceId = this.externalTraceContext
? this.externalTraceContext.traceId
const externalTraceId = externalTraceContext
? externalTraceContext.traceId
: this.externalTraceId;

// If there's no spanContext, or if the externalTraceId is not set, return the original logRecord.
Expand Down
83 changes: 83 additions & 0 deletions packages/core/test/externalSpanExporterWrapper.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { SpanKind, SpanStatusCode, TraceFlags } from "@opentelemetry/api";
import type { ReadableSpan, SpanExporter } from "@opentelemetry/sdk-trace-node";
import { beforeEach, describe, expect, it } from "vitest";
import { ExternalSpanExporterWrapper } from "../src/v3/otel/tracingSDK.js";
import { SemanticInternalAttributes } from "../src/v3/semanticInternalAttributes.js";
import { traceContext } from "../src/v3/trace-context-api.js";
import { StandardTraceContextManager } from "../src/v3/traceContext/manager.js";
Comment thread
coderabbitai[bot] marked this conversation as resolved.

const TRACEPARENT_RUN_A = "00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-1111111111111111-01";
const TRACEPARENT_RUN_B = "00-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb-2222222222222222-01";

function createAttemptSpan(): ReadableSpan {
const spanCtx = {
traceId: "cccccccccccccccccccccccccccccccc",
spanId: "3333333333333333",
traceFlags: TraceFlags.SAMPLED,
};
return {
name: "Attempt 1",
kind: SpanKind.CONSUMER,
spanContext: () => spanCtx,
parentSpanContext: undefined,
startTime: [0, 0],
endTime: [0, 0],
status: { code: SpanStatusCode.UNSET },
attributes: { [SemanticInternalAttributes.SPAN_ATTEMPT]: true },
links: [],
events: [],
duration: [0, 0],
ended: true,
resource: {} as any,
instrumentationLibrary: { name: "test" } as any,
droppedAttributesCount: 0,
droppedEventsCount: 0,
droppedLinksCount: 0,
} as unknown as ReadableSpan;
}

function makeCapturingExporter(): { exporter: SpanExporter; captured: ReadableSpan[][] } {
const captured: ReadableSpan[][] = [];
const exporter: SpanExporter = {
export: (spans, cb) => {
captured.push(spans);
cb({ code: 0 } as any);
},
shutdown: () => Promise.resolve(),
forceFlush: () => Promise.resolve(),
};
return { exporter, captured };
}

describe("ExternalSpanExporterWrapper warm-start regression", () => {
let manager: StandardTraceContextManager;

beforeEach(() => {
manager = new StandardTraceContextManager();
traceContext.setGlobalManager(manager);
});

it("rewrites attempt spans using the manager's current external context, not the value captured at construction", () => {
const { exporter, captured } = makeCapturingExporter();

manager.traceContext = { external: { traceparent: TRACEPARENT_RUN_A } };

const wrapper = new ExternalSpanExporterWrapper(
exporter,
"ffffffffffffffffffffffffffffffff"
);

manager.traceContext = { external: { traceparent: TRACEPARENT_RUN_B } };

wrapper.export([createAttemptSpan()], () => {});

expect(captured).toHaveLength(1);
expect(captured[0]).toHaveLength(1);

const span = captured[0]![0]!;
expect(span.parentSpanContext?.spanId).toBe("2222222222222222");
expect(span.parentSpanContext?.spanId).not.toBe("1111111111111111");
expect(span.parentSpanContext?.traceId).toBe("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
expect(span.spanContext().traceId).toBe("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
});
Loading