Skip to content

Commit 94d96b8

Browse files
ericallamsamejr
authored andcommitted
feat(run-engine): ability to repair runs in QUEUED, SUSPENDED, and FINISHED execution status (#2564)
* feat(server): add two admin endpoints for queue and environment concurrency debugging and repairing feat(run-engine): ability to repair runs in QUEUED, SUSPENDED, and FINISHED execution status * Handle FINISHED snapshot in the repair
1 parent f23c3c1 commit 94d96b8

File tree

5 files changed

+264
-52
lines changed

5 files changed

+264
-52
lines changed

apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,17 @@ export async function action({ request, params }: ActionFunctionArgs) {
8686
const repairResults = await pMap(
8787
queues,
8888
async (queue) => {
89-
return engine.repairQueue(environment, queue.name, parsedBody.dryRun);
89+
const repair = await engine.repairQueue(
90+
environment,
91+
queue.name,
92+
parsedBody.dryRun,
93+
repairEnvironmentResults.runIds
94+
);
95+
96+
return {
97+
queue: queue.name,
98+
...repair,
99+
};
90100
},
91101
{ concurrency: 5 }
92102
);

internal-packages/run-engine/src/engine/index.ts

Lines changed: 164 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export class RunEngine {
7171
private tracer: Tracer;
7272
private meter: Meter;
7373
private heartbeatTimeouts: HeartbeatTimeouts;
74+
private repairSnapshotTimeoutMs: number;
7475

7576
prisma: PrismaClient;
7677
readOnlyPrisma: PrismaReplicaClient;
@@ -191,6 +192,9 @@ export class RunEngine {
191192
heartbeatSnapshot: async ({ payload }) => {
192193
await this.#handleStalledSnapshot(payload);
193194
},
195+
repairSnapshot: async ({ payload }) => {
196+
await this.#handleRepairSnapshot(payload);
197+
},
194198
expireRun: async ({ payload }) => {
195199
await this.ttlSystem.expireRun({ runId: payload.runId });
196200
},
@@ -241,6 +245,8 @@ export class RunEngine {
241245
...(options.heartbeatTimeoutsMs ?? {}),
242246
};
243247

248+
this.repairSnapshotTimeoutMs = options.repairSnapshotTimeoutMs ?? 60_000;
249+
244250
const resources: SystemResources = {
245251
prisma: this.prisma,
246252
worker: this.worker,
@@ -1174,81 +1180,77 @@ export class RunEngine {
11741180
async repairEnvironment(environment: AuthenticatedEnvironment, dryRun: boolean) {
11751181
const runIds = await this.runQueue.getCurrentConcurrencyOfEnvironment(environment);
11761182

1177-
const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000);
1183+
return this.#repairRuns(runIds, dryRun);
1184+
}
11781185

1179-
if (dryRun) {
1180-
return {
1181-
runIds,
1182-
completedRunIds: completedRuns.map((r) => r.id),
1183-
dryRun,
1184-
};
1185-
}
1186+
async repairQueue(
1187+
environment: AuthenticatedEnvironment,
1188+
queue: string,
1189+
dryRun: boolean,
1190+
ignoreRunIds: string[]
1191+
) {
1192+
const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue);
1193+
1194+
const runIdsToRepair = runIds.filter((runId) => !ignoreRunIds.includes(runId));
1195+
1196+
return this.#repairRuns(runIdsToRepair, dryRun);
1197+
}
11861198

1187-
if (completedRuns.length === 0) {
1199+
async #repairRuns(runIds: string[], dryRun: boolean) {
1200+
if (runIds.length === 0) {
11881201
return {
11891202
runIds,
1190-
completedRunIds: [],
1203+
repairs: [],
11911204
dryRun,
11921205
};
11931206
}
11941207

1195-
await pMap(
1196-
completedRuns,
1197-
async (run) => {
1198-
await this.runQueue.acknowledgeMessage(run.orgId, run.id, {
1199-
skipDequeueProcessing: true,
1200-
removeFromWorkerQueue: false,
1201-
});
1208+
const repairs = await pMap(
1209+
runIds,
1210+
async (runId) => {
1211+
return this.#repairRun(runId, dryRun);
12021212
},
12031213
{ concurrency: 5 }
12041214
);
12051215

12061216
return {
12071217
runIds,
1208-
completedRunIds: completedRuns.map((r) => r.id),
1218+
repairs,
12091219
dryRun,
12101220
};
12111221
}
12121222

1213-
async repairQueue(environment: AuthenticatedEnvironment, queue: string, dryRun: boolean) {
1214-
const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue);
1215-
1216-
const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000);
1217-
1218-
if (dryRun) {
1219-
return {
1220-
queue,
1221-
runIds,
1222-
completedRunIds: completedRuns.map((r) => r.id),
1223-
dryRun,
1224-
};
1225-
}
1223+
async #repairRun(runId: string, dryRun: boolean) {
1224+
const snapshot = await getLatestExecutionSnapshot(this.prisma, runId);
1225+
1226+
if (
1227+
snapshot.executionStatus === "QUEUED" ||
1228+
snapshot.executionStatus === "SUSPENDED" ||
1229+
snapshot.executionStatus === "FINISHED"
1230+
) {
1231+
if (!dryRun) {
1232+
// Schedule the repair job
1233+
await this.worker.enqueueOnce({
1234+
id: `repair-in-progress-run:${runId}`,
1235+
job: "repairSnapshot",
1236+
payload: { runId, snapshotId: snapshot.id, executionStatus: snapshot.executionStatus },
1237+
availableAt: new Date(Date.now() + this.repairSnapshotTimeoutMs),
1238+
});
1239+
}
12261240

1227-
if (completedRuns.length === 0) {
12281241
return {
1229-
queue,
1230-
runIds,
1231-
completedRunIds: [],
1232-
dryRun,
1242+
action: "repairSnapshot",
1243+
runId,
1244+
snapshotStatus: snapshot.executionStatus,
1245+
snapshotId: snapshot.id,
12331246
};
12341247
}
12351248

1236-
await pMap(
1237-
completedRuns,
1238-
async (run) => {
1239-
await this.runQueue.acknowledgeMessage(run.orgId, run.id, {
1240-
skipDequeueProcessing: true,
1241-
removeFromWorkerQueue: false,
1242-
});
1243-
},
1244-
{ concurrency: 5 }
1245-
);
1246-
12471249
return {
1248-
queue,
1249-
runIds,
1250-
completedRunIds: completedRuns.map((r) => r.id),
1251-
dryRun,
1250+
action: "ignore",
1251+
runId,
1252+
snapshotStatus: snapshot.executionStatus,
1253+
snapshotId: snapshot.id,
12521254
};
12531255
}
12541256

@@ -1650,6 +1652,117 @@ export class RunEngine {
16501652
});
16511653
}
16521654

1655+
async #handleRepairSnapshot({
1656+
runId,
1657+
snapshotId,
1658+
executionStatus,
1659+
}: {
1660+
runId: string;
1661+
snapshotId: string;
1662+
executionStatus: string;
1663+
}) {
1664+
return await this.runLock.lock("handleRepairSnapshot", [runId], async () => {
1665+
const latestSnapshot = await getLatestExecutionSnapshot(this.prisma, runId);
1666+
1667+
if (latestSnapshot.id !== snapshotId) {
1668+
this.logger.log(
1669+
"RunEngine.handleRepairSnapshot no longer the latest snapshot, stopping the repair.",
1670+
{
1671+
runId,
1672+
snapshotId,
1673+
latestSnapshotExecutionStatus: latestSnapshot.executionStatus,
1674+
repairExecutionStatus: executionStatus,
1675+
}
1676+
);
1677+
1678+
return;
1679+
}
1680+
1681+
// Okay, so this means we haven't transitioned to a new status yes, so we need to do something
1682+
switch (latestSnapshot.executionStatus) {
1683+
case "EXECUTING":
1684+
case "EXECUTING_WITH_WAITPOINTS":
1685+
case "PENDING_CANCEL":
1686+
case "PENDING_EXECUTING":
1687+
case "QUEUED_EXECUTING":
1688+
case "RUN_CREATED": {
1689+
// Do nothing;
1690+
return;
1691+
}
1692+
case "QUEUED": {
1693+
this.logger.log("RunEngine.handleRepairSnapshot QUEUED", {
1694+
runId,
1695+
snapshotId,
1696+
});
1697+
1698+
//it will automatically be requeued X times depending on the queue retry settings
1699+
const gotRequeued = await this.runQueue.nackMessage({
1700+
orgId: latestSnapshot.organizationId,
1701+
messageId: runId,
1702+
});
1703+
1704+
if (!gotRequeued) {
1705+
this.logger.error("RunEngine.handleRepairSnapshot QUEUED repair failed", {
1706+
runId,
1707+
snapshot: latestSnapshot,
1708+
});
1709+
} else {
1710+
this.logger.log("RunEngine.handleRepairSnapshot QUEUED repair successful", {
1711+
runId,
1712+
snapshot: latestSnapshot,
1713+
});
1714+
}
1715+
1716+
break;
1717+
}
1718+
case "FINISHED":
1719+
case "SUSPENDED": {
1720+
this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED/FINISHED", {
1721+
runId,
1722+
snapshotId,
1723+
});
1724+
1725+
const taskRun = await this.prisma.taskRun.findFirst({
1726+
where: { id: runId },
1727+
select: {
1728+
queue: true,
1729+
},
1730+
});
1731+
1732+
if (!taskRun) {
1733+
this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED/FINISHED task run not found", {
1734+
runId,
1735+
snapshotId,
1736+
});
1737+
return;
1738+
}
1739+
1740+
// We need to clear this run from the current concurrency sets
1741+
await this.runQueue.clearMessageFromConcurrencySets({
1742+
runId,
1743+
orgId: latestSnapshot.organizationId,
1744+
queue: taskRun.queue,
1745+
env: {
1746+
id: latestSnapshot.environmentId,
1747+
type: latestSnapshot.environmentType,
1748+
project: {
1749+
id: latestSnapshot.projectId,
1750+
},
1751+
organization: {
1752+
id: latestSnapshot.organizationId,
1753+
},
1754+
},
1755+
});
1756+
1757+
break;
1758+
}
1759+
default: {
1760+
assertNever(latestSnapshot.executionStatus);
1761+
}
1762+
}
1763+
});
1764+
}
1765+
16531766
async #concurrencySweeperCallback(
16541767
runIds: string[],
16551768
completedAtOffsetMs: number = 1000 * 60 * 10

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export type RunEngineOptions = {
7171
/** If not set then checkpoints won't ever be used */
7272
retryWarmStartThresholdMs?: number;
7373
heartbeatTimeoutsMs?: Partial<HeartbeatTimeouts>;
74+
repairSnapshotTimeoutMs?: number;
7475
treatProductionExecutionStallsAsOOM?: boolean;
7576
suspendedHeartbeatRetriesConfig?: {
7677
maxCount?: number;

internal-packages/run-engine/src/engine/workerCatalog.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ export const workerCatalog = {
1616
}),
1717
visibilityTimeoutMs: 30_000,
1818
},
19+
repairSnapshot: {
20+
schema: z.object({
21+
runId: z.string(),
22+
snapshotId: z.string(),
23+
executionStatus: z.string(),
24+
}),
25+
visibilityTimeoutMs: 30_000,
26+
},
1927
expireRun: {
2028
schema: z.object({
2129
runId: z.string(),

0 commit comments

Comments
 (0)