From efe15f5804649ac233ad85dee5eff18ed624c94d Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 4 Oct 2025 07:19:13 -0700 Subject: [PATCH] fix(otel): propagate the task event store to run descendants --- .../concerns/idempotencyKeys.server.ts | 6 +- .../runEngine/concerns/traceEvents.server.ts | 32 +-- .../runEngine/services/triggerTask.server.ts | 195 +++++++++--------- apps/webapp/app/runEngine/types.ts | 2 + .../app/v3/eventRepository/index.server.ts | 11 +- .../app/v3/services/triggerTask.server.ts | 9 +- .../app/v3/services/triggerTaskV1.server.ts | 8 +- apps/webapp/test/engine/triggerTask.test.ts | 2 + 8 files changed, 138 insertions(+), 127 deletions(-) diff --git a/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts b/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts index 3080ae871a..d22c8020d2 100644 --- a/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts +++ b/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts @@ -17,7 +17,10 @@ export class IdempotencyKeyConcern { private readonly traceEventConcern: TraceEventConcern ) {} - async handleTriggerRequest(request: TriggerTaskRequest): Promise { + async handleTriggerRequest( + request: TriggerTaskRequest, + parentStore: string | undefined + ): Promise { const idempotencyKey = request.options?.idempotencyKey ?? request.body.options?.idempotencyKey; const idempotencyKeyExpiresAt = request.options?.idempotencyKeyExpiresAt ?? @@ -83,6 +86,7 @@ export class IdempotencyKeyConcern { if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) { await this.traceEventConcern.traceIdempotentRun( request, + parentStore, { existingRun, idempotencyKey, diff --git a/apps/webapp/app/runEngine/concerns/traceEvents.server.ts b/apps/webapp/app/runEngine/concerns/traceEvents.server.ts index 7d880a5e57..634df34e4a 100644 --- a/apps/webapp/app/runEngine/concerns/traceEvents.server.ts +++ b/apps/webapp/app/runEngine/concerns/traceEvents.server.ts @@ -1,39 +1,26 @@ -import { EventRepository } from "~/v3/eventRepository/eventRepository.server"; -import { TracedEventSpan, TraceEventConcern, TriggerTaskRequest } from "../types"; import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes"; import { TaskRun } from "@trigger.dev/database"; -import { getTaskEventStore } from "~/v3/taskEventStore.server"; -import { ClickhouseEventRepository } from "~/v3/eventRepository/clickhouseEventRepository.server"; import { IEventRepository } from "~/v3/eventRepository/eventRepository.types"; -import { FEATURE_FLAG, flags } from "~/v3/featureFlags.server"; -import { env } from "~/env.server"; import { getEventRepository } from "~/v3/eventRepository/index.server"; +import { TracedEventSpan, TraceEventConcern, TriggerTaskRequest } from "../types"; export class DefaultTraceEventsConcern implements TraceEventConcern { - private readonly eventRepository: EventRepository; - private readonly clickhouseEventRepository: ClickhouseEventRepository; - - constructor( - eventRepository: EventRepository, - clickhouseEventRepository: ClickhouseEventRepository - ) { - this.eventRepository = eventRepository; - this.clickhouseEventRepository = clickhouseEventRepository; - } - async #getEventRepository( - request: TriggerTaskRequest + request: TriggerTaskRequest, + parentStore: string | undefined ): Promise<{ repository: IEventRepository; store: string }> { return await getEventRepository( - request.environment.organization.featureFlags as Record + request.environment.organization.featureFlags as Record, + parentStore ); } async traceRun( request: TriggerTaskRequest, + parentStore: string | undefined, callback: (span: TracedEventSpan, store: string) => Promise ): Promise { - const { repository, store } = await this.#getEventRepository(request); + const { repository, store } = await this.#getEventRepository(request, parentStore); return await repository.traceEvent( request.taskId, @@ -73,6 +60,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern { async traceIdempotentRun( request: TriggerTaskRequest, + parentStore: string | undefined, options: { existingRun: TaskRun; idempotencyKey: string; @@ -82,7 +70,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern { callback: (span: TracedEventSpan, store: string) => Promise ): Promise { const { existingRun, idempotencyKey, incomplete, isError } = options; - const { repository, store } = await this.#getEventRepository(request); + const { repository, store } = await this.#getEventRepository(request, parentStore); return await repository.traceEvent( `${request.taskId} (cached)`, @@ -107,7 +95,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern { }, async (event, traceContext, traceparent) => { //log a message - await this.eventRepository.recordEvent( + await repository.recordEvent( `There's an existing run for idempotencyKey: ${idempotencyKey}`, { taskSlug: request.taskId, diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 4916e237bb..144d9b3178 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -197,7 +197,8 @@ export class RunEngineTriggerTaskService { } const idempotencyKeyConcernResult = await this.idempotencyKeyConcern.handleTriggerRequest( - triggerRequest + triggerRequest, + parentRun?.taskEventStore ); if (idempotencyKeyConcernResult.isCached) { @@ -266,105 +267,109 @@ export class RunEngineTriggerTaskService { const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region); try { - return await this.traceEventConcern.traceRun(triggerRequest, async (event, store) => { - const result = await this.runNumberIncrementer.incrementRunNumber( - triggerRequest, - async (num) => { - event.setAttribute("queueName", queueName); - span.setAttribute("queueName", queueName); - event.setAttribute("runId", runFriendlyId); - span.setAttribute("runId", runFriendlyId); - - const payloadPacket = await this.payloadProcessor.process(triggerRequest); - - const taskRun = await this.engine.trigger( - { - number: num, - friendlyId: runFriendlyId, - environment: environment, - idempotencyKey, - idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined, - taskIdentifier: taskId, - payload: payloadPacket.data ?? "", - payloadType: payloadPacket.dataType, - context: body.context, - traceContext: this.#propagateExternalTraceContext( - event.traceContext, - parentRun?.traceContext, - event.traceparent?.spanId - ), - traceId: event.traceId, - spanId: event.spanId, - parentSpanId: - options.parentAsLinkType === "replay" ? undefined : event.traceparent?.spanId, - replayedFromTaskRunFriendlyId: options.replayedFromTaskRunFriendlyId, - lockedToVersionId: lockedToBackgroundWorker?.id, - taskVersion: lockedToBackgroundWorker?.version, - sdkVersion: lockedToBackgroundWorker?.sdkVersion, - cliVersion: lockedToBackgroundWorker?.cliVersion, - concurrencyKey: body.options?.concurrencyKey, - queue: queueName, - lockedQueueId, - workerQueue, - isTest: body.options?.test ?? false, - delayUntil, - queuedAt: delayUntil ? undefined : new Date(), - maxAttempts: body.options?.maxAttempts, - taskEventStore: store, - ttl, - tags, - oneTimeUseToken: options.oneTimeUseToken, - parentTaskRunId: parentRun?.id, - rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id, - batch: options?.batchId - ? { - id: options.batchId, - index: options.batchIndex ?? 0, - } - : undefined, - resumeParentOnCompletion: body.options?.resumeParentOnCompletion, - depth, - metadata: metadataPacket?.data, - metadataType: metadataPacket?.dataType, - seedMetadata: metadataPacket?.data, - seedMetadataType: metadataPacket?.dataType, - maxDurationInSeconds: body.options?.maxDuration - ? clampMaxDuration(body.options.maxDuration) - : undefined, - machine: body.options?.machine, - priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined, - queueTimestamp: - options.queueTimestamp ?? - (parentRun && body.options?.resumeParentOnCompletion - ? parentRun.queueTimestamp ?? undefined - : undefined), - scheduleId: options.scheduleId, - scheduleInstanceId: options.scheduleInstanceId, - createdAt: options.overrideCreatedAt, - bulkActionId: body.options?.bulkActionId, - planType, - }, - this.prisma - ); - - const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined; - - if (error) { - event.failWithError(error); + return await this.traceEventConcern.traceRun( + triggerRequest, + parentRun?.taskEventStore, + async (event, store) => { + const result = await this.runNumberIncrementer.incrementRunNumber( + triggerRequest, + async (num) => { + event.setAttribute("queueName", queueName); + span.setAttribute("queueName", queueName); + event.setAttribute("runId", runFriendlyId); + span.setAttribute("runId", runFriendlyId); + + const payloadPacket = await this.payloadProcessor.process(triggerRequest); + + const taskRun = await this.engine.trigger( + { + number: num, + friendlyId: runFriendlyId, + environment: environment, + idempotencyKey, + idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined, + taskIdentifier: taskId, + payload: payloadPacket.data ?? "", + payloadType: payloadPacket.dataType, + context: body.context, + traceContext: this.#propagateExternalTraceContext( + event.traceContext, + parentRun?.traceContext, + event.traceparent?.spanId + ), + traceId: event.traceId, + spanId: event.spanId, + parentSpanId: + options.parentAsLinkType === "replay" ? undefined : event.traceparent?.spanId, + replayedFromTaskRunFriendlyId: options.replayedFromTaskRunFriendlyId, + lockedToVersionId: lockedToBackgroundWorker?.id, + taskVersion: lockedToBackgroundWorker?.version, + sdkVersion: lockedToBackgroundWorker?.sdkVersion, + cliVersion: lockedToBackgroundWorker?.cliVersion, + concurrencyKey: body.options?.concurrencyKey, + queue: queueName, + lockedQueueId, + workerQueue, + isTest: body.options?.test ?? false, + delayUntil, + queuedAt: delayUntil ? undefined : new Date(), + maxAttempts: body.options?.maxAttempts, + taskEventStore: store, + ttl, + tags, + oneTimeUseToken: options.oneTimeUseToken, + parentTaskRunId: parentRun?.id, + rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id, + batch: options?.batchId + ? { + id: options.batchId, + index: options.batchIndex ?? 0, + } + : undefined, + resumeParentOnCompletion: body.options?.resumeParentOnCompletion, + depth, + metadata: metadataPacket?.data, + metadataType: metadataPacket?.dataType, + seedMetadata: metadataPacket?.data, + seedMetadataType: metadataPacket?.dataType, + maxDurationInSeconds: body.options?.maxDuration + ? clampMaxDuration(body.options.maxDuration) + : undefined, + machine: body.options?.machine, + priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined, + queueTimestamp: + options.queueTimestamp ?? + (parentRun && body.options?.resumeParentOnCompletion + ? parentRun.queueTimestamp ?? undefined + : undefined), + scheduleId: options.scheduleId, + scheduleInstanceId: options.scheduleInstanceId, + createdAt: options.overrideCreatedAt, + bulkActionId: body.options?.bulkActionId, + planType, + }, + this.prisma + ); + + const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined; + + if (error) { + event.failWithError(error); + } + + return { run: taskRun, error, isCached: false }; } + ); - return { run: taskRun, error, isCached: false }; + if (result?.error) { + throw new ServiceValidationError( + taskRunErrorToString(taskRunErrorEnhancer(result.error)) + ); } - ); - if (result?.error) { - throw new ServiceValidationError( - taskRunErrorToString(taskRunErrorEnhancer(result.error)) - ); + return result; } - - return result; - }); + ); } catch (error) { if (error instanceof RunDuplicateIdempotencyKeyError) { //retry calling this function, because this time it will return the idempotent run diff --git a/apps/webapp/app/runEngine/types.ts b/apps/webapp/app/runEngine/types.ts index 2324edc6b8..0aa52d0a40 100644 --- a/apps/webapp/app/runEngine/types.ts +++ b/apps/webapp/app/runEngine/types.ts @@ -143,10 +143,12 @@ export type TracedEventSpan = { export interface TraceEventConcern { traceRun( request: TriggerTaskRequest, + parentStore: string | undefined, callback: (span: TracedEventSpan, store: string) => Promise ): Promise; traceIdempotentRun( request: TriggerTaskRequest, + parentStore: string | undefined, options: { existingRun: TaskRun; idempotencyKey: string; diff --git a/apps/webapp/app/v3/eventRepository/index.server.ts b/apps/webapp/app/v3/eventRepository/index.server.ts index cda9e58940..3bf77fcd76 100644 --- a/apps/webapp/app/v3/eventRepository/index.server.ts +++ b/apps/webapp/app/v3/eventRepository/index.server.ts @@ -18,8 +18,17 @@ export function resolveEventRepositoryForStore(store: string | undefined): IEven } export async function getEventRepository( - featureFlags: Record | undefined + featureFlags: Record | undefined, + parentStore: string | undefined ): Promise<{ repository: IEventRepository; store: string }> { + if (typeof parentStore === "string") { + if (parentStore === "clickhouse") { + return { repository: clickhouseEventRepository, store: "clickhouse" }; + } else { + return { repository: eventRepository, store: getTaskEventStore() }; + } + } + const taskEventRepository = await resolveTaskEventRepositoryFlag(featureFlags); if (taskEventRepository === "clickhouse") { diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 5f56a35af2..235dddd7d6 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -1,5 +1,6 @@ import { TriggerTaskRequestBody } from "@trigger.dev/core/v3"; import { RunEngineVersion, TaskRun } from "@trigger.dev/database"; +import { env } from "~/env.server"; import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server"; import { DefaultPayloadProcessor } from "~/runEngine/concerns/payloads.server"; import { DefaultQueueManager } from "~/runEngine/concerns/queues.server"; @@ -9,12 +10,9 @@ import { RunEngineTriggerTaskService } from "~/runEngine/services/triggerTask.se import { DefaultTriggerTaskValidator } from "~/runEngine/validators/triggerTaskValidator"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { determineEngineVersion } from "../engineVersion.server"; -import { eventRepository } from "../eventRepository/eventRepository.server"; import { tracer } from "../tracer.server"; import { WithRunEngine } from "./baseService.server"; import { TriggerTaskServiceV1 } from "./triggerTaskV1.server"; -import { env } from "~/env.server"; -import { clickhouseEventRepository } from "../eventRepository/clickhouseEventRepositoryInstance.server"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; @@ -94,10 +92,7 @@ export class TriggerTaskService extends WithRunEngine { body: TriggerTaskRequestBody, options: TriggerTaskServiceOptions = {} ): Promise { - const traceEventConcern = new DefaultTraceEventsConcern( - eventRepository, - clickhouseEventRepository - ); + const traceEventConcern = new DefaultTraceEventsConcern(); const service = new RunEngineTriggerTaskService({ prisma: this._prisma, diff --git a/apps/webapp/app/v3/services/triggerTaskV1.server.ts b/apps/webapp/app/v3/services/triggerTaskV1.server.ts index c193f142d6..5e6ac7c9f1 100644 --- a/apps/webapp/app/v3/services/triggerTaskV1.server.ts +++ b/apps/webapp/app/v3/services/triggerTaskV1.server.ts @@ -179,6 +179,7 @@ export class TriggerTaskServiceV1 extends BaseService { depth: true, queueTimestamp: true, queue: true, + taskEventStore: true, }, }, }, @@ -216,6 +217,7 @@ export class TriggerTaskServiceV1 extends BaseService { taskIdentifier: true, rootTaskRunId: true, depth: true, + taskEventStore: true, }, }, }, @@ -237,6 +239,7 @@ export class TriggerTaskServiceV1 extends BaseService { depth: true, queueTimestamp: true, queue: true, + taskEventStore: true, }, }, }, @@ -289,7 +292,10 @@ export class TriggerTaskServiceV1 extends BaseService { : undefined; const { repository, store } = await getEventRepository( - environment.organization.featureFlags as Record + environment.organization.featureFlags as Record, + dependentAttempt?.taskRun.taskEventStore ?? + parentAttempt?.taskRun.taskEventStore ?? + dependentBatchRun?.dependentTaskAttempt?.taskRun.taskEventStore ); try { diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts index 36dabd008c..aa0e059156 100644 --- a/apps/webapp/test/engine/triggerTask.test.ts +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -79,6 +79,7 @@ class MockTriggerTaskValidator implements TriggerTaskValidator { class MockTraceEventConcern implements TraceEventConcern { async traceRun( request: TriggerTaskRequest, + parentStore: string | undefined, callback: (span: TracedEventSpan, store: string) => Promise ): Promise { return await callback( @@ -96,6 +97,7 @@ class MockTraceEventConcern implements TraceEventConcern { async traceIdempotentRun( request: TriggerTaskRequest, + parentStore: string | undefined, options: { existingRun: TaskRun; idempotencyKey: string;