Skip to content

refactor: extract shared sub-workflow resolution primitives for heterogeneous batch planning #334

@spinje

Description

@spinje

Context

PR fixing GH #333 (sort_keys cache bug) also addressed the heterogeneous batch sub-workflow planner gap — batch nodes with workflow: ${item.workflow} (like emotional-reviews in lyrics-generator) were marked opaque because the planner couldn't resolve per-item workflow paths.

The fix adds per-item workflow resolution to the planner, which mirrors logic already in the runtime's WorkflowExecutor (compile-once cache, per-item dispatch). This is intentional duplication — tracked here for cleanup.

What's Duplicated

1. Per-item workflow resolution + compile cache

Planner: src/pflow/execution/plan.py::_plan_heterogeneous_batch_items (~70 LOC)

  • For each batch item: injects item into shared, resolves templates, extracts workflow path
  • Compiles each unique path once (local compile_cache: dict[str, _PreparedSubWorkflow])
  • Plans each item against its compiled child via _build_plan_with_shared

Runtime: src/pflow/runtime/workflow_executor.py::WorkflowExecutor

  • _compiled_workflow_cache (class-level dict keyed by resolved path)
  • _loaded_ir_cache (class-level dict keyed by raw workflow ref)
  • Per-item dispatch happens in exec() via the batch executor callback

What's shared: The per-item template resolution → workflow path → compile → cache pattern. The planner reimplements this because it can't call WorkflowExecutor.exec() (no side effects allowed).

2. Output population (#321 item A)

Planner: plan.py::_extract_child_outputs_resolve_declared_outputs + _mirror_child_shared (~30 LOC)
Runtime: workflow_executor.py::_expose_child_outputs (~27 LOC)

Already documented in #321.

3. Cycle detection (#321 item B)

Planner: plan.py uses visited_paths argument threading
Runtime: workflow_executor.py uses shared["_pflow_stack"] list

Already documented in #321.

What to Extract

A shared abstraction that both the runtime and planner call. Candidate shape:

# Shared: resolve batch items to per-item workflow targets
def resolve_heterogeneous_batch_workflows(
    items: list[Any],
    workflow_ref_template: str,
    item_alias: str,
    template_config: TemplateConfig,
    shared: dict,
    base_path: Path,
    registry: Registry,
) -> dict[str, CompiledWorkflow]:
    """Resolve and compile unique child workflows for a heterogeneous batch.
    
    Returns a dict keyed by resolved path → compiled workflow.
    """

The runtime's WorkflowExecutor and the planner's _plan_heterogeneous_batch_items would both call this instead of reimplementing the resolve → compile → cache loop.

Where the Code Lives

  • Planner heterogeneous path: src/pflow/execution/plan.py::_plan_heterogeneous_batch_items (search for compile_cache)
  • Planner dispatch: src/pflow/execution/plan.py::_plan_batch_sub_workflow — branches on is_per_item_workflow
  • Runtime compile cache: src/pflow/runtime/workflow_executor.py_compiled_workflow_cache, _loaded_ir_cache
  • Runtime per-item dispatch: src/pflow/runtime/workflow_executor.py::exec() — resolved inside the batch executor callback
  • Shared primitives already extracted: WorkflowExecutor.is_exposable_child_key, WorkflowExecutor.MAX_DEPTH_DEFAULT, is_clean_termination (from PR feat: --dry-run with plan + cost/duration estimates (fixes #310) #320)

Why It Was Shipped as Duplication

No users yet. Correct dry-run behavior (showing per-item cache status for heterogeneous batches) is more valuable now than architectural purity. The duplication is localized (~70 LOC in one function) and tracked here.

Acceptance Criteria

  • Shared primitive extracted and used by both planner and runtime
  • make test && make check clean
  • test_plan_drift.py (30+ cases) passes unchanged
  • No behavioral change — same dry-run output, same runtime behavior

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions