test(pipeline): orchestrator × executor concurrency matrix (ENG-298)#100
Merged
brian-arnold merged 3 commits intorevive_asyncfrom Mar 24, 2026
Conversation
Add a comprehensive test suite covering all four combinations of pipeline orchestrator (sync/async) and function executor (sync/async def): 1. Sync orchestrator + sync function — sequential baseline 2. Sync orchestrator + async function — graceful no-deadlock bridging 3. Async orchestrator + sync function — thread-pool concurrency 4. Async orchestrator + async function — maximum native concurrency Each combination is tested for correctness and either sequential or concurrent behaviour. Cross-combination perf tests assert that async+async and async+sync are measurably faster (>40%) than sync+sync for I/O-bound workloads, validating the concurrency benefit restored by PR #99 (ENG-297). Closes ENG-298. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
Demonstrate that async+async (asyncio.sleep) outperforms async+sync (for-loop time.sleep) when the thread pool is saturated. The sync function uses a for loop of time.sleep calls — unambiguously blocking with no event-loop cooperation — and runs through run_in_executor, consuming one thread slot per concurrent packet. By capping the event loop's default executor at 2 workers while dispatching 6 packets, the async+sync path serialises into multiple rounds (~0.3 s) while async+async runs all coroutines concurrently in a single sleep period (~0.1 s). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…sync orch Add TestAsyncOrchestratorFunctionTypeDifference which directly tests that the function type (sync vs async) produces a measurable performance difference under the async orchestrator — the exact scenario that went undetected in the PLT-930 regression. With a single-worker thread pool: - async+sync (for-loop time.sleep): only 1 thread → packets are truly sequential (~N × sleep) - async+async (asyncio.sleep): no thread needed → all N packets run concurrently (~1 × sleep) If async_execute loses its TaskGroup concurrency, the async function also becomes sequential (~0.8s), exceeding the < 0.30s threshold and failing this test — directly catching the regression. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Collaborator
|
These tests look great and are a nice addition to PR #99 ! |
brian-arnold
approved these changes
Mar 24, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes ENG-298 — Review PR #99 concurrency regression fix and validate orchestrator × executor matrix.
PR #99 Review
PR #99 (fix(function_node): restore within-node concurrent packet execution) restores
asyncio.TaskGroup-based concurrency inFunctionNode.async_execute. The diff was reviewed and found correct:sem.acquire()beforetg.create_task(...),sem.release()infinallyt: TagProtocol = tag) avoids closure variable capture bugs_process_one_dbhelper defined once outside the loop — no closure aliasinggetattr(self._function_pod, "node_config", NodeConfig())is safe (attribute always present)asyncio.TaskGrouppropagates task exceptions correctlyWhat this PR adds
A new test file
tests/test_pipeline/test_orchestrator_executor_matrix.pycovering all four combinations at the pipeline level:def)async def)Each cell is tested for correctness and expected sequential/concurrent behaviour.
Regression guard (key addition)
TestAsyncOrchestratorFunctionTypeDifferenceis specifically designed to catch the PLT-930 class of regression, where async functions silently lose their concurrency benefit.It uses a single-worker thread pool to isolate the function-type dimension:
time.sleep): only 1 thread → packets forced sequential → ≥ 0.60sasyncio.sleep): no thread needed → all N concurrent → < 0.30sIf
async_executeever loses itsTaskGroupagain, the async function serialises to ~0.80s and the fast-path assertion fires — directly catching the regression. The previous node-level tests only validated orchestrator type, not function type; this test validates both simultaneously.Performance comparison tests
TestAsyncAsyncVsAsyncSyncadditionally shows async+async outperforms async+sync under thread-pool saturation (2 workers, 6 packets): the sync path needs multiple rounds (~0.3s) while the async path runs all coroutines in one go (~0.1s).Test results
Test plan
test_orchestrator_executor_matrix.py— 18 tests, all passtest_channels/test_node_async_execute.py— 22 tests, all passtest_channels/test_copilot_review_issues.py— 9 tests, all passtests/test_channels/+tests/test_pipeline/suite — 521 passedTaskGroupfromasync_executecausesTestAsyncOrchestratorFunctionTypeDifferenceto fail (0.80s vs < 0.30s threshold)🤖 Generated with Claude Code