Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .changeset/conditional-step-execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Add conditional step execution with skip infrastructure

**Schema Changes:**

- New columns: required_input_pattern, forbidden_input_pattern, when_unmet, when_failed, skip_reason, skipped_at
- New columns: required_input_pattern, forbidden_input_pattern, when_unmet, when_exhausted, skip_reason, skipped_at
- New step status: 'skipped'
- New function: cascade_skip_steps() for skip propagation
- FlowShape condition fields for auto-compilation drift detection
1 change: 1 addition & 0 deletions .ignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
!.claude
!.claude/*
!.notes
4 changes: 2 additions & 2 deletions pkgs/core/schemas/0050_tables_definitions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ create table pgflow.steps (
required_input_pattern jsonb, -- JSON pattern for @> containment check (if)
forbidden_input_pattern jsonb, -- JSON pattern for NOT @> containment check (ifNot)
when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default)
when_failed text not null default 'fail', -- What to do when handler fails after retries
when_exhausted text not null default 'fail', -- What to do when handler fails after retries
created_at timestamptz not null default now(),
primary key (flow_slug, step_slug),
unique (flow_slug, step_index), -- Ensure step_index is unique within a flow
Expand All @@ -38,7 +38,7 @@ create table pgflow.steps (
constraint opt_timeout_is_positive check (opt_timeout is null or opt_timeout > 0),
constraint opt_start_delay_is_nonnegative check (opt_start_delay is null or opt_start_delay >= 0),
constraint when_unmet_is_valid check (when_unmet in ('fail', 'skip', 'skip-cascade')),
constraint when_failed_is_valid check (when_failed in ('fail', 'skip', 'skip-cascade'))
constraint when_exhausted_is_valid check (when_exhausted in ('fail', 'skip', 'skip-cascade'))
);

-- Dependencies table - stores relationships between steps
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- _cascade_force_skip_steps: Skip a step and cascade to all downstream dependents
-- Used when a condition is unmet (whenUnmet: skip-cascade) or handler fails (whenFailed: skip-cascade)
-- Used when a condition is unmet (whenUnmet: skip-cascade) or handler fails (whenExhausted: skip-cascade)
create or replace function pgflow._cascade_force_skip_steps(
run_id uuid,
step_slug text,
Expand Down
6 changes: 3 additions & 3 deletions pkgs/core/schemas/0100_function_add_step.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ create or replace function pgflow.add_step(
required_input_pattern jsonb default null,
forbidden_input_pattern jsonb default null,
when_unmet text default 'skip',
when_failed text default 'fail'
when_exhausted text default 'fail'
)
returns pgflow.steps
language plpgsql
Expand Down Expand Up @@ -41,7 +41,7 @@ BEGIN
INSERT INTO pgflow.steps (
flow_slug, step_slug, step_type, step_index, deps_count,
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
required_input_pattern, forbidden_input_pattern, when_unmet, when_failed
required_input_pattern, forbidden_input_pattern, when_unmet, when_exhausted
)
VALUES (
add_step.flow_slug,
Expand All @@ -56,7 +56,7 @@ BEGIN
add_step.required_input_pattern,
add_step.forbidden_input_pattern,
add_step.when_unmet,
add_step.when_failed
add_step.when_exhausted
)
ON CONFLICT ON CONSTRAINT steps_pkey
DO UPDATE SET step_slug = EXCLUDED.step_slug
Expand Down
10 changes: 5 additions & 5 deletions pkgs/core/schemas/0100_function_compare_flow_shapes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ BEGIN
);
END IF;

-- Compare whenFailed (structural - affects DAG execution semantics)
IF v_local_step->>'whenFailed' != v_db_step->>'whenFailed' THEN
-- Compare whenExhausted (structural - affects DAG execution semantics)
IF v_local_step->>'whenExhausted' != v_db_step->>'whenExhausted' THEN
v_differences := array_append(
v_differences,
format(
$$Step at index %s: whenFailed differs '%s' vs '%s'$$,
$$Step at index %s: whenExhausted differs '%s' vs '%s'$$,
v_idx,
v_local_step->>'whenFailed',
v_db_step->>'whenFailed'
v_local_step->>'whenExhausted',
v_db_step->>'whenExhausted'
)
);
END IF;
Expand Down
2 changes: 1 addition & 1 deletion pkgs/core/schemas/0100_function_create_flow_from_shape.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ BEGIN
start_delay => (v_step_options->>'startDelay')::int,
step_type => v_step->>'stepType',
when_unmet => v_step->>'whenUnmet',
when_failed => v_step->>'whenFailed',
when_exhausted => v_step->>'whenExhausted',
required_input_pattern => CASE
WHEN (v_step->'requiredInputPattern'->>'defined')::boolean
THEN v_step->'requiredInputPattern'->'value'
Expand Down
62 changes: 31 additions & 31 deletions pkgs/core/schemas/0100_function_fail_task.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ DECLARE
v_run_failed boolean;
v_step_failed boolean;
v_step_skipped boolean;
v_when_failed text;
v_when_exhausted text;
v_task_exhausted boolean; -- True if task has exhausted retries
v_flow_slug_for_deps text; -- Used for decrementing remaining_deps on plain skip
begin
Expand Down Expand Up @@ -63,11 +63,11 @@ flow_info AS (
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id
),
config AS (
config AS (
SELECT
COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts,
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay,
s.when_failed
s.when_exhausted
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN flow_info fi ON fi.flow_slug = s.flow_slug
Expand Down Expand Up @@ -95,53 +95,53 @@ fail_or_retry_task as (
AND task.status = 'started'
RETURNING *
),
-- Determine if task exhausted retries and get when_failed mode
task_status AS (
SELECT
(select status from fail_or_retry_task) AS new_task_status,
(select when_failed from config) AS when_failed_mode,
-- Determine if task exhausted retries and get when_exhausted mode
task_status AS (
SELECT
(select status from fail_or_retry_task) AS new_task_status,
(select when_exhausted from config) AS when_exhausted_mode,
-- Task is exhausted when it's failed (no more retries)
((select status from fail_or_retry_task) = 'failed') AS is_exhausted
),
maybe_fail_step AS (
UPDATE pgflow.step_states
SET
-- Status logic:
-- - If task not exhausted (retrying): keep current status
-- - If exhausted AND when_failed='fail': set to 'failed'
-- - If exhausted AND when_failed IN ('skip', 'skip-cascade'): set to 'skipped'
status = CASE
WHEN NOT (select is_exhausted from task_status) THEN pgflow.step_states.status
WHEN (select when_failed_mode from task_status) = 'fail' THEN 'failed'
ELSE 'skipped' -- skip or skip-cascade
END,
-- Status logic:
-- - If task not exhausted (retrying): keep current status
-- - If exhausted AND when_exhausted='fail': set to 'failed'
-- - If exhausted AND when_exhausted IN ('skip', 'skip-cascade'): set to 'skipped'
status = CASE
WHEN NOT (select is_exhausted from task_status) THEN pgflow.step_states.status
WHEN (select when_exhausted_mode from task_status) = 'fail' THEN 'failed'
ELSE 'skipped' -- skip or skip-cascade
END,
failed_at = CASE
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) = 'fail' THEN now()
ELSE NULL
END,
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) = 'fail' THEN now()
ELSE NULL
END,
error_message = CASE
WHEN (select is_exhausted from task_status) THEN fail_task.error_message
ELSE NULL
END,
skip_reason = CASE
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN 'handler_failed'
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) IN ('skip', 'skip-cascade') THEN 'handler_failed'
ELSE pgflow.step_states.skip_reason
END,
skipped_at = CASE
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN now()
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) IN ('skip', 'skip-cascade') THEN now()
ELSE pgflow.step_states.skipped_at
END,
-- Clear remaining_tasks when skipping (required by remaining_tasks_state_consistency constraint)
remaining_tasks = CASE
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN NULL
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) IN ('skip', 'skip-cascade') THEN NULL
ELSE pgflow.step_states.remaining_tasks
END
FROM fail_or_retry_task
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
)
-- Update run status: only fail when when_failed='fail' and step was failed
-- Update run status: only fail when when_exhausted='fail' and step was failed
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
Expand All @@ -159,9 +159,9 @@ SET status = CASE
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING (status = 'failed') INTO v_run_failed;

-- Capture when_failed mode and check if step was skipped for later processing
SELECT s.when_failed INTO v_when_failed
FROM pgflow.steps s
-- Capture when_exhausted mode and check if step was skipped for later processing
SELECT s.when_exhausted INTO v_when_exhausted
FROM pgflow.steps s
JOIN pgflow.runs r ON r.flow_slug = s.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug;
Expand Down Expand Up @@ -194,8 +194,8 @@ IF v_step_failed THEN
);
END IF;

-- Handle step skipping (when_failed = 'skip' or 'skip-cascade')
IF v_step_skipped THEN
-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
IF v_step_skipped THEN
-- Send broadcast event for step skipped
PERFORM realtime.send(
jsonb_build_object(
Expand All @@ -212,8 +212,8 @@ IF v_step_skipped THEN
false
);

-- For skip-cascade: cascade skip to all downstream dependents
IF v_when_failed = 'skip-cascade' THEN
-- For skip-cascade: cascade skip to all downstream dependents
IF v_when_exhausted = 'skip-cascade' THEN
PERFORM pgflow._cascade_force_skip_steps(fail_task.run_id, fail_task.step_slug, 'handler_failed');
ELSE
-- For plain 'skip': decrement remaining_deps on dependent steps
Expand Down
2 changes: 1 addition & 1 deletion pkgs/core/schemas/0100_function_get_flow_shape.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ as $$
'[]'::jsonb
),
'whenUnmet', step.when_unmet,
'whenFailed', step.when_failed,
'whenExhausted', step.when_exhausted,
'requiredInputPattern', CASE
WHEN step.required_input_pattern IS NULL
THEN '{"defined": false}'::jsonb
Expand Down
10 changes: 5 additions & 5 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ export type Database = {
step_index: number
step_slug: string
step_type: string
when_failed: string
when_exhausted: string
when_unmet: string
}
Insert: {
Expand All @@ -315,7 +315,7 @@ export type Database = {
step_index?: number
step_slug: string
step_type?: string
when_failed?: string
when_exhausted?: string
when_unmet?: string
}
Update: {
Expand All @@ -331,7 +331,7 @@ export type Database = {
step_index?: number
step_slug?: string
step_type?: string
when_failed?: string
when_exhausted?: string
when_unmet?: string
}
Relationships: [
Expand Down Expand Up @@ -431,7 +431,7 @@ export type Database = {
step_slug: string
step_type?: string
timeout?: number
when_failed?: string
when_exhausted?: string
when_unmet?: string
}
Returns: {
Expand All @@ -447,7 +447,7 @@ export type Database = {
step_index: number
step_slug: string
step_type: string
when_failed: string
when_exhausted: string
when_unmet: string
}
SetofOptions: {
Expand Down
Loading