From c0cdefd583e71dd7903497e3c5c3d8bd8d8b7072 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 25 Sep 2025 19:26:23 +0100 Subject: [PATCH 1/2] feat(server): add two admin endpoints for queue and environment concurrency debugging and repairing feat(run-engine): ability to repair runs in QUEUED, SUSPENDED, and FINISHED execution status --- ...nts.$environmentId.engine.repair-queues.ts | 105 +++++ ...vironments.$environmentId.engine.report.ts | 95 ++++ internal-packages/run-engine/package.json | 3 +- .../run-engine/src/engine/index.ts | 424 +++++++++++++++++- .../run-engine/src/engine/types.ts | 9 + .../run-engine/src/engine/workerCatalog.ts | 8 + .../run-engine/src/run-queue/index.ts | 128 ++++++ pnpm-lock.yaml | 3 + 8 files changed, 770 insertions(+), 5 deletions(-) create mode 100644 apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts create mode 100644 apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts diff --git a/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts new file mode 100644 index 0000000000..30d60197f9 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts @@ -0,0 +1,105 @@ +import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import pMap from "p-map"; +import { z } from "zod"; +import { $replica, prisma } from "~/db.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { determineEngineVersion } from "~/v3/engineVersion.server"; +import { engine } from "~/v3/runEngine.server"; + +const ParamsSchema = z.object({ + environmentId: z.string(), +}); + +const BodySchema = z.object({ + dryRun: z.boolean().default(true), + queues: z.array(z.string()).default([]), +}); + +export async function action({ request, params }: ActionFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + const parsedParams = ParamsSchema.parse(params); + + const environment = await prisma.runtimeEnvironment.findFirst({ + where: { + id: parsedParams.environmentId, + }, + include: { + organization: true, + project: true, + orgMember: true, + }, + }); + + if (!environment) { + return json({ error: "Environment not found" }, { status: 404 }); + } + + const engineVersion = await determineEngineVersion({ environment }); + + if (engineVersion === "V1") { + return json({ error: "Engine version is V1" }, { status: 400 }); + } + + const body = await request.json(); + const parsedBody = BodySchema.parse(body); + + const queues = await $replica.taskQueue.findMany({ + where: { + runtimeEnvironmentId: environment.id, + version: "V2", + name: parsedBody.queues.length > 0 ? { in: parsedBody.queues } : undefined, + }, + select: { + friendlyId: true, + name: true, + concurrencyLimit: true, + type: true, + paused: true, + }, + orderBy: { + orderableName: "asc", + }, + }); + + const repairEnvironmentResults = await engine.repairEnvironment(environment, parsedBody.dryRun); + + const repairResults = await pMap( + queues, + async (queue) => { + const repair = await engine.repairQueue( + environment, + queue.name, + parsedBody.dryRun, + repairEnvironmentResults.runIds + ); + + return { + queue: queue.name, + ...repair, + }; + }, + { concurrency: 5 } + ); + + return json({ environment: repairEnvironmentResults, queues: repairResults }); +} diff --git a/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts new file mode 100644 index 0000000000..3ea9576899 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts @@ -0,0 +1,95 @@ +import { json, LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { $replica, prisma } from "~/db.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { determineEngineVersion } from "~/v3/engineVersion.server"; +import { engine } from "~/v3/runEngine.server"; + +const ParamsSchema = z.object({ + environmentId: z.string(), +}); + +const SearchParamsSchema = z.object({ + verbose: z.string().default("0"), + page: z.coerce.number().optional(), + per_page: z.coerce.number().optional(), +}); + +export async function loader({ request, params }: LoaderFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + const parsedParams = ParamsSchema.parse(params); + + const environment = await prisma.runtimeEnvironment.findFirst({ + where: { + id: parsedParams.environmentId, + }, + include: { + organization: true, + project: true, + orgMember: true, + }, + }); + + if (!environment) { + return json({ error: "Environment not found" }, { status: 404 }); + } + + const engineVersion = await determineEngineVersion({ environment }); + + if (engineVersion === "V1") { + return json({ error: "Engine version is V1" }, { status: 400 }); + } + + const url = new URL(request.url); + const searchParams = SearchParamsSchema.parse(Object.fromEntries(url.searchParams)); + + const page = searchParams.page ?? 1; + const perPage = searchParams.per_page ?? 50; + + const queues = await $replica.taskQueue.findMany({ + where: { + runtimeEnvironmentId: environment.id, + version: "V2", + }, + select: { + friendlyId: true, + name: true, + concurrencyLimit: true, + type: true, + paused: true, + }, + orderBy: { + orderableName: "asc", + }, + skip: (page - 1) * perPage, + take: perPage, + }); + + const report = await engine.generateEnvironmentReport( + environment, + queues, + searchParams.verbose === "1" + ); + + return json(report); +} diff --git a/internal-packages/run-engine/package.json b/internal-packages/run-engine/package.json index a12abc73e2..680d385ca4 100644 --- a/internal-packages/run-engine/package.json +++ b/internal-packages/run-engine/package.json @@ -30,7 +30,8 @@ "nanoid": "3.3.8", "redlock": "5.0.0-beta.2", "seedrandom": "^3.0.5", - "zod": "3.25.76" + "zod": "3.25.76", + "p-map": "^6.0.0" }, "devDependencies": { "@internal/testcontainers": "workspace:*", diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index f9b3061e91..9911f03a49 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -30,7 +30,7 @@ import { EventEmitter } from "node:events"; import { FairQueueSelectionStrategy } from "../run-queue/fairQueueSelectionStrategy.js"; import { RunQueue } from "../run-queue/index.js"; import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js"; -import { MinimalAuthenticatedEnvironment } from "../shared/index.js"; +import { AuthenticatedEnvironment, MinimalAuthenticatedEnvironment } from "../shared/index.js"; import { BillingCache } from "./billingCache.js"; import { NotImplementedError, RunDuplicateIdempotencyKeyError } from "./errors.js"; import { EventBus, EventBusEvents } from "./eventBus.js"; @@ -53,8 +53,15 @@ import { RunAttemptSystem } from "./systems/runAttemptSystem.js"; import { SystemResources } from "./systems/systems.js"; import { TtlSystem } from "./systems/ttlSystem.js"; import { WaitpointSystem } from "./systems/waitpointSystem.js"; -import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js"; +import { + EngineWorker, + HeartbeatTimeouts, + ReportableQueue, + RunEngineOptions, + TriggerParams, +} from "./types.js"; import { workerCatalog } from "./workerCatalog.js"; +import pMap from "p-map"; export class RunEngine { private runLockRedis: Redis; @@ -64,6 +71,7 @@ export class RunEngine { private tracer: Tracer; private meter: Meter; private heartbeatTimeouts: HeartbeatTimeouts; + private repairSnapshotTimeoutMs: number; prisma: PrismaClient; readOnlyPrisma: PrismaReplicaClient; @@ -184,6 +192,9 @@ export class RunEngine { heartbeatSnapshot: async ({ payload }) => { await this.#handleStalledSnapshot(payload); }, + repairSnapshot: async ({ payload }) => { + await this.#handleRepairSnapshot(payload); + }, expireRun: async ({ payload }) => { await this.ttlSystem.expireRun({ runId: payload.runId }); }, @@ -234,6 +245,8 @@ export class RunEngine { ...(options.heartbeatTimeoutsMs ?? {}), }; + this.repairSnapshotTimeoutMs = options.repairSnapshotTimeoutMs ?? 60_000; + const resources: SystemResources = { prisma: this.prisma, worker: this.worker, @@ -1162,6 +1175,202 @@ export class RunEngine { } } + async repairEnvironment(environment: AuthenticatedEnvironment, dryRun: boolean) { + const runIds = await this.runQueue.getCurrentConcurrencyOfEnvironment(environment); + + return this.#repairRuns(runIds, dryRun); + } + + async repairQueue( + environment: AuthenticatedEnvironment, + queue: string, + dryRun: boolean, + ignoreRunIds: string[] + ) { + const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue); + + const runIdsToRepair = runIds.filter((runId) => !ignoreRunIds.includes(runId)); + + return this.#repairRuns(runIdsToRepair, dryRun); + } + + async #repairRuns(runIds: string[], dryRun: boolean) { + if (runIds.length === 0) { + return { + runIds, + repairs: [], + dryRun, + }; + } + + const repairs = await pMap( + runIds, + async (runId) => { + return this.#repairRun(runId, dryRun); + }, + { concurrency: 5 } + ); + + return { + runIds, + repairs, + dryRun, + }; + } + + async #repairRun(runId: string, dryRun: boolean) { + const snapshot = await getLatestExecutionSnapshot(this.prisma, runId); + + if ( + snapshot.executionStatus === "QUEUED" || + snapshot.executionStatus === "SUSPENDED" || + snapshot.executionStatus === "FINISHED" + ) { + if (!dryRun) { + // Schedule the repair job + await this.worker.enqueueOnce({ + id: `repair-in-progress-run:${runId}`, + job: "repairSnapshot", + payload: { runId, snapshotId: snapshot.id, executionStatus: snapshot.executionStatus }, + availableAt: new Date(Date.now() + this.repairSnapshotTimeoutMs), + }); + } + + return { + action: "repairSnapshot", + runId, + snapshotStatus: snapshot.executionStatus, + snapshotId: snapshot.id, + }; + } + + return { + action: "ignore", + runId, + snapshotStatus: snapshot.executionStatus, + snapshotId: snapshot.id, + }; + } + + async generateEnvironmentReport( + environment: AuthenticatedEnvironment, + queues: ReportableQueue[], + verbose: boolean + ) { + const [ + concurrencyLimit, // env limit (no burst) + concurrencyLimitWithBurstFactor, // env limit * burst + currentDequeued, + currentConcurrency, + burstFactor, + ] = await Promise.all([ + this.runQueue.getEnvConcurrencyLimit(environment), + this.runQueue.getEnvConcurrencyLimitWithBurstFactor(environment), + this.runQueue.currentConcurrencyOfEnvironment(environment), // "currentDequeued" in your label terminology + this.runQueue.operationalCurrentConcurrencyOfEnvironment(environment), + this.runQueue.getEnvConcurrencyBurstFactor(environment), + ]); + + const envMetrics = { + envCurrent: currentConcurrency, + envLimit: concurrencyLimit, + envLimitWithBurst: concurrencyLimitWithBurstFactor, + burstFactor, + }; + + const envAnalysis = analyzeEnvironment(envMetrics); + + const queueReports = await pMap( + queues, + async (queue) => { + return this.#generateReportForQueue(environment, queue, envMetrics, verbose); + }, + { concurrency: 5 } + ); + + return { + concurrencyLimit: { + value: concurrencyLimit, + key: verbose ? this.runQueue.keys.envConcurrencyLimitKey(environment) : undefined, + }, + concurrencyLimitWithBurstFactor: { + value: concurrencyLimitWithBurstFactor, + key: verbose + ? this.runQueue.keys.envConcurrencyLimitBurstFactorKey(environment) + : undefined, + }, + currentDequeued: { + value: currentDequeued, + key: verbose ? this.runQueue.keys.envCurrentDequeuedKey(environment) : undefined, + label: "Env current dequeued, this is what is displayed to the user", + }, + currentConcurrency: { + value: currentConcurrency, + key: verbose ? this.runQueue.keys.envCurrentConcurrencyKey(environment) : undefined, + label: + "Env current concurrency, this is what is used to determine if the environment can be dequeued from", + }, + analysis: envAnalysis, + queues: queueReports, + }; + } + + async #generateReportForQueue( + environment: AuthenticatedEnvironment, + queue: ReportableQueue, + envMetrics: EnvInputs, + verbose: boolean + ) { + const currentConcurrency = await this.runQueue.currentConcurrencyOfQueue( + environment, + queue.name + ); + const currentDequeued = await this.runQueue.currentDequeuedOfQueue(environment, queue.name); + const concurrencyLimit = await this.runQueue.getQueueConcurrencyLimit(environment, queue.name); + const messagesDueCount = await this.runQueue.lengthOfQueueAvailableMessages( + environment, + queue.name + ); + + const queueAnalysis = analyzeQueue({ + paused: queue.paused === true, + envLimit: envMetrics.envLimit, + envLimitWithBurst: envMetrics.envLimitWithBurst, + queueLimit: typeof concurrencyLimit === "number" ? concurrencyLimit : undefined, + queueCurrent: currentConcurrency, + envCurrent: envMetrics.envCurrent, + dueCount: messagesDueCount, + }); + + return { + name: queue.name, + friendlyId: queue.friendlyId, + type: queue.type, + paused: queue.paused, + dbConcurrencyLimit: queue.concurrencyLimit, + key: this.runQueue.keys.queueKey(environment, queue.name), + analysis: queueAnalysis, + concurrencyLimit: { + value: typeof concurrencyLimit === "number" ? concurrencyLimit : null, + key: verbose + ? this.runQueue.keys.queueConcurrencyLimitKey(environment, queue.name) + : undefined, + }, + currentConcurrency: { + value: currentConcurrency, + key: verbose + ? this.runQueue.keys.queueCurrentConcurrencyKey(environment, queue.name) + : undefined, + }, + currentDequeued: { + value: currentDequeued, + key: verbose + ? this.runQueue.keys.queueCurrentDequeuedKey(environment, queue.name) + : undefined, + }, + }; + } + async #handleStalledSnapshot({ runId, snapshotId, @@ -1435,14 +1644,126 @@ export class RunEngine { }); } + async #handleRepairSnapshot({ + runId, + snapshotId, + executionStatus, + }: { + runId: string; + snapshotId: string; + executionStatus: string; + }) { + return await this.runLock.lock("handleRepairSnapshot", [runId], async () => { + const latestSnapshot = await getLatestExecutionSnapshot(this.prisma, runId); + + if (latestSnapshot.id !== snapshotId) { + this.logger.log( + "RunEngine.handleRepairSnapshot no longer the latest snapshot, stopping the repair.", + { + runId, + snapshotId, + latestSnapshotExecutionStatus: latestSnapshot.executionStatus, + repairExecutionStatus: executionStatus, + } + ); + + return; + } + + // Okay, so this means we haven't transitioned to a new status yes, so we need to do something + switch (latestSnapshot.executionStatus) { + case "EXECUTING": + case "EXECUTING_WITH_WAITPOINTS": + case "FINISHED": + case "PENDING_CANCEL": + case "PENDING_EXECUTING": + case "QUEUED_EXECUTING": + case "RUN_CREATED": { + // Do nothing; + return; + } + case "QUEUED": { + this.logger.log("RunEngine.handleRepairSnapshot QUEUED", { + runId, + snapshotId, + }); + + //it will automatically be requeued X times depending on the queue retry settings + const gotRequeued = await this.runQueue.nackMessage({ + orgId: latestSnapshot.organizationId, + messageId: runId, + }); + + if (!gotRequeued) { + this.logger.error("RunEngine.handleRepairSnapshot QUEUED repair failed", { + runId, + snapshot: latestSnapshot, + }); + } else { + this.logger.log("RunEngine.handleRepairSnapshot QUEUED repair successful", { + runId, + snapshot: latestSnapshot, + }); + } + + break; + } + case "SUSPENDED": { + this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED", { + runId, + snapshotId, + }); + + const taskRun = await this.prisma.taskRun.findFirst({ + where: { id: runId }, + select: { + queue: true, + }, + }); + + if (!taskRun) { + this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED task run not found", { + runId, + snapshotId, + }); + return; + } + + // We need to clear this run from the current concurrency sets + await this.runQueue.clearMessageFromConcurrencySets({ + runId, + orgId: latestSnapshot.organizationId, + queue: taskRun.queue, + env: { + id: latestSnapshot.environmentId, + type: latestSnapshot.environmentType, + project: { + id: latestSnapshot.projectId, + }, + organization: { + id: latestSnapshot.organizationId, + }, + }, + }); + + break; + } + default: { + assertNever(latestSnapshot.executionStatus); + } + } + }); + } + async #concurrencySweeperCallback( - runIds: string[] + runIds: string[], + completedAtOffsetMs: number = 1000 * 60 * 10 ): Promise> { const runs = await this.readOnlyPrisma.taskRun.findMany({ where: { id: { in: runIds }, completedAt: { - lte: new Date(Date.now() - 1000 * 60 * 10), // This only finds runs that were completed more than 10 minutes ago + lte: new Date(Date.now() - completedAtOffsetMs), // This only finds runs that were completed more than 10 minutes ago }, organizationId: { not: null, @@ -1483,3 +1804,98 @@ export class RunEngine { this.billingCache.invalidate(orgId); } } + +type EnvInputs = { + envCurrent: number; + envLimit: number; + envLimitWithBurst: number; + burstFactor?: number; +}; + +function analyzeEnvironment(inputs: EnvInputs) { + const { envCurrent, envLimit, envLimitWithBurst, burstFactor } = inputs; + + const reasons: string[] = []; + const envAvailableCapacity = Math.max(0, envLimitWithBurst - envCurrent); + const canDequeue = envAvailableCapacity > 0; + + if (!canDequeue) { + reasons.push( + `Environment concurrency (${envCurrent}) has reached the limit with burst (${envLimitWithBurst}).` + ); + } + + return { + canDequeue, + reasons, + metrics: { + envAvailableCapacity, + }, + }; +} + +type QueueInputs = { + paused?: boolean; + envLimit: number; + envLimitWithBurst: number; + queueLimit?: number; // undefined => no explicit queue limit (Lua uses a huge default) + queueCurrent: number; + envCurrent: number; + dueCount?: number; // optional (if you implement countDueMessages) +}; + +function analyzeQueue(inputs: QueueInputs) { + const { paused, envLimit, envLimitWithBurst, queueLimit, queueCurrent, envCurrent, dueCount } = + inputs; + + const reasons: string[] = []; + + // Effective queue limit mirrors the Lua: min(queueLimit || 1_000_000, envLimit) + const queueLimitCapped = typeof queueLimit === "number" ? queueLimit : 1_000_000; + const effectiveQueueLimit = Math.min(queueLimitCapped, envLimit); + + const envAvailable = Math.max(0, envLimitWithBurst - envCurrent); + const queueAvailable = Math.max(0, effectiveQueueLimit - queueCurrent); + + // Mirror Lua's actualMaxCount = min(maxCount, envAvailable, queueAvailable). + // Here we only need to know if capacity exists at all (maxCount >= 1 assumed). + const hasCapacity = envAvailable > 0 && queueAvailable > 0; + + // High-signal reasons (ordered) + if (paused) { + reasons.push("Queue is paused."); + } + + if (envAvailable <= 0) { + reasons.push( + `Environment concurrency (${envCurrent}) has reached the limit with burst (${envLimitWithBurst}).` + ); + } + + if (queueAvailable <= 0) { + reasons.push( + `Queue concurrency (${queueCurrent}) has reached the effective queue limit (${effectiveQueueLimit}).` + ); + } + + // Optional visibility: no due messages (score > now or empty queue) + if (typeof dueCount === "number" && dueCount <= 0) { + reasons.push("No due messages in the queue (nothing scored ≤ now)."); + } + + // Final decision: + // - Not paused + // - Has capacity (both env and queue) + // - And (optionally) has work due + const canDequeue = !paused && hasCapacity && (typeof dueCount === "number" ? dueCount > 0 : true); + + return { + canDequeue, + reasons: canDequeue ? [] : reasons, + metrics: { + effectiveQueueLimit, + queueAvailableCapacity: queueAvailable, + messagesDueCount: typeof dueCount === "number" ? dueCount : null, + }, + }; +} diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 5125be560e..040cb3cd09 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -71,6 +71,7 @@ export type RunEngineOptions = { /** If not set then checkpoints won't ever be used */ retryWarmStartThresholdMs?: number; heartbeatTimeoutsMs?: Partial; + repairSnapshotTimeoutMs?: number; treatProductionExecutionStallsAsOOM?: boolean; suspendedHeartbeatRetriesConfig?: { maxCount?: number; @@ -150,3 +151,11 @@ export type TriggerParams = { }; export type EngineWorker = Worker; + +export type ReportableQueue = { + name: string; + concurrencyLimit: number | null; + type: string; + paused: boolean; + friendlyId: string; +}; diff --git a/internal-packages/run-engine/src/engine/workerCatalog.ts b/internal-packages/run-engine/src/engine/workerCatalog.ts index 81918ac119..2ed6f5076b 100644 --- a/internal-packages/run-engine/src/engine/workerCatalog.ts +++ b/internal-packages/run-engine/src/engine/workerCatalog.ts @@ -16,6 +16,14 @@ export const workerCatalog = { }), visibilityTimeoutMs: 30_000, }, + repairSnapshot: { + schema: z.object({ + runId: z.string(), + snapshotId: z.string(), + executionStatus: z.string(), + }), + visibilityTimeoutMs: 30_000, + }, expireRun: { schema: z.object({ runId: z.string(), diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 81cbb0379c..20185b74cc 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -40,6 +40,7 @@ import { OutputPayload, OutputPayloadV2, RunQueueKeyProducer, + RunQueueKeyProducerEnvironment, RunQueueSelectionStrategy, } from "./types.js"; import { WorkerQueueResolver } from "./workerQueueResolver.js"; @@ -355,6 +356,24 @@ export class RunQueue { return Math.floor(limit * burstFactor); } + public async getEnvConcurrencyBurstFactor(env: MinimalAuthenticatedEnvironment) { + const result = await this.redis.get(this.keys.envConcurrencyLimitBurstFactorKey(env)); + + const burstFactor = result + ? Number(result) + : this.options.defaultEnvConcurrencyBurstFactor ?? 1; + + return burstFactor; + } + + public async getCurrentConcurrencyOfEnvironment(env: MinimalAuthenticatedEnvironment) { + return this.redis.smembers(this.keys.envCurrentConcurrencyKey(env)); + } + + public async getCurrentConcurrencyOfQueue(env: MinimalAuthenticatedEnvironment, queue: string) { + return this.redis.smembers(this.keys.queueCurrentConcurrencyKey(env, queue)); + } + public async lengthOfQueue( env: MinimalAuthenticatedEnvironment, queue: string, @@ -363,6 +382,19 @@ export class RunQueue { return this.redis.zcard(this.keys.queueKey(env, queue, concurrencyKey)); } + public async lengthOfQueueAvailableMessages( + env: MinimalAuthenticatedEnvironment, + queue: string, + currentTime: Date = new Date(), + concurrencyKey?: string + ) { + return this.redis.zcount( + this.keys.queueKey(env, queue, concurrencyKey), + "-inf", + String(currentTime.getTime()) + ); + } + public async lengthOfEnvQueue(env: MinimalAuthenticatedEnvironment) { return this.redis.zcard(this.keys.envQueueKey(env)); } @@ -419,6 +451,14 @@ export class RunQueue { return this.redis.scard(this.keys.queueCurrentConcurrencyKey(env, queue, concurrencyKey)); } + public async currentDequeuedOfQueue( + env: MinimalAuthenticatedEnvironment, + queue: string, + concurrencyKey?: string + ) { + return this.redis.scard(this.keys.queueCurrentDequeuedKey(env, queue, concurrencyKey)); + } + public async currentConcurrencyOfQueues( env: MinimalAuthenticatedEnvironment, queues: string[] @@ -502,6 +542,15 @@ export class RunQueue { return this.redis.scard(this.keys.envCurrentDequeuedKey(env)); } + /** + * Get the operational current concurrency of the environment + * @param env - The environment to get the current concurrency of + * @returns The current concurrency of the environment + */ + public async operationalCurrentConcurrencyOfEnvironment(env: MinimalAuthenticatedEnvironment) { + return this.redis.scard(this.keys.envCurrentConcurrencyKey(env)); + } + public async messageExists(orgId: string, messageId: string) { return this.redis.exists(this.keys.messageKey(orgId, messageId)); } @@ -882,6 +931,15 @@ export class RunQueue { }); } + public async clearMessageFromConcurrencySets(params: { + runId: string; + orgId: string; + queue: string; + env: RunQueueKeyProducerEnvironment; + }) { + return this.#callClearMessageFromConcurrencySets(params); + } + async quit() { this.abortController.abort(); @@ -1750,6 +1808,45 @@ export class RunQueue { ); } + async #callClearMessageFromConcurrencySets({ + runId, + orgId, + queue, + env, + }: { + runId: string; + orgId: string; + queue: string; + env: RunQueueKeyProducerEnvironment; + }) { + const messageId = runId; + const messageKey = this.keys.messageKey(orgId, messageId); + const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKey(env, queue); + const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKey(env); + const queueCurrentDequeuedKey = this.keys.queueCurrentDequeuedKey(env, queue); + const envCurrentDequeuedKey = this.keys.envCurrentDequeuedKey(env); + + this.logger.debug("Calling clearMessageFromConcurrencySets", { + messageKey, + queue, + env, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + messageId, + service: this.name, + }); + + return this.redis.clearMessageFromConcurrencySets( + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + messageId + ); + } + async #callNackMessage({ message, retryAt }: { message: OutputPayload; retryAt?: number }) { const messageId = message.runId; const messageKey = this.keys.messageKey(message.orgId, message.runId); @@ -2587,6 +2684,26 @@ end return results `, }); + + this.redis.defineCommand("clearMessageFromConcurrencySets", { + numberOfKeys: 4, + lua: ` +-- Keys: +local queueCurrentConcurrencyKey = KEYS[1] +local envCurrentConcurrencyKey = KEYS[2] +local queueCurrentDequeuedKey = KEYS[3] +local envCurrentDequeuedKey = KEYS[4] + +-- Args: +local messageId = ARGV[1] + +-- Update the concurrency keys +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', queueCurrentDequeuedKey, messageId) +redis.call('SREM', envCurrentDequeuedKey, messageId) +`, + }); } } @@ -2671,6 +2788,17 @@ declare module "@internal/redis" { callback?: Callback ): Result; + clearMessageFromConcurrencySets( + // keys + queueCurrentConcurrencyKey: string, + envCurrentConcurrencyKey: string, + queueCurrentDequeuedKey: string, + envCurrentDequeuedKey: string, + // args + messageId: string, + callback?: Callback + ): Result; + nackMessage( // keys masterQueueKey: string, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6c7ca99a06..a7f137024e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1129,6 +1129,9 @@ importers: nanoid: specifier: 3.3.8 version: 3.3.8 + p-map: + specifier: ^6.0.0 + version: 6.0.0 redlock: specifier: 5.0.0-beta.2 version: 5.0.0-beta.2(patch_hash=rwyegdki7iserrd7fgjwxkhnlu) From a2bd982c27047d2ba65436b102973942826c88d9 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 26 Sep 2025 15:15:01 +0100 Subject: [PATCH 2/2] Handle FINISHED snapshot in the repair --- internal-packages/run-engine/src/engine/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index b3138462d3..0c38d1bf44 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1682,7 +1682,6 @@ export class RunEngine { switch (latestSnapshot.executionStatus) { case "EXECUTING": case "EXECUTING_WITH_WAITPOINTS": - case "FINISHED": case "PENDING_CANCEL": case "PENDING_EXECUTING": case "QUEUED_EXECUTING": @@ -1716,8 +1715,9 @@ export class RunEngine { break; } + case "FINISHED": case "SUSPENDED": { - this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED", { + this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED/FINISHED", { runId, snapshotId, }); @@ -1730,7 +1730,7 @@ export class RunEngine { }); if (!taskRun) { - this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED task run not found", { + this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED/FINISHED task run not found", { runId, snapshotId, });