diff --git a/.changeset/cyan-ducks-wonder.md b/.changeset/cyan-ducks-wonder.md new file mode 100644 index 000000000..123897616 --- /dev/null +++ b/.changeset/cyan-ducks-wonder.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Allow step retrying if it fails without proper cleanup diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 3a7c66fff..b05d366d7 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -640,12 +640,13 @@ export const stepEntrypoint = let result: unknown; const attempt = step.attempt + 1; try { - if (step.status !== 'pending') { - // We should only be running the step if it's pending - // (initial state, or state set on re-try), so the step has been - // invoked erroneously. + if (!['pending', 'running'].includes(step.status)) { + // We should only be running the step if it's either + // a) pending - initial state, or state set on re-try + // b) running - if a step fails mid-execution, like a function timeout + // otherwise, the step has been invoked erroneously console.error( - `[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending", got "${step.status}" instead, skipping execution` + `[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending" or "running", got "${step.status}" instead, skipping execution` ); span?.setAttributes({ ...Attribute.StepSkipped(true), @@ -698,11 +699,23 @@ export const stepEntrypoint = () => stepFn(...args) ); + // NOTE: None of the code from this point is guaranteed to run + // Since the step might fail or cause a function timeout and the process might be SIGKILL'd + // The workflow runtime must be resilient to the below code not executing on a failed step result = dehydrateStepReturnValue(result, ops, workflowRunId); waitUntil(Promise.all(ops)); - // Update the event log with the step result + // Mark the step as completed first. This order is important. If a concurrent + // execution marked the step as complete, this request should throw, and + // this prevent the step_completed event in the event log + // TODO: this should really be atomic and handled by the world + await world.steps.update(workflowRunId, stepId, { + status: 'completed', + output: result as Serializable, + }); + + // Then, append the event log with the step result await world.events.create(workflowRunId, { eventType: 'step_completed', correlationId: stepId, @@ -711,11 +724,6 @@ export const stepEntrypoint = }, }); - await world.steps.update(workflowRunId, stepId, { - status: 'completed', - output: result as Serializable, - }); - span?.setAttributes({ ...Attribute.StepStatus('completed'), ...Attribute.StepResultType(typeof result),