From 179475876575352bc91673363c7a88ca01eef67e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 22 May 2026 16:12:25 +0100 Subject: [PATCH 1/5] perf(run-engine,webapp): look up PENDING_VERSION runs via ClickHouse When a background worker registers, the engine resolves runs that were queued before the worker was ready. That lookup used to scan a Postgres status index. Move it to ClickHouse: query candidate run ids from `task_runs_v2`, then refetch the actual rows from Postgres by primary key with a `status = 'PENDING_VERSION'` guard for idempotency. The lookup is a pluggable interface on the run engine (`PendingVersionRunIdLookup`); the webapp wires a ClickHouse-backed implementation through the org-scoped `clickhouseFactory` using a new "engine" client type, configured by `RUN_ENGINE_CLICKHOUSE_*` env vars. When the lookup returns no candidates, one bounded retry is scheduled ~5s later to cover ClickHouse replication lag. The Postgres status guard prevents double-promotion when retries race with concurrent deploys. --- .../pending-version-clickhouse-lookup.md | 6 + apps/webapp/app/env.server.ts | 15 ++ .../clickhouse/clickhouseFactory.server.ts | 49 ++++++- apps/webapp/app/v3/runEngine.server.ts | 2 + .../runEnginePendingVersionLookup.server.ts | 24 ++++ .../clickhousePendingVersionLookup.server.ts | 92 ++++++++++++ internal-packages/clickhouse/src/index.ts | 2 + internal-packages/clickhouse/src/taskRuns.ts | 19 +++ .../run-engine/src/engine/index.ts | 8 +- .../engine/services/pendingVersionLookup.ts | 48 +++++++ .../engine/systems/pendingVersionSystem.ts | 134 ++++++++++++++---- .../run-engine/src/engine/systems/systems.ts | 2 + .../src/engine/tests/pendingVersion.test.ts | 4 + .../tests/postgresPendingVersionLookup.ts | 43 ++++++ .../run-engine/src/engine/types.ts | 22 +++ .../run-engine/src/engine/workerCatalog.ts | 8 ++ internal-packages/run-engine/src/index.ts | 6 + 17 files changed, 456 insertions(+), 28 deletions(-) create mode 100644 .server-changes/pending-version-clickhouse-lookup.md create mode 100644 apps/webapp/app/v3/runEnginePendingVersionLookup.server.ts create mode 100644 apps/webapp/app/v3/services/clickhousePendingVersionLookup.server.ts create mode 100644 internal-packages/run-engine/src/engine/services/pendingVersionLookup.ts create mode 100644 internal-packages/run-engine/src/engine/tests/postgresPendingVersionLookup.ts diff --git a/.server-changes/pending-version-clickhouse-lookup.md b/.server-changes/pending-version-clickhouse-lookup.md new file mode 100644 index 00000000000..af46540528f --- /dev/null +++ b/.server-changes/pending-version-clickhouse-lookup.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +PendingVersionSystem now discovers PENDING_VERSION run ids via ClickHouse and re-validates each by primary key in Postgres, reducing read load on the TaskRun status index. Uses a dedicated `RUN_ENGINE_CLICKHOUSE_*` client so it doesn't contend with the main analytics pool. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index d2162de025c..a0b676d7602 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1471,6 +1471,21 @@ const EnvironmentSchema = z EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10), EVENTS_CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), EVENTS_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"), + // ClickHouse client used by @internal/run-engine's PendingVersionSystem. + // Kept on its own URL + pool so this low-QPS path can't contend with + // the main analytics client (CLICKHOUSE_URL). Falls back to the main + // URL when unset so unconfigured environments still work. + RUN_ENGINE_CLICKHOUSE_URL: z + .string() + .optional() + .transform((v) => v ?? process.env.CLICKHOUSE_URL), + RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"), + RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(), + RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(5), + RUN_ENGINE_CLICKHOUSE_LOG_LEVEL: z + .enum(["log", "error", "warn", "info", "debug"]) + .default("info"), + RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"), EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000), EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000), METRICS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(10000), diff --git a/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts b/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts index 652fc26ae35..fb7f384fd27 100644 --- a/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts +++ b/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts @@ -181,6 +181,36 @@ function initializeSessionsReplicationClickhouseClient(): ClickHouse { }); } +/** Run-engine PendingVersionSystem lookup (`RUN_ENGINE_CLICKHOUSE_URL`); + * falls back to the default client if unset. */ +const defaultRunEngineClickhouseClient = singleton( + "runEngineClickhouseClient", + initializeRunEngineClickhouseClient +); + +function initializeRunEngineClickhouseClient(): ClickHouse { + if (!env.RUN_ENGINE_CLICKHOUSE_URL) { + return defaultClickhouseClient; + } + + const url = new URL(env.RUN_ENGINE_CLICKHOUSE_URL); + url.searchParams.delete("secure"); + + return new ClickHouse({ + url: url.toString(), + name: "run-engine-clickhouse", + keepAlive: { + enabled: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.RUN_ENGINE_CLICKHOUSE_LOG_LEVEL, + compression: { + request: env.RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST === "1", + }, + maxOpenConnections: env.RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS, + }); +} + /** Task events (`EVENTS_CLICKHOUSE_URL`); not exported — accessed via factory. */ const defaultEventsClickhouseClient = singleton( "eventsClickhouseClient", @@ -226,7 +256,8 @@ export type ClientType = | "sessions_replication" | "logs" | "query" - | "admin"; + | "admin" + | "engine"; function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHouse { const parsed = new URL(url); @@ -285,6 +316,20 @@ function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHou maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, clickhouseSettings: getLogsListClickhouseSettings(), }); + case "engine": + return new ClickHouse({ + url: parsed.toString(), + name, + keepAlive: { + enabled: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.RUN_ENGINE_CLICKHOUSE_LOG_LEVEL, + compression: { + request: env.RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST === "1", + }, + maxOpenConnections: env.RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS, + }); case "standard": case "query": case "admin": @@ -351,6 +396,8 @@ export class ClickhouseFactory { return defaultQueryClickhouseClient; case "admin": return defaultAdminClickhouseClient; + case "engine": + return defaultRunEngineClickhouseClient; } } diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index e97a1dc8ae7..7e96c5184b2 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -6,6 +6,7 @@ import { logger } from "~/services/logger.server"; import { defaultMachine, getCurrentPlan } from "~/services/platform.v3.server"; import { singleton } from "~/utils/singleton"; import { allMachines } from "./machinePresets.server"; +import { runEnginePendingVersionLookup } from "./runEnginePendingVersionLookup.server"; import { meter, tracer } from "./tracer.server"; export const engine = singleton("RunEngine", createRunEngine); @@ -131,6 +132,7 @@ function createRunEngine() { factor: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR, }, retryWarmStartThresholdMs: env.RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS, + pendingVersionRunIdLookup: runEnginePendingVersionLookup, billing: { getCurrentPlan: async (orgId: string) => { const plan = await getCurrentPlan(orgId); diff --git a/apps/webapp/app/v3/runEnginePendingVersionLookup.server.ts b/apps/webapp/app/v3/runEnginePendingVersionLookup.server.ts new file mode 100644 index 00000000000..407504fab72 --- /dev/null +++ b/apps/webapp/app/v3/runEnginePendingVersionLookup.server.ts @@ -0,0 +1,24 @@ +import { type PendingVersionRunIdLookup } from "@internal/run-engine"; +import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; +import { logger } from "~/services/logger.server"; +import { singleton } from "~/utils/singleton"; +import { ClickhousePendingVersionLookup } from "./services/clickhousePendingVersionLookup.server"; + +/** + * Lookup used by `@internal/run-engine`'s `PendingVersionSystem` to find + * `PENDING_VERSION` TaskRun ids via ClickHouse, removing the need for + * Postgres index #13 (`TaskRun_status_runtimeEnvironmentId_createdAt_id_idx`). + * + * Resolves the ClickHouse client per call via {@link clickhouseFactory} + * using the `"engine"` client type, configured by `RUN_ENGINE_CLICKHOUSE_*` + * env vars and routed per-organization for customers with HIPAA / data + * sovereignty data stores. + */ +export const runEnginePendingVersionLookup = singleton( + "runEnginePendingVersionLookup", + initializeRunEnginePendingVersionLookup +); + +function initializeRunEnginePendingVersionLookup(): PendingVersionRunIdLookup { + return new ClickhousePendingVersionLookup({ clickhouseFactory, logger }); +} diff --git a/apps/webapp/app/v3/services/clickhousePendingVersionLookup.server.ts b/apps/webapp/app/v3/services/clickhousePendingVersionLookup.server.ts new file mode 100644 index 00000000000..609557d1dde --- /dev/null +++ b/apps/webapp/app/v3/services/clickhousePendingVersionLookup.server.ts @@ -0,0 +1,92 @@ +import { + type PendingVersionRunIdLookup, + type PendingVersionRunIdLookupOptions, + type PendingVersionRunIdLookupResult, +} from "@internal/run-engine"; +import { Logger } from "@trigger.dev/core/logger"; +import type { ClickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server"; + +export type ClickhousePendingVersionLookupOptions = { + clickhouseFactory: ClickhouseFactory; + logger: Logger; +}; + +/** + * ClickHouse-backed lookup for `PENDING_VERSION` TaskRun ids. + * + * Resolves the ClickHouse client per call via the + * {@link ClickhouseFactory}, which honors per-organization data-store + * routing (HIPAA / data-sovereignty customers get their own instance, + * everyone else lands on the shared `engine` client configured by + * `RUN_ENGINE_CLICKHOUSE_*` env vars). + * + * Best-effort by design: replication lag against `task_runs_v2` can + * produce stale candidates. The run-engine consumer re-validates every + * id against Postgres by primary key with a `status = 'PENDING_VERSION'` + * guard before any mutation, so stale ids are dropped at the source of + * truth. On ClickHouse error we log and return an empty result; the + * pending-version re-enqueue tail loop retries on the next event. + */ +export class ClickhousePendingVersionLookup implements PendingVersionRunIdLookup { + readonly name = "clickhouse"; + + constructor(private readonly opts: ClickhousePendingVersionLookupOptions) {} + + async lookupPendingVersionRunIds( + options: PendingVersionRunIdLookupOptions + ): Promise { + // Empty IN-lists would be a no-op; bail before issuing the query. + if (options.taskIdentifiers.length === 0 || options.queues.length === 0) { + return { runIds: [] }; + } + + let clickhouse; + try { + clickhouse = await this.opts.clickhouseFactory.getClickhouseForOrganization( + options.organizationId, + "engine" + ); + } catch (error) { + this.opts.logger.warn("ClickhousePendingVersionLookup factory resolution failed", { + error, + organizationId: options.organizationId, + }); + return { runIds: [] }; + } + + const builder = clickhouse.taskRuns + .pendingVersionIdsQueryBuilder() + // `organization_id` MUST be the leading filter — it is the leading + // sort-key column on `task_runs_v2` and the only thing that prunes + // granules cheaply on a multi-tenant table. + .where("organization_id = {organizationId: String}", { + organizationId: options.organizationId, + }) + .where("project_id = {projectId: String}", { projectId: options.projectId }) + .where("environment_id = {environmentId: String}", { + environmentId: options.environmentId, + }) + .where("status = 'PENDING_VERSION'") + .where("task_identifier IN {taskIdentifiers: Array(String)}", { + taskIdentifiers: options.taskIdentifiers, + }) + .where("queue IN {queues: Array(String)}", { queues: options.queues }) + .where("_is_deleted = 0") + .orderBy("created_at ASC") + .limit(options.limit); + + const [queryError, rows] = await builder.execute(); + + if (queryError) { + this.opts.logger.warn("ClickhousePendingVersionLookup query failed", { + error: queryError, + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }); + return { runIds: [] }; + } + + return { runIds: rows.map((row) => row.run_id) }; + } +} diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index 45f0fa485a7..17e48623f9f 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -13,6 +13,7 @@ import { getTaskUsageByOrganization, getTaskRunsCountQueryBuilder, getTaskRunTagsQueryBuilder, + getPendingVersionIdsQueryBuilder, } from "./taskRuns.js"; import { getSpanDetailsQueryBuilder, @@ -224,6 +225,7 @@ export class ClickHouse { queryBuilder: getTaskRunsQueryBuilder(this.reader), countQueryBuilder: getTaskRunsCountQueryBuilder(this.reader), tagQueryBuilder: getTaskRunTagsQueryBuilder(this.reader), + pendingVersionIdsQueryBuilder: getPendingVersionIdsQueryBuilder(this.reader), getTaskActivity: getTaskActivityQueryBuilder(this.reader), getCurrentRunningStats: getCurrentRunningStats(this.reader), getAverageDurations: getAverageDurations(this.reader), diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index f6427359772..40b2daac206 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -379,6 +379,25 @@ export function getTaskRunsQueryBuilder(ch: ClickhouseReader, settings?: ClickHo }); } +/** + * Lookup builder for the run-engine `PendingVersionSystem`. Returns just + * `run_id` from `task_runs_v2`. No `FINAL` — the run-engine re-validates + * each candidate against Postgres by primary key, so a stale + * `PENDING_VERSION` row from a not-yet-merged part is harmless and + * `FINAL` would be too expensive for this hot path. + */ +export function getPendingVersionIdsQueryBuilder( + ch: ClickhouseReader, + settings?: ClickHouseSettings +) { + return ch.queryBuilder({ + name: "getPendingVersionIds", + baseQuery: "SELECT run_id FROM trigger_dev.task_runs_v2", + schema: TaskRunV2QueryResult, + settings, + }); +} + export function getTaskRunsCountQueryBuilder(ch: ClickhouseReader, settings?: ClickHouseSettings) { return ch.queryBuilder({ name: "getTaskRunsCount", diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index e0af0f2c4ff..ff75141aa0d 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -65,6 +65,7 @@ import { import { PendingVersionSystem } from "./systems/pendingVersionSystem.js"; import { RaceSimulationSystem } from "./systems/raceSimulationSystem.js"; import { RunAttemptSystem } from "./systems/runAttemptSystem.js"; +import { NoopPendingVersionRunIdLookup } from "./services/pendingVersionLookup.js"; import { SystemResources } from "./systems/systems.js"; import { TtlSystem } from "./systems/ttlSystem.js"; import { WaitpointSystem } from "./systems/waitpointSystem.js"; @@ -244,7 +245,8 @@ export class RunEngine { }, queueRunsPendingVersion: async ({ payload }) => { await this.pendingVersionSystem.enqueueRunsForBackgroundWorker( - payload.backgroundWorkerId + payload.backgroundWorkerId, + payload.attempt ); }, tryCompleteBatch: async ({ payload }) => { @@ -295,6 +297,8 @@ export class RunEngine { runLock: this.runLock, runQueue: this.runQueue, raceSimulationSystem: this.raceSimulationSystem, + pendingVersionRunIdLookup: + options.pendingVersionRunIdLookup ?? new NoopPendingVersionRunIdLookup(), }; this.executionSnapshotSystem = new ExecutionSnapshotSystem({ @@ -332,6 +336,8 @@ export class RunEngine { this.pendingVersionSystem = new PendingVersionSystem({ resources, enqueueSystem: this.enqueueSystem, + lagRetryDelayMs: options.pendingVersionLagRetryDelayMs, + lagMaxRetries: options.pendingVersionLagMaxRetries, }); this.waitpointSystem = new WaitpointSystem({ diff --git a/internal-packages/run-engine/src/engine/services/pendingVersionLookup.ts b/internal-packages/run-engine/src/engine/services/pendingVersionLookup.ts new file mode 100644 index 00000000000..62ce67f4d9b --- /dev/null +++ b/internal-packages/run-engine/src/engine/services/pendingVersionLookup.ts @@ -0,0 +1,48 @@ +/** + * Lookup interface for discovering TaskRun ids that are currently in the + * `PENDING_VERSION` status for a given background-worker filter. + * + * The default Postgres-backed implementation lives in the webapp and is + * injected via {@link SystemResources}. The run-engine package only owns + * the contract; concrete implementations are provided by the consumer. + * + * Best-effort by design: implementations may return stale ids (rows that + * have since transitioned) or omit recently-inserted rows (replication + * lag against an analytical store). The caller MUST re-validate every + * returned id against the source-of-truth database before mutating it. + */ +export type PendingVersionRunIdLookupOptions = { + organizationId: string; + projectId: string; + environmentId: string; + taskIdentifiers: string[]; + queues: string[]; + /** Maximum number of ids to return. Implementations must respect this cap. */ + limit: number; +}; + +export type PendingVersionRunIdLookupResult = { + runIds: string[]; +}; + +export interface PendingVersionRunIdLookup { + /** Stable identifier for logs and metrics, e.g. "clickhouse", "test-noop". */ + readonly name: string; + + lookupPendingVersionRunIds( + options: PendingVersionRunIdLookupOptions + ): Promise; +} + +/** + * Default lookup used when nothing is wired up (tests that don't exercise + * the lookup, or engine instances that don't run the pending-version + * resolver). Returns an empty result so the system no-ops cleanly. + */ +export class NoopPendingVersionRunIdLookup implements PendingVersionRunIdLookup { + readonly name = "noop"; + + async lookupPendingVersionRunIds(): Promise { + return { runIds: [] }; + } +} diff --git a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts index 9007cf86b2d..c8cee819b07 100644 --- a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts @@ -5,8 +5,26 @@ export type PendingVersionSystemOptions = { resources: SystemResources; enqueueSystem: EnqueueSystem; queueRunsPendingVersionBatchSize?: number; + /** + * How long to wait before retrying when the lookup returned zero + * candidates. Bounded by {@link lagMaxRetries}. Defaults to 5s. + * + * The ClickHouse-backed lookup can miss runs that were just inserted + * to Postgres due to replication lag. One bounded retry gives the + * pipeline time to catch up. + */ + lagRetryDelayMs?: number; + /** + * Maximum number of times to reschedule when the lookup returned zero + * candidates. Defaults to 1 — first attempt + one retry. Set to 0 to + * disable lag-aware retries entirely. + */ + lagMaxRetries?: number; }; +const DEFAULT_LAG_RETRY_DELAY_MS = 5_000; +const DEFAULT_LAG_MAX_RETRIES = 1; + export class PendingVersionSystem { private readonly $: SystemResources; private readonly enqueueSystem: EnqueueSystem; @@ -16,7 +34,7 @@ export class PendingVersionSystem { this.enqueueSystem = options.enqueueSystem; } - async enqueueRunsForBackgroundWorker(backgroundWorkerId: string) { + async enqueueRunsForBackgroundWorker(backgroundWorkerId: string, attempt: number = 0) { //It could be a lot of runs, so we will process them in a batch //if there are still more to process we will enqueue this function again const maxCount = this.options.queueRunsPendingVersionBatchSize ?? 200; @@ -44,37 +62,59 @@ export class PendingVersionSystem { return; } + const taskIdentifiers = backgroundWorker.tasks.map((task) => task.slug); + const queues = backgroundWorker.queues.map((queue) => queue.name); + this.$.logger.debug("Finding PENDING_VERSION runs for background worker", { workerId: backgroundWorker.id, - taskIdentifiers: backgroundWorker.tasks.map((task) => task.slug), - queues: backgroundWorker.queues.map((queue) => queue.name), + taskIdentifiers, + queues, }); - const pendingRuns = await this.$.readOnlyPrisma.taskRun.findMany({ - where: { - runtimeEnvironmentId: backgroundWorker.runtimeEnvironmentId, + // Step 1: ask the injected lookup (typically ClickHouse-backed) for + // candidate run ids. Best-effort — results may be stale or incomplete. + const { runIds: candidateIds } = await this.$.pendingVersionRunIdLookup + .lookupPendingVersionRunIds({ + organizationId: backgroundWorker.runtimeEnvironment.organizationId, projectId: backgroundWorker.projectId, + environmentId: backgroundWorker.runtimeEnvironmentId, + taskIdentifiers, + queues, + limit: maxCount + 1, + }); + + if (!candidateIds.length) { + await this.#maybeScheduleLagRetry(backgroundWorkerId, attempt, "lookup_empty"); + return; + } + + // Step 2: fetch the actual rows from the primary by id, filtered by + // `status: "PENDING_VERSION"` so any candidate whose status has moved + // is dropped. The planner uses the PK for `id IN (…)`; the status + // predicate is a residual filter and does NOT require the status + // index. + const pendingRuns = await this.$.prisma.taskRun.findMany({ + where: { + id: { in: candidateIds }, status: "PENDING_VERSION", - taskIdentifier: { - in: backgroundWorker.tasks.map((task) => task.slug), - }, - queue: { - in: backgroundWorker.queues.map((queue) => queue.name), - }, }, orderBy: { createdAt: "asc", }, - take: maxCount + 1, }); - //none to process - if (!pendingRuns.length) return; + if (!pendingRuns.length) { + // CH returned candidates but all of them have already moved past + // PENDING_VERSION (typically because a concurrent deploy or retry + // beat us to them). Don't reschedule — there's no work to wait for. + return; + } this.$.logger.debug("Enqueueing PENDING_VERSION runs for background worker", { workerId: backgroundWorker.id, - taskIdentifiers: pendingRuns.map((run) => run.taskIdentifier), - queues: pendingRuns.map((run) => run.queue), + lookupName: this.$.pendingVersionRunIdLookup.name, + candidateCount: candidateIds.length, + pendingRunCount: pendingRuns.length, runs: pendingRuns.map((run) => ({ id: run.id, taskIdentifier: run.taskIdentifier, @@ -86,14 +126,20 @@ export class PendingVersionSystem { for (const run of pendingRuns) { await this.$.prisma.$transaction(async (tx) => { - const updatedRun = await tx.taskRun.update({ - where: { - id: run.id, - }, - data: { - status: "PENDING", - }, + // Idempotency guard: only flips PENDING_VERSION → PENDING. If another + // worker already promoted this run between our findMany and the + // update, count is 0 and we skip the enqueue. + const updateResult = await tx.taskRun.updateMany({ + where: { id: run.id, status: "PENDING_VERSION" }, + data: { status: "PENDING" }, }); + + if (updateResult.count === 0) { + return; + } + + const updatedRun = await tx.taskRun.findFirstOrThrow({ where: { id: run.id } }); + await this.enqueueSystem.enqueueRun({ run: updatedRun, env: backgroundWorker.runtimeEnvironment, @@ -132,11 +178,47 @@ export class PendingVersionSystem { } } - async scheduleResolvePendingVersionRuns(backgroundWorkerId: string): Promise { + async scheduleResolvePendingVersionRuns( + backgroundWorkerId: string, + opts?: { attempt?: number; availableAt?: Date } + ): Promise { //we want this to happen in the background await this.$.worker.enqueue({ job: "queueRunsPendingVersion", - payload: { backgroundWorkerId }, + payload: { backgroundWorkerId, attempt: opts?.attempt }, + availableAt: opts?.availableAt, + }); + } + + /** + * Schedule one more lookup attempt when the first found zero candidates, + * to cover ClickHouse replication lag against `task_runs_v2`. Bounded by + * `lagMaxRetries` so we never loop indefinitely. + */ + async #maybeScheduleLagRetry( + backgroundWorkerId: string, + attempt: number, + reason: "lookup_empty" + ): Promise { + const maxRetries = this.options.lagMaxRetries ?? DEFAULT_LAG_MAX_RETRIES; + + if (attempt >= maxRetries) { + return; + } + + const delayMs = this.options.lagRetryDelayMs ?? DEFAULT_LAG_RETRY_DELAY_MS; + + this.$.logger.debug("Scheduling pending-version lag retry", { + backgroundWorkerId, + attempt: attempt + 1, + maxRetries, + delayMs, + reason, + }); + + await this.scheduleResolvePendingVersionRuns(backgroundWorkerId, { + attempt: attempt + 1, + availableAt: new Date(Date.now() + delayMs), }); } } diff --git a/internal-packages/run-engine/src/engine/systems/systems.ts b/internal-packages/run-engine/src/engine/systems/systems.ts index 909df712105..e21f95958d1 100644 --- a/internal-packages/run-engine/src/engine/systems/systems.ts +++ b/internal-packages/run-engine/src/engine/systems/systems.ts @@ -4,6 +4,7 @@ import { PrismaClient, PrismaReplicaClient } from "@trigger.dev/database"; import { RunQueue } from "../../run-queue/index.js"; import { EventBus } from "../eventBus.js"; import { RunLocker } from "../locking.js"; +import { PendingVersionRunIdLookup } from "../services/pendingVersionLookup.js"; import { EngineWorker } from "../types.js"; import { RaceSimulationSystem } from "./raceSimulationSystem.js"; @@ -18,4 +19,5 @@ export type SystemResources = { runLock: RunLocker; runQueue: RunQueue; raceSimulationSystem: RaceSimulationSystem; + pendingVersionRunIdLookup: PendingVersionRunIdLookup; }; diff --git a/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts b/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts index 38eaa00b213..cd56f10c84c 100644 --- a/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts +++ b/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts @@ -4,6 +4,7 @@ import { RunEngine } from "../index.js"; import { setTimeout } from "timers/promises"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; import { DequeuedMessage } from "@trigger.dev/core/v3"; +import { PostgresPendingVersionRunIdLookup } from "./postgresPendingVersionLookup.js"; vi.setConfig({ testTimeout: 60_000 }); @@ -44,6 +45,7 @@ describe("RunEngine pending version", () => { }, //set this so we have to requeue the runs in two batches queueRunsWaitingForWorkerBatchSize: 1, + pendingVersionRunIdLookup: new PostgresPendingVersionRunIdLookup(prisma), tracer: trace.getTracer("test", "0.0.0"), }); @@ -191,6 +193,7 @@ describe("RunEngine pending version", () => { }, //set this so we have to requeue the runs in two batches queueRunsWaitingForWorkerBatchSize: 1, + pendingVersionRunIdLookup: new PostgresPendingVersionRunIdLookup(prisma), tracer: trace.getTracer("test", "0.0.0"), logLevel: "debug", }); @@ -355,6 +358,7 @@ describe("RunEngine pending version", () => { }, baseCostInCents: 0.0001, }, + pendingVersionRunIdLookup: new PostgresPendingVersionRunIdLookup(prisma), tracer: trace.getTracer("test", "0.0.0"), }); diff --git a/internal-packages/run-engine/src/engine/tests/postgresPendingVersionLookup.ts b/internal-packages/run-engine/src/engine/tests/postgresPendingVersionLookup.ts new file mode 100644 index 00000000000..33239ea8176 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/postgresPendingVersionLookup.ts @@ -0,0 +1,43 @@ +import { PrismaClient } from "@trigger.dev/database"; +import type { + PendingVersionRunIdLookup, + PendingVersionRunIdLookupOptions, + PendingVersionRunIdLookupResult, +} from "../services/pendingVersionLookup.js"; + +/** + * Test-only Postgres-backed lookup. Performs the same query the system + * used to issue directly before the ClickHouse migration. Lets the + * existing pendingVersion tests keep exercising the end-to-end transition + * without spinning up a ClickHouse container. + * + * Not exported from the package — for in-package tests only. + */ +export class PostgresPendingVersionRunIdLookup implements PendingVersionRunIdLookup { + readonly name = "test-postgres"; + + constructor(private readonly prisma: PrismaClient) {} + + async lookupPendingVersionRunIds( + options: PendingVersionRunIdLookupOptions + ): Promise { + if (options.taskIdentifiers.length === 0 || options.queues.length === 0) { + return { runIds: [] }; + } + + const rows = await this.prisma.taskRun.findMany({ + where: { + runtimeEnvironmentId: options.environmentId, + projectId: options.projectId, + status: "PENDING_VERSION", + taskIdentifier: { in: options.taskIdentifiers }, + queue: { in: options.queues }, + }, + select: { id: true }, + orderBy: { createdAt: "asc" }, + take: options.limit, + }); + + return { runIds: rows.map((r) => r.id) }; + } +} diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 0b17262ba1c..e63b1c81f8b 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -19,6 +19,7 @@ import { LockRetryConfig } from "./locking.js"; import { workerCatalog } from "./workerCatalog.js"; import { type BillingPlan } from "./billingCache.js"; import type { DRRConfig } from "../batch-queue/types.js"; +import type { PendingVersionRunIdLookup } from "./services/pendingVersionLookup.js"; export type RunEngineOptions = { prisma: PrismaClient; @@ -178,6 +179,27 @@ export type RunEngineOptions = { factor?: number; }; queueRunsWaitingForWorkerBatchSize?: number; + /** + * Lookup used by {@link PendingVersionSystem} to discover candidate + * `PENDING_VERSION` run ids without scanning the Postgres status index. + * Defaults to a noop lookup that returns an empty result, which causes + * the pending-version resolver to short-circuit. Production callers + * should inject a ClickHouse-backed implementation. + */ + pendingVersionRunIdLookup?: PendingVersionRunIdLookup; + /** + * When the pending-version lookup returns zero candidates, schedule + * one more attempt after this delay to cover ClickHouse replication + * lag. Defaults to 5_000ms. + */ + pendingVersionLagRetryDelayMs?: number; + /** + * Maximum number of lag-aware retries. Each call beyond the first + * counts against this cap. Defaults to 1 — so a deploy will fire at + * most two queries spaced by `pendingVersionLagRetryDelayMs`. Set to 0 + * to disable lag-aware retries entirely. + */ + pendingVersionLagMaxRetries?: number; /** Optional maximum TTL for all runs (e.g. "14d"). If set, runs without an explicit TTL * will use this as their TTL, and runs with a TTL larger than this will be clamped. */ defaultMaxTtl?: string; diff --git a/internal-packages/run-engine/src/engine/workerCatalog.ts b/internal-packages/run-engine/src/engine/workerCatalog.ts index 2ed6f5076b2..de54c1ece00 100644 --- a/internal-packages/run-engine/src/engine/workerCatalog.ts +++ b/internal-packages/run-engine/src/engine/workerCatalog.ts @@ -41,6 +41,14 @@ export const workerCatalog = { queueRunsPendingVersion: { schema: z.object({ backgroundWorkerId: z.string(), + /** + * Bounded retry counter used by {@link PendingVersionSystem} to cover + * ClickHouse replication lag. The first scheduling has no attempt; + * if the lookup returns zero candidates, the system reschedules + * itself once with `attempt = 1`. Capped by + * `pendingVersionLagMaxRetries` on `RunEngineOptions`. + */ + attempt: z.number().int().nonnegative().optional(), }), visibilityTimeoutMs: 60_000, }, diff --git a/internal-packages/run-engine/src/index.ts b/internal-packages/run-engine/src/index.ts index 3f96045c13f..43ca7f177c6 100644 --- a/internal-packages/run-engine/src/index.ts +++ b/internal-packages/run-engine/src/index.ts @@ -6,6 +6,12 @@ export { } from "./engine/errors.js"; export type { EventBusEventArgs, EventBusEvents } from "./engine/eventBus.js"; export type { AuthenticatedEnvironment } from "./shared/index.js"; +export type { + PendingVersionRunIdLookup, + PendingVersionRunIdLookupOptions, + PendingVersionRunIdLookupResult, +} from "./engine/services/pendingVersionLookup.js"; +export { NoopPendingVersionRunIdLookup } from "./engine/services/pendingVersionLookup.js"; // Batch Queue exports export { BatchQueue, BatchCompletionTracker } from "./batch-queue/index.js"; From f53d2ca1d20940013fc12b5d386b1d207bc6761d Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 22 May 2026 16:38:55 +0100 Subject: [PATCH 2/5] fix(run-engine): don't emit runStatusChanged when promotion was skipped MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the idempotency guard fires (concurrent worker already promoted the run, updateMany returns count=0), the transaction returned early — but the eventBus.emit('runStatusChanged') call was outside the transaction and fired unconditionally. Have the transaction return a boolean and guard the emit on it. --- .../run-engine/src/engine/systems/pendingVersionSystem.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts index c8cee819b07..96b77cb638a 100644 --- a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts @@ -125,7 +125,7 @@ export class PendingVersionSystem { }); for (const run of pendingRuns) { - await this.$.prisma.$transaction(async (tx) => { + const promoted = await this.$.prisma.$transaction(async (tx) => { // Idempotency guard: only flips PENDING_VERSION → PENDING. If another // worker already promoted this run between our findMany and the // update, count is 0 and we skip the enqueue. @@ -135,7 +135,7 @@ export class PendingVersionSystem { }); if (updateResult.count === 0) { - return; + return false; } const updatedRun = await tx.taskRun.findFirstOrThrow({ where: { id: run.id } }); @@ -150,8 +150,12 @@ export class PendingVersionSystem { // if it sits queued waiting on a concurrency slot. includeTtl: true, }); + + return true; }); + if (!promoted) continue; + this.$.eventBus.emit("runStatusChanged", { time: new Date(), run: { From 48c2bc437dec3384c03a0050e09f5f022b76bfa2 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 22 May 2026 16:39:00 +0100 Subject: [PATCH 3/5] fix(run-engine): wire queueRunsWaitingForWorkerBatchSize to PendingVersionSystem The RunEngineOptions has carried queueRunsWaitingForWorkerBatchSize for a while but never threaded it through to PendingVersionSystem's queueRunsPendingVersionBatchSize, so the option silently no-op'd and the system always used the default of 200. --- internal-packages/run-engine/src/engine/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index ff75141aa0d..da42247111a 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -336,6 +336,7 @@ export class RunEngine { this.pendingVersionSystem = new PendingVersionSystem({ resources, enqueueSystem: this.enqueueSystem, + queueRunsPendingVersionBatchSize: options.queueRunsWaitingForWorkerBatchSize, lagRetryDelayMs: options.pendingVersionLagRetryDelayMs, lagMaxRetries: options.pendingVersionLagMaxRetries, }); From ff6db49d1ea7299bb43b0340e8a2c226ab87014a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 22 May 2026 16:53:20 +0100 Subject: [PATCH 4/5] fix(run-engine): reschedule batch drain on candidateIds count, not pendingRuns After the ClickHouse migration, pendingRuns.length is post-status-guard; runs that have already left PENDING_VERSION between the CH lookup and the Postgres refetch get filtered out. Using it as the more-work signal under-reports when more candidates exist on the worker and stops short. Switch to candidateIds.length, which is the raw lookup result. --- .../run-engine/src/engine/systems/pendingVersionSystem.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts index 96b77cb638a..6d503012fbc 100644 --- a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts @@ -176,8 +176,12 @@ export class PendingVersionSystem { }); } - //enqueue more if needed - if (pendingRuns.length > maxCount) { + // Reschedule when the lookup returned a full-plus-one batch — that's + // the signal there are more candidates to drain. Use `candidateIds` + // (the raw lookup result) rather than `pendingRuns` (post-status-guard) + // because runs that already left PENDING_VERSION shouldn't suppress + // the next batch. + if (candidateIds.length > maxCount) { await this.scheduleResolvePendingVersionRuns(backgroundWorkerId); } } From 98e8272169de5706ba312059d2c670b4ce0913e0 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 22 May 2026 16:53:29 +0100 Subject: [PATCH 5/5] fix(webapp): log run-engine ClickHouse factory failures at error level Factory resolution failures (registry misload, missing data store, ClientType mismatch) are configuration problems, not transient blips, and ops loses observability if they only surface as warnings. Query- level errors stay at warn since those are expected to be transient. --- .../v3/services/clickhousePendingVersionLookup.server.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/clickhousePendingVersionLookup.server.ts b/apps/webapp/app/v3/services/clickhousePendingVersionLookup.server.ts index 609557d1dde..2cdde550e9c 100644 --- a/apps/webapp/app/v3/services/clickhousePendingVersionLookup.server.ts +++ b/apps/webapp/app/v3/services/clickhousePendingVersionLookup.server.ts @@ -47,7 +47,12 @@ export class ClickhousePendingVersionLookup implements PendingVersionRunIdLookup "engine" ); } catch (error) { - this.opts.logger.warn("ClickhousePendingVersionLookup factory resolution failed", { + // Factory resolution failures usually mean a real configuration + // problem (registry misload, missing data store, ClientType mismatch). + // These are not transient — log at error so ops sees them in dashboards + // and incident hooks. Query-level errors below stay at warn because + // those are expected to be transient. + this.opts.logger.error("ClickhousePendingVersionLookup factory resolution failed", { error, organizationId: options.organizationId, });