fix(flows): inherit flow_env in sub-flow predicates#9042
Conversation
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
/ai review this PR |
Deploying windmill with
|
| Latest commit: |
732e123
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://6253cd94.windmill.pages.dev |
| Branch Preview URL: | https://flow-env-skip-eval.windmill.pages.dev |
|
Claude finished @hugocasa's task in 4m 32s —— View job ReviewSolid, well-scoped fix with strong test coverage. The architectural split between Suggestions1.
2. Silent error in let mini = match get_mini_pulled_job(db, &flow_job_id).await {
Ok(Some(j)) => j,
_ => return Some(env),
};The catch-all 3. Recursive CTE omits The walk uses 4. Empty-env optimization inconsistency —
5. Unnecessary clone in let env = if let Some(ref e) = flow_value.flow_env {
e.clone()
} else { ... };When local env is set, the entire 6. Magic number The cycle/runaway guard Things I checked and look good
|
…en gate Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Claude finished @hugocasa's task in 1m 35s —— View job Re-review (commits since prior review:
|
| Prior finding | Status |
|---|---|
#1 needs_flow_env over-eager for retry without retry_if |
✅ Fixed via retry_uses_flow_env closure (worker_flow.rs:473-482) |
#2 Silent error on get_mini_pulled_job failure |
✅ Fixed — Err(e) now matched explicitly with tracing::warn! (worker_flow.rs:2406-2409) |
#3 Recursive CTE omits root_job |
✅ Fixed — COALESCE(c.root_job, c.flow_innermost_root_job, c.parent_job) aligns with jobs.rs:475 and jobs.rs:581 (worker_flow.rs:2459) |
#6 Magic number 50 |
✅ Fixed — extracted as MAX_FLOW_ENV_LOOKUP_DEPTH constant with comment (worker_flow.rs:2429-2431), passed as $3 parameter |
Spot-checks on the new commit
- CTE select-list / column-list match: the
chainCTE column list (id, root_job, parent_job, flow_innermost_root_job, runnable_id, runnable_path, raw_flow, depth) matches both the anchor and recursiveSELECTlists. ✓ - Parameter binding:
MAX_FLOW_ENV_LOOKUP_DEPTH: i32bound as$3;c.depth < $3is well-typed (depth is implicitlyint4from the0literal ++ 1). ✓ - Sqlx cache regenerated: three new query JSONs added, the prior one deleted — cache is consistent with the SQL change.
- Tightened gate is sound:
evaluate_retrybody atworker_flow.rs:2274-2286confirmsflow_envis only consulted inside theretry_ifbranch — gating onretry_if.is_some()is exact, no false negatives.
Remaining minor nits (not blocking — same as prior review)
P2 — Empty-env optimization inconsistency — worker_flow.rs:2400-2402 vs handle_flow:2632-2654
resolve_flow_env_for_status_update short-circuits on env.is_empty() before the transform_json call; handle_flow doesn't. Either harmonize or just drop the optimization — transform_json on an empty map is cheap.
P2 — Unnecessary clone of local flow_env — worker_flow.rs:2395-2399
let env = if let Some(ref e) = flow_value.flow_env {
e.clone()
} else {
fetch_root_flow_env(db, flow_job_id, workspace_id).await?
};When the local env is set we clone the entire HashMap<String, Box<RawValue>> even though transform_json only needs &HashMap<…>. A Cow<'_, HashMap<…>> or two passes-by-reference would avoid the copy. Not a hot path, so this is purely a polish item.
AGENTS.md compliance
backend/CLAUDE.mdrules: validation, sqlx cache discipline — followed (3 query JSONs added, 1 removed in latest commit, all consistent with the schema change).- No banned patterns triggered.
Test coverage
Backend Rust tests in flow_engine_parity.rs already cover the new behavior comprehensively (9 regression tests pinning branches/nested branches/parallel for-loops/QuickJS-fallback transforms/imported flows/stop_after_if/retry_if/stop_after_all_iters_if). The latest commit is a pure refactor of internal lookup logic — no new test surface needed; the existing tests exercise both the pre-root_job and post-root_job walks via the parallel-loop scenario where flow_innermost_root_job resets to NULL.
Manual verification: the test plan in the PR body is appropriate. Worth specifically re-running the parallel for-loop case (flow_innermost_root_job=NULL) end-to-end in the UI, since the CTE's preference change (now starting from root_job) affects that path; the unit test confirms correctness but a manual sanity check of "create a flow with flow_env, wrap a step in a parallel for-loop with skip_if: flow_env.X === true, verify skip fires" covers the integration path.
· branch: flow-env-skip-eval
·
Codex ReviewShould address issues before merging: imported-flow own env skipped in nested subflows Findings [P1] Test coverage Backend coverage is strong for root inheritance, branch/loop predicates, retry/stop predicates, and top-level imported flow env precedence. It is missing the nested imported-flow case above: imported flow defines its own Manual verification should cover that scenario through the flow runner: run a parent flow that imports a flow with its own |
Pi ReviewGood to merge The second commit (0ac94de) addressed four of the six suggestions from the prior Claude review:
The remaining two suggestions from Claude (empty-env short-circuit consistency, unnecessary clone) are truly minor and don't rise to even P2 under the review policy — What I verified
Test coverageBackend — 9 regression tests added to Manual verification needed
|
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Now I have a complete understanding of the code. Let me write the review. Pi ReviewShould address issues before merging: imported flow's P1 — BranchOne/loop predicates inside imported flows see parent's
|
… scope Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…re era Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pi ReviewGood to merge — the two latest commits (54915e5 and 732e123) addressed all P1 and P2 findings from prior reviews. The code is correct, well-tested, and clean. What changed across the PR (5 commits)The PR progressed through three design iterations driven by prior review findings:
What I verified
Test coverageBackend — 10 regression tests in Manual verification: the test plan in the PR body is appropriate. The critical scenario to verify end-to-end in the UI is the imported flow with nested BranchOne (the scenario uncovered in the prior review round): create a parent flow that imports a flow with its own |
Summary
Predicates inside a BranchOne / for-loop sub-flow couldn't read
flow_env(e.g.,skip_if: flow_env.X === truereturned false even whenXwas true). Sub-flows constructed bypayload_from_modulesfor branches/loops don't carry the parent'sflow_envin their ownFlowValue, sohandle_flowsawflow.flow_env = Noneand propagatedNoneto every eval. Input transforms with simpleflow_env.Xhappened to work because they hit a regex API fast-path that walks to the root, but anything wrapped inBoolean(...)(skip_if, stop_after_if, retry_if, branchone predicates) or any complex expression (Math.min(flow_env.X, 10)) fell through to QuickJS and used the localNone.Same root cause also broke four eval calls in
update_flow_status_after_job_completion_internalthat hardcodedNoneforflow_env:stop_after_if,stop_after_all_iters_if, and bothevaluate_retrypaths.Changes
fetch_root_flow_envinworker_flow.rs— a recursive CTE walksCOALESCE(flow_innermost_root_job, parent_job)up the job chain (depth ≤ 50), prefersflow_version.value, falls back toraw_flow, and returns the closest ancestor'sflow_env. Recursion is needed because parallel for-loop iterations and imported sub-flows resetflow_innermost_root_jobtoNone.handle_flow— whenflow.flow_env.is_none()and the job has aparent_job, inherit via the lookup before resolving$var:/$res:.update_flow_status_after_job_completion_internal— same lookup (lazy: only when current/failure module hasstop_after_if/stop_after_all_iters_if/retry), then thread the env into:compute_bool_from_exprforstop_after_ifevaluate_stop_after_all_iters_if(newflow_envparam, both call sites)evaluate_retrycalls (continue_on_error + should_retry closure)FlowModuleValue::Flow { path }) keep their ownflow_envif defined (status quo: replace, no merge); if undefined they now inherit from the parent.flow_engine_parity.rs, all verified to fail onmainand pass with the fix:test_flow_env_skip_if_inside_branchone— the original bugtest_flow_env_skip_if_nested_branchone— recursion past depth 1 (branch-in-branch)test_flow_env_skip_if_in_parallel_forloop—flow_innermost_root_job=Nonecasetest_flow_env_complex_input_transform_in_branch— QuickJS fallback path inside sub-flowtest_flow_env_imported_flow_uses_own_env— pins option (i): own env winstest_flow_env_imported_flow_inherits_when_unset— imported flow with no env inherits from parenttest_flow_env_in_stop_after_if,test_flow_env_in_retry_if,test_flow_env_in_stop_after_all_iters_ifTest plan
flow_env: { ENV: "production" }, add a BranchOne with an inner step whose Skip condition isflow_env.ENV === "production". Run the flow → inner step is skipped.Math.min(flow_env.LIMIT, 10)→ resolves correctly.flow_envfrom a parent flow with a differentflow_env→ imported flow uses its own values.Generated with Claude Code
Summary by cubic
Fixes missing
flow_envin sub-flow predicates and respects imported-flow scoping. Predicates and stop/retry logic now inherit the nearestflow_env, with DB lookups only when a predicate needs it.Bug Fixes
fetch_root_flow_envthat recursively walksflow_innermost_root_job -> parent_job(depth-capped) and returns the closest ancestor’sflow_env, preferringflow_version.valueoverraw_flow; avoids jumping viaroot_jobso imported flows keep their scope.handle_flow, inheritflow_envfor sub-flows before resolving$var:/$res:.update_flow_status_after_job_completion_internal, lazily resolveflow_envonly ifstop_after_if/stop_after_all_iters_if/retry_ifare set, then pass it tocompute_bool_from_expr,evaluate_stop_after_all_iters_if(new param), andevaluate_retry.flow_env; if unset, they inherit from the parent. Predicates inside imported flows and nested branches now see the correct scope.stop_after_if/stop_after_all_iters_if/retry_if.Refactors
GET /jobs/flow_env_by_flow_job_idAPI and client helper; no HTTP calls forflow_env.results.*;flow_envis read directly in QuickJS (no wrapping/await).Written for commit 732e123. Summary will update on new commits.