diff --git a/internal-packages/run-engine/src/engine/statuses.ts b/internal-packages/run-engine/src/engine/statuses.ts index 5eb923fa3d..93a4428cac 100644 --- a/internal-packages/run-engine/src/engine/statuses.ts +++ b/internal-packages/run-engine/src/engine/statuses.ts @@ -31,6 +31,11 @@ export function isCheckpointable(status: TaskRunExecutionStatus): boolean { return checkpointableStatuses.includes(status); } +export function isFinishedOrPendingFinished(status: TaskRunExecutionStatus): boolean { + const finishedStatuses: TaskRunExecutionStatus[] = ["FINISHED", "PENDING_CANCEL"]; + return finishedStatuses.includes(status); +} + export function isFinalRunStatus(status: TaskRunStatus): boolean { const finalStatuses: TaskRunStatus[] = [ "CANCELED", diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 425ae8262d..9dfb9659fc 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -10,7 +10,7 @@ import { } from "@trigger.dev/database"; import { nanoid } from "nanoid"; import { sendNotificationToWorker } from "../eventBus.js"; -import { isExecuting } from "../statuses.js"; +import { isExecuting, isFinishedOrPendingFinished } from "../statuses.js"; import { EnqueueSystem } from "./enqueueSystem.js"; import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./executionSnapshotSystem.js"; import { SystemResources } from "./systems.js"; @@ -512,6 +512,14 @@ export class WaitpointSystem { await this.$.runLock.lock([runId], 5000, async () => { const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId); + if (isFinishedOrPendingFinished(snapshot.executionStatus)) { + this.$.logger.debug(`#continueRunIfUnblocked: run is finished, skipping`, { + runId, + snapshot, + }); + return; + } + //run is still executing, send a message to the worker if (isExecuting(snapshot.executionStatus)) { const result = await this.$.runQueue.reacquireConcurrency( @@ -573,6 +581,11 @@ export class WaitpointSystem { } else { if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) { // TODO: We're screwed, should probably fail the run immediately + this.$.logger.error(`#continueRunIfUnblocked: run has no checkpoint`, { + runId: run.id, + snapshot, + blockingWaitpoints, + }); throw new Error(`#continueRunIfUnblocked: run has no checkpoint: ${run.id}`); } diff --git a/internal-packages/run-engine/src/engine/workerCatalog.ts b/internal-packages/run-engine/src/engine/workerCatalog.ts index 92eddce19e..f3900d0f8f 100644 --- a/internal-packages/run-engine/src/engine/workerCatalog.ts +++ b/internal-packages/run-engine/src/engine/workerCatalog.ts @@ -6,20 +6,20 @@ export const workerCatalog = { waitpointId: z.string(), error: z.string().optional(), }), - visibilityTimeoutMs: 5000, + visibilityTimeoutMs: 30_000, }, heartbeatSnapshot: { schema: z.object({ runId: z.string(), snapshotId: z.string(), }), - visibilityTimeoutMs: 5000, + visibilityTimeoutMs: 30_000, }, expireRun: { schema: z.object({ runId: z.string(), }), - visibilityTimeoutMs: 5000, + visibilityTimeoutMs: 30_000, }, cancelRun: { schema: z.object({ @@ -27,30 +27,30 @@ export const workerCatalog = { completedAt: z.coerce.date(), reason: z.string().optional(), }), - visibilityTimeoutMs: 5000, + visibilityTimeoutMs: 30_000, }, queueRunsPendingVersion: { schema: z.object({ backgroundWorkerId: z.string(), }), - visibilityTimeoutMs: 5000, + visibilityTimeoutMs: 60_000, }, tryCompleteBatch: { schema: z.object({ batchId: z.string(), }), - visibilityTimeoutMs: 10_000, + visibilityTimeoutMs: 30_000, }, continueRunIfUnblocked: { schema: z.object({ runId: z.string(), }), - visibilityTimeoutMs: 10_000, + visibilityTimeoutMs: 30_000, }, enqueueDelayedRun: { schema: z.object({ runId: z.string(), }), - visibilityTimeoutMs: 10_000, + visibilityTimeoutMs: 30_000, }, };