-
-
Notifications
You must be signed in to change notification settings - Fork 837
feat(run-engine): ability to repair runs in QUEUED, SUSPENDED, and FINISHED execution status #2564
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…rrency debugging and repairing feat(run-engine): ability to repair runs in QUEUED, SUSPENDED, and FINISHED execution status
|
Warning Rate limit exceeded@ericallam has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 10 minutes and 58 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
Walkthrough
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60–75 minutes Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
66cafd2
to
c0cdefd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
internal-packages/run-engine/src/engine/workerCatalog.ts (1)
19-26
: Constrain executionStatus with z.enumUse an explicit enum for executionStatus to prevent invalid values and tighten validation.
repairSnapshot: { - schema: z.object({ - runId: z.string(), - snapshotId: z.string(), - executionStatus: z.string(), - }), + schema: z.object({ + runId: z.string(), + snapshotId: z.string(), + executionStatus: z.enum([ + "RUN_CREATED", + "QUEUED_EXECUTING", + "PENDING_EXECUTING", + "EXECUTING", + "EXECUTING_WITH_WAITPOINTS", + "SUSPENDED", + "PENDING_CANCEL", + "FINISHED", + "QUEUED", + ]), + }), visibilityTimeoutMs: 30_000, },internal-packages/run-engine/src/run-queue/index.ts (2)
934-941
: Public API to clear concurrency sets: good additionSurface area is minimal and delegates to the private call. Consider adding basic tracing attributes for observability (optional).
2693-2712
: Lua command definition is soundCommand signature matches the TS declaration and usage. Optionally return a count of removed entries for metrics.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
(1 hunks)internal-packages/run-engine/src/engine/index.ts
(5 hunks)internal-packages/run-engine/src/engine/types.ts
(1 hunks)internal-packages/run-engine/src/engine/workerCatalog.ts
(1 hunks)internal-packages/run-engine/src/run-queue/index.ts
(5 hunks)
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}
: Always prefer using isomorphic code like fetch, ReadableStream, etc. instead of Node.js specific code
For TypeScript, we usually use types over interfaces
Avoid enums
No default exports, use function declarations
Files:
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
internal-packages/run-engine/src/run-queue/index.ts
internal-packages/run-engine/src/engine/types.ts
internal-packages/run-engine/src/engine/workerCatalog.ts
internal-packages/run-engine/src/engine/index.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
We use zod a lot in packages/core and in the webapp
Files:
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
apps/webapp/**/*.{ts,tsx}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
When importing from @trigger.dev/core in the webapp, never import the root package path; always use one of the documented subpath exports from @trigger.dev/core’s package.json
Files:
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
{apps/webapp/app/**/*.server.{ts,tsx},apps/webapp/app/routes/**/*.ts}
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Access environment variables only via the env export from app/env.server.ts; do not reference process.env directly
Files:
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
apps/webapp/app/**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)
Modules intended for test consumption under apps/webapp/app/**/*.ts must not read environment variables; accept configuration via options instead
Files:
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts
🧬 Code graph analysis (2)
internal-packages/run-engine/src/run-queue/index.ts (3)
internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (2)
runId
(1462-1591)runId
(1849-1899)internal-packages/run-engine/src/run-queue/types.ts (1)
RunQueueKeyProducerEnvironment
(49-52)internal-packages/run-engine/src/run-queue/keyProducer.ts (5)
messageKey
(254-258)queueCurrentConcurrencyKey
(157-165)envCurrentConcurrencyKey
(199-217)queueCurrentDequeuedKey
(149-155)envCurrentDequeuedKey
(231-247)
internal-packages/run-engine/src/engine/index.ts (3)
internal-packages/run-engine/src/run-queue/index.ts (1)
runId
(1812-1849)internal-packages/run-engine/src/shared/index.ts (1)
AuthenticatedEnvironment
(4-6)internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1)
getLatestExecutionSnapshot
(95-113)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: typecheck / typecheck
🔇 Additional comments (11)
internal-packages/run-engine/src/engine/types.ts (1)
74-74
: Config option addition looks good; ensure it’s wired into defaults/docsThe optional
repairSnapshotTimeoutMs
aligns with the new repair workflow. Confirm it’s documented and defaulted as intended in the engine constructor.internal-packages/run-engine/src/run-queue/index.ts (3)
43-44
: Type import looks correctImporting RunQueueKeyProducerEnvironment improves API clarity for environment-scoped operations.
1812-1850
: Lua-backed clear operation is correctKey selection and SREM updates are consistent with your key schema. The log includes useful context.
Please confirm there’s an automated test covering the SUSPENDED-repair path calling this API (see engine #handleRepairSnapshot). If missing, I can help scaffold one.
2796-2806
: Commander interface extended correctlyThe new
clearMessageFromConcurrencySets
signature matches the Lua script and call sites.internal-packages/run-engine/src/engine/index.ts (6)
74-74
: Private timeout field added: OKMatches the new RunEngineOptions and used to schedule repair jobs.
195-197
: New repairSnapshot worker job: OKJob wiring matches workerCatalog and the private handler.
248-249
: Default timeout appliedReasonable default of 60s when not provided.
1183-1184
: Delegation to #repairRuns is cleanReturns unified shape for environment repairs.
1186-1197
: Queue repair with ignore list: good dedupeFilters pre-repaired runIds to avoid double work per queue.
Confirm the admin route always calls repairEnvironment before per-queue repair to ensure ignore lists are correct (current route does).
1199-1221
: Repair orchestration via pMap: OKBounded concurrency (5) is sensible.
apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts (1)
89-99
: Queue-level repair results shape: OKPassing environment repair runIds to ignore avoids redundant queue repairs; returning
{ queue, ...repair }
is a clear response shape.Consider including
friendlyId
alongsidequeue
for UX parity with other endpoints (optional).
async #handleRepairSnapshot({ | ||
runId, | ||
snapshotId, | ||
executionStatus, | ||
}: { | ||
runId: string; | ||
snapshotId: string; | ||
executionStatus: string; | ||
}) { | ||
return await this.runLock.lock("handleRepairSnapshot", [runId], async () => { | ||
const latestSnapshot = await getLatestExecutionSnapshot(this.prisma, runId); | ||
|
||
if (latestSnapshot.id !== snapshotId) { | ||
this.logger.log( | ||
"RunEngine.handleRepairSnapshot no longer the latest snapshot, stopping the repair.", | ||
{ | ||
runId, | ||
snapshotId, | ||
latestSnapshotExecutionStatus: latestSnapshot.executionStatus, | ||
repairExecutionStatus: executionStatus, | ||
} | ||
); | ||
|
||
return; | ||
} | ||
|
||
// Okay, so this means we haven't transitioned to a new status yes, so we need to do something | ||
switch (latestSnapshot.executionStatus) { | ||
case "EXECUTING": | ||
case "EXECUTING_WITH_WAITPOINTS": | ||
case "FINISHED": | ||
case "PENDING_CANCEL": | ||
case "PENDING_EXECUTING": | ||
case "QUEUED_EXECUTING": | ||
case "RUN_CREATED": { | ||
// Do nothing; | ||
return; | ||
} | ||
case "QUEUED": { | ||
this.logger.log("RunEngine.handleRepairSnapshot QUEUED", { | ||
runId, | ||
snapshotId, | ||
}); | ||
|
||
//it will automatically be requeued X times depending on the queue retry settings | ||
const gotRequeued = await this.runQueue.nackMessage({ | ||
orgId: latestSnapshot.organizationId, | ||
messageId: runId, | ||
}); | ||
|
||
if (!gotRequeued) { | ||
this.logger.error("RunEngine.handleRepairSnapshot QUEUED repair failed", { | ||
runId, | ||
snapshot: latestSnapshot, | ||
}); | ||
} else { | ||
this.logger.log("RunEngine.handleRepairSnapshot QUEUED repair successful", { | ||
runId, | ||
snapshot: latestSnapshot, | ||
}); | ||
} | ||
|
||
break; | ||
} | ||
case "SUSPENDED": { | ||
this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED", { | ||
runId, | ||
snapshotId, | ||
}); | ||
|
||
const taskRun = await this.prisma.taskRun.findFirst({ | ||
where: { id: runId }, | ||
select: { | ||
queue: true, | ||
}, | ||
}); | ||
|
||
if (!taskRun) { | ||
this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED task run not found", { | ||
runId, | ||
snapshotId, | ||
}); | ||
return; | ||
} | ||
|
||
// We need to clear this run from the current concurrency sets | ||
await this.runQueue.clearMessageFromConcurrencySets({ | ||
runId, | ||
orgId: latestSnapshot.organizationId, | ||
queue: taskRun.queue, | ||
env: { | ||
id: latestSnapshot.environmentId, | ||
type: latestSnapshot.environmentType, | ||
project: { | ||
id: latestSnapshot.projectId, | ||
}, | ||
organization: { | ||
id: latestSnapshot.organizationId, | ||
}, | ||
}, | ||
}); | ||
|
||
break; | ||
} | ||
default: { | ||
assertNever(latestSnapshot.executionStatus); | ||
} | ||
} | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Handle FINISHED runs: attempt ack or clear sets
Currently FINISHED is grouped under a “do nothing” branch. Add explicit FINISHED handling to ack the message if present, else clear concurrency sets, to actually repair stuck finished runs.
- switch (latestSnapshot.executionStatus) {
- case "EXECUTING":
- case "EXECUTING_WITH_WAITPOINTS":
- case "FINISHED":
+ switch (latestSnapshot.executionStatus) {
+ case "EXECUTING":
+ case "EXECUTING_WITH_WAITPOINTS":
case "PENDING_CANCEL":
case "PENDING_EXECUTING":
case "QUEUED_EXECUTING":
case "RUN_CREATED": {
// Do nothing;
return;
}
case "QUEUED": {
...
break;
}
+ case "FINISHED": {
+ this.logger.log("RunEngine.handleRepairSnapshot FINISHED", {
+ runId,
+ snapshotId,
+ });
+
+ // If the message still exists, ack to fully clean up queue and concurrency artifacts.
+ const hasMessage = await this.runQueue.messageExists(
+ latestSnapshot.organizationId,
+ runId
+ );
+ if (hasMessage) {
+ await this.runQueue.acknowledgeMessage(latestSnapshot.organizationId, runId, {
+ skipDequeueProcessing: true,
+ removeFromWorkerQueue: true,
+ });
+ break;
+ }
+
+ // Fallback: ensure concurrency sets are cleared if the message is gone.
+ const taskRun = await this.prisma.taskRun.findFirst({
+ where: { id: runId },
+ select: { queue: true },
+ });
+ if (!taskRun) {
+ this.logger.error("RunEngine.handleRepairSnapshot FINISHED task run not found", {
+ runId,
+ snapshotId,
+ });
+ return;
+ }
+ await this.runQueue.clearMessageFromConcurrencySets({
+ runId,
+ orgId: latestSnapshot.organizationId,
+ queue: taskRun.queue,
+ env: {
+ id: latestSnapshot.environmentId,
+ type: latestSnapshot.environmentType,
+ project: { id: latestSnapshot.projectId },
+ organization: { id: latestSnapshot.organizationId },
+ },
+ });
+ break;
+ }
case "SUSPENDED": {
...
break;
}
default: {
assertNever(latestSnapshot.executionStatus);
}
}
This makes FINISHED repair effective without relying on the periodic sweeper.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async #handleRepairSnapshot({ | |
runId, | |
snapshotId, | |
executionStatus, | |
}: { | |
runId: string; | |
snapshotId: string; | |
executionStatus: string; | |
}) { | |
return await this.runLock.lock("handleRepairSnapshot", [runId], async () => { | |
const latestSnapshot = await getLatestExecutionSnapshot(this.prisma, runId); | |
if (latestSnapshot.id !== snapshotId) { | |
this.logger.log( | |
"RunEngine.handleRepairSnapshot no longer the latest snapshot, stopping the repair.", | |
{ | |
runId, | |
snapshotId, | |
latestSnapshotExecutionStatus: latestSnapshot.executionStatus, | |
repairExecutionStatus: executionStatus, | |
} | |
); | |
return; | |
} | |
// Okay, so this means we haven't transitioned to a new status yes, so we need to do something | |
switch (latestSnapshot.executionStatus) { | |
case "EXECUTING": | |
case "EXECUTING_WITH_WAITPOINTS": | |
case "FINISHED": | |
case "PENDING_CANCEL": | |
case "PENDING_EXECUTING": | |
case "QUEUED_EXECUTING": | |
case "RUN_CREATED": { | |
// Do nothing; | |
return; | |
} | |
case "QUEUED": { | |
this.logger.log("RunEngine.handleRepairSnapshot QUEUED", { | |
runId, | |
snapshotId, | |
}); | |
//it will automatically be requeued X times depending on the queue retry settings | |
const gotRequeued = await this.runQueue.nackMessage({ | |
orgId: latestSnapshot.organizationId, | |
messageId: runId, | |
}); | |
if (!gotRequeued) { | |
this.logger.error("RunEngine.handleRepairSnapshot QUEUED repair failed", { | |
runId, | |
snapshot: latestSnapshot, | |
}); | |
} else { | |
this.logger.log("RunEngine.handleRepairSnapshot QUEUED repair successful", { | |
runId, | |
snapshot: latestSnapshot, | |
}); | |
} | |
break; | |
} | |
case "SUSPENDED": { | |
this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED", { | |
runId, | |
snapshotId, | |
}); | |
const taskRun = await this.prisma.taskRun.findFirst({ | |
where: { id: runId }, | |
select: { | |
queue: true, | |
}, | |
}); | |
if (!taskRun) { | |
this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED task run not found", { | |
runId, | |
snapshotId, | |
}); | |
return; | |
} | |
// We need to clear this run from the current concurrency sets | |
await this.runQueue.clearMessageFromConcurrencySets({ | |
runId, | |
orgId: latestSnapshot.organizationId, | |
queue: taskRun.queue, | |
env: { | |
id: latestSnapshot.environmentId, | |
type: latestSnapshot.environmentType, | |
project: { | |
id: latestSnapshot.projectId, | |
}, | |
organization: { | |
id: latestSnapshot.organizationId, | |
}, | |
}, | |
}); | |
break; | |
} | |
default: { | |
assertNever(latestSnapshot.executionStatus); | |
} | |
} | |
}); | |
} | |
// Okay, so this means we haven't transitioned to a new status yet, so we need to do something | |
switch (latestSnapshot.executionStatus) { | |
case "EXECUTING": | |
case "EXECUTING_WITH_WAITPOINTS": | |
case "PENDING_CANCEL": | |
case "PENDING_EXECUTING": | |
case "QUEUED_EXECUTING": | |
case "RUN_CREATED": { | |
// Do nothing; | |
return; | |
} | |
case "QUEUED": { | |
this.logger.log("RunEngine.handleRepairSnapshot QUEUED", { | |
runId, | |
snapshotId, | |
}); | |
//it will automatically be requeued X times depending on the queue retry settings | |
const gotRequeued = await this.runQueue.nackMessage({ | |
orgId: latestSnapshot.organizationId, | |
messageId: runId, | |
}); | |
if (!gotRequeued) { | |
this.logger.error("RunEngine.handleRepairSnapshot QUEUED repair failed", { | |
runId, | |
snapshot: latestSnapshot, | |
}); | |
} else { | |
this.logger.log("RunEngine.handleRepairSnapshot QUEUED repair successful", { | |
runId, | |
snapshot: latestSnapshot, | |
}); | |
} | |
break; | |
} | |
case "FINISHED": { | |
this.logger.log("RunEngine.handleRepairSnapshot FINISHED", { | |
runId, | |
snapshotId, | |
}); | |
// If the message still exists, ack to fully clean up queue and concurrency artifacts. | |
const hasMessage = await this.runQueue.messageExists( | |
latestSnapshot.organizationId, | |
runId | |
); | |
if (hasMessage) { | |
await this.runQueue.acknowledgeMessage(latestSnapshot.organizationId, runId, { | |
skipDequeueProcessing: true, | |
removeFromWorkerQueue: true, | |
}); | |
break; | |
} | |
// Fallback: ensure concurrency sets are cleared if the message is gone. | |
const taskRun = await this.prisma.taskRun.findFirst({ | |
where: { id: runId }, | |
select: { queue: true }, | |
}); | |
if (!taskRun) { | |
this.logger.error("RunEngine.handleRepairSnapshot FINISHED task run not found", { | |
runId, | |
snapshotId, | |
}); | |
return; | |
} | |
await this.runQueue.clearMessageFromConcurrencySets({ | |
runId, | |
orgId: latestSnapshot.organizationId, | |
queue: taskRun.queue, | |
env: { | |
id: latestSnapshot.environmentId, | |
type: latestSnapshot.environmentType, | |
project: { id: latestSnapshot.projectId }, | |
organization: { id: latestSnapshot.organizationId }, | |
}, | |
}); | |
break; | |
} | |
case "SUSPENDED": { | |
this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED", { | |
runId, | |
snapshotId, | |
}); | |
const taskRun = await this.prisma.taskRun.findFirst({ | |
where: { id: runId }, | |
select: { | |
queue: true, | |
}, | |
}); | |
if (!taskRun) { | |
this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED task run not found", { | |
runId, | |
snapshotId, | |
}); | |
return; | |
} | |
// We need to clear this run from the current concurrency sets | |
await this.runQueue.clearMessageFromConcurrencySets({ | |
runId, | |
orgId: latestSnapshot.organizationId, | |
queue: taskRun.queue, | |
env: { | |
id: latestSnapshot.environmentId, | |
type: latestSnapshot.environmentType, | |
project: { | |
id: latestSnapshot.projectId, | |
}, | |
organization: { | |
id: latestSnapshot.organizationId, | |
}, | |
}, | |
}); | |
break; | |
} | |
default: { | |
assertNever(latestSnapshot.executionStatus); | |
} | |
} |
🤖 Prompt for AI Agents
internal-packages/run-engine/src/engine/index.ts around lines 1655 to 1764: the
FINISHED case is currently lumped into the "do nothing" branch; implement
explicit FINISHED handling that first tries to ack the queue message (await
this.runQueue.ackMessage({ orgId: latestSnapshot.organizationId, messageId:
runId })) and if ack returns false fall back to clearing the run from
concurrency sets (fetch taskRun.queue like in SUSPENDED, log and return if not
found, then call this.runQueue.clearMessageFromConcurrencySets with the same env
shape used in SUSPENDED); log success/failure paths similarly to the
QUEUED/SUSPENDED branches.
…NISHED execution status (#2564) * feat(server): add two admin endpoints for queue and environment concurrency debugging and repairing feat(run-engine): ability to repair runs in QUEUED, SUSPENDED, and FINISHED execution status * Handle FINISHED snapshot in the repair
…NISHED execution status (#2564) * feat(server): add two admin endpoints for queue and environment concurrency debugging and repairing feat(run-engine): ability to repair runs in QUEUED, SUSPENDED, and FINISHED execution status * Handle FINISHED snapshot in the repair
No description provided.