refactor: major cleanup and simplification of protocols#83
Conversation
Captures design decisions from discussion on four areas: - with_options() always returns new executor instance (immutability) - Remove execution_engine_opts from FunctionNode - CachedFunctionPod for pod-level caching alongside CachedPacketFunction - Type-safe executor dispatch via Generic[E] + __init_subclass__ https://claude.ai/code/session_01NMDJZkGHVLHikRr9FcMY8f
|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
This PR refactors core Orcapod protocols and operator hierarchy around a new StaticOutputOperatorPod abstraction, introduces async channel-based execution primitives, and expands executors/config/docs to support the simplified architecture (noting the PR description states the package is only partially functioning while pipeline/caching rewrites are pending).
Changes:
- Introduces
StaticOutputOperatorPod+DynamicPodStream, and rewrites unary/binary operators to the new schema-based interfaces (output_schema,*_static_process, async execution hooks). - Adds async execution primitives (
channels.py) and new packet-function executors (LocalExecutor,RayExecutor). - Updates context/spec schema to use a semantic hasher + type handler registry, and adds MkDocs + docs/examples.
Reviewed changes
Copilot reviewed 1 out of 1 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| src/orcapod/core/operators/static_output_pod.py | Adds StaticOutputOperatorPod and DynamicPodStream, including default barrier-mode async execution. |
| src/orcapod/core/operators/semijoin.py | Reworks SemiJoin to new operator base APIs; adds a streaming-ish async execution path. |
| src/orcapod/core/operators/merge_join.py | Adds new MergeJoin operator (schema prediction + Polars join + collision merging). |
| src/orcapod/core/operators/mappers.py | Ports mapper operators to new base APIs; adds row-wise async execution and config serialization. |
| src/orcapod/core/operators/filters.py | Ports filtering/selection ops to new base APIs; adds config serialization and predicate reconstruction. |
| src/orcapod/core/operators/batch.py | Ports Batch operator to new base APIs; adds streaming batch async execution and config serialization. |
| src/orcapod/core/operators/base.py | Replaces legacy kernel/operator base with UnaryOperator/BinaryOperator pods + barrier-mode async execution. |
| src/orcapod/core/operators/init.py | Updates operator exports; adds MergeJoin. |
| src/orcapod/core/nodes/source_node.py | Adds SourceNode as a graph root wrapper with read-only load mode and async execution. |
| src/orcapod/core/nodes/init.py | Adds nodes package exports and GraphNode alias. |
| src/orcapod/core/kernels.py | Removes legacy kernel framework. |
| src/orcapod/core/executors/ray.py | Adds Ray-backed packet function executor. |
| src/orcapod/core/executors/local.py | Adds in-process packet function executor. |
| src/orcapod/core/executors/base.py | Adds executor abstract base defining sync/async execution APIs. |
| src/orcapod/core/executors/init.py | Exports executors. |
| src/orcapod/core/datagrams/base.py | Removes legacy datagram base. |
| src/orcapod/core/datagrams/init.py | Re-exports new unified datagram/tag/packet types. |
| src/orcapod/core/base.py | Refactors core mixins around semantic hashing + pipeline hashing + temporal tracking. |
| src/orcapod/core/arrow_data_utils.py | Removes legacy Arrow helpers (now in utils). |
| src/orcapod/core/init.py | Updates tracker manager export import path. |
| src/orcapod/contexts/registry.py | Updates context component bootstrapping to semantic hasher + handler registry, supports spec component refs. |
| src/orcapod/contexts/data/v0.1.json | Updates default context spec (semantic hasher, type handlers, file hasher, etc.). |
| src/orcapod/contexts/data/schemas/context_schema.json | Updates JSON schema for new context spec fields and ObjectSpec variants. |
| src/orcapod/contexts/core.py | Updates DataContext to use semantic hasher + type handler registry protocols. |
| src/orcapod/contexts/init.py | Updates public context helpers (get_default_semantic_hasher, protocol imports). |
| src/orcapod/channels.py | Adds async Channel + BroadcastChannel primitives used by async pipeline execution. |
| src/orcapod/init.py | Cleans top-level re-exports and public API surface. |
| pyproject.toml | Adds dependencies and dev tooling (mkdocs, pre-commit, pytest-asyncio, etc.). |
| mkdocs.yml | Adds MkDocs Material configuration and API doc generation config. |
| function-execution-improvements-plan.md | Adds a design/plan doc for executor and caching improvements. |
| examples/save_and_load_pipelines.py | Adds example for pipeline serialization/deserialization. |
| examples/async_vs_sync_pipeline.py | Adds demonstration comparing sync vs async execution behavior. |
| docs/index.md | Adds documentation landing page and quick example. |
| docs/getting-started.md | Adds a getting started guide. |
| docs/concepts/streams.md | Adds stream concept documentation. |
| docs/concepts/sources.md | Adds sources concept documentation. |
| docs/concepts/operators.md | Adds operators concept documentation. |
| docs/concepts/identity.md | Adds identity/hash concept documentation. |
| docs/concepts/function-pods.md | Adds function pod concept documentation. |
| docs/api/types.md | Adds API reference page for types. |
| docs/api/streams.md | Adds API reference page for streams. |
| docs/api/sources.md | Adds API reference page for sources. |
| docs/api/pipeline.md | Adds API reference page for pipeline. |
| docs/api/operators.md | Adds API reference page for operators. |
| docs/api/nodes.md | Adds API reference page for nodes. |
| docs/api/index.md | Adds API reference index page. |
| docs/api/function-pods.md | Adds API reference page for function pods. |
| docs/api/databases.md | Adds API reference page for databases. |
| design/async-execution-implementation-plan.md | Adds detailed async execution implementation plan doc. |
| demo_pipeline.py | Adds end-to-end demo script for pipeline compilation/persistence. |
| demo_caching.py | Adds end-to-end demo script for caching strategies. |
| CLAUDE.md | Adds repo-specific agent/dev workflow documentation. |
| .zed/rules | Adds editor/agent rules mirroring CLAUDE.md. |
| .pre-commit-config.yaml | Adds pre-commit configuration (ruff format, uv sync, etc.). |
| .github/workflows/run-tests.yml | Updates CI branches/python versions; installs graphviz deps. |
| .github/workflows/run-objective-tests.yml | Adds objective test workflow with coverage upload. |
| .claude/settings.json | Adds Claude plugin settings. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This pull request introduces a plan for a comprehensive set of improvements to the function execution chain, focusing on executor immutability, clearer option ownership, pod-level caching, and type-safe executor dispatch. These changes are designed to enhance consistency, performance, and type safety across the execution pipeline. The most important changes are grouped below by theme:
Executor immutability and option ownership:
with_options()method onPacketFunctionExecutorBasenow always returns a new instance, making executors immutable and ensuring side-effect-free option configuration.execution_engine_optsattribute is removed fromFunctionNode, centralizing option ownership in the pipeline's executor-assignment logic.Pod-level caching:
CachedFunctionPodclass for pod-level caching, allowing cache keys to include tag metadata and complementing existing packet-level caching.function_poddecorator gains apod_cache_databaseparameter to enable pod-level caching when provided.Type-safe executor dispatch:
PacketFunctionBaseis parameterized withGeneric[E]and uses__init_subclass__to resolve executor protocol at class definition time, moving type checks out of the hot path and into executor assignment.Related to PLT-920