diff --git a/.server-changes/pending-version-clickhouse-lookup.md b/.server-changes/pending-version-clickhouse-lookup.md new file mode 100644 index 00000000000..af46540528f --- /dev/null +++ b/.server-changes/pending-version-clickhouse-lookup.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +PendingVersionSystem now discovers PENDING_VERSION run ids via ClickHouse and re-validates each by primary key in Postgres, reducing read load on the TaskRun status index. Uses a dedicated `RUN_ENGINE_CLICKHOUSE_*` client so it doesn't contend with the main analytics pool. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index d2162de025c..a0b676d7602 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1471,6 +1471,21 @@ const EnvironmentSchema = z EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10), EVENTS_CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), EVENTS_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"), + // ClickHouse client used by @internal/run-engine's PendingVersionSystem. + // Kept on its own URL + pool so this low-QPS path can't contend with + // the main analytics client (CLICKHOUSE_URL). Falls back to the main + // URL when unset so unconfigured environments still work. + RUN_ENGINE_CLICKHOUSE_URL: z + .string() + .optional() + .transform((v) => v ?? process.env.CLICKHOUSE_URL), + RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"), + RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(), + RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(5), + RUN_ENGINE_CLICKHOUSE_LOG_LEVEL: z + .enum(["log", "error", "warn", "info", "debug"]) + .default("info"), + RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"), EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000), EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000), METRICS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(10000), diff --git a/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts b/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts index 652fc26ae35..fb7f384fd27 100644 --- a/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts +++ b/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts @@ -181,6 +181,36 @@ function initializeSessionsReplicationClickhouseClient(): ClickHouse { }); } +/** Run-engine PendingVersionSystem lookup (`RUN_ENGINE_CLICKHOUSE_URL`); + * falls back to the default client if unset. */ +const defaultRunEngineClickhouseClient = singleton( + "runEngineClickhouseClient", + initializeRunEngineClickhouseClient +); + +function initializeRunEngineClickhouseClient(): ClickHouse { + if (!env.RUN_ENGINE_CLICKHOUSE_URL) { + return defaultClickhouseClient; + } + + const url = new URL(env.RUN_ENGINE_CLICKHOUSE_URL); + url.searchParams.delete("secure"); + + return new ClickHouse({ + url: url.toString(), + name: "run-engine-clickhouse", + keepAlive: { + enabled: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.RUN_ENGINE_CLICKHOUSE_LOG_LEVEL, + compression: { + request: env.RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST === "1", + }, + maxOpenConnections: env.RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS, + }); +} + /** Task events (`EVENTS_CLICKHOUSE_URL`); not exported — accessed via factory. */ const defaultEventsClickhouseClient = singleton( "eventsClickhouseClient", @@ -226,7 +256,8 @@ export type ClientType = | "sessions_replication" | "logs" | "query" - | "admin"; + | "admin" + | "engine"; function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHouse { const parsed = new URL(url); @@ -285,6 +316,20 @@ function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHou maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS, clickhouseSettings: getLogsListClickhouseSettings(), }); + case "engine": + return new ClickHouse({ + url: parsed.toString(), + name, + keepAlive: { + enabled: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.RUN_ENGINE_CLICKHOUSE_LOG_LEVEL, + compression: { + request: env.RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST === "1", + }, + maxOpenConnections: env.RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS, + }); case "standard": case "query": case "admin": @@ -351,6 +396,8 @@ export class ClickhouseFactory { return defaultQueryClickhouseClient; case "admin": return defaultAdminClickhouseClient; + case "engine": + return defaultRunEngineClickhouseClient; } } diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index e97a1dc8ae7..7e96c5184b2 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -6,6 +6,7 @@ import { logger } from "~/services/logger.server"; import { defaultMachine, getCurrentPlan } from "~/services/platform.v3.server"; import { singleton } from "~/utils/singleton"; import { allMachines } from "./machinePresets.server"; +import { runEnginePendingVersionLookup } from "./runEnginePendingVersionLookup.server"; import { meter, tracer } from "./tracer.server"; export const engine = singleton("RunEngine", createRunEngine); @@ -131,6 +132,7 @@ function createRunEngine() { factor: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR, }, retryWarmStartThresholdMs: env.RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS, + pendingVersionRunIdLookup: runEnginePendingVersionLookup, billing: { getCurrentPlan: async (orgId: string) => { const plan = await getCurrentPlan(orgId); diff --git a/apps/webapp/app/v3/runEnginePendingVersionLookup.server.ts b/apps/webapp/app/v3/runEnginePendingVersionLookup.server.ts new file mode 100644 index 00000000000..407504fab72 --- /dev/null +++ b/apps/webapp/app/v3/runEnginePendingVersionLookup.server.ts @@ -0,0 +1,24 @@ +import { type PendingVersionRunIdLookup } from "@internal/run-engine"; +import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; +import { logger } from "~/services/logger.server"; +import { singleton } from "~/utils/singleton"; +import { ClickhousePendingVersionLookup } from "./services/clickhousePendingVersionLookup.server"; + +/** + * Lookup used by `@internal/run-engine`'s `PendingVersionSystem` to find + * `PENDING_VERSION` TaskRun ids via ClickHouse, removing the need for + * Postgres index #13 (`TaskRun_status_runtimeEnvironmentId_createdAt_id_idx`). + * + * Resolves the ClickHouse client per call via {@link clickhouseFactory} + * using the `"engine"` client type, configured by `RUN_ENGINE_CLICKHOUSE_*` + * env vars and routed per-organization for customers with HIPAA / data + * sovereignty data stores. + */ +export const runEnginePendingVersionLookup = singleton( + "runEnginePendingVersionLookup", + initializeRunEnginePendingVersionLookup +); + +function initializeRunEnginePendingVersionLookup(): PendingVersionRunIdLookup { + return new ClickhousePendingVersionLookup({ clickhouseFactory, logger }); +} diff --git a/apps/webapp/app/v3/services/clickhousePendingVersionLookup.server.ts b/apps/webapp/app/v3/services/clickhousePendingVersionLookup.server.ts new file mode 100644 index 00000000000..2cdde550e9c --- /dev/null +++ b/apps/webapp/app/v3/services/clickhousePendingVersionLookup.server.ts @@ -0,0 +1,97 @@ +import { + type PendingVersionRunIdLookup, + type PendingVersionRunIdLookupOptions, + type PendingVersionRunIdLookupResult, +} from "@internal/run-engine"; +import { Logger } from "@trigger.dev/core/logger"; +import type { ClickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server"; + +export type ClickhousePendingVersionLookupOptions = { + clickhouseFactory: ClickhouseFactory; + logger: Logger; +}; + +/** + * ClickHouse-backed lookup for `PENDING_VERSION` TaskRun ids. + * + * Resolves the ClickHouse client per call via the + * {@link ClickhouseFactory}, which honors per-organization data-store + * routing (HIPAA / data-sovereignty customers get their own instance, + * everyone else lands on the shared `engine` client configured by + * `RUN_ENGINE_CLICKHOUSE_*` env vars). + * + * Best-effort by design: replication lag against `task_runs_v2` can + * produce stale candidates. The run-engine consumer re-validates every + * id against Postgres by primary key with a `status = 'PENDING_VERSION'` + * guard before any mutation, so stale ids are dropped at the source of + * truth. On ClickHouse error we log and return an empty result; the + * pending-version re-enqueue tail loop retries on the next event. + */ +export class ClickhousePendingVersionLookup implements PendingVersionRunIdLookup { + readonly name = "clickhouse"; + + constructor(private readonly opts: ClickhousePendingVersionLookupOptions) {} + + async lookupPendingVersionRunIds( + options: PendingVersionRunIdLookupOptions + ): Promise { + // Empty IN-lists would be a no-op; bail before issuing the query. + if (options.taskIdentifiers.length === 0 || options.queues.length === 0) { + return { runIds: [] }; + } + + let clickhouse; + try { + clickhouse = await this.opts.clickhouseFactory.getClickhouseForOrganization( + options.organizationId, + "engine" + ); + } catch (error) { + // Factory resolution failures usually mean a real configuration + // problem (registry misload, missing data store, ClientType mismatch). + // These are not transient — log at error so ops sees them in dashboards + // and incident hooks. Query-level errors below stay at warn because + // those are expected to be transient. + this.opts.logger.error("ClickhousePendingVersionLookup factory resolution failed", { + error, + organizationId: options.organizationId, + }); + return { runIds: [] }; + } + + const builder = clickhouse.taskRuns + .pendingVersionIdsQueryBuilder() + // `organization_id` MUST be the leading filter — it is the leading + // sort-key column on `task_runs_v2` and the only thing that prunes + // granules cheaply on a multi-tenant table. + .where("organization_id = {organizationId: String}", { + organizationId: options.organizationId, + }) + .where("project_id = {projectId: String}", { projectId: options.projectId }) + .where("environment_id = {environmentId: String}", { + environmentId: options.environmentId, + }) + .where("status = 'PENDING_VERSION'") + .where("task_identifier IN {taskIdentifiers: Array(String)}", { + taskIdentifiers: options.taskIdentifiers, + }) + .where("queue IN {queues: Array(String)}", { queues: options.queues }) + .where("_is_deleted = 0") + .orderBy("created_at ASC") + .limit(options.limit); + + const [queryError, rows] = await builder.execute(); + + if (queryError) { + this.opts.logger.warn("ClickhousePendingVersionLookup query failed", { + error: queryError, + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }); + return { runIds: [] }; + } + + return { runIds: rows.map((row) => row.run_id) }; + } +} diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index 45f0fa485a7..17e48623f9f 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -13,6 +13,7 @@ import { getTaskUsageByOrganization, getTaskRunsCountQueryBuilder, getTaskRunTagsQueryBuilder, + getPendingVersionIdsQueryBuilder, } from "./taskRuns.js"; import { getSpanDetailsQueryBuilder, @@ -224,6 +225,7 @@ export class ClickHouse { queryBuilder: getTaskRunsQueryBuilder(this.reader), countQueryBuilder: getTaskRunsCountQueryBuilder(this.reader), tagQueryBuilder: getTaskRunTagsQueryBuilder(this.reader), + pendingVersionIdsQueryBuilder: getPendingVersionIdsQueryBuilder(this.reader), getTaskActivity: getTaskActivityQueryBuilder(this.reader), getCurrentRunningStats: getCurrentRunningStats(this.reader), getAverageDurations: getAverageDurations(this.reader), diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index f6427359772..40b2daac206 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -379,6 +379,25 @@ export function getTaskRunsQueryBuilder(ch: ClickhouseReader, settings?: ClickHo }); } +/** + * Lookup builder for the run-engine `PendingVersionSystem`. Returns just + * `run_id` from `task_runs_v2`. No `FINAL` — the run-engine re-validates + * each candidate against Postgres by primary key, so a stale + * `PENDING_VERSION` row from a not-yet-merged part is harmless and + * `FINAL` would be too expensive for this hot path. + */ +export function getPendingVersionIdsQueryBuilder( + ch: ClickhouseReader, + settings?: ClickHouseSettings +) { + return ch.queryBuilder({ + name: "getPendingVersionIds", + baseQuery: "SELECT run_id FROM trigger_dev.task_runs_v2", + schema: TaskRunV2QueryResult, + settings, + }); +} + export function getTaskRunsCountQueryBuilder(ch: ClickhouseReader, settings?: ClickHouseSettings) { return ch.queryBuilder({ name: "getTaskRunsCount", diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index e0af0f2c4ff..da42247111a 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -65,6 +65,7 @@ import { import { PendingVersionSystem } from "./systems/pendingVersionSystem.js"; import { RaceSimulationSystem } from "./systems/raceSimulationSystem.js"; import { RunAttemptSystem } from "./systems/runAttemptSystem.js"; +import { NoopPendingVersionRunIdLookup } from "./services/pendingVersionLookup.js"; import { SystemResources } from "./systems/systems.js"; import { TtlSystem } from "./systems/ttlSystem.js"; import { WaitpointSystem } from "./systems/waitpointSystem.js"; @@ -244,7 +245,8 @@ export class RunEngine { }, queueRunsPendingVersion: async ({ payload }) => { await this.pendingVersionSystem.enqueueRunsForBackgroundWorker( - payload.backgroundWorkerId + payload.backgroundWorkerId, + payload.attempt ); }, tryCompleteBatch: async ({ payload }) => { @@ -295,6 +297,8 @@ export class RunEngine { runLock: this.runLock, runQueue: this.runQueue, raceSimulationSystem: this.raceSimulationSystem, + pendingVersionRunIdLookup: + options.pendingVersionRunIdLookup ?? new NoopPendingVersionRunIdLookup(), }; this.executionSnapshotSystem = new ExecutionSnapshotSystem({ @@ -332,6 +336,9 @@ export class RunEngine { this.pendingVersionSystem = new PendingVersionSystem({ resources, enqueueSystem: this.enqueueSystem, + queueRunsPendingVersionBatchSize: options.queueRunsWaitingForWorkerBatchSize, + lagRetryDelayMs: options.pendingVersionLagRetryDelayMs, + lagMaxRetries: options.pendingVersionLagMaxRetries, }); this.waitpointSystem = new WaitpointSystem({ diff --git a/internal-packages/run-engine/src/engine/services/pendingVersionLookup.ts b/internal-packages/run-engine/src/engine/services/pendingVersionLookup.ts new file mode 100644 index 00000000000..62ce67f4d9b --- /dev/null +++ b/internal-packages/run-engine/src/engine/services/pendingVersionLookup.ts @@ -0,0 +1,48 @@ +/** + * Lookup interface for discovering TaskRun ids that are currently in the + * `PENDING_VERSION` status for a given background-worker filter. + * + * The default Postgres-backed implementation lives in the webapp and is + * injected via {@link SystemResources}. The run-engine package only owns + * the contract; concrete implementations are provided by the consumer. + * + * Best-effort by design: implementations may return stale ids (rows that + * have since transitioned) or omit recently-inserted rows (replication + * lag against an analytical store). The caller MUST re-validate every + * returned id against the source-of-truth database before mutating it. + */ +export type PendingVersionRunIdLookupOptions = { + organizationId: string; + projectId: string; + environmentId: string; + taskIdentifiers: string[]; + queues: string[]; + /** Maximum number of ids to return. Implementations must respect this cap. */ + limit: number; +}; + +export type PendingVersionRunIdLookupResult = { + runIds: string[]; +}; + +export interface PendingVersionRunIdLookup { + /** Stable identifier for logs and metrics, e.g. "clickhouse", "test-noop". */ + readonly name: string; + + lookupPendingVersionRunIds( + options: PendingVersionRunIdLookupOptions + ): Promise; +} + +/** + * Default lookup used when nothing is wired up (tests that don't exercise + * the lookup, or engine instances that don't run the pending-version + * resolver). Returns an empty result so the system no-ops cleanly. + */ +export class NoopPendingVersionRunIdLookup implements PendingVersionRunIdLookup { + readonly name = "noop"; + + async lookupPendingVersionRunIds(): Promise { + return { runIds: [] }; + } +} diff --git a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts index 9007cf86b2d..6d503012fbc 100644 --- a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts @@ -5,8 +5,26 @@ export type PendingVersionSystemOptions = { resources: SystemResources; enqueueSystem: EnqueueSystem; queueRunsPendingVersionBatchSize?: number; + /** + * How long to wait before retrying when the lookup returned zero + * candidates. Bounded by {@link lagMaxRetries}. Defaults to 5s. + * + * The ClickHouse-backed lookup can miss runs that were just inserted + * to Postgres due to replication lag. One bounded retry gives the + * pipeline time to catch up. + */ + lagRetryDelayMs?: number; + /** + * Maximum number of times to reschedule when the lookup returned zero + * candidates. Defaults to 1 — first attempt + one retry. Set to 0 to + * disable lag-aware retries entirely. + */ + lagMaxRetries?: number; }; +const DEFAULT_LAG_RETRY_DELAY_MS = 5_000; +const DEFAULT_LAG_MAX_RETRIES = 1; + export class PendingVersionSystem { private readonly $: SystemResources; private readonly enqueueSystem: EnqueueSystem; @@ -16,7 +34,7 @@ export class PendingVersionSystem { this.enqueueSystem = options.enqueueSystem; } - async enqueueRunsForBackgroundWorker(backgroundWorkerId: string) { + async enqueueRunsForBackgroundWorker(backgroundWorkerId: string, attempt: number = 0) { //It could be a lot of runs, so we will process them in a batch //if there are still more to process we will enqueue this function again const maxCount = this.options.queueRunsPendingVersionBatchSize ?? 200; @@ -44,37 +62,59 @@ export class PendingVersionSystem { return; } + const taskIdentifiers = backgroundWorker.tasks.map((task) => task.slug); + const queues = backgroundWorker.queues.map((queue) => queue.name); + this.$.logger.debug("Finding PENDING_VERSION runs for background worker", { workerId: backgroundWorker.id, - taskIdentifiers: backgroundWorker.tasks.map((task) => task.slug), - queues: backgroundWorker.queues.map((queue) => queue.name), + taskIdentifiers, + queues, }); - const pendingRuns = await this.$.readOnlyPrisma.taskRun.findMany({ - where: { - runtimeEnvironmentId: backgroundWorker.runtimeEnvironmentId, + // Step 1: ask the injected lookup (typically ClickHouse-backed) for + // candidate run ids. Best-effort — results may be stale or incomplete. + const { runIds: candidateIds } = await this.$.pendingVersionRunIdLookup + .lookupPendingVersionRunIds({ + organizationId: backgroundWorker.runtimeEnvironment.organizationId, projectId: backgroundWorker.projectId, + environmentId: backgroundWorker.runtimeEnvironmentId, + taskIdentifiers, + queues, + limit: maxCount + 1, + }); + + if (!candidateIds.length) { + await this.#maybeScheduleLagRetry(backgroundWorkerId, attempt, "lookup_empty"); + return; + } + + // Step 2: fetch the actual rows from the primary by id, filtered by + // `status: "PENDING_VERSION"` so any candidate whose status has moved + // is dropped. The planner uses the PK for `id IN (…)`; the status + // predicate is a residual filter and does NOT require the status + // index. + const pendingRuns = await this.$.prisma.taskRun.findMany({ + where: { + id: { in: candidateIds }, status: "PENDING_VERSION", - taskIdentifier: { - in: backgroundWorker.tasks.map((task) => task.slug), - }, - queue: { - in: backgroundWorker.queues.map((queue) => queue.name), - }, }, orderBy: { createdAt: "asc", }, - take: maxCount + 1, }); - //none to process - if (!pendingRuns.length) return; + if (!pendingRuns.length) { + // CH returned candidates but all of them have already moved past + // PENDING_VERSION (typically because a concurrent deploy or retry + // beat us to them). Don't reschedule — there's no work to wait for. + return; + } this.$.logger.debug("Enqueueing PENDING_VERSION runs for background worker", { workerId: backgroundWorker.id, - taskIdentifiers: pendingRuns.map((run) => run.taskIdentifier), - queues: pendingRuns.map((run) => run.queue), + lookupName: this.$.pendingVersionRunIdLookup.name, + candidateCount: candidateIds.length, + pendingRunCount: pendingRuns.length, runs: pendingRuns.map((run) => ({ id: run.id, taskIdentifier: run.taskIdentifier, @@ -85,15 +125,21 @@ export class PendingVersionSystem { }); for (const run of pendingRuns) { - await this.$.prisma.$transaction(async (tx) => { - const updatedRun = await tx.taskRun.update({ - where: { - id: run.id, - }, - data: { - status: "PENDING", - }, + const promoted = await this.$.prisma.$transaction(async (tx) => { + // Idempotency guard: only flips PENDING_VERSION → PENDING. If another + // worker already promoted this run between our findMany and the + // update, count is 0 and we skip the enqueue. + const updateResult = await tx.taskRun.updateMany({ + where: { id: run.id, status: "PENDING_VERSION" }, + data: { status: "PENDING" }, }); + + if (updateResult.count === 0) { + return false; + } + + const updatedRun = await tx.taskRun.findFirstOrThrow({ where: { id: run.id } }); + await this.enqueueSystem.enqueueRun({ run: updatedRun, env: backgroundWorker.runtimeEnvironment, @@ -104,8 +150,12 @@ export class PendingVersionSystem { // if it sits queued waiting on a concurrency slot. includeTtl: true, }); + + return true; }); + if (!promoted) continue; + this.$.eventBus.emit("runStatusChanged", { time: new Date(), run: { @@ -126,17 +176,57 @@ export class PendingVersionSystem { }); } - //enqueue more if needed - if (pendingRuns.length > maxCount) { + // Reschedule when the lookup returned a full-plus-one batch — that's + // the signal there are more candidates to drain. Use `candidateIds` + // (the raw lookup result) rather than `pendingRuns` (post-status-guard) + // because runs that already left PENDING_VERSION shouldn't suppress + // the next batch. + if (candidateIds.length > maxCount) { await this.scheduleResolvePendingVersionRuns(backgroundWorkerId); } } - async scheduleResolvePendingVersionRuns(backgroundWorkerId: string): Promise { + async scheduleResolvePendingVersionRuns( + backgroundWorkerId: string, + opts?: { attempt?: number; availableAt?: Date } + ): Promise { //we want this to happen in the background await this.$.worker.enqueue({ job: "queueRunsPendingVersion", - payload: { backgroundWorkerId }, + payload: { backgroundWorkerId, attempt: opts?.attempt }, + availableAt: opts?.availableAt, + }); + } + + /** + * Schedule one more lookup attempt when the first found zero candidates, + * to cover ClickHouse replication lag against `task_runs_v2`. Bounded by + * `lagMaxRetries` so we never loop indefinitely. + */ + async #maybeScheduleLagRetry( + backgroundWorkerId: string, + attempt: number, + reason: "lookup_empty" + ): Promise { + const maxRetries = this.options.lagMaxRetries ?? DEFAULT_LAG_MAX_RETRIES; + + if (attempt >= maxRetries) { + return; + } + + const delayMs = this.options.lagRetryDelayMs ?? DEFAULT_LAG_RETRY_DELAY_MS; + + this.$.logger.debug("Scheduling pending-version lag retry", { + backgroundWorkerId, + attempt: attempt + 1, + maxRetries, + delayMs, + reason, + }); + + await this.scheduleResolvePendingVersionRuns(backgroundWorkerId, { + attempt: attempt + 1, + availableAt: new Date(Date.now() + delayMs), }); } } diff --git a/internal-packages/run-engine/src/engine/systems/systems.ts b/internal-packages/run-engine/src/engine/systems/systems.ts index 909df712105..e21f95958d1 100644 --- a/internal-packages/run-engine/src/engine/systems/systems.ts +++ b/internal-packages/run-engine/src/engine/systems/systems.ts @@ -4,6 +4,7 @@ import { PrismaClient, PrismaReplicaClient } from "@trigger.dev/database"; import { RunQueue } from "../../run-queue/index.js"; import { EventBus } from "../eventBus.js"; import { RunLocker } from "../locking.js"; +import { PendingVersionRunIdLookup } from "../services/pendingVersionLookup.js"; import { EngineWorker } from "../types.js"; import { RaceSimulationSystem } from "./raceSimulationSystem.js"; @@ -18,4 +19,5 @@ export type SystemResources = { runLock: RunLocker; runQueue: RunQueue; raceSimulationSystem: RaceSimulationSystem; + pendingVersionRunIdLookup: PendingVersionRunIdLookup; }; diff --git a/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts b/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts index 38eaa00b213..cd56f10c84c 100644 --- a/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts +++ b/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts @@ -4,6 +4,7 @@ import { RunEngine } from "../index.js"; import { setTimeout } from "timers/promises"; import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js"; import { DequeuedMessage } from "@trigger.dev/core/v3"; +import { PostgresPendingVersionRunIdLookup } from "./postgresPendingVersionLookup.js"; vi.setConfig({ testTimeout: 60_000 }); @@ -44,6 +45,7 @@ describe("RunEngine pending version", () => { }, //set this so we have to requeue the runs in two batches queueRunsWaitingForWorkerBatchSize: 1, + pendingVersionRunIdLookup: new PostgresPendingVersionRunIdLookup(prisma), tracer: trace.getTracer("test", "0.0.0"), }); @@ -191,6 +193,7 @@ describe("RunEngine pending version", () => { }, //set this so we have to requeue the runs in two batches queueRunsWaitingForWorkerBatchSize: 1, + pendingVersionRunIdLookup: new PostgresPendingVersionRunIdLookup(prisma), tracer: trace.getTracer("test", "0.0.0"), logLevel: "debug", }); @@ -355,6 +358,7 @@ describe("RunEngine pending version", () => { }, baseCostInCents: 0.0001, }, + pendingVersionRunIdLookup: new PostgresPendingVersionRunIdLookup(prisma), tracer: trace.getTracer("test", "0.0.0"), }); diff --git a/internal-packages/run-engine/src/engine/tests/postgresPendingVersionLookup.ts b/internal-packages/run-engine/src/engine/tests/postgresPendingVersionLookup.ts new file mode 100644 index 00000000000..33239ea8176 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/postgresPendingVersionLookup.ts @@ -0,0 +1,43 @@ +import { PrismaClient } from "@trigger.dev/database"; +import type { + PendingVersionRunIdLookup, + PendingVersionRunIdLookupOptions, + PendingVersionRunIdLookupResult, +} from "../services/pendingVersionLookup.js"; + +/** + * Test-only Postgres-backed lookup. Performs the same query the system + * used to issue directly before the ClickHouse migration. Lets the + * existing pendingVersion tests keep exercising the end-to-end transition + * without spinning up a ClickHouse container. + * + * Not exported from the package — for in-package tests only. + */ +export class PostgresPendingVersionRunIdLookup implements PendingVersionRunIdLookup { + readonly name = "test-postgres"; + + constructor(private readonly prisma: PrismaClient) {} + + async lookupPendingVersionRunIds( + options: PendingVersionRunIdLookupOptions + ): Promise { + if (options.taskIdentifiers.length === 0 || options.queues.length === 0) { + return { runIds: [] }; + } + + const rows = await this.prisma.taskRun.findMany({ + where: { + runtimeEnvironmentId: options.environmentId, + projectId: options.projectId, + status: "PENDING_VERSION", + taskIdentifier: { in: options.taskIdentifiers }, + queue: { in: options.queues }, + }, + select: { id: true }, + orderBy: { createdAt: "asc" }, + take: options.limit, + }); + + return { runIds: rows.map((r) => r.id) }; + } +} diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 0b17262ba1c..e63b1c81f8b 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -19,6 +19,7 @@ import { LockRetryConfig } from "./locking.js"; import { workerCatalog } from "./workerCatalog.js"; import { type BillingPlan } from "./billingCache.js"; import type { DRRConfig } from "../batch-queue/types.js"; +import type { PendingVersionRunIdLookup } from "./services/pendingVersionLookup.js"; export type RunEngineOptions = { prisma: PrismaClient; @@ -178,6 +179,27 @@ export type RunEngineOptions = { factor?: number; }; queueRunsWaitingForWorkerBatchSize?: number; + /** + * Lookup used by {@link PendingVersionSystem} to discover candidate + * `PENDING_VERSION` run ids without scanning the Postgres status index. + * Defaults to a noop lookup that returns an empty result, which causes + * the pending-version resolver to short-circuit. Production callers + * should inject a ClickHouse-backed implementation. + */ + pendingVersionRunIdLookup?: PendingVersionRunIdLookup; + /** + * When the pending-version lookup returns zero candidates, schedule + * one more attempt after this delay to cover ClickHouse replication + * lag. Defaults to 5_000ms. + */ + pendingVersionLagRetryDelayMs?: number; + /** + * Maximum number of lag-aware retries. Each call beyond the first + * counts against this cap. Defaults to 1 — so a deploy will fire at + * most two queries spaced by `pendingVersionLagRetryDelayMs`. Set to 0 + * to disable lag-aware retries entirely. + */ + pendingVersionLagMaxRetries?: number; /** Optional maximum TTL for all runs (e.g. "14d"). If set, runs without an explicit TTL * will use this as their TTL, and runs with a TTL larger than this will be clamped. */ defaultMaxTtl?: string; diff --git a/internal-packages/run-engine/src/engine/workerCatalog.ts b/internal-packages/run-engine/src/engine/workerCatalog.ts index 2ed6f5076b2..de54c1ece00 100644 --- a/internal-packages/run-engine/src/engine/workerCatalog.ts +++ b/internal-packages/run-engine/src/engine/workerCatalog.ts @@ -41,6 +41,14 @@ export const workerCatalog = { queueRunsPendingVersion: { schema: z.object({ backgroundWorkerId: z.string(), + /** + * Bounded retry counter used by {@link PendingVersionSystem} to cover + * ClickHouse replication lag. The first scheduling has no attempt; + * if the lookup returns zero candidates, the system reschedules + * itself once with `attempt = 1`. Capped by + * `pendingVersionLagMaxRetries` on `RunEngineOptions`. + */ + attempt: z.number().int().nonnegative().optional(), }), visibilityTimeoutMs: 60_000, }, diff --git a/internal-packages/run-engine/src/index.ts b/internal-packages/run-engine/src/index.ts index 3f96045c13f..43ca7f177c6 100644 --- a/internal-packages/run-engine/src/index.ts +++ b/internal-packages/run-engine/src/index.ts @@ -6,6 +6,12 @@ export { } from "./engine/errors.js"; export type { EventBusEventArgs, EventBusEvents } from "./engine/eventBus.js"; export type { AuthenticatedEnvironment } from "./shared/index.js"; +export type { + PendingVersionRunIdLookup, + PendingVersionRunIdLookupOptions, + PendingVersionRunIdLookupResult, +} from "./engine/services/pendingVersionLookup.js"; +export { NoopPendingVersionRunIdLookup } from "./engine/services/pendingVersionLookup.js"; // Batch Queue exports export { BatchQueue, BatchCompletionTracker } from "./batch-queue/index.js";