Skip to content

Fix concurrent streaming in ConcurrentExecution#638

Open
gtopper wants to merge 1 commit intomlrun:developmentfrom
gtopper:ML-12378
Open

Fix concurrent streaming in ConcurrentExecution#638
gtopper wants to merge 1 commit intomlrun:developmentfrom
gtopper:ML-12378

Conversation

@gtopper
Copy link
Copy Markdown
Collaborator

@gtopper gtopper commented Mar 31, 2026

ML-12378

Problem

When ConcurrentExecution processes streaming (generator) event processors with max_in_flight > 1, the generators are iterated serially in the FIFO worker coroutine. This blocks the worker from picking up other events until the current generator is fully consumed, negating the concurrency benefit of max_in_flight.

Root cause

_handle_completed called _emit_streaming_chunks (inherited from _StreamingStepMixin), which iterates the generator inline — holding the worker for the entire duration of each generator. With 4 events and a 0.5s generator, total time is ~2.0s instead of ~0.5s.

Solution

Offload generator iteration to background asyncio.Tasks that feed an asyncio.Queue, freeing the FIFO worker to process other events concurrently.

New internal types:

  • _GeneratorDone(error) — sentinel placed on the queue when the generator is exhausted (or errors).
  • _StreamingQueue(queue, task) — wraps the queue and its background task for lifecycle management.

New methods on ConcurrentExecution:

  • _iterate_generator — background task that consumes a generator (sync or async) into an asyncio.Queue. Sync generators use run_in_executor per next() call to avoid blocking the event loop.
  • _emit_streaming_from_queue — mirrors _emit_streaming_chunks but reads from the queue, emitting StreamChunk events and a final StreamCompletion (with error if the generator raised).

Process-based mechanisms (process_pool, dedicated_process) use the existing IPC pattern: _concurrent_streaming_run_in_subprocess runs the generator in the subprocess and streams chunks via multiprocessing.Queue, which _async_read_streaming_queue consumes as an async generator on the parent side.

Task lifecycle: _StreamingQueue holds the asyncio.Task reference, and _handle_completed awaits it after draining the queue. This prevents GC of fire-and-forget tasks and surfaces unexpected errors.

Additional changes

  • Migrated ConcurrentExecution to ParallelExecutionMechanisms: replaces the old _supported_concurrency_mechanisms list with the shared enum. Legacy names ("threading", "multiprocessing") are mapped automatically for backward compatibility.
  • Detects generator functions early via inspect.isgeneratorfunction in __init__, used to route process-based mechanisms to the IPC streaming path in _process_event.

Tests

  • Concurrency tests (test_concurrent_streaming_asyncio_async_gen, test_concurrent_streaming_sync_gen parametrized over thread_pool/process_pool/dedicated_process/naive): verify generators run concurrently across events.
  • Error tests (test_concurrent_streaming_error_asyncio_async_gen, test_concurrent_streaming_error_sync_gen parametrized over all mechanisms, test_concurrent_streaming_error_mixed_with_healthy): verify mid-stream errors propagate correctly and don't poison healthy concurrent events.

[ML-12378](https://iguazio.atlassian.net/browse/ML-12378)

Streaming generators were iterated serially in the FIFO worker, negating `max_in_flight` concurrency. Offload generator iteration to background tasks feeding an `asyncio.Queue`, enabling concurrent streaming across events.
Copy link
Copy Markdown
Collaborator

@royischoss royischoss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey LGTM minor comments

Comment thread storey/flow.py
args,
mp_queue,
)
ipc_gen = _async_read_streaming_queue(mp_queue)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is ipc stand for?

Comment thread tests/test_streaming.py

assert result == ["re_test_0", "re_test_1"]

# -- ML-12378 concurrency tests ----------------------------------------
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the task reference in docs and variables name?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants