-
Notifications
You must be signed in to change notification settings - Fork 13
feat: breadcrumb navigation, subworkflow isolation, and stop reliability #113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2324be4
b0bbe66
f2c58e1
9bc83db
f07e7bd
1265f08
8294a61
3de55ee
d76e788
608f309
b2c9a3a
205c188
f357ead
7fbae1e
b928db4
7160508
ba185fc
e6b612e
3128ff6
fd7f254
312f35a
6155180
b2eb97f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,174 @@ | ||
| # Conductor Dashboard Deep-Link Specification | ||
|
|
||
| ## Overview | ||
|
|
||
| The conductor web dashboard (`conductor run --web`) accepts URL query parameters | ||
| that deep-link into specific nodes of the workflow graph. This enables external | ||
| tools (e.g., the conductor-dashboard meta-dashboard) to generate clickable links | ||
| that open the UI focused on a particular agent or subworkflow. | ||
|
|
||
| ## Query Parameters | ||
|
|
||
| | Parameter | Format | Description | | ||
| |----------------|---------------------------------|------------------------------------------------| | ||
| | `subworkflow` | slash-separated agent path | Navigate into a subworkflow context | | ||
| | `agent` | agent name | Select and center an agent node in the graph | | ||
|
|
||
| Both parameters are optional. When both are present, subworkflow navigation | ||
| happens first, then the agent is selected within that subworkflow's graph. | ||
|
|
||
| ## URL Format | ||
|
|
||
| ``` | ||
| http://localhost:{port}[?subworkflow={path}][&agent={name}] | ||
| ``` | ||
|
|
||
| ## Subworkflow Path | ||
|
|
||
| The `subworkflow` parameter is a `/`-separated path of segments, starting from | ||
| the root workflow. Each segment is matched against sibling subworkflow contexts | ||
| in priority order: | ||
|
|
||
| ### 1. Exact slot key | ||
|
|
||
| Matches the engine-emitted `slot_key` verbatim. For sequential subworkflows the | ||
| slot key equals the agent name. For `for_each` iterations the slot key includes | ||
| the item key in brackets, e.g. `plan_child[item-0]`. | ||
|
|
||
| ``` | ||
| ?subworkflow=plan_child[item-0]/design | ||
| ``` | ||
|
|
||
| ### 2. Positional index (`agent#N`, 0-based) | ||
|
|
||
| Matches the Nth iteration among siblings sharing that `parentAgent`. Useful when | ||
| the caller doesn't know the exact `item_key` values emitted by the engine. | ||
|
|
||
| ``` | ||
| # First for_each iteration of plan_child | ||
| ?subworkflow=plan_child%230 | ||
|
|
||
| # Third iteration, then into its "design" child | ||
| ?subworkflow=plan_child%232/design | ||
| ``` | ||
|
|
||
| > **Note:** `#` must be percent-encoded as `%23` in URLs. | ||
|
|
||
| ### 3. Bare agent name | ||
|
|
||
| Matches if **exactly one** sibling has that `parentAgent`. Works for sequential | ||
| (non-`for_each`) subworkflows and single-iteration `for_each` groups. Returns an | ||
| **ambiguous** error when multiple iterations exist — the error message lists the | ||
| valid exact slot keys and positional alternatives. | ||
|
|
||
| ``` | ||
| # Works when there is only one "planning" subworkflow | ||
| ?subworkflow=planning | ||
| ``` | ||
|
|
||
| Given this workflow nesting: | ||
|
|
||
| ``` | ||
| root | ||
| ├── intake (agent) | ||
| ├── planning (workflow agent → planning.yaml) | ||
| │ ├── architect (agent) | ||
| │ └── design (workflow agent → design.yaml) | ||
| │ ├── reviewer (agent) | ||
| │ └── writer (agent) | ||
| ├── plan_child (for_each workflow agent → child.yaml) | ||
| │ ├── plan_child[item-0] (iteration 0) | ||
| │ └── plan_child[item-1] (iteration 1) | ||
| └── close_out (agent) | ||
| ``` | ||
|
|
||
| | URL | Result | | ||
| |--------------------------------------------------------|-------------------------------------------| | ||
| | `?subworkflow=planning` | View planning.yaml's graph | | ||
| | `?subworkflow=planning/design` | View design.yaml's graph | | ||
| | `?subworkflow=planning/design&agent=reviewer` | View design.yaml, select reviewer node | | ||
| | `?subworkflow=plan_child[item-0]` | View child.yaml iteration 0 | | ||
| | `?subworkflow=plan_child%230` | Same — positional (0-based) | | ||
| | `?subworkflow=plan_child%231` | View child.yaml iteration 1 | | ||
|
|
||
| Each path segment is matched using the priority rules above (exact slot key → | ||
| positional → bare name). | ||
|
|
||
| ## Agent Selection | ||
|
|
||
| The `agent` parameter selects and centers a node in the **currently viewed** | ||
| workflow graph: | ||
|
|
||
| - **Root agent** (no subworkflow context): `?agent=intake` | ||
| - **Agent inside a subworkflow**: `?subworkflow=planning&agent=architect` | ||
|
|
||
| **Important:** An agent that lives inside a subworkflow will NOT be found | ||
| by `?agent=reviewer` alone — you must also provide the `subworkflow` path | ||
| to navigate to the correct context first: | ||
|
|
||
| ``` | ||
| # ✗ WRONG — reviewer doesn't exist in the root workflow | ||
| ?agent=reviewer | ||
|
|
||
| # ✓ CORRECT — navigate into planning/design, then select reviewer | ||
| ?subworkflow=planning/design&agent=reviewer | ||
| ``` | ||
|
|
||
| ## Behavior | ||
|
|
||
| 1. **Parse** — On initial page load, read `subworkflow` and `agent` from | ||
| `window.location.search`. | ||
|
|
||
| 2. **Wait** — Do nothing until the workflow graph has been populated | ||
| (agents arrive via WebSocket late-joiner replay). | ||
|
|
||
| 3. **Navigate** — If `subworkflow` is present, split on `/` and call | ||
| `navigateIntoSubworkflow()` for each segment sequentially. | ||
| Each call is synchronous (zustand `set`/`get`), so the viewed context | ||
| updates between calls. | ||
|
|
||
| 4. **Select** — If `agent` is present, call `selectNode(agent)` then | ||
| `fitView({ nodes: [{ id: agent }] })` to center the graph on the node | ||
| with a smooth animation. | ||
|
|
||
| 5. **Once** — Deep-link application fires exactly once per page load. | ||
| Subsequent WebSocket events do not re-trigger navigation. | ||
|
|
||
| ## Edge Cases | ||
|
|
||
| | Scenario | Behavior | | ||
| |---------------------------------------|--------------------------------------------------| | ||
| | Unknown subworkflow path segment | Error banner with "not found" + notation hint | | ||
| | Ambiguous bare name (multiple for_each iterations) | Error banner listing valid alternatives | | ||
| | Unknown agent name | No node selected, error banner displayed | | ||
| | Subworkflow hasn't started yet | Navigation fails with "not found" error | | ||
| | Page refresh | Deep-link re-applied from URL (full state replay) | | ||
| | Combined with breadcrumb navigation | User can freely navigate after deep-link applies | | ||
|
|
||
| ## Example URLs | ||
|
|
||
| ``` | ||
| # Root workflow — default view | ||
| http://localhost:49123 | ||
|
|
||
| # Select an agent in the root workflow | ||
| http://localhost:49123?agent=intake | ||
|
|
||
| # Drill into a subworkflow | ||
| http://localhost:49123?subworkflow=planning | ||
|
|
||
| # Drill two levels deep | ||
| http://localhost:49123?subworkflow=planning/design | ||
|
|
||
| # Drill into subworkflow and select an agent within it | ||
| http://localhost:49123?subworkflow=planning/design&agent=reviewer | ||
|
|
||
| # for_each iteration by exact slot key | ||
| http://localhost:49123?subworkflow=plan_child[item-0] | ||
|
|
||
| # for_each iteration by positional index (# → %23 in URL) | ||
| http://localhost:49123?subworkflow=plan_child%230 | ||
|
|
||
| # Nested: for_each iteration, then into a child subworkflow | ||
| http://localhost:49123?subworkflow=plan_child%230/design&agent=writer | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -285,6 +285,7 @@ def __init__( | |
| web_dashboard: WebDashboard | None = None, | ||
| _subworkflow_depth: int = 0, | ||
| run_context: RunContext | None = None, | ||
| _dashboard_context_path: list[str] | None = None, | ||
| ) -> None: | ||
| """Initialize the WorkflowEngine. | ||
|
|
||
|
|
@@ -314,6 +315,13 @@ def __init__( | |
| _subworkflow_depth: Current nesting depth for sub-workflow composition. | ||
| Used internally to enforce MAX_SUBWORKFLOW_DEPTH. Callers should | ||
| not set this directly. | ||
| _dashboard_context_path: Slot-key path identifying this engine's | ||
| position in the recursive sub-workflow tree. Root engine = ``[]``. | ||
| Sub-workflow engines spawned via ``_execute_subworkflow`` get | ||
| ``[*parent_path, slot_key]``. Used by ``_emit`` to auto-stamp | ||
| ``subworkflow_path`` on outgoing events so the dashboard can | ||
| route per-context state under concurrency. Callers should not | ||
| set this directly. | ||
|
|
||
| Note: | ||
| If both provider and registry are provided, registry takes precedence. | ||
|
|
@@ -381,6 +389,13 @@ def __init__( | |
| self._bg_mode = self._run_context.bg_mode | ||
| self._system_metadata: dict[str, Any] = {} | ||
|
|
||
| # Recursive sub-workflow context path for dashboard routing. | ||
| # Root engine = []. Child engines spawned via _execute_subworkflow get | ||
| # [*parent_path, slot_key]. _emit auto-stamps non-empty paths onto | ||
| # outgoing events so the frontend can resolve the owning context | ||
| # without inferring parentage from activeContextPath. | ||
| self._dashboard_context_path: list[str] = list(_dashboard_context_path or []) | ||
|
|
||
| def _build_pricing_overrides(self) -> dict[str, ModelPricing] | None: | ||
| """Build pricing overrides from workflow cost configuration. | ||
|
|
||
|
|
@@ -416,6 +431,11 @@ def _emit(self, event_type: str, data: dict[str, Any]) -> None: | |
| """ | ||
| if self._event_emitter is None: | ||
| return | ||
| # Auto-stamp subworkflow_path on every event from sub-engines so the | ||
| # dashboard can route per-context state under concurrency. Root engine | ||
| # has an empty path and emits no stamp (preserving legacy event shape). | ||
| if self._dashboard_context_path and "subworkflow_path" not in data: | ||
| data = {**data, "subworkflow_path": list(self._dashboard_context_path)} | ||
| event = WorkflowEvent(type=event_type, timestamp=_time.time(), data=data) | ||
| self._event_emitter.emit(event) | ||
|
|
||
|
|
@@ -598,6 +618,7 @@ async def _execute_subworkflow( | |
| self, | ||
| agent: AgentDef, | ||
| context: dict[str, Any], | ||
| slot_key: str | None = None, | ||
| ) -> dict[str, Any]: | ||
| """Execute a sub-workflow as a black-box step. | ||
|
|
||
|
|
@@ -608,6 +629,11 @@ async def _execute_subworkflow( | |
| Args: | ||
| agent: Workflow agent definition with ``workflow`` path. | ||
| context: Workflow context for template rendering (used as sub-workflow input). | ||
| slot_key: Identity of this sub-workflow run within the parent's | ||
| slot-key path. Defaults to ``agent.name`` for the sequential | ||
| path; for_each/parallel paths supply per-iteration keys | ||
| (e.g. ``"<group>[<key>]"``) so concurrent runs get distinct | ||
| identities. | ||
|
|
||
| Returns: | ||
| The sub-workflow's final output dict. | ||
|
|
@@ -673,6 +699,10 @@ async def _execute_subworkflow( | |
| keyboard_listener=self._keyboard_listener, | ||
| web_dashboard=self._web_dashboard, | ||
| _subworkflow_depth=self._subworkflow_depth + 1, | ||
| _dashboard_context_path=[ | ||
| *self._dashboard_context_path, | ||
| slot_key or agent.name, | ||
| ], | ||
| ) | ||
|
|
||
| return await child_engine.run(sub_inputs) | ||
|
|
@@ -1115,10 +1145,11 @@ async def _check_interrupt(self, current_agent_name: str) -> InterruptResult | N | |
|
|
||
| # In web mode, the interrupt was already handled at the provider level | ||
| # (partial output → _handle_web_pause). Consume the stale flag silently. | ||
| # We check for dashboard presence only (not has_connections) because in | ||
| # --web/--web-bg mode the CLI interactive handler is never appropriate, | ||
| # even if clients are transiently disconnected. | ||
| # EXCEPTION: in subworkflows (depth > 0), propagate the interrupt so it | ||
| # unwinds the child engine back to the parent, stopping the workflow. | ||
| if self._web_dashboard is not None: | ||
| if self._subworkflow_depth > 0: | ||
| raise InterruptError(agent_name=current_agent_name) | ||
| return None | ||
|
|
||
| # Build output preview from last stored output | ||
|
|
@@ -1222,6 +1253,32 @@ async def _handle_web_pause(self, agent_name: str, partial_output: AgentOutput) | |
| disconnect_task = asyncio.create_task(disconnect_event.wait()) | ||
| tasks = {resume_task, kill_task, disconnect_task} | ||
|
|
||
| # In subworkflows, also watch the interrupt_event so that a second | ||
| # Stop click while paused will stop the workflow without requiring | ||
| # the user to first Resume then wait for the next between-agent check. | ||
| # | ||
| # INTENTIONAL ROOT-vs-SUBWORKFLOW ASYMMETRY: | ||
| # At root depth, we deliberately do NOT subscribe to interrupt_event | ||
| # here — pause is exited only by Resume or Kill. Inside a sub-workflow | ||
| # we DO subscribe so a single Stop click cleanly unwinds the child | ||
| # engine back to the parent (Stop-during-pause is otherwise a no-op | ||
| # because the partial-output handler owns the only between-agent | ||
| # interrupt check, and the sub-engine is currently sitting in this | ||
| # pause loop instead of stepping through its main loop). | ||
| # | ||
| # Pre-clearing interrupt_event below means a Stop click that lands | ||
| # *between* clear() and the asyncio.create_task() below is silently | ||
| # discarded — but a Stop click that lands during the wait is honored. | ||
| # That window is tiny (microseconds), and the alternative (not | ||
| # clearing) would carry a stale Stop signal from a prior pause cycle | ||
| # into this one. We accept the narrow race in favor of correctness | ||
| # across cycles. See PR #113 review thread for the discussion. | ||
| stop_task = None | ||
| if self._subworkflow_depth > 0 and self._interrupt_event is not None: | ||
| self._interrupt_event.clear() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pause+Stop UX diverges between root and subworkflow. Pre-clearing At minimum, add a code comment explaining the intentional asymmetry. Ideally, document the divergence in the dashboard UX docs (or align behavior between root and subworkflow).
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 1c3d199 with an inline comment in
Also added Aligning behavior between root and subworkflow Stop semantics feels out of scope for this PR - happy to file a follow-up if you want them unified one way or the other. |
||
| stop_task = asyncio.create_task(self._interrupt_event.wait()) | ||
| tasks.add(stop_task) | ||
|
|
||
| # If any event was set between clear() and task creation, the task | ||
| # will already be done — no need to wait, but we still fall through | ||
| # to the normal done/pending handling below. | ||
|
|
@@ -1245,6 +1302,12 @@ async def _handle_web_pause(self, agent_name: str, partial_output: AgentOutput) | |
| if kill_task in done: | ||
| raise InterruptError(agent_name=agent_name) | ||
|
|
||
| # Stop-while-paused in a subworkflow: treat as interrupt | ||
| if stop_task is not None and stop_task in done: | ||
| if self._interrupt_event is not None: | ||
| self._interrupt_event.clear() | ||
| raise InterruptError(agent_name=agent_name) | ||
|
|
||
| if disconnect_task in done: | ||
| logger.info( | ||
| "All dashboard clients disconnected while '%s' was paused — auto-resuming", | ||
|
|
@@ -1793,6 +1856,8 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: | |
| "agent_name": agent.name, | ||
| "iteration": sub_execution_count, | ||
| "workflow": agent.workflow, | ||
| "parent_path": list(self._dashboard_context_path), | ||
| "slot_key": agent.name, | ||
| }, | ||
| ) | ||
|
|
||
|
|
@@ -1807,6 +1872,8 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: | |
| "elapsed": _sub_elapsed, | ||
| "error_type": type(exc).__name__, | ||
| "message": str(exc), | ||
| "parent_path": list(self._dashboard_context_path), | ||
| "slot_key": agent.name, | ||
| }, | ||
| ) | ||
| raise | ||
|
|
@@ -1818,6 +1885,8 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: | |
| "agent_name": agent.name, | ||
| "elapsed": _sub_elapsed, | ||
| "output": sub_output, | ||
| "parent_path": list(self._dashboard_context_path), | ||
| "slot_key": agent.name, | ||
| }, | ||
| ) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Headline feature is untested.
This new
if self._subworkflow_depth > 0: raise InterruptError(...)branch — and the matching newstop_tasklogic in_handle_web_pause(lines 1077-1110) — have zero test coverage. Every test inTestCheckInterrupt(tests/test_engine/test_workflow_interrupt.py:572-658) constructs an engine with_subworkflow_depth=0, so this branch is dead code under tests. You could delete lines 969-970 and all 45 tests in the suite would still pass.Since "Stop button reliability during subworkflows" is a headline feature of this PR, please add at minimum:
test_check_interrupt_raises_in_subworkflow— engine with_subworkflow_depth=1and_web_dashboardset; assertInterruptErroris raised.test_handle_web_pause_stop_event_in_subworkflow— pause a child engine, set_interrupt_event, assertInterruptError.test_nested_subworkflow_path_accumulates— depth ≥ 2; assertsubworkflow_pathchains correctly across multiple levels.test_subworkflow_failed_event_carries_parent_path_and_slot_key— make the child raise; assert the failed event payload (currently only the success path is asserted attests/test_engine/test_subworkflow.py:1014-1015).for_each-of-workflow test asserting distinctslot_keyper iteration — the existing test only covers the trivial sequential case (slot_key == agent.name).Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in fef6b36. Added the test cases you specified (and one bonus):
TestCheckInterruptSubworkflow::test_check_interrupt_raises_in_subworkflow- engine with_subworkflow_depth=1and_web_dashboardset; assertsInterruptErroris raised andinterrupt_eventis cleared.TestCheckInterruptSubworkflow::test_check_interrupt_consumed_silently_at_root- regression guard for the root-depth silent-consume branch.TestHandleWebPauseSubworkflow::test_handle_web_pause_stop_event_in_subworkflow- pauses a child engine, sets_interrupt_eventmid-pause, assertsInterruptError.TestHandleWebPauseSubworkflow::test_handle_web_pause_root_ignores_interrupt_event- documents the intentional root-vs-subworkflow asymmetry as an executable spec.TestSubWorkflowDashboardPath::test_subworkflow_failed_event_carries_parent_path_and_slot_key- child raises, asserts the failed event payload (agent_name,parent_path,slot_key,error_type,message).TestSubWorkflowDashboardPath::test_nested_subworkflow_path_accumulates- depth 2 (parent -> mid -> leaf), assertssubworkflow_pathchains correctly across both levels and that the root engine still emitsworkflow_completedwithout a stamp.TestSubWorkflowDashboardPath::test_concurrent_for_each_subworkflow_emits_distinct_slot_keys- your point Idle recovery: cumulative counter fails long-running agents — add per-workflow config and reset-on-progress #5. The existingtest_for_each_subworkflow_emits_distinct_slot_keysalready covers slot-key uniqueness for for_each-of-workflow, but it ran withmax_concurrent=1so iterations were sequential. The new test usesmax_concurrent=3so iterations actually overlap, proving uniqueness is not an artifact of serial execution.Engine suite: 53 passed (was 46), one pre-existing Windows path failure in
test_event_log.pyis unrelated.