Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/cancel-dequeued-runs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Show the cancel button in the runs list for runs in `DEQUEUED` status. `DEQUEUED` was missing from `NON_FINAL_RUN_STATUSES` so the list hid the button even though the single run page allowed it.
1 change: 1 addition & 0 deletions apps/webapp/app/v3/taskStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export const NON_FINAL_RUN_STATUSES = [
"PENDING",
"PENDING_VERSION",
"WAITING_FOR_DEPLOY",
"DEQUEUED",
"EXECUTING",
"WAITING_TO_RESUME",
"RETRYING_AFTER_FAILURE",
Expand Down
127 changes: 127 additions & 0 deletions internal-packages/run-engine/src/engine/tests/cancelling.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,5 +322,132 @@ describe("RunEngine cancelling", () => {
}
});

containerTest("Cancelling a run (dequeued)", async ({ prisma, redisOptions }) => {
//create environment
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

const engine = new RunEngine({
prisma,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
queue: {
redis: redisOptions,
masterQueueConsumersDisabled: true,
processWorkerQueueDebounceMs: 50,
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0001,
},
tracer: trace.getTracer("test", "0.0.0"),
});

try {
const parentTask = "parent-task";

//create background worker
await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask]);

//trigger the run
const parentRun = await engine.trigger(
{
number: 1,
friendlyId: "run_p1234",
environment: authenticatedEnvironment,
taskIdentifier: parentTask,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "t12345",
spanId: "s12345",
workerQueue: "main",
queue: `task/${parentTask}`,
isTest: false,
tags: [],
},
prisma
);

//dequeue the run, but don't start an attempt — this leaves TaskRun.status = DEQUEUED
//and execution snapshot = PENDING_EXECUTING (a worker has claimed the run)
await setTimeout(500);
const dequeued = await engine.dequeueFromWorkerQueue({
consumerId: "test_12345",
workerQueue: "main",
});
expect(dequeued.length).toBe(1);

const dequeuedRun = await prisma.taskRun.findFirstOrThrow({
where: { id: parentRun.id },
});
expect(dequeuedRun.status).toBe("DEQUEUED");

//cancel the dequeued run — a worker has already claimed it, so the snapshot goes to
//PENDING_CANCEL pending the worker ack. TaskRun.status flips to CANCELED immediately
//so the UI reflects cancellation without waiting.
const result = await engine.cancelRun({
runId: parentRun.id,
completedAt: new Date(),
reason: "Cancelled by the user",
});
expect(result.snapshot.executionStatus).toBe("PENDING_CANCEL");

const pendingCancel = await engine.getRunExecutionData({ runId: parentRun.id });
expect(pendingCancel?.snapshot.executionStatus).toBe("PENDING_CANCEL");
expect(pendingCancel?.run.status).toBe("CANCELED");

let cancelledEventData: EventBusEventArgs<"runCancelled">[0][] = [];
engine.eventBus.on("runCancelled", (result) => {
cancelledEventData.push(result);
});

//simulate worker acknowledging the cancellation
const completeResult = await engine.completeRunAttempt({
runId: parentRun.id,
snapshotId: pendingCancel!.snapshot.id,
completion: {
ok: false,
id: parentRun.id,
error: {
type: "INTERNAL_ERROR" as const,
code: "TASK_RUN_CANCELLED" as const,
},
},
});
expect(completeResult.snapshot.executionStatus).toBe("FINISHED");
expect(completeResult.run.status).toBe("CANCELED");

//check emitted event after worker ack
expect(cancelledEventData.length).toBe(1);
const parentEvent = cancelledEventData.find((r) => r.run.id === parentRun.id);
assertNonNullable(parentEvent);
expect(parentEvent.run.spanId).toBe(parentRun.spanId);

//concurrency should have been released
const envConcurrencyCompleted = await engine.runQueue.currentConcurrencyOfEnvironment(
authenticatedEnvironment
);
expect(envConcurrencyCompleted).toBe(0);
} finally {
await engine.quit();
}
});

//todo bulk cancelling runs
});
Loading