diff --git a/.changeset/gold-crews-fly.md b/.changeset/gold-crews-fly.md new file mode 100644 index 000000000..46fea641e --- /dev/null +++ b/.changeset/gold-crews-fly.md @@ -0,0 +1,5 @@ +--- +"inngest": patch +--- + +Add selective header forwarding when sending events, allowing context propagation when tracing fanouts 👀 diff --git a/packages/inngest/src/components/Inngest.ts b/packages/inngest/src/components/Inngest.ts index ded418d1c..5dee1c8eb 100644 --- a/packages/inngest/src/components/Inngest.ts +++ b/packages/inngest/src/components/Inngest.ts @@ -351,6 +351,20 @@ export class Inngest { public async send>>( payload: Payload ): Promise> { + return this._send({ payload }); + } + + /** + * Internal method for sending an event, used to allow Inngest internals to + * further customize the request sent to an Inngest Server. + */ + private async _send>>({ + payload, + headers, + }: { + payload: Payload; + headers?: Record; + }): Promise> { const hooks = await getHookStack( this.middleware, "onSendEvent", @@ -465,7 +479,7 @@ export class Inngest { const response = await this.fetch(url, { method: "POST", body: stringify(payloads), - headers: { ...this.headers }, + headers: { ...this.headers, ...headers }, }); let body: SendEventResponse | undefined; diff --git a/packages/inngest/src/components/InngestCommHandler.ts b/packages/inngest/src/components/InngestCommHandler.ts index 5d6d1f523..c15dd5a81 100644 --- a/packages/inngest/src/components/InngestCommHandler.ts +++ b/packages/inngest/src/components/InngestCommHandler.ts @@ -735,12 +735,37 @@ export class InngestCommHandler< (await getQuerystring("processing run request", queryKeys.StepId)) || null; + const headersToFetch = [headerKeys.TraceParent, headerKeys.TraceState]; + + const headerPromises = headersToFetch.map(async (header) => { + const value = await actions.headers( + `fetching ${header} for forwarding`, + header + ); + + return { header, value }; + }); + + const fetchedHeaders = await Promise.all(headerPromises); + + const headersToForward = fetchedHeaders.reduce>( + (acc, { header, value }) => { + if (value) { + acc[header] = value; + } + + return acc; + }, + {} + ); + const { version, result } = this.runStep({ functionId: fnId, data: body, stepId, timer, reqArgs, + headers: headersToForward, }); const stepOutput = await result; @@ -759,6 +784,7 @@ export class InngestCommHandler< status: result.retriable ? 500 : 400, headers: { "Content-Type": "application/json", + ...headersToForward, [headerKeys.NoRetry]: result.retriable ? "false" : "true", ...(typeof result.retriable === "string" ? { [headerKeys.RetryAfter]: result.retriable } @@ -773,6 +799,7 @@ export class InngestCommHandler< status: 200, headers: { "Content-Type": "application/json", + ...headersToForward, }, body: stringify(undefinedToNull(result.data)), version, @@ -783,6 +810,7 @@ export class InngestCommHandler< status: 500, headers: { "Content-Type": "application/json", + ...headersToForward, [headerKeys.NoRetry]: "false", }, body: stringify({ @@ -800,6 +828,7 @@ export class InngestCommHandler< status: 206, headers: { "Content-Type": "application/json", + ...headersToForward, ...(typeof result.retriable !== "undefined" ? { [headerKeys.NoRetry]: result.retriable ? "false" : "true", @@ -818,7 +847,10 @@ export class InngestCommHandler< return { status: 206, - headers: { "Content-Type": "application/json" }, + headers: { + "Content-Type": "application/json", + ...headersToForward, + }, body: stringify(steps), version, }; @@ -941,12 +973,14 @@ export class InngestCommHandler< data, timer, reqArgs, + headers, }: { functionId: string; stepId: string | null; data: unknown; timer: ServerTiming; reqArgs: unknown[]; + headers: Record; }): { version: ExecutionVersion; result: Promise } { const fn = this.fns[functionId]; if (!fn) { @@ -1012,6 +1046,7 @@ export class InngestCommHandler< isFailureHandler: fn.onFailure, stepCompletionOrder: ctx?.stack?.stack ?? [], reqArgs, + headers, }, }; }, @@ -1047,6 +1082,7 @@ export class InngestCommHandler< disableImmediateExecution: ctx?.disable_immediate_execution, stepCompletionOrder: ctx?.stack?.stack ?? [], reqArgs, + headers, }, }; }, diff --git a/packages/inngest/src/components/InngestFunction.test.ts b/packages/inngest/src/components/InngestFunction.test.ts index 8a805c861..e8e8c3e27 100644 --- a/packages/inngest/src/components/InngestFunction.test.ts +++ b/packages/inngest/src/components/InngestFunction.test.ts @@ -156,6 +156,7 @@ describe("runFn", () => { stepState: {}, stepCompletionOrder: [], reqArgs: [], + headers: {}, }, }); @@ -200,6 +201,7 @@ describe("runFn", () => { runId: "run", stepCompletionOrder: [], reqArgs: [], + headers: {}, }, }); diff --git a/packages/inngest/src/components/InngestStepTools.ts b/packages/inngest/src/components/InngestStepTools.ts index 9f8e1ae85..1405a2e92 100644 --- a/packages/inngest/src/components/InngestStepTools.ts +++ b/packages/inngest/src/components/InngestStepTools.ts @@ -30,6 +30,7 @@ import { } from "./Inngest"; import { InngestFunction } from "./InngestFunction"; import { InngestFunctionReference } from "./InngestFunctionReference"; +import { type InngestExecution } from "./execution/InngestExecution"; export interface FoundStep extends HashedOp { hashedId: string; @@ -113,6 +114,7 @@ export const createStepTools = < TTriggers extends TriggersFromClient = TriggersFromClient, >( client: TClient, + execution: InngestExecution, stepHandler: StepHandler ) => { /** @@ -190,7 +192,10 @@ export const createStepTools = < }, { fn: (idOrOptions, payload) => { - return client.send(payload); + return client["_send"]({ + payload, + headers: execution["options"]["headers"], + }); }, } ), diff --git a/packages/inngest/src/components/execution/InngestExecution.ts b/packages/inngest/src/components/execution/InngestExecution.ts index 341429bf6..e1ad9dd5c 100644 --- a/packages/inngest/src/components/execution/InngestExecution.ts +++ b/packages/inngest/src/components/execution/InngestExecution.ts @@ -54,6 +54,11 @@ export interface InngestExecutionOptions { data: Omit; stepState: Record; stepCompletionOrder: string[]; + + /** + * Headers to be sent with any request to Inngest during this execution. + */ + headers: Record; requestedRunStep?: string; timer?: ServerTiming; isFailureHandler?: boolean; @@ -68,7 +73,6 @@ export class InngestExecution { protected debug: Debugger; constructor(protected options: InngestExecutionOptions) { - this.options = options; this.debug = Debug(debugPrefix).extend(this.options.runId); } } diff --git a/packages/inngest/src/components/execution/v0.ts b/packages/inngest/src/components/execution/v0.ts index 0e31739e7..2ba08da70 100644 --- a/packages/inngest/src/components/execution/v0.ts +++ b/packages/inngest/src/components/execution/v0.ts @@ -413,7 +413,7 @@ export class V0InngestExecution }); }; - const step = createStepTools(this.options.client, stepHandler); + const step = createStepTools(this.options.client, this, stepHandler); let fnArg = { ...(this.options.data as { event: EventPayload }), diff --git a/packages/inngest/src/components/execution/v1.ts b/packages/inngest/src/components/execution/v1.ts index 3c4430691..50a12f126 100644 --- a/packages/inngest/src/components/execution/v1.ts +++ b/packages/inngest/src/components/execution/v1.ts @@ -854,7 +854,7 @@ class V1InngestExecution extends InngestExecution implements IInngestExecution { return promise; }; - return createStepTools(this.options.client, stepHandler); + return createStepTools(this.options.client, this, stepHandler); } private getUserFnToRun(): Handler.Any { diff --git a/packages/inngest/src/helpers/consts.ts b/packages/inngest/src/helpers/consts.ts index c26b29412..4a3ab62e2 100644 --- a/packages/inngest/src/helpers/consts.ts +++ b/packages/inngest/src/helpers/consts.ts @@ -127,6 +127,8 @@ export enum headerKeys { RetryAfter = "retry-after", InngestServerKind = "x-inngest-server-kind", InngestExpectedServerKind = "x-inngest-expected-server-kind", + TraceParent = "traceparent", + TraceState = "tracestate", } export const defaultInngestApiBaseUrl = "https://api.inngest.com/"; diff --git a/packages/inngest/src/test/helpers.ts b/packages/inngest/src/test/helpers.ts index 051cccc37..d8f0d69d8 100644 --- a/packages/inngest/src/test/helpers.ts +++ b/packages/inngest/src/test/helpers.ts @@ -11,6 +11,8 @@ import { } from "@local/components/InngestStepTools"; import { ExecutionVersion, + IInngestExecution, + InngestExecution, InngestExecutionOptions, PREFERRED_EXECUTION_VERSION, } from "@local/components/execution/InngestExecution"; @@ -63,9 +65,31 @@ export const createClient = >( export const testClientId = "__test_client__"; export const getStepTools = ( - client: Inngest.Any = createClient({ id: testClientId }) + client: Inngest.Any = createClient({ id: testClientId }), + executionOptions: Partial = {} ) => { - const step = createStepTools(client, ({ args, matchOp }) => { + const execution = client + .createFunction({ id: "test" }, { event: "test" }, () => undefined) + ["createExecution"]({ + version: PREFERRED_EXECUTION_VERSION, + partialOptions: { + data: fromPartial({ + event: { name: "foo", data: {} }, + }), + runId: "run", + stepState: {}, + stepCompletionOrder: [], + isFailureHandler: false, + requestedRunStep: undefined, + timer: new ServerTiming(), + disableImmediateExecution: false, + reqArgs: [], + headers: {}, + ...executionOptions, + }, + }) as IInngestExecution & InngestExecution; + + const step = createStepTools(client, execution, ({ args, matchOp }) => { const stepOptions = getStepOptions(args[0]); return Promise.resolve(matchOp(stepOptions, ...args.slice(1))); }); @@ -105,6 +129,7 @@ export const runFnWithStack = ( timer: new ServerTiming(), disableImmediateExecution: opts?.disableImmediateExecution, reqArgs: [], + headers: {}, }, });