Add async function support and fix concurrent packet execution#72
Add async function support and fix concurrent packet execution#72
Conversation
PythonPacketFunction now accepts both sync and async callables: - Async functions are detected at construction via inspect.iscoroutinefunction - direct_async_call awaits async functions directly (no thread pool) - direct_call runs async functions synchronously via asyncio.run() - Sync function behavior is unchanged Also extracts _build_output_packet helper to eliminate duplication between the sync and async code paths, and fixes get_function_components to recognise `async def` declarations. https://claude.ai/code/session_01Cw9C8vLr7HHeis6vkJXbtY
…e.async_execute PersistentFunctionNode.async_execute Phase 2 was fully sequential, processing each packet one at a time. This defeated the purpose of async execution for I/O-bound packet functions when run through the Pipeline API. Rewrote Phase 2 to use the same Semaphore + TaskGroup pattern as the parent FunctionNode. https://claude.ai/code/session_01Cw9C8vLr7HHeis6vkJXbtY
Shows all four combinations of sync/async executor x sync/async function in a single Pipeline API demo, with analysis explaining why async+sync and async+async perform similarly for I/O-bound work. https://claude.ai/code/session_01Cw9C8vLr7HHeis6vkJXbtY
…l gap Replace time.sleep with a pure-Python busy-wait loop that holds the GIL. This makes the async+sync vs async+async difference clearly visible: threads cannot overlap GIL-bound work, while native coroutines can. https://claude.ai/code/session_01Cw9C8vLr7HHeis6vkJXbtY
|
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
This PR adds end-to-end support for async (async def) packet functions in PythonPacketFunction and fixes PersistentFunctionNode.async_execute so that Phase 2 processes packets concurrently (respecting configured concurrency limits) instead of sequentially.
Changes:
- Add async-function detection + correct sync/async invocation paths in
PythonPacketFunction. - Make
PersistentFunctionNode.async_executePhase 2 concurrent viaSemaphore + TaskGroup(matchingFunctionNode). - Add tests and an example demonstrating concurrency behavior and expected performance differences.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
src/orcapod/core/packet_function.py |
Detect async functions and route execution appropriately; refactor output-packet construction. |
src/orcapod/core/nodes/function_node.py |
Fix PersistentFunctionNode.async_execute Phase 2 to run concurrently with bounded concurrency. |
src/orcapod/hashing/hash_utils.py |
Ensure source hashing logic recognizes async def declarations. |
tests/test_core/packet_function/test_packet_function.py |
Add fixtures and unit tests covering async function construction + sync/async call paths. |
tests/test_channels/test_node_async_execute.py |
Add integration test asserting async packets overlap under concurrency. |
examples/async_vs_sync_pipeline.py |
Provide an illustrative 2x2 matrix example of executor/function sync vs async combinations. |
DESIGN_ISSUES.md |
Document the resolved FN1 issue for PersistentFunctionNode.async_execute. |
Comments suppressed due to low confidence (4)
tests/test_channels/test_node_async_execute.py:369
- This test relies on wall-clock timing (
elapsed < 0.6) to prove concurrency, which is prone to flakiness on slower/contended CI machines. Consider asserting concurrency more deterministically (e.g., track max in-flight tasks with a counter/asyncio.Eventliketests/test_core/test_regression_fixes.py::TestAsyncExecuteBackpressure, or relax the timing assertion substantially).
t0 = time.perf_counter()
await node.async_execute([input_ch.reader], output_ch.writer)
elapsed = time.perf_counter() - t0
results = await output_ch.reader.collect()
assert len(results) == 5
values = sorted(pkt.as_dict()["result"] for _, pkt in results)
assert values == [0, 2, 4, 6, 8]
# With 5 packets at 0.2s each and max_concurrency=5,
# concurrent execution should complete in ~0.2s, not ~1.0s
assert elapsed < 0.6, f"Expected concurrent execution but took {elapsed:.2f}s"
examples/async_vs_sync_pipeline.py:173
- The analysis text says native async coroutines “bypass the GIL entirely”, which is inaccurate: coroutines still execute Python bytecode under the GIL, they just yield control at
awaitpoints (making I/O overlap possible). Please adjust the wording to avoid misleading readers (e.g., describe reduced GIL contention due to cooperative scheduling).
print(f" async+async {t4:.2f}s — branches overlap AND packets overlap")
print(f" (native coroutines bypass the GIL entirely)")
print()
print(f" Key insight: async+sync is much slower than async+async because")
print(f" the sync function holds the GIL, so run_in_executor threads")
print(f" cannot actually run in parallel. Native async coroutines release")
print(f" control at each 'await', allowing true concurrency.")
src/orcapod/core/packet_function.py:442
_call_async_function_synccreates a newThreadPoolExecutor(and thread) on every call when a loop is already running, which can be very expensive and can exhaust threads under load. Consider reusing a shared executor (module-level or onself) or a dedicated long-lived background thread/loop for running these coroutines from sync contexts inside an event loop.
# Already in a loop — run in a separate thread with its own loop
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(1) as pool:
return pool.submit(asyncio.run, coro).result()
src/orcapod/core/packet_function.py:441
- In the
sem(running-loop) branch, the coroutine is created in the caller thread (coro = self._function(...)) and then passed into another thread forasyncio.run. If thread submission fails or gets cancelled, this can leave an un-awaited coroutine and raise warnings; it’s also safer to construct the coroutine inside the thread that runs it. Consider passingpacket.as_dict()into the thread and creating/awaiting the coroutine entirely within that thread.
coro = self._function(**packet.as_dict())
try:
asyncio.get_running_loop()
except RuntimeError:
# No running loop — safe to use asyncio.run()
return asyncio.run(coro)
else:
# Already in a loop — run in a separate thread with its own loop
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(1) as pool:
return pool.submit(asyncio.run, coro).result()
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| sem = ( | ||
| asyncio.Semaphore(max_concurrency) | ||
| if max_concurrency is not None | ||
| else None | ||
| ) | ||
|
|
||
| async def process_one( | ||
| tag: TagProtocol, packet: PacketProtocol | ||
| ) -> None: | ||
| try: | ||
| tag_out, result_packet = await self.async_process_packet( | ||
| tag, packet | ||
| ) | ||
| if result_packet is not None: | ||
| await output.send((tag_out, result_packet)) | ||
| finally: | ||
| if sem is not None: | ||
| sem.release() | ||
|
|
||
| async with asyncio.TaskGroup() as tg: | ||
| async for tag, packet in inputs[0]: | ||
| input_hash = packet.content_hash().to_string() | ||
| if input_hash in computed_hashes: | ||
| continue | ||
| if sem is not None: | ||
| await sem.acquire() | ||
| tg.create_task(process_one(tag, packet)) |
There was a problem hiding this comment.
resolve_concurrency can return 0 (it’s not validated), and asyncio.Semaphore(0) will cause Phase 2 to deadlock on the first await sem.acquire(). Since PersistentFunctionNode.async_execute newly uses the semaphore scaffold, this introduces a potential hang for NodeConfig(max_concurrency=0). Consider validating max_concurrency to be None or >= 1 (raising a ValueError otherwise) before constructing/acquiring the semaphore.
- Replace misleading GIL bypass claim with accurate cooperative scheduling description in async example - Reuse a module-level ThreadPoolExecutor instead of creating one per call in _call_async_function_sync - Construct coroutine inside the executor thread to avoid unawaited coroutine warnings on submission failure - Validate resolve_concurrency rejects <= 0 to prevent Semaphore(0) deadlock - Add test suite exposing all five issues (TDD: tests written first, verified to fail, then fixed) https://claude.ai/code/session_01Cw9C8vLr7HHeis6vkJXbtY
Summary
This PR adds first-class support for async packet functions and fixes a critical bug where
PersistentFunctionNode.async_executewas executing packets sequentially instead of concurrently.Key Changes
1. Async Packet Function Support
is_asyncproperty toPythonPacketFunctionto detect async functions viainspect.iscoroutinefunction()_call_async_function_sync()helper to safely run async functions from sync contexts usingasyncio.run()or thread pool fallback when already in an event loopdirect_call()to handle async functions by running them synchronouslydirect_async_call()toawaitasync functions directly instead of always usingrun_in_executor_build_output_packet()to reduce duplication2. Fixed Concurrent Packet Execution in
PersistentFunctionNodeasync_executewas fully sequential — packets were awaited one at a time in a simpleasync forloop, preventing async functions from overlapping I/O operationsasyncio.Semaphore + TaskGrouppattern (matching parentFunctionNode.async_execute), enabling true concurrent execution of async packetsNodeConfig.max_concurrencyandPipelineConfig3. Test Coverage
async_add_pf,async_multi_pf)test_concurrent_execution_with_async_functionverifying that 5 async packets with 0.2s sleep each complete in ~0.2s (concurrent) rather than ~1.0s (sequential)4. Example & Documentation
examples/async_vs_sync_pipeline.pydemonstrating the 2×2 matrix of sync/async executors and sync/async packet functionsDESIGN_ISSUES.mdto document the fix for FN1Implementation Details
The async function support gracefully handles both sync and async contexts:
direct_call), async functions are executed viaasyncio.run()or a helper threaddirect_async_call), async functions are awaited directly for zero-copy efficiencyrun_in_executorwhen called from async contextsThe concurrent execution fix ensures that when multiple async packets are processed, they can overlap their I/O operations rather than serializing, providing the expected performance benefit of async/await.
https://claude.ai/code/session_01Cw9C8vLr7HHeis6vkJXbtY