fix(function_node): restore within-node concurrent packet execution#99
fix(function_node): restore within-node concurrent packet execution#99eywalker merged 8 commits intonauticalab:devfrom
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
Restores within-node concurrent packet execution for FunctionNode.async_execute in the async channel execution engine, reintroducing semaphore-based concurrency limiting consistent with FunctionPod.async_execute.
Changes:
- Re-adds concurrency resolution (
resolve_concurrency) and per-node semaphore gating inFunctionNode.async_executefor both DB-backed and simple execution paths usingasyncio.TaskGroup. - Updates/adds async execution tests to assert concurrent behavior and verify
max_concurrency=1enforces sequential processing. - Updates Copilot regression tests to reflect restored concurrency and to expect
resolve_concurrencyto rejectmax_concurrency <= 0.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
src/orcapod/core/nodes/function_node.py |
Restores concurrent task execution with optional semaphore limiting in async execution paths. |
tests/test_channels/test_node_async_execute.py |
Adds/updates concurrency tests for DB and non-DB paths, including a sequential limiting case. |
tests/test_channels/test_copilot_review_issues.py |
Updates concurrency expectations and renames/adjusts the max_concurrency=0 test to expect a ValueError. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Resolve concurrency limit from node config (pipeline config is not | ||
| # threaded through the orchestrator, so we fall back to defaults). | ||
| node_config = getattr(self._function_pod, "node_config", NodeConfig()) | ||
| max_concurrency = resolve_concurrency(node_config, PipelineConfig()) | ||
|
|
||
| sem = ( | ||
| asyncio.Semaphore(max_concurrency) | ||
| if max_concurrency is not None | ||
| else None | ||
| ) | ||
|
|
||
| try: |
There was a problem hiding this comment.
resolve_concurrency(...) (and semaphore construction) happen before the try/finally that closes output. If resolve_concurrency raises (e.g., max_concurrency <= 0), output.close() is never called, which can leave downstream consumers blocked waiting for closure. Consider moving concurrency resolution/semaphore creation inside the try (or adding an outer try/finally) so output is always closed even on early validation errors.
| # Resolve concurrency limit from node config (pipeline config is not | |
| # threaded through the orchestrator, so we fall back to defaults). | |
| node_config = getattr(self._function_pod, "node_config", NodeConfig()) | |
| max_concurrency = resolve_concurrency(node_config, PipelineConfig()) | |
| sem = ( | |
| asyncio.Semaphore(max_concurrency) | |
| if max_concurrency is not None | |
| else None | |
| ) | |
| try: | |
| try: | |
| # Resolve concurrency limit from node config (pipeline config is not | |
| # threaded through the orchestrator, so we fall back to defaults). | |
| node_config = getattr(self._function_pod, "node_config", NodeConfig()) | |
| max_concurrency = resolve_concurrency(node_config, PipelineConfig()) | |
| sem = ( | |
| asyncio.Semaphore(max_concurrency) | |
| if max_concurrency is not None | |
| else None | |
| ) |
| # 5 packets × 0.2s each should complete well under 1s with concurrency=5 | ||
| assert elapsed < 0.5, f"Expected concurrent execution but took {elapsed:.2f}s" | ||
|
|
There was a problem hiding this comment.
These wall-clock timing assertions (elapsed < 0.5) are likely to be flaky on CI under load (scheduler jitter, slower runners). Since this repo already has a deterministic concurrency counter test (see TestDeterministicConcurrencyTracking in test_copilot_review_issues.py), consider switching this test to track peak in-flight tasks instead of asserting on elapsed time, or relax the threshold significantly.
| # 5 packets × 0.2s each, sequential = ~1.0s minimum | ||
| assert elapsed >= 0.9, f"Expected sequential execution but took {elapsed:.2f}s" |
There was a problem hiding this comment.
This test uses a runtime threshold (elapsed >= 0.9) to infer sequential behavior, which can also be flaky across environments. A more robust approach is to deterministically track peak concurrency (e.g., with an asyncio.Lock()-protected counter) and assert that peak == 1 when max_concurrency=1.
| # 5 packets × 0.2s each should complete well under 1s with concurrency=5 | ||
| assert elapsed < 0.5, f"Expected concurrent execution but took {elapsed:.2f}s" |
There was a problem hiding this comment.
Same concern as the earlier concurrency timing test: elapsed < 0.5 is a tight wall-clock bound and can become flaky on slower CI runners. Prefer a deterministic concurrency/overlap assertion (peak in-flight tasks > 1) rather than elapsed-time thresholds, or loosen the bound enough to accommodate runner variance.
Add a comprehensive test suite covering all four combinations of pipeline orchestrator (sync/async) and function executor (sync/async def): 1. Sync orchestrator + sync function — sequential baseline 2. Sync orchestrator + async function — graceful no-deadlock bridging 3. Async orchestrator + sync function — thread-pool concurrency 4. Async orchestrator + async function — maximum native concurrency Each combination is tested for correctness and either sequential or concurrent behaviour. Cross-combination perf tests assert that async+async and async+sync are measurably faster (>40%) than sync+sync for I/O-bound workloads, validating the concurrency benefit restored by PR nauticalab#99 (ENG-297). Closes ENG-298. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Demonstrate that async+async (asyncio.sleep) outperforms async+sync (for-loop time.sleep) when the thread pool is saturated. The sync function uses a for loop of time.sleep calls — unambiguously blocking with no event-loop cooperation — and runs through run_in_executor, consuming one thread slot per concurrent packet. By capping the event loop's default executor at 2 workers while dispatching 6 packets, the async+sync path serialises into multiple rounds (~0.3 s) while async+async runs all coroutines concurrently in a single sleep period (~0.1 s). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…sync orch Add TestAsyncOrchestratorFunctionTypeDifference which directly tests that the function type (sync vs async) produces a measurable performance difference under the async orchestrator — the exact scenario that went undetected in the PLT-930 regression. With a single-worker thread pool: - async+sync (for-loop time.sleep): only 1 thread → packets are truly sequential (~N × sleep) - async+async (asyncio.sleep): no thread needed → all N packets run concurrently (~1 × sleep) If async_execute loses its TaskGroup concurrency, the async function also becomes sequential (~0.8s), exceeding the < 0.30s threshold and failing this test — directly catching the regression. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…re init inside try/finally, use deterministic concurrency assertions
|
All 4 Copilot comments fixed:
2-4. test_node_async_execute.py — Replaced all wall-clock timing assertions with deterministic peak-concurrency
No impact on PR #100 — it only adds a new test file (test_orchestrator_executor_matrix.py) and doesn't touch any |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Concurrency limiting was removed in PLT-922 (deferred to PLT-930). | ||
| # Packets are now processed sequentially, so peak should be 1. | ||
| assert peak == 1, f"Expected sequential execution (peak=1) but peak was {peak}" | ||
| # With concurrency restored (PLT-930), peak should match max_concurrency. |
There was a problem hiding this comment.
The comment says “peak should match max_concurrency”, but the assertions only check peak > 1 and peak <= 5 (not equality). Either tighten the assertion to peak == 5 (if deterministic here) or adjust the comment to reflect the weaker guarantee being tested.
| # With concurrency restored (PLT-930), peak should match max_concurrency. | |
| # With concurrency restored (PLT-930), peak should demonstrate | |
| # concurrent execution and must not exceed max_concurrency. |
…view-pr-99-concurrency-regression-fix-validate test(pipeline): orchestrator × executor concurrency matrix (ENG-298)
…side try/finally, use deterministic concurrency assertions
Add a comprehensive test suite covering all four combinations of pipeline orchestrator (sync/async) and function executor (sync/async def): 1. Sync orchestrator + sync function — sequential baseline 2. Sync orchestrator + async function — graceful no-deadlock bridging 3. Async orchestrator + sync function — thread-pool concurrency 4. Async orchestrator + async function — maximum native concurrency Each combination is tested for correctness and either sequential or concurrent behaviour. Cross-combination perf tests assert that async+async and async+sync are measurably faster (>40%) than sync+sync for I/O-bound workloads, validating the concurrency benefit restored by PR #99 (ENG-297). Closes ENG-298. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Here's a summary of the changes:
src/orcapod/core/nodes/function_node.py:
tests/test_channels/test_node_async_execute.py:
tests/test_channels/test_copilot_review_issues.py: