Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/add-bulk-cancel-and-status-filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/cli": patch
---

Add bulk cancel (`workflow cancel --status=<status>`) and `--status` filter for `inspect runs`. Fix step I/O hydration in JSON output.
162 changes: 154 additions & 8 deletions packages/cli/src/commands/cancel.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
import { Args } from '@oclif/core';
import readline from 'node:readline';
import { Args, Flags } from '@oclif/core';
import { cancelRun } from '@workflow/core/runtime';
import { parseWorkflowName } from '@workflow/utils/parse-name';
import chalk from 'chalk';
import Table from 'easy-table';
import { BaseCommand } from '../base.js';
import { LOGGING_CONFIG } from '../lib/config/log.js';
import { LOGGING_CONFIG, logger } from '../lib/config/log.js';
import { cliFlags } from '../lib/inspect/flags.js';
import { setupCliWorld } from '../lib/inspect/setup.js';

export default class Cancel extends BaseCommand {
static description = 'Cancel a workflow';
static description =
'Cancel a workflow run, or bulk-cancel runs by status/name';

static aliases = ['c'];

static examples = ['$ workflow cancel <run-id>', '$ wf cancel <run-id>'];
static examples = [
'$ workflow cancel <run-id>',
'$ workflow cancel --status=running',
'$ workflow cancel --status=running --workflowName=myWorkflow',
'$ workflow cancel --status=running -y',
];

async catch(error: any) {
if (LOGGING_CONFIG.VERBOSE_MODE) {
Expand All @@ -21,12 +31,44 @@ export default class Cancel extends BaseCommand {

static args = {
runId: Args.string({
description: 'ID of the run to cancel.',
required: true,
description: 'ID of the run to cancel (omit for bulk mode with filters)',
required: false,
}),
} as const;

static flags = cliFlags;
static flags = {
...cliFlags,
status: Flags.string({
description: 'Filter runs by status for bulk cancel',
required: false,
options: ['running', 'completed', 'failed', 'cancelled', 'pending'],
helpGroup: 'Bulk Cancel',
helpLabel: '--status',
}),
workflowName: Flags.string({
description: 'Filter runs by workflow name for bulk cancel',
required: false,
char: 'n',
helpGroup: 'Bulk Cancel',
helpLabel: '-n, --workflowName',
}),
limit: Flags.integer({
description: 'Max runs to cancel in bulk mode',
required: false,
default: 50,
helpGroup: 'Bulk Cancel',
helpLabel: '--limit',
helpValue: 'NUMBER',
}),
confirm: Flags.boolean({
description: 'Skip interactive confirmation prompt',
required: false,
char: 'y',
default: false,
helpGroup: 'Bulk Cancel',
helpLabel: '-y, --confirm',
}),
};

public async run(): Promise<void> {
const { flags, args } = await this.parse(Cancel);
Expand All @@ -38,6 +80,110 @@ export default class Cancel extends BaseCommand {
);
}

await cancelRun(world, args.runId);
// Single-run cancel (existing behavior)
if (args.runId) {
await cancelRun(world, args.runId);
logger.log(chalk.green(`Cancelled run ${args.runId}`));
return;
}

// Bulk mode requires at least one filter
if (!flags.status && !flags.workflowName) {
logger.error(
'Provide a run ID or use --status/--workflowName to bulk cancel.\n' +
'Examples:\n' +
' workflow cancel <run-id>\n' +
' workflow cancel --status=running\n' +
' workflow cancel --status=running --workflowName=myWorkflow'
);
process.exit(1);
}

// Fetch matching runs
const runs = await world.runs.list({
status: flags.status as any,
workflowName: flags.workflowName,
pagination: { limit: flags.limit || 50 },
resolveData: 'none',
});

if (runs.data.length === 0) {
logger.warn('No matching runs found.');
return;
}

// Display what will be cancelled
const table = new Table();
for (const run of runs.data) {
const shortName =
parseWorkflowName(run.workflowName)?.shortName || run.workflowName;
table.cell('runId', run.runId);
table.cell('workflow', chalk.blueBright(shortName));
table.cell('status', run.status);
table.cell(
'startedAt',
run.startedAt ? new Date(run.startedAt).toISOString() : '-'
);
table.newRow();
}
logger.log(`\nFound ${chalk.bold(runs.data.length)} runs to cancel:\n`);
logger.log(table.toString());

if (runs.hasMore) {
logger.warn(
`More runs match these filters. Increase --limit (currently ${flags.limit || 50}) or re-run to cancel additional runs.`
);
}

// Confirm unless --confirm/-y
if (!flags.confirm) {
const confirmed = await promptConfirm(
`Cancel ${runs.data.length} run${runs.data.length === 1 ? '' : 's'}?`
);
if (!confirmed) {
logger.log('Aborted.');
return;
}
}

// Cancel each run with progress
let cancelled = 0;
let failed = 0;
for (const run of runs.data) {
try {
await cancelRun(world, run.runId);
cancelled++;
logger.log(
chalk.green(` ✓ ${run.runId}`) +
chalk.gray(` (${cancelled}/${runs.data.length})`)
);
} catch (err: any) {
failed++;
logger.warn(` ✗ ${run.runId}: ${err.message || String(err)}`);
}
}

logger.log(
`\nDone: ${chalk.green(`${cancelled} cancelled`)}${failed > 0 ? `, ${chalk.red(`${failed} failed`)}` : ''}`
);
}
}

async function promptConfirm(message: string): Promise<boolean> {
// Non-TTY: abort since user cannot confirm interactively (use -y/--confirm to skip prompt)
if (!process.stdin.isTTY) {
return false;
}

const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});

return new Promise<boolean>((resolve) => {
rl.question(`${message} [y/N] `, (answer) => {
rl.close();
resolve(answer.trim().toLowerCase() === 'y');
});
});
}
8 changes: 8 additions & 0 deletions packages/cli/src/commands/inspect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ export default class Inspect extends BaseCommand {
helpGroup: 'Filtering',
helpLabel: '-n, --workflowName',
}),
status: Flags.string({
description: 'filter runs by status (only for runs)',
required: false,
options: ['running', 'completed', 'failed', 'cancelled', 'pending'],
helpGroup: 'Filtering',
helpLabel: '--status',
}),
withData: Flags.boolean({
description: 'include full input/output data in list views',
required: false,
Expand Down Expand Up @@ -259,6 +266,7 @@ function toInspectOptions(flags: any): InspectCLIOptions {
sort: flags.sort as 'asc' | 'desc' | undefined,
limit: flags.limit,
workflowName: flags.workflowName,
status: flags.status,
withData: flags.withData,
decrypt: flags.decrypt,
backend: flags.backend,
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/lib/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export type InspectCLIOptions = {
sort?: 'asc' | 'desc';
limit?: number;
workflowName?: string;
status?: string;
withData?: boolean;
backend?: string;
disableRelativeDates?: boolean;
Expand Down
7 changes: 6 additions & 1 deletion packages/cli/src/lib/inspect/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
try {
const runs = await world.runs.list({
workflowName: opts.workflowName,
status: opts.status as any,
pagination: {
sortOrder: opts.sort || 'desc',
cursor: opts.cursor,
Expand Down Expand Up @@ -590,6 +591,7 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
try {
const runs = await world.runs.list({
workflowName: opts.workflowName,
status: opts.status as any,
pagination: {
sortOrder: opts.sort || 'desc',
cursor,
Expand Down Expand Up @@ -721,7 +723,10 @@ export const listSteps = async (
},
resolveData,
});
showJson(stepChunks.data);
const stepsWithHydratedIO = await Promise.all(
stepChunks.data.map((s) => hydrateResourceIO(s, resolveKey))
);
showJson(stepsWithHydratedIO);
return;
} catch (error) {
if (handleApiError(error, opts.backend)) {
Expand Down
Loading
Loading