Skip to content

Add concrete asynchronous orchestrator#71

Merged
eywalker merged 1 commit intodevfrom
claude/async-orchestrator-KoOH7
Mar 4, 2026
Merged

Add concrete asynchronous orchestrator#71
eywalker merged 1 commit intodevfrom
claude/async-orchestrator-KoOH7

Conversation

@eywalker
Copy link
Copy Markdown
Contributor

@eywalker eywalker commented Mar 4, 2026

This pull request introduces asynchronous pipeline execution to the Orcapod framework by implementing an async orchestrator and adding async execution methods to core node classes. The main focus is enabling push-based, channel-driven execution of the pipeline DAG using asyncio, allowing concurrent node execution and improved scalability.

Key changes include:

Async pipeline orchestration:

  • Added a new AsyncPipelineOrchestrator class that compiles the pipeline DAG into channels and launches all nodes concurrently using asyncio.TaskGroup. This orchestrator manages channel wiring, node task launching, and terminal output collection. (src/orcapod/pipeline/orchestrator.py)
  • Updated the Pipeline class to support an ExecutorType.ASYNC_CHANNELS mode, which delegates execution to the new orchestrator when specified in the pipeline config. (src/orcapod/pipeline/graph.py)
  • Exposed AsyncPipelineOrchestrator in the pipeline module's public API. (src/orcapod/pipeline/__init__.py)

Async execution in core nodes:

  • Added an async_execute method to FunctionNode, implementing streaming async execution with concurrency control via semaphores. (src/orcapod/core/function_pod.py)
  • Added an async_execute method to OperatorNode that delegates to the wrapped operator's async execution. (src/orcapod/core/operator_node.py)
  • Added an async_execute method to SourceNode, enabling it to push all packets from its stream to the output channel asynchronously. (src/orcapod/core/tracker.py)

Type and import updates:

  • Updated imports to include Sequence, ReadableChannel, and WritableChannel where needed for async execution signatures. (src/orcapod/core/operator_node.py, src/orcapod/core/tracker.py) [1] [2]

These changes collectively enable fully asynchronous, concurrent execution of Orcapod pipelines using a push-based channel model.

…G execution

Implements Phase 3 of the async execution system design:

- Add async_execute to SourceNode, OperatorNode, and FunctionNode
- Implement AsyncPipelineOrchestrator that compiles a GraphTracker DAG
  into bounded channels and launches all nodes concurrently via TaskGroup
- Support fan-out via BroadcastChannel when one node feeds multiple downstream
- Add Pipeline.run() integration with ExecutorType.ASYNC_CHANNELS config
- Export AsyncPipelineOrchestrator from pipeline package

Tests cover: linear pipeline, operator pipeline, diamond DAG (join),
fan-out, run_async entry point, and PipelineConfig integration.

https://claude.ai/code/session_01XVj6P27QtZvdazJ13kQFHp
Copilot AI review requested due to automatic review settings March 4, 2026 23:49
@eywalker eywalker changed the base branch from main to dev March 4, 2026 23:49
@eywalker eywalker changed the title Refactor protocols and enhance core functionality with new features Add concrete asynchronous orchestrator Mar 4, 2026
@eywalker eywalker merged commit a06c718 into dev Mar 4, 2026
2 checks passed
@codecov-commenter
Copy link
Copy Markdown

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 88.49558% with 13 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/orcapod/pipeline/orchestrator.py 89.85% 7 Missing ⚠️
src/orcapod/pipeline/graph.py 66.66% 4 Missing ⚠️
src/orcapod/core/function_pod.py 89.47% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors Orcapod’s core stream/source/operator abstractions to a new protocol- and schema-based API, while introducing new runtime features (async channels/executors, persistent operator nodes) and updating developer automation (pre-commit + CI) and agent documentation.

Changes:

  • Replace legacy stream/source/kernel/datagram implementations with ArrowTableStream/ArrowTableSource-centric architecture and Schema/ColumnConfig APIs.
  • Add async execution foundations: channel primitives, StaticOutputPod.async_execute, and executor abstractions (local + Ray).
  • Update tooling and docs: agent instruction files, pre-commit hooks, CI matrix/deps, and dependency set.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
src/orcapod/core/streams/arrow_table_stream.py Refactors immutable table-backed stream to protocol + ColumnConfig API and new identity/pipeline hashing behavior.
src/orcapod/core/streams/init.py Updates public stream exports to new ArrowTableStream/StreamBase surface.
src/orcapod/core/static_output_pod.py Introduces StaticOutputPod + DynamicPodStream with barrier-mode async execution support.
src/orcapod/core/sources/source_registry.py Reworks source registry semantics (idempotent register + replace) and error behavior.
src/orcapod/core/sources/persistent_source.py Adds DB-backed caching wrapper for sources that merges live + cached records.
src/orcapod/core/sources/list_source.py Refactors list-backed source to delegate to ArrowTableSource and new schema APIs.
src/orcapod/core/sources/dict_source.py Refactors dict-backed source to delegate to ArrowTableSource and new schema APIs.
src/orcapod/core/sources/derived_source.py Adds DerivedSource to expose DB-backed node records as immutable sources.
src/orcapod/core/sources/delta_table_source.py Refactors Delta source to delegate to ArrowTableSource and adds stable record-id support.
src/orcapod/core/sources/data_frame_source.py Refactors DataFrame source to delegate to ArrowTableSource; keeps object-column coercion.
src/orcapod/core/sources/csv_source.py Refactors CSV source to delegate to ArrowTableSource and add record-id support.
src/orcapod/core/sources/arrow_table_source.py Rebuilds core arrow-table source enrichment (source-info + system tags) and adds resolve_field.
src/orcapod/core/sources/init.py Updates public exports to new RootSource-based source hierarchy.
src/orcapod/core/operators/semijoin.py Refactors SemiJoin to StreamProtocol + Schema APIs and new operator base hooks.
src/orcapod/core/operators/merge_join.py Adds new MergeJoin operator implementing merged-list semantics and commutative behavior.
src/orcapod/core/operators/mappers.py Refactors tag/packet mappers to new stream/schema APIs.
src/orcapod/core/operators/join.py Refactors Join to new schema APIs, adds system-tag prediction, and canonical ordering by pipeline hash.
src/orcapod/core/operators/filters.py Refactors PolarsFilter and packet-column selection to new stream/schema APIs.
src/orcapod/core/operators/column_selection.py Refactors tag/packet column selection/drop ops to new stream/schema APIs.
src/orcapod/core/operators/batch.py Refactors Batch operator to new stream/schema APIs.
src/orcapod/core/operators/init.py Updates operator exports and adds MergeJoin to public API.
src/orcapod/core/operator_node.py Adds OperatorNode and PersistentOperatorNode for (optional) DB-backed operator caching/replay.
src/orcapod/core/kernels.py Removes legacy kernel base implementation.
src/orcapod/core/executors/base.py Adds executor abstraction for packet-function execution backends.
src/orcapod/core/executors/local.py Adds in-process executor implementation.
src/orcapod/core/executors/ray.py Adds Ray-backed packet-function executor implementation with lazy init.
src/orcapod/core/executors/init.py Exposes executor types.
src/orcapod/core/execution_engine.py Adds execution-engine protocol abstraction.
src/orcapod/core/datagrams/base.py Removes legacy datagram base implementation.
src/orcapod/core/datagrams/init.py Switches datagram exports to unified Datagram, Tag, Packet.
src/orcapod/core/base.py Introduces new shared mixins (LabelableMixin, DataContextMixin, hashing + pipeline identity, temporal tracking).
src/orcapod/core/arrow_data_utils.py Removes legacy Arrow data utilities in favor of orcapod.utils.* equivalents.
src/orcapod/core/init.py Updates exports to new tracker module location.
src/orcapod/contexts/registry.py Updates context spec parsing to auto-instantiate components and renames object hasher → semantic hasher.
src/orcapod/contexts/core.py Updates DataContext to include semantic_hasher + type_handler_registry.
src/orcapod/contexts/init.py Updates helper getters (default semantic hasher, type converter protocol typing).
src/orcapod/contexts/data/v0.1.json Adds semantic hashing components (file_hasher, type_handler_registry, semantic_hasher).
src/orcapod/contexts/data/schemas/context_schema.json Updates JSON schema to require semantic hasher + type handler registry and support _ref/_type.
src/orcapod/channels.py Adds async channel primitives and broadcast (fan-out) support.
src/orcapod/init.py Refactors public package exports to new core nodes/sources.
pyproject.toml Updates runtime/dev dependencies (adds pre-commit, basedpyright, etc.).
.pre-commit-config.yaml Adds pre-commit hooks (uv sync, ruff-format, whitespace/yaml checks).
.github/workflows/run-tests.yml Adjusts CI branch triggers, Python matrix, and installs Graphviz system deps.
CLAUDE.md Adds Claude agent instructions and architecture overview (new file).
.zed/rules Adds Zed agent rules synchronized with Claude instructions (new file).
design/async-execution-implementation-plan.md Adds async-execution implementation plan (design doc).
demo_pipeline.py Adds demo showcasing persistent wrapping and pipeline usage.
src/orcapod/core/sources/manual_table_source.py Removes manual Delta table source implementation.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 303 to 321
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DynamicPodStream._update_cache_status() assigns to self._cached_results, but this class initializes _cached_stream (not _cached_results). This will raise AttributeError (or silently create a new attribute) and fails to invalidate the actual cached stream. Replace both _cached_results assignments with _cached_stream (and consider also resetting _cached_time when invalidating).

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overlap error message reports packet_schema.keys() (the union), not the actual colliding keys that triggered the error. Use intersection_packet_schema.keys() (or the specific set of colliding packet keys) so the message points to the real conflict.

Suggested change
f"Packets should not have overlapping keys, but {intersection_packet_schema.keys()} found in {stream} and {other_stream}."

Copilot uses AI. Check for mistakes.
Comment thread src/orcapod/core/base.py
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring says the property returns str | None, but the implementation always returns a str (falls back to class name). Update the docstring to match the actual return type to avoid misleading API users.

Suggested change
str: The label of the object. If no label is explicitly set and no computed label is available,
the class name is used as a fallback.

Copilot uses AI. Check for mistakes.
Comment on lines 81 to 83
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inline ignore comment # type: ignore[mehod] has a misspelled code and (if you're using Pyright/BasedPyright) the bracketed form doesn't match Pyright's ignore syntax, so the suppression likely won't work. Prefer proper Pyright ignores (e.g., # type: ignore or # pyright: ignore[...]) and/or use explicit narrowing/casting after checking the protocol supports as_dict().

Copilot uses AI. Check for mistakes.
Comment on lines 134 to 141
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

identity_structure() materializes self.as_table(all_info=True), which can be expensive (hstacking context/source tables, dropping columns, and potentially sorting via defaults). Since this structure is used for hashing, consider hashing a cheaper representation (e.g., the already-held self._table plus _data_context_table/_source_info_table references, or a precomputed arrow table hash) to avoid extra table construction work and memory churn.

Copilot uses AI. Check for mistakes.
Comment on lines 116 to 122
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table.to_pylist() converts the entire table into Python objects, which can be a major time/memory cost for large sources. Since _make_record_id() only needs either the row index or a single record_id_column value, consider avoiding full-row conversion by reading just that column (e.g., table.column(record_id_column).to_pylist() when set) and constructing source_info from that.

Copilot uses AI. Check for mistakes.
Comment thread src/orcapod/channels.py
Comment on lines 138 to 145
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR introduces new concurrency primitives (Channel, BroadcastChannel, close semantics, backpressure) but no accompanying tests are included in the diff. Since correctness here is subtle, please add unit tests covering: close/drain behavior, send-after-close errors, backpressure/blocking semantics, and broadcast fan-out guarantees.

Copilot uses AI. Check for mistakes.
@eywalker eywalker deleted the claude/async-orchestrator-KoOH7 branch March 11, 2026 04:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants