Skip to content

Worker processing comm#3

Merged
merceod merged 13 commits into
mainfrom
worker_processing_comm
Feb 28, 2026
Merged

Worker processing comm#3
merceod merged 13 commits into
mainfrom
worker_processing_comm

Conversation

@merceod

@merceod merceod commented Feb 27, 2026

Copy link
Copy Markdown
Collaborator

Here's a summary of everything implemented:

Files Created

  1. mminf/engine/init.py — Empty package init
  2. mminf/engine/base.py — BaseEngine ABC, EngineType enum, StageBatch and StageOutput data classes
  3. mminf/engine/enc_dec_engine.py — EncoderDecoderEngine for stateless forward passes (ViT, text emb, VAE)
  4. mminf/engine/flow_engine.py — FlowEngine for single denoising step execution
  5. mminf/engine/ar_engine.py — AREngine with PageAllocator, KVRequestState, FlashInfer-guarded prefill/decode, pause/resume
  6. mminf/worker/engine_manager.py — EngineManager mapping stage names to engine instances
  7. mminf/worker/micro_scheduler.py — MicroScheduler with priority-based stage selection (AR > Flow > EncDec)
  8. mminf/worker/worker.py — Real Worker integrating SubgraphsManager, EngineManager, MicroScheduler, and MooncakeCommunicationManager
  9. test/test_phase1.py — 20 tests covering PageAllocator, all engines, EngineManager, image_gen loop, SubgraphsManager integration, MicroScheduler, and prefill graph

Files Modified

  1. mminf/communication/tensors.py — Completed start_read_tensors() and get_ready_tensors(), changed self.pending from dict to list[EventAndPointers], added request_id to EventAndPointers, made NameAndRequestId frozen/hashable, guarded
    mooncake import
  2. mminf/worker/dummy_worker.py — Fixed 3 bugs:
    - SubgraphQueues.add_request/reset: used deepcopy(self.subgraph.section) instead of deepcopy(self.subgraph) (Subgraph vs GraphSection)
    - SubgraphQueues.is_done: now checks waiting is None AND ready is empty (was prematurely marking subgraphs complete)
    - SubgraphsManager.process_stage_outputs: skip back_to_conductor pointers and unknown stages when routing to workers
  3. mminf/graph/request_queues.py — Fixed process_new_inputs to return ProcessedInputs instead of raw list when waiting is None

…pending from dict to list[EventAndPointers], added request_id to EventAndPointers, made NameAndRequestId frozen/hashable, guarded mooncake import
…f.subgraph.section) instead of deepcopy(self.subgraph) (Subgraph vs GraphSection); (2) SubgraphQueues.is_done: now checks waiting is None AND ready is empty (was prematurely marking subgraphs complete); (3) SubgraphsManager.process_stage_outputs: skip back_to_conductor pointers and unknown stages when routing to workers
…n loop, SubgraphsManager integration, MicroScheduler, and prefill graph
…approach would be to raise an error or log a warning instead of silently skipping
@kamahori

Copy link
Copy Markdown
Collaborator

What's the distinction of engine vs worker?

@merceod merceod merged commit 3d788ca into main Feb 28, 2026
@kamahori kamahori deleted the worker_processing_comm branch March 14, 2026 00:38
shg8 added a commit that referenced this pull request May 5, 2026
PR #78 review issue #3. _apply_pending_stops_to_batch built the
"loop-back inputs to drop from this iter's routing" set as every
self-loop edge on the running node:

  stopped_loop_backs[rid] = {
      edge.name for edge in node.outputs
      if edge.next_node == node.name
  }

That works for in-tree models (each AR node owns exactly one decode
loop, so {self-loop edges} == {this loop's loop-back edges}), but
contradicts the docstring's "the stopped loop's loop-back" promise.
A node that participates in two distinct loops with disjoint loop-
back edges would lose the SURVIVING loop's loop-back tensor when
the OTHER loop stopped, since the broad self-loop filter doesn't
know which edge belongs to which loop.

Fix: thread per-loop attribution through stop_loops.

- WorkerGraphQueues.stop_loops now returns the
  (edge.name, edge.next_node) pairs of the loop-back signals
  belonging to the loops that actually matched loop_names. The
  walker already finds matching DynamicLoops to call
  register_finished on; collect their _loop_back_signals at the
  same point.

- WorkerGraphsManager.stop_loops aggregates and returns the union
  across worker graphs. Existing void callers
  (_drain_orphan_pending_stops) ignore the return — backward-
  compatible.

- _apply_pending_stops_to_batch intersects node.outputs against
  the returned set: now drops only edges that are BOTH self-loops
  AND loop-back of a stopped loop. The self-loop guard preserves
  the existing consumer's filter shape (kept_for_routing only
  drops e.next_node == node.name && e.name in consumed_names).

No in-tree model has the multi-loop-per-node shape today, so the
visible behavior on Orpheus / BAGEL / Q3-Omni is unchanged. The
fix tightens the docstring contract to match the code so the next
model with that shape doesn't silently drop loop-back tensors.

test/modular/test_stop_loops_filter.py drives WorkerGraphQueues
.stop_loops directly with a hand-built section to verify the
attribution. Five cases:

  - returns only stopped loops' loop-back signals
  - returns union when multiple loops stopped
  - empty when no loops match
  - register_finished only fires on matched loops
  - shared-node-name case from the PR comment: two loops, same
    GraphNode name "n", disjoint self-loop edges {a} and {b};
    stopping loop_a returns {("a","n")} and never {("b","n")}
shg8 added a commit that referenced this pull request May 5, 2026
PR #78 review issue #3. _apply_pending_stops_to_batch built the
"loop-back inputs to drop from this iter's routing" set as every
self-loop edge on the running node:

  stopped_loop_backs[rid] = {
      edge.name for edge in node.outputs
      if edge.next_node == node.name
  }

That works for in-tree models (each AR node owns exactly one decode
loop, so {self-loop edges} == {this loop's loop-back edges}), but
contradicts the docstring's "the stopped loop's loop-back" promise.
A node that participates in two distinct loops with disjoint loop-
back edges would lose the SURVIVING loop's loop-back tensor when
the OTHER loop stopped, since the broad self-loop filter doesn't
know which edge belongs to which loop.

Fix: thread per-loop attribution through stop_loops.

- WorkerGraphQueues.stop_loops now returns the
  (edge.name, edge.next_node) pairs of the loop-back signals
  belonging to the loops that actually matched loop_names. The
  walker already finds matching DynamicLoops to call
  register_finished on; collect their _loop_back_signals at the
  same point.

- WorkerGraphsManager.stop_loops aggregates and returns the union
  across worker graphs. Existing void callers
  (_drain_orphan_pending_stops) ignore the return — backward-
  compatible.

- _apply_pending_stops_to_batch intersects node.outputs against
  the returned set: now drops only edges that are BOTH self-loops
  AND loop-back of a stopped loop. The self-loop guard preserves
  the existing consumer's filter shape (kept_for_routing only
  drops e.next_node == node.name && e.name in consumed_names).

No in-tree model has the multi-loop-per-node shape today, so the
visible behavior on Orpheus / BAGEL / Q3-Omni is unchanged. The
fix tightens the docstring contract to match the code so the next
model with that shape doesn't silently drop loop-back tensors.

test/modular/test_stop_loops_filter.py drives WorkerGraphQueues
.stop_loops directly with a hand-built section to verify the
attribution. Five cases:

  - returns only stopped loops' loop-back signals
  - returns union when multiple loops stopped
  - empty when no loops match
  - register_finished only fires on matched loops
  - shared-node-name case from the PR comment: two loops, same
    GraphNode name "n", disjoint self-loop edges {a} and {b};
    stopping loop_a returns {("a","n")} and never {("b","n")}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants