Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,4 @@ Implementation parity rules:
- The async helpers (`run_workflow_async` and `resume_workflow_async` in `cli/run.py`) must wire up the same event emitter, JSONL event log subscriber, console event subscriber, and `WebDashboard` lifecycle.
- The `WorkflowEngine` constructor receives the same kwargs in both paths (`event_emitter`, `web_dashboard`, `run_context`, `interrupt_event`, `keyboard_listener`, `instructions_preamble`).
- Background-process forking lives in `cli/bg_runner.py`. `run --web-bg` calls `launch_background()` and `resume --web-bg` calls `launch_background_resume()`. Both must forward equivalent options and write a PID file via `cli/pid.py`.
- Note: on resume, the dashboard only shows events from the resumed agent forward — events from agents that completed before the checkpoint were emitted in the original process and are not replayed.
- Note: on resume, the dashboard is seeded with prior events before it starts accepting clients. The CLI prepends a fresh `workflow_started` event built from the **current** workflow YAML (via `WorkflowEngine.build_workflow_started_data()`) so historical events apply to the correct topology; it then either replays the original JSONL event log (`WebDashboard.replay_events_from_jsonl()` — when the checkpoint records an `event_log_path` and the file exists) or synthesises minimal `*_started` / `*_completed` pairs from the restored `WorkflowContext` (`replay_synthetic_from_context()`). The resumed engine's own `workflow_started` emit is suppressed via `engine.suppress_workflow_started_emit()` so the dashboard sees exactly one root `workflow_started` (no `wfDepth` double-count). Root-level lifecycle events from the original JSONL (`workflow_started` / `workflow_completed` / `workflow_failed` / `checkpoint_saved`) are filtered out on replay; subworkflow-level lifecycle events are preserved so frontend `wfDepth` stays balanced. The resumed `EventLogSubscriber` opens the original JSONL in append mode (when available) so a multi-resume session produces one continuous log file and `run_id` stays stable for log-correlation tools.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased](https://github.com/microsoft/conductor/compare/v0.1.16...HEAD)

### Fixed
- `conductor resume … --web` and `--web-bg` no longer open an empty
dashboard. Checkpoints now record the original `run_id` and JSONL
`event_log_path`. On resume the dashboard's history is seeded BEFORE
it accepts clients: the CLI prepends a fresh `workflow_started` event
built from the current YAML (so historical events apply to the
correct topology), then replays the original JSONL log line-by-line
(or, when no log file is available, synthesises minimal
`*_started`/`*_completed` pairs from the restored execution history).
The resumed engine's own `workflow_started` emit is suppressed so the
dashboard sees exactly one root start — no `wfDepth` double-counting.
Root-level `workflow_completed` / `workflow_failed` /
`checkpoint_saved` events from the original run are filtered out on
replay; subworkflow lifecycle events are preserved so the frontend's
context tracking stays balanced. The resumed `EventLogSubscriber`
appends to the original log, preserving `run_id` across resume
generations so log/timeline correlation tools see one continuous run
(#167).
- Workflows that configure `reasoning.effort` (or workflow-wide
`runtime.default_reasoning_effort`) on the Copilot provider were broken
for **every named Copilot model** when running against
Expand Down
139 changes: 100 additions & 39 deletions src/conductor/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import sys
import tempfile
import time
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -1583,9 +1584,12 @@ async def resume_workflow_async(
log_file: Optional path to write full debug output to a file.
no_interactive: If True, disables the keyboard interrupt listener.
web: If True, start a real-time web dashboard for the resumed run.
Note: the dashboard only shows events from the resumed agent
forward; agent runs that completed before the checkpoint are
not replayed.
The dashboard is seeded with the original timeline by replaying
the JSONL event log captured during the previous run (or by
synthesising minimal events from the restored ``WorkflowContext``
when the log file is unavailable), so previously completed
agents remain visible alongside live events from the resumed
run.
web_port: Port for the web dashboard (0 = auto-select).
web_bg: If True, auto-shutdown dashboard after workflow + client
disconnect.
Expand Down Expand Up @@ -1667,30 +1671,8 @@ async def resume_workflow_async(
f"Checkpoint created: {cp.created_at} (failed at: {cp.failure.get('agent', 'unknown')})"
)

# Start web dashboard now that we have the workflow path
if web:
from conductor.web.server import WebDashboard

bg_mode = web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1"
dashboard = WebDashboard(
emitter,
host="127.0.0.1",
port=web_port,
bg=bg_mode,
workflow_root=resolved_workflow_path.resolve().parent,
)

try:
await dashboard.start()
_verbose_console.print(f"[bold cyan]Dashboard:[/bold cyan] {dashboard.url}")
except Exception as e:
_verbose_console.print(
f"[bold yellow]Warning:[/bold yellow] "
f"Dashboard failed to start: {e}. Continuing without dashboard."
)
dashboard = None

# Load workflow config
# Load workflow config first — needed both to construct the dashboard
# (workflow_root) and to seed the synthetic replay fallback.
config = load_config(resolved_workflow_path)

# Merge CLI metadata on top of YAML-declared metadata (parity with run)
Expand All @@ -1702,16 +1684,6 @@ async def resume_workflow_async(
verbose_log(f"Provider override: {provider_override}", style="yellow")
config.workflow.runtime.provider = provider_override # type: ignore[assignment]

# Start JSONL event log subscriber (parity with run)
from conductor.engine.event_log import EventLogSubscriber

event_log_subscriber = EventLogSubscriber(config.workflow.name)
emitter.subscribe(event_log_subscriber.on_event)

# Subscribe console output to the event emitter (parity with run)
console_subscriber = ConsoleEventSubscriber()
emitter.subscribe(console_subscriber.on_event)

# Verify the current_agent exists in the workflow
agent_names = {a.name for a in config.agents}
parallel_names = {g.name for g in config.parallel} if config.parallel else set()
Expand All @@ -1734,6 +1706,25 @@ async def resume_workflow_async(
timeout_seconds=config.workflow.limits.timeout_seconds,
)

# Construct the web dashboard early (subscribes to the emitter on
# construction) but defer ``dashboard.start()`` until after we have
# seeded ``_event_history`` with the current-config
# ``workflow_started`` event plus the original run's replay. That
# way the very first ``GET /api/state`` and the first WebSocket
# client both see a fully populated, topology-correct history —
# no race window where a client connects mid-replay.
if web:
from conductor.web.server import WebDashboard

bg_mode = web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1"
dashboard = WebDashboard(
emitter,
host="127.0.0.1",
port=web_port,
bg=bg_mode,
workflow_root=resolved_workflow_path.resolve().parent,
)

# Build MCP servers config (same as run_workflow_async)
mcp_servers = await _build_mcp_servers(config)

Expand Down Expand Up @@ -1761,6 +1752,33 @@ async def resume_workflow_async(

from conductor.engine.workflow import RunContext

# Resume-mode log path: append to the original log when available
# so a multi-resume session produces one continuous file and
# ``run_id`` stays stable across resume generations.
existing_log_path: Path | None = None
if cp.event_log_path:
candidate = Path(cp.event_log_path)
if candidate.exists() and candidate.is_file():
existing_log_path = candidate

# Build the JSONL subscriber BEFORE the engine so RunContext
# carries the resolved ``run_id`` and ``log_file`` (used by
# the engine to populate the ``workflow_started`` event payload).
# When the checkpoint has the original log info, the subscriber
# appends to it and reuses run_id; otherwise it generates fresh.
from conductor.engine.event_log import EventLogSubscriber

event_log_subscriber = EventLogSubscriber(
config.workflow.name,
existing_path=existing_log_path,
existing_run_id=cp.run_id or None,
)
emitter.subscribe(event_log_subscriber.on_event)

# Subscribe console output to the event emitter (parity with run)
console_subscriber = ConsoleEventSubscriber()
emitter.subscribe(console_subscriber.on_event)

engine = WorkflowEngine(
config,
registry=registry,
Expand All @@ -1772,15 +1790,58 @@ async def resume_workflow_async(
web_dashboard=dashboard,
instructions_preamble=cp.instructions_preamble,
run_context=RunContext(
run_id=event_log_subscriber.run_id if event_log_subscriber else "",
log_file=str(event_log_subscriber.path) if event_log_subscriber else "",
run_id=event_log_subscriber.run_id,
log_file=str(event_log_subscriber.path),
dashboard_port=(dashboard.port if dashboard is not None else None),
bg_mode=web_bg or os.environ.get("CONDUCTOR_WEB_BG") == "1",
),
)
engine.set_context(restored_context)
engine.set_limits(restored_limits)

# Seed the dashboard with the original timeline so previously
# completed agents remain visible. Order matters:
# 1. Prepend a fresh ``workflow_started`` built from the
# current config so historical events apply to the
# correct topology.
# 2. Replay the original JSONL log (root-level lifecycle
# events are filtered to keep frontend ``wfDepth`` balanced).
# 3. If no JSONL is available, fall back to synthesised
# events from the restored context.
# 4. Suppress the engine's own ``workflow_started`` emit on
# resume — without this the dashboard would see two root
# starts and treat the live run as a child workflow.
if dashboard is not None:
dashboard.prepend_workflow_started(engine.build_workflow_started_data())
replayed = 0
if existing_log_path is not None:
replayed = dashboard.replay_events_from_jsonl(existing_log_path)
if replayed == 0:
try:
cp_ts: float | None = datetime.fromisoformat(cp.created_at).timestamp()
except (TypeError, ValueError):
cp_ts = None
replayed = dashboard.replay_synthetic_from_context(
restored_context, config, checkpoint_timestamp=cp_ts
)
verbose_log(f"Seeded dashboard with {replayed} prior event(s)")
engine.suppress_workflow_started_emit()

try:
await dashboard.start()
_verbose_console.print(f"[bold cyan]Dashboard:[/bold cyan] {dashboard.url}")
except Exception as e:
_verbose_console.print(
f"[bold yellow]Warning:[/bold yellow] "
f"Dashboard failed to start: {e}. Continuing without dashboard."
)
# Drop the dashboard everywhere it's been wired up.
# The engine + DialogHandler captured it at construction
# time and would otherwise block waiting on a never-
# running WebSocket for human gates / dialogs.
engine.clear_web_dashboard()
dashboard = None

# Share interrupt_event with dashboard so POST /api/stop can abort agents
if dashboard is not None and interrupt_event is not None:
dashboard.set_interrupt_event(interrupt_event)
Expand Down
29 changes: 29 additions & 0 deletions src/conductor/engine/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ class CheckpointData:
limits: Serialized ``LimitEnforcer`` state.
copilot_session_ids: Mapping of agent names to Copilot session IDs.
file_path: Path where the checkpoint file is stored.
instructions_preamble: Workspace instructions preamble that was
active during the original run, or ``None``.
run_id: Original run identifier from the ``EventLogSubscriber``.
Empty string when the checkpoint was written by a version
of Conductor that predated this field.
event_log_path: Filesystem path to the original JSONL event log.
Used by the CLI resume path to seed the web dashboard with
the original timeline and to append further events to the
same log. Empty string when the checkpoint was written by a
version of Conductor that predated this field or when the
log file was unavailable at checkpoint time.
"""

version: int
Expand All @@ -93,6 +104,13 @@ class CheckpointData:
file_path: Path = field(default_factory=lambda: Path())
instructions_preamble: str | None = None
"""Workspace instructions preamble that was active during the original run."""
run_id: str = ""
"""Original run identifier from ``EventLogSubscriber``. Empty for
checkpoints written before this field was introduced."""
event_log_path: str = ""
"""Filesystem path to the original JSONL event log. Empty for
checkpoints written before this field was introduced, or when the
log file was unavailable at checkpoint time."""


class CheckpointManager:
Expand Down Expand Up @@ -140,6 +158,8 @@ def save_checkpoint(
copilot_session_ids: dict[str, str] | None = None,
system_metadata: dict[str, Any] | None = None,
instructions_preamble: str | None = None,
run_id: str = "",
event_log_path: str = "",
) -> Path | None:
"""Serialize workflow state to a checkpoint file.

Expand All @@ -159,6 +179,11 @@ def save_checkpoint(
copilot_session_ids: Optional mapping of agent names to session IDs.
system_metadata: Optional system metadata captured at workflow start.
instructions_preamble: Optional workspace instructions preamble to persist.
run_id: Original run identifier (from ``EventLogSubscriber``).
Persisted so resume can keep run-correlation stable.
event_log_path: Filesystem path to the original JSONL event log.
Persisted so resume can replay prior events into the
dashboard and append further events to the same log.

Returns:
Path to the saved checkpoint file, or ``None`` if saving failed.
Expand Down Expand Up @@ -201,6 +226,8 @@ def save_checkpoint(
"copilot_session_ids": copilot_session_ids or {},
"system": system_metadata or {},
"instructions_preamble": instructions_preamble,
"run_id": run_id,
"event_log_path": event_log_path,
}

# Serialize to JSON
Expand Down Expand Up @@ -317,6 +344,8 @@ def load_checkpoint(checkpoint_path: Path) -> CheckpointData:
copilot_session_ids=data.get("copilot_session_ids", {}),
file_path=checkpoint_path,
instructions_preamble=data.get("instructions_preamble"),
run_id=data.get("run_id", "") or "",
event_log_path=data.get("event_log_path", "") or "",
)

@staticmethod
Expand Down
53 changes: 50 additions & 3 deletions src/conductor/engine/event_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,60 @@ class EventLogSubscriber:
Each line is a JSON object with ``type``, ``timestamp``, and ``data``
fields — the same shape as ``WorkflowEvent.to_dict()``.

Args:
workflow_name: Used in the filename for easy identification.
By default a fresh log file is created under
``$TMPDIR/conductor/`` with a random ``run_id`` suffix. When the
optional ``existing_path``/``existing_run_id`` kwargs are provided
and the file is writable, the subscriber appends to the existing log
and reuses the run id — used by the CLI's resume flow so a workflow
that is paused and resumed (possibly multiple times) produces a
single continuous log instead of one file per resume generation.
"""

def __init__(self, workflow_name: str) -> None:
def __init__(
self,
workflow_name: str,
*,
existing_path: Path | None = None,
existing_run_id: str | None = None,
) -> None:
"""Initialise the subscriber.

Args:
workflow_name: Used in the default filename for easy
identification when no ``existing_path`` is provided.
existing_path: When provided alongside ``existing_run_id``
and the file is writable, open it in append mode and
continue writing to the original log instead of creating
a new one. Used by ``resume_workflow_async`` so a
resumed run produces one continuous JSONL log across
resume generations.
existing_run_id: The run identifier associated with
``existing_path``. Reused (not regenerated) so log /
timeline correlation tools see one continuous run.
"""
import secrets

if (
existing_path is not None
and existing_run_id
and existing_path.exists()
and existing_path.is_file()
):
try:
# Append mode preserves the original events; rely on the
# caller (the dashboard replay step) to seed the in-memory
# state from the existing contents.
self._handle = open(existing_path, "a", encoding="utf-8") # noqa: SIM115
self._path = existing_path
self._run_id = existing_run_id
return
except OSError:
logger.warning(
"Cannot append to existing event log %s; creating a new log instead",
existing_path,
exc_info=True,
)

ts = time.strftime("%Y%m%d-%H%M%S")
# Append random suffix to avoid filename collisions
# when multiple runs start in the same second
Expand Down
Loading
Loading