Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/pending-version-clickhouse-lookup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

PendingVersionSystem now discovers PENDING_VERSION run ids via ClickHouse and re-validates each by primary key in Postgres, reducing read load on the TaskRun status index. Uses a dedicated `RUN_ENGINE_CLICKHOUSE_*` client so it doesn't contend with the main analytics pool.
15 changes: 15 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,21 @@ const EnvironmentSchema = z
EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
EVENTS_CLICKHOUSE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
EVENTS_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
// ClickHouse client used by @internal/run-engine's PendingVersionSystem.
// Kept on its own URL + pool so this low-QPS path can't contend with
// the main analytics client (CLICKHOUSE_URL). Falls back to the main
// URL when unset so unconfigured environments still work.
RUN_ENGINE_CLICKHOUSE_URL: z
.string()
.optional()
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(5),
RUN_ENGINE_CLICKHOUSE_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
METRICS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(10000),
Expand Down
49 changes: 48 additions & 1 deletion apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,36 @@ function initializeSessionsReplicationClickhouseClient(): ClickHouse {
});
}

/** Run-engine PendingVersionSystem lookup (`RUN_ENGINE_CLICKHOUSE_URL`);
* falls back to the default client if unset. */
const defaultRunEngineClickhouseClient = singleton(
"runEngineClickhouseClient",
initializeRunEngineClickhouseClient
);

function initializeRunEngineClickhouseClient(): ClickHouse {
if (!env.RUN_ENGINE_CLICKHOUSE_URL) {
return defaultClickhouseClient;
}

const url = new URL(env.RUN_ENGINE_CLICKHOUSE_URL);
url.searchParams.delete("secure");

return new ClickHouse({
url: url.toString(),
name: "run-engine-clickhouse",
keepAlive: {
enabled: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
idleSocketTtl: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
},
logLevel: env.RUN_ENGINE_CLICKHOUSE_LOG_LEVEL,
compression: {
request: env.RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST === "1",
},
maxOpenConnections: env.RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
});
}

/** Task events (`EVENTS_CLICKHOUSE_URL`); not exported — accessed via factory. */
const defaultEventsClickhouseClient = singleton(
"eventsClickhouseClient",
Expand Down Expand Up @@ -226,7 +256,8 @@ export type ClientType =
| "sessions_replication"
| "logs"
| "query"
| "admin";
| "admin"
| "engine";

function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHouse {
const parsed = new URL(url);
Expand Down Expand Up @@ -285,6 +316,20 @@ function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHou
maxOpenConnections: env.CLICKHOUSE_MAX_OPEN_CONNECTIONS,
clickhouseSettings: getLogsListClickhouseSettings(),
});
case "engine":
return new ClickHouse({
url: parsed.toString(),
name,
keepAlive: {
enabled: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
idleSocketTtl: env.RUN_ENGINE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
},
logLevel: env.RUN_ENGINE_CLICKHOUSE_LOG_LEVEL,
compression: {
request: env.RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST === "1",
},
maxOpenConnections: env.RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
});
case "standard":
case "query":
case "admin":
Expand Down Expand Up @@ -351,6 +396,8 @@ export class ClickhouseFactory {
return defaultQueryClickhouseClient;
case "admin":
return defaultAdminClickhouseClient;
case "engine":
return defaultRunEngineClickhouseClient;
}
}

Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { logger } from "~/services/logger.server";
import { defaultMachine, getCurrentPlan } from "~/services/platform.v3.server";
import { singleton } from "~/utils/singleton";
import { allMachines } from "./machinePresets.server";
import { runEnginePendingVersionLookup } from "./runEnginePendingVersionLookup.server";
import { meter, tracer } from "./tracer.server";

export const engine = singleton("RunEngine", createRunEngine);
Expand Down Expand Up @@ -131,6 +132,7 @@ function createRunEngine() {
factor: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR,
},
retryWarmStartThresholdMs: env.RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS,
pendingVersionRunIdLookup: runEnginePendingVersionLookup,
billing: {
getCurrentPlan: async (orgId: string) => {
const plan = await getCurrentPlan(orgId);
Expand Down
24 changes: 24 additions & 0 deletions apps/webapp/app/v3/runEnginePendingVersionLookup.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { type PendingVersionRunIdLookup } from "@internal/run-engine";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { ClickhousePendingVersionLookup } from "./services/clickhousePendingVersionLookup.server";

/**
* Lookup used by `@internal/run-engine`'s `PendingVersionSystem` to find
* `PENDING_VERSION` TaskRun ids via ClickHouse, removing the need for
* Postgres index #13 (`TaskRun_status_runtimeEnvironmentId_createdAt_id_idx`).
*
* Resolves the ClickHouse client per call via {@link clickhouseFactory}
* using the `"engine"` client type, configured by `RUN_ENGINE_CLICKHOUSE_*`
* env vars and routed per-organization for customers with HIPAA / data
* sovereignty data stores.
*/
export const runEnginePendingVersionLookup = singleton(
"runEnginePendingVersionLookup",
initializeRunEnginePendingVersionLookup
);

function initializeRunEnginePendingVersionLookup(): PendingVersionRunIdLookup {
return new ClickhousePendingVersionLookup({ clickhouseFactory, logger });
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import {
type PendingVersionRunIdLookup,
type PendingVersionRunIdLookupOptions,
type PendingVersionRunIdLookupResult,
} from "@internal/run-engine";
import { Logger } from "@trigger.dev/core/logger";
import type { ClickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";

export type ClickhousePendingVersionLookupOptions = {
clickhouseFactory: ClickhouseFactory;
logger: Logger;
};

/**
* ClickHouse-backed lookup for `PENDING_VERSION` TaskRun ids.
*
* Resolves the ClickHouse client per call via the
* {@link ClickhouseFactory}, which honors per-organization data-store
* routing (HIPAA / data-sovereignty customers get their own instance,
* everyone else lands on the shared `engine` client configured by
* `RUN_ENGINE_CLICKHOUSE_*` env vars).
*
* Best-effort by design: replication lag against `task_runs_v2` can
* produce stale candidates. The run-engine consumer re-validates every
* id against Postgres by primary key with a `status = 'PENDING_VERSION'`
* guard before any mutation, so stale ids are dropped at the source of
* truth. On ClickHouse error we log and return an empty result; the
* pending-version re-enqueue tail loop retries on the next event.
*/
export class ClickhousePendingVersionLookup implements PendingVersionRunIdLookup {
readonly name = "clickhouse";

constructor(private readonly opts: ClickhousePendingVersionLookupOptions) {}

async lookupPendingVersionRunIds(
options: PendingVersionRunIdLookupOptions
): Promise<PendingVersionRunIdLookupResult> {
// Empty IN-lists would be a no-op; bail before issuing the query.
if (options.taskIdentifiers.length === 0 || options.queues.length === 0) {
return { runIds: [] };
}

let clickhouse;
try {
clickhouse = await this.opts.clickhouseFactory.getClickhouseForOrganization(
options.organizationId,
"engine"
);
} catch (error) {
// Factory resolution failures usually mean a real configuration
// problem (registry misload, missing data store, ClientType mismatch).
// These are not transient — log at error so ops sees them in dashboards
// and incident hooks. Query-level errors below stay at warn because
// those are expected to be transient.
this.opts.logger.error("ClickhousePendingVersionLookup factory resolution failed", {
error,
organizationId: options.organizationId,
});
return { runIds: [] };
}

const builder = clickhouse.taskRuns
.pendingVersionIdsQueryBuilder()
// `organization_id` MUST be the leading filter — it is the leading
// sort-key column on `task_runs_v2` and the only thing that prunes
// granules cheaply on a multi-tenant table.
.where("organization_id = {organizationId: String}", {
organizationId: options.organizationId,
})
.where("project_id = {projectId: String}", { projectId: options.projectId })
.where("environment_id = {environmentId: String}", {
environmentId: options.environmentId,
})
.where("status = 'PENDING_VERSION'")
.where("task_identifier IN {taskIdentifiers: Array(String)}", {
taskIdentifiers: options.taskIdentifiers,
})
.where("queue IN {queues: Array(String)}", { queues: options.queues })
.where("_is_deleted = 0")
.orderBy("created_at ASC")
.limit(options.limit);

const [queryError, rows] = await builder.execute();

if (queryError) {
this.opts.logger.warn("ClickhousePendingVersionLookup query failed", {
error: queryError,
organizationId: options.organizationId,
projectId: options.projectId,
environmentId: options.environmentId,
});
return { runIds: [] };
}

return { runIds: rows.map((row) => row.run_id) };
}
}
2 changes: 2 additions & 0 deletions internal-packages/clickhouse/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
getTaskUsageByOrganization,
getTaskRunsCountQueryBuilder,
getTaskRunTagsQueryBuilder,
getPendingVersionIdsQueryBuilder,
} from "./taskRuns.js";
import {
getSpanDetailsQueryBuilder,
Expand Down Expand Up @@ -224,6 +225,7 @@ export class ClickHouse {
queryBuilder: getTaskRunsQueryBuilder(this.reader),
countQueryBuilder: getTaskRunsCountQueryBuilder(this.reader),
tagQueryBuilder: getTaskRunTagsQueryBuilder(this.reader),
pendingVersionIdsQueryBuilder: getPendingVersionIdsQueryBuilder(this.reader),
getTaskActivity: getTaskActivityQueryBuilder(this.reader),
getCurrentRunningStats: getCurrentRunningStats(this.reader),
getAverageDurations: getAverageDurations(this.reader),
Expand Down
19 changes: 19 additions & 0 deletions internal-packages/clickhouse/src/taskRuns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,25 @@ export function getTaskRunsQueryBuilder(ch: ClickhouseReader, settings?: ClickHo
});
}

/**
* Lookup builder for the run-engine `PendingVersionSystem`. Returns just
* `run_id` from `task_runs_v2`. No `FINAL` — the run-engine re-validates
* each candidate against Postgres by primary key, so a stale
* `PENDING_VERSION` row from a not-yet-merged part is harmless and
* `FINAL` would be too expensive for this hot path.
*/
export function getPendingVersionIdsQueryBuilder(
ch: ClickhouseReader,
settings?: ClickHouseSettings
) {
return ch.queryBuilder({
name: "getPendingVersionIds",
baseQuery: "SELECT run_id FROM trigger_dev.task_runs_v2",
schema: TaskRunV2QueryResult,
settings,
});
}

export function getTaskRunsCountQueryBuilder(ch: ClickhouseReader, settings?: ClickHouseSettings) {
return ch.queryBuilder({
name: "getTaskRunsCount",
Expand Down
9 changes: 8 additions & 1 deletion internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import {
import { PendingVersionSystem } from "./systems/pendingVersionSystem.js";
import { RaceSimulationSystem } from "./systems/raceSimulationSystem.js";
import { RunAttemptSystem } from "./systems/runAttemptSystem.js";
import { NoopPendingVersionRunIdLookup } from "./services/pendingVersionLookup.js";
import { SystemResources } from "./systems/systems.js";
import { TtlSystem } from "./systems/ttlSystem.js";
import { WaitpointSystem } from "./systems/waitpointSystem.js";
Expand Down Expand Up @@ -244,7 +245,8 @@ export class RunEngine {
},
queueRunsPendingVersion: async ({ payload }) => {
await this.pendingVersionSystem.enqueueRunsForBackgroundWorker(
payload.backgroundWorkerId
payload.backgroundWorkerId,
payload.attempt
);
},
tryCompleteBatch: async ({ payload }) => {
Expand Down Expand Up @@ -295,6 +297,8 @@ export class RunEngine {
runLock: this.runLock,
runQueue: this.runQueue,
raceSimulationSystem: this.raceSimulationSystem,
pendingVersionRunIdLookup:
options.pendingVersionRunIdLookup ?? new NoopPendingVersionRunIdLookup(),
};

this.executionSnapshotSystem = new ExecutionSnapshotSystem({
Expand Down Expand Up @@ -332,6 +336,9 @@ export class RunEngine {
this.pendingVersionSystem = new PendingVersionSystem({
resources,
enqueueSystem: this.enqueueSystem,
queueRunsPendingVersionBatchSize: options.queueRunsWaitingForWorkerBatchSize,
lagRetryDelayMs: options.pendingVersionLagRetryDelayMs,
lagMaxRetries: options.pendingVersionLagMaxRetries,
});
Comment thread
ericallam marked this conversation as resolved.

this.waitpointSystem = new WaitpointSystem({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Lookup interface for discovering TaskRun ids that are currently in the
* `PENDING_VERSION` status for a given background-worker filter.
*
* The default Postgres-backed implementation lives in the webapp and is
* injected via {@link SystemResources}. The run-engine package only owns
* the contract; concrete implementations are provided by the consumer.
*
* Best-effort by design: implementations may return stale ids (rows that
* have since transitioned) or omit recently-inserted rows (replication
* lag against an analytical store). The caller MUST re-validate every
* returned id against the source-of-truth database before mutating it.
*/
export type PendingVersionRunIdLookupOptions = {
organizationId: string;
projectId: string;
environmentId: string;
taskIdentifiers: string[];
queues: string[];
/** Maximum number of ids to return. Implementations must respect this cap. */
limit: number;
};

export type PendingVersionRunIdLookupResult = {
runIds: string[];
};

export interface PendingVersionRunIdLookup {
/** Stable identifier for logs and metrics, e.g. "clickhouse", "test-noop". */
readonly name: string;

lookupPendingVersionRunIds(
options: PendingVersionRunIdLookupOptions
): Promise<PendingVersionRunIdLookupResult>;
}

/**
* Default lookup used when nothing is wired up (tests that don't exercise
* the lookup, or engine instances that don't run the pending-version
* resolver). Returns an empty result so the system no-ops cleanly.
*/
export class NoopPendingVersionRunIdLookup implements PendingVersionRunIdLookup {
readonly name = "noop";

async lookupPendingVersionRunIds(): Promise<PendingVersionRunIdLookupResult> {
return { runIds: [] };
}
}
Loading