diff --git a/.changeset/add-bulk-cancel-and-status-filter.md b/.changeset/add-bulk-cancel-and-status-filter.md new file mode 100644 index 0000000000..be65c4d7c5 --- /dev/null +++ b/.changeset/add-bulk-cancel-and-status-filter.md @@ -0,0 +1,5 @@ +--- +"@workflow/cli": patch +--- + +Add bulk cancel (`workflow cancel --status=`) and `--status` filter for `inspect runs`. Fix step I/O hydration in JSON output. diff --git a/packages/cli/src/commands/cancel.ts b/packages/cli/src/commands/cancel.ts index d6494da00f..210ef2f257 100644 --- a/packages/cli/src/commands/cancel.ts +++ b/packages/cli/src/commands/cancel.ts @@ -1,16 +1,26 @@ -import { Args } from '@oclif/core'; +import readline from 'node:readline'; +import { Args, Flags } from '@oclif/core'; import { cancelRun } from '@workflow/core/runtime'; +import { parseWorkflowName } from '@workflow/utils/parse-name'; +import chalk from 'chalk'; +import Table from 'easy-table'; import { BaseCommand } from '../base.js'; -import { LOGGING_CONFIG } from '../lib/config/log.js'; +import { LOGGING_CONFIG, logger } from '../lib/config/log.js'; import { cliFlags } from '../lib/inspect/flags.js'; import { setupCliWorld } from '../lib/inspect/setup.js'; export default class Cancel extends BaseCommand { - static description = 'Cancel a workflow'; + static description = + 'Cancel a workflow run, or bulk-cancel runs by status/name'; static aliases = ['c']; - static examples = ['$ workflow cancel ', '$ wf cancel ']; + static examples = [ + '$ workflow cancel ', + '$ workflow cancel --status=running', + '$ workflow cancel --status=running --workflowName=myWorkflow', + '$ workflow cancel --status=running -y', + ]; async catch(error: any) { if (LOGGING_CONFIG.VERBOSE_MODE) { @@ -21,12 +31,44 @@ export default class Cancel extends BaseCommand { static args = { runId: Args.string({ - description: 'ID of the run to cancel.', - required: true, + description: 'ID of the run to cancel (omit for bulk mode with filters)', + required: false, }), } as const; - static flags = cliFlags; + static flags = { + ...cliFlags, + status: Flags.string({ + description: 'Filter runs by status for bulk cancel', + required: false, + options: ['running', 'completed', 'failed', 'cancelled', 'pending'], + helpGroup: 'Bulk Cancel', + helpLabel: '--status', + }), + workflowName: Flags.string({ + description: 'Filter runs by workflow name for bulk cancel', + required: false, + char: 'n', + helpGroup: 'Bulk Cancel', + helpLabel: '-n, --workflowName', + }), + limit: Flags.integer({ + description: 'Max runs to cancel in bulk mode', + required: false, + default: 50, + helpGroup: 'Bulk Cancel', + helpLabel: '--limit', + helpValue: 'NUMBER', + }), + confirm: Flags.boolean({ + description: 'Skip interactive confirmation prompt', + required: false, + char: 'y', + default: false, + helpGroup: 'Bulk Cancel', + helpLabel: '-y, --confirm', + }), + }; public async run(): Promise { const { flags, args } = await this.parse(Cancel); @@ -38,6 +80,110 @@ export default class Cancel extends BaseCommand { ); } - await cancelRun(world, args.runId); + // Single-run cancel (existing behavior) + if (args.runId) { + await cancelRun(world, args.runId); + logger.log(chalk.green(`Cancelled run ${args.runId}`)); + return; + } + + // Bulk mode requires at least one filter + if (!flags.status && !flags.workflowName) { + logger.error( + 'Provide a run ID or use --status/--workflowName to bulk cancel.\n' + + 'Examples:\n' + + ' workflow cancel \n' + + ' workflow cancel --status=running\n' + + ' workflow cancel --status=running --workflowName=myWorkflow' + ); + process.exit(1); + } + + // Fetch matching runs + const runs = await world.runs.list({ + status: flags.status as any, + workflowName: flags.workflowName, + pagination: { limit: flags.limit || 50 }, + resolveData: 'none', + }); + + if (runs.data.length === 0) { + logger.warn('No matching runs found.'); + return; + } + + // Display what will be cancelled + const table = new Table(); + for (const run of runs.data) { + const shortName = + parseWorkflowName(run.workflowName)?.shortName || run.workflowName; + table.cell('runId', run.runId); + table.cell('workflow', chalk.blueBright(shortName)); + table.cell('status', run.status); + table.cell( + 'startedAt', + run.startedAt ? new Date(run.startedAt).toISOString() : '-' + ); + table.newRow(); + } + logger.log(`\nFound ${chalk.bold(runs.data.length)} runs to cancel:\n`); + logger.log(table.toString()); + + if (runs.hasMore) { + logger.warn( + `More runs match these filters. Increase --limit (currently ${flags.limit || 50}) or re-run to cancel additional runs.` + ); + } + + // Confirm unless --confirm/-y + if (!flags.confirm) { + const confirmed = await promptConfirm( + `Cancel ${runs.data.length} run${runs.data.length === 1 ? '' : 's'}?` + ); + if (!confirmed) { + logger.log('Aborted.'); + return; + } + } + + // Cancel each run with progress + let cancelled = 0; + let failed = 0; + for (const run of runs.data) { + try { + await cancelRun(world, run.runId); + cancelled++; + logger.log( + chalk.green(` ✓ ${run.runId}`) + + chalk.gray(` (${cancelled}/${runs.data.length})`) + ); + } catch (err: any) { + failed++; + logger.warn(` ✗ ${run.runId}: ${err.message || String(err)}`); + } + } + + logger.log( + `\nDone: ${chalk.green(`${cancelled} cancelled`)}${failed > 0 ? `, ${chalk.red(`${failed} failed`)}` : ''}` + ); } } + +async function promptConfirm(message: string): Promise { + // Non-TTY: abort since user cannot confirm interactively (use -y/--confirm to skip prompt) + if (!process.stdin.isTTY) { + return false; + } + + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + }); + + return new Promise((resolve) => { + rl.question(`${message} [y/N] `, (answer) => { + rl.close(); + resolve(answer.trim().toLowerCase() === 'y'); + }); + }); +} diff --git a/packages/cli/src/commands/inspect.ts b/packages/cli/src/commands/inspect.ts index db5d2eff3f..fd555a6c98 100644 --- a/packages/cli/src/commands/inspect.ts +++ b/packages/cli/src/commands/inspect.ts @@ -116,6 +116,13 @@ export default class Inspect extends BaseCommand { helpGroup: 'Filtering', helpLabel: '-n, --workflowName', }), + status: Flags.string({ + description: 'filter runs by status (only for runs)', + required: false, + options: ['running', 'completed', 'failed', 'cancelled', 'pending'], + helpGroup: 'Filtering', + helpLabel: '--status', + }), withData: Flags.boolean({ description: 'include full input/output data in list views', required: false, @@ -259,6 +266,7 @@ function toInspectOptions(flags: any): InspectCLIOptions { sort: flags.sort as 'asc' | 'desc' | undefined, limit: flags.limit, workflowName: flags.workflowName, + status: flags.status, withData: flags.withData, decrypt: flags.decrypt, backend: flags.backend, diff --git a/packages/cli/src/lib/config/types.ts b/packages/cli/src/lib/config/types.ts index 89f87a481b..1885bf3167 100644 --- a/packages/cli/src/lib/config/types.ts +++ b/packages/cli/src/lib/config/types.ts @@ -18,6 +18,7 @@ export type InspectCLIOptions = { sort?: 'asc' | 'desc'; limit?: number; workflowName?: string; + status?: string; withData?: boolean; backend?: string; disableRelativeDates?: boolean; diff --git a/packages/cli/src/lib/inspect/output.ts b/packages/cli/src/lib/inspect/output.ts index e0d7500143..d11ee3587d 100644 --- a/packages/cli/src/lib/inspect/output.ts +++ b/packages/cli/src/lib/inspect/output.ts @@ -563,6 +563,7 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => { try { const runs = await world.runs.list({ workflowName: opts.workflowName, + status: opts.status as any, pagination: { sortOrder: opts.sort || 'desc', cursor: opts.cursor, @@ -590,6 +591,7 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => { try { const runs = await world.runs.list({ workflowName: opts.workflowName, + status: opts.status as any, pagination: { sortOrder: opts.sort || 'desc', cursor, @@ -721,7 +723,10 @@ export const listSteps = async ( }, resolveData, }); - showJson(stepChunks.data); + const stepsWithHydratedIO = await Promise.all( + stepChunks.data.map((s) => hydrateResourceIO(s, resolveKey)) + ); + showJson(stepsWithHydratedIO); return; } catch (error) { if (handleApiError(error, opts.backend)) { diff --git a/packages/cli/test-bulk-cancel.sh b/packages/cli/test-bulk-cancel.sh new file mode 100644 index 0000000000..95543273e8 --- /dev/null +++ b/packages/cli/test-bulk-cancel.sh @@ -0,0 +1,244 @@ +#!/bin/bash +# E2E tests for bulk cancel + --status filter + step JSON hydration +# Prerequisites: pnpm build (in packages/cli), workbench running on :3000 +# Usage: bash test-bulk-cancel.sh [--record] +# +# NOTE on starting workflows: +# - addTenWorkflow: started via HTTP API (completes fast, headers arrive immediately) +# - sleepingWorkflow: started via CLI `workflow start ` in background +# (HTTP API blocks until workflow completes since headers wait for first stream chunk) + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" +WORKBENCH_DIR="$REPO_ROOT/workbench/nextjs-turbopack" +CLI_BIN="$SCRIPT_DIR/bin/run.js" + +cli() { (cd "$WORKBENCH_DIR" && WORKFLOW_NO_UPDATE_CHECK=1 node "$CLI_BIN" "$@"); } + +WORKBENCH="http://localhost:3000" +BK="--backend local" +PASS=0; FAIL=0; TOTAL=0 + +# Record mode +LOGFILE="" +if [[ "${1:-}" == "--record" ]]; then + LOGFILE="$SCRIPT_DIR/test-results-$(date +%Y%m%d-%H%M%S).log" + echo "Recording to $LOGFILE" +fi +log() { + if [[ -n "$LOGFILE" ]]; then echo "$@" | tee -a "$LOGFILE"; else echo "$@"; fi +} +run_test() { + TOTAL=$((TOTAL + 1)) + log ""; log "══════════════════════════════════════════" + log "TEST $TOTAL: $1"; log "══════════════════════════════════════════" +} +pass() { PASS=$((PASS + 1)); log "PASS ✅"; } +fail() { FAIL=$((FAIL + 1)); log "FAIL ❌: $1"; } + +# Start addTenWorkflow via HTTP (fast — completes in <1s, headers arrive immediately) +start_fast() { + local arg="$1" + node -e ' + const http = require("http"); + const body = JSON.stringify({workflowName: "addTenWorkflow", args: ['"$arg"']}); + const req = http.request({ + hostname: "localhost", port: 3000, path: "/api/workflows/start", method: "POST", + headers: {"Content-Type": "application/json", "Content-Length": Buffer.byteLength(body)} + }, (res) => { process.stdout.write(res.headers["x-workflow-run-id"] || ""); res.destroy(); process.exit(0); }); + req.on("error", () => process.exit(1)); + req.end(body); + setTimeout(() => process.exit(1), 30000); + ' 2>/dev/null +} + +# Start sleepingWorkflow via HTTP API (fire-and-forget, get run ID from inspect) +# The HTTP API blocks until first stream chunk (= workflow completion for sleeping), +# so we fire the request in background and discover the run ID via inspect. +SEED_RUN_ID="" # unused now but kept for reference +start_sleeping() { + local duration="${1:-30000}" + # Fire HTTP request in background (don't wait for response) + node -e ' + const http = require("http"); + const body = JSON.stringify({workflowName: "sleepingWorkflow", args: ['"$duration"']}); + const req = http.request({ + hostname: "localhost", port: 3000, path: "/api/workflows/start", method: "POST", + headers: {"Content-Type": "application/json", "Content-Length": Buffer.byteLength(body)} + }, () => {}); + req.on("error", () => {}); + req.end(body); + // Keep process alive briefly so server receives the request + setTimeout(() => process.exit(0), 2000); + ' 2>/dev/null & + local node_pid=$! + # Wait for the run to appear in the local world + sleep 3 + # Get the most recent running sleepingWorkflow run + local run_id=$(cli inspect runs --status=running --json $BK --limit 10 2>/dev/null | python3 -c " +import sys, json +d = json.load(sys.stdin) +for r in d['data']: + if 'sleepingWorkflow' in r.get('workflowName', ''): + print(r['runId']) + break +" 2>/dev/null) + wait $node_pid 2>/dev/null || true + if [[ -z "$run_id" ]]; then + log "ERROR: No running sleepingWorkflow found after start" + return 1 + fi + echo "$run_id" +} + +# ── Preflight ── +log "Checking workbench at $WORKBENCH..." +if ! curl -s -o /dev/null -w "%{http_code}" "$WORKBENCH" 2>/dev/null | grep -q "200\|302"; then + log "ERROR: Workbench not running. Start with: cd workbench/nextjs-turbopack && pnpm dev" + exit 1 +fi +log "Workbench is up." + +# Warmup — first API call triggers route compilation +log "Warming up API routes..." +start_fast 1 > /dev/null || { log "ERROR: Warmup failed"; exit 1; } +log "Warmup done." +sleep 2 + +# Cleanup +log "Cleaning up leftover runs..." +cli cancel --status=running -y $BK 2>&1 || true +sleep 1 + +# ── Test 1: Single cancel regression ── +run_test "Single cancel (regression)" +RUN_ID=$(start_sleeping 30000) +log "Started: $RUN_ID" +sleep 2 +cli cancel "$RUN_ID" $BK 2>&1 +sleep 1 +STATUS=$(cli inspect run "$RUN_ID" --json $BK 2>/dev/null | python3 -c "import sys,json; print(json.load(sys.stdin)['status'])") +log "Status after cancel: $STATUS" +[[ "$STATUS" == "cancelled" ]] && pass || fail "expected cancelled, got $STATUS" + +# ── Test 2: --status filter on inspect runs ── +run_test "--status filter on inspect runs" +RUN_ID=$(start_fast 100) +log "Started: $RUN_ID" +sleep 3 +COMPLETED=$(cli inspect runs --status=completed --json $BK 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(len(d.get('data',[])))") +log "Completed runs found: $COMPLETED" +[[ "$COMPLETED" -gt 0 ]] && pass || fail "no completed runs found" + +# ── Test 3: Bulk cancel --status=running ── +run_test "Bulk cancel --status=running" +# Fire 3 sleepingWorkflow requests in parallel +for i in 1 2 3; do + node -e ' + const http = require("http"); + const body = JSON.stringify({workflowName: "sleepingWorkflow", args: [30000]}); + const req = http.request({hostname: "localhost", port: 3000, path: "/api/workflows/start", method: "POST", + headers: {"Content-Type": "application/json", "Content-Length": Buffer.byteLength(body)}}, () => {}); + req.on("error", () => {}); req.end(body); + setTimeout(() => process.exit(0), 2000); + ' 2>/dev/null & +done +sleep 4 +RUNNING_BEFORE=$(cli inspect runs --status=running --json $BK 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(len(d.get('data',[])))") +log "Running before cancel: $RUNNING_BEFORE" +log "--- Cancelling all running ---" +cli cancel --status=running -y $BK 2>&1 +sleep 1 +RUNNING=$(cli inspect runs --status=running --json $BK 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(len(d.get('data',[])))") +log "Remaining running: $RUNNING" +[[ "$RUNNING" -eq 0 ]] && pass || fail "$RUNNING runs still running" + +# ── Test 4: Bulk cancel --workflowName filter ── +# NOTE: workflowName must be the full WDK path (e.g. workflow//./workflows/99_e2e//sleepingWorkflow) +# Short names like "sleepingWorkflow" won't match. This is expected — the World API does exact match. +# On Vercel backend, names look different (deployment-based). This test uses the local backend path. +run_test "Bulk cancel --workflowName filter" +# Start 2 sleepingWorkflows +for i in 1 2; do + node -e ' + const http = require("http"); + const body = JSON.stringify({workflowName: "sleepingWorkflow", args: [30000]}); + const req = http.request({hostname: "localhost", port: 3000, path: "/api/workflows/start", method: "POST", + headers: {"Content-Type": "application/json", "Content-Length": Buffer.byteLength(body)}}, () => {}); + req.on("error", () => {}); req.end(body); + setTimeout(() => process.exit(0), 2000); + ' 2>/dev/null & +done +sleep 4 +# Get the full workflowName from a running run +FULL_WF_NAME=$(cli inspect runs --status=running --json $BK --limit 1 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['data'][0]['workflowName'] if d['data'] else '')" 2>/dev/null) +RUNNING_BEFORE=$(cli inspect runs --status=running --json $BK 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(len(d.get('data',[])))") +log "Running before cancel: $RUNNING_BEFORE (name: $FULL_WF_NAME)" +log "--- Cancelling by workflowName ---" +cli cancel --workflowName="$FULL_WF_NAME" --status=running -y $BK 2>&1 +sleep 1 +RUNNING_AFTER=$(cli inspect runs --status=running --json $BK 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(len(d.get('data',[])))") +log "Running after cancel: $RUNNING_AFTER" +[[ "$RUNNING_AFTER" -eq 0 ]] && pass || fail "$RUNNING_AFTER runs still running" + +# ── Test 5: No matching runs ── +run_test "No matching runs warning" +cli cancel --status=running -y $BK 2>&1 || true +sleep 1 +OUTPUT=$(cli cancel --status=running -y $BK 2>&1 || true) +log "$OUTPUT" +echo "$OUTPUT" | grep -qi "no matching" && pass || fail "expected 'no matching' warning" + +# ── Test 6: Step JSON hydration ── +run_test "Step JSON hydration fix" +RUN_ID=$(start_fast 100) +log "Started: $RUN_ID" +sleep 4 +STEP_IO=$(cli inspect steps --runId="$RUN_ID" --withData --json $BK 2>/dev/null) +log "Checking step IO hydration..." +if echo "$STEP_IO" | python3 -c " +import sys, json +data = json.load(sys.stdin) +found = False +for step in data: + out = step.get('output') + if out is not None: + found = True + if isinstance(out, dict) and '0' in out: + print(f'RAW_BYTES: {out}') + sys.exit(1) + print(f'HYDRATED: {out}') +if not found: + print('NO_OUTPUT') + sys.exit(1) +" 2>/dev/null; then + pass +else + fail "raw byte arrays or no output" +fi + +# ── Test 7: hasMore warning ── +run_test "hasMore warning with --limit=1" +start_fast 200 > /dev/null || true +sleep 3 +OUTPUT=$(cli cancel --status=completed --limit=1 -y $BK 2>&1 || true) +log "$OUTPUT" +echo "$OUTPUT" | grep -qi "more runs\|increase --limit\|More runs match" && pass || fail "no hasMore warning" + +# ── Test 8: Error on no args/flags ── +run_test "Error when no runId or filters" +OUTPUT=$(cli cancel $BK 2>&1 || true) +log "$OUTPUT" +echo "$OUTPUT" | grep -qi "provide a run id\|--status\|--workflowName" && pass || fail "no usage error" + +# ── Cleanup background start processes ── +pkill -f "workflow.*start.*--backend local" 2>/dev/null || true + +# ── Summary ── +log ""; log "══════════════════════════════════════════" +log "RESULTS: $PASS passed, $FAIL failed, $TOTAL total" +log "══════════════════════════════════════════" +[[ -n "$LOGFILE" ]] && log "Output saved to: $LOGFILE" +[[ $FAIL -eq 0 ]] && exit 0 || exit 1