feat: pipeline serialization and node simplification#81
Conversation
Design for Pipeline.save()/Pipeline.load() with read-only mode support, node descriptor serialization, and registry-based reconstruction. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ocols Make serialization methods part of ArrowDatabaseProtocol and SourceProtocol rather than ad-hoc per-class additions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use "python.function.v0" instead of "python" in all examples to match the real PythonPacketFunction.packet_function_type_id value. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
13-task TDD plan covering protocol extensions, registries, node from_descriptor(), Pipeline.save()/load(), and integration tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nd implementations Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… all operators Adds serialization support to the operator protocol and all concrete operator implementations. StaticOutputOperatorPod provides a default implementation for parameter-free operators (Join, MergeJoin, SemiJoin). Subclasses with constructor parameters override to_config() to capture their state. PolarsFilter handles both serializable Expr predicates and non-reconstructable ones gracefully. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tocol and PythonPacketFunction Adds serialization support to PacketFunctionProtocol, PythonPacketFunction, and PacketFunctionWrapper so packet functions can be saved and reconstructed from JSON-compatible config dicts as part of pipeline serialization. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… and resolver helpers Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…and FunctionPod Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…urce implementations - Add to_config/from_config abstract methods to SourceProtocol - CSVSource and DeltaTableSource: full round-trip serialization (file-backed) - DictSource, ListSource, ArrowTableSource, DataFrameSource: metadata-only to_config() with NotImplementedError from_config() (in-memory data not serializable) - CachedSource: serializes inner source + cache database config, from_config() delegates to resolve_source_from_config/resolve_database_from_config Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Enable SourceNode reconstruction from serialized pipeline descriptors. Full mode delegates to the live stream; read-only mode returns stored schema/hash metadata and raises RuntimeError on data-access methods. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ading Add PersistentFunctionNode.from_descriptor() following the same pattern as SourceNode.from_descriptor(). Full mode constructs normally via __init__; read-only mode bypasses __init__ using __new__ and initializes all fields from the inheritance chain manually, returning stored hashes and schema from the descriptor metadata. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ading Implements PersistentOperatorNode.from_descriptor() following the same pattern as SourceNode and PersistentFunctionNode. Supports full mode (with live operator + input streams) and read-only mode (metadata only, with stored schema/hashes). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implement save() method that serializes a compiled pipeline to JSON, including pipeline metadata, node descriptors, and edge list. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implement Pipeline.load() classmethod that reconstructs a compiled pipeline from a JSON file produced by Pipeline.save(). Supports "full" mode (attempts live reconstruction with fallback to read-only) and "read_only" mode (metadata only). Includes tests for both modes, version validation, label preservation, and graph structure integrity. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ipeline package Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add 31 new tests across three test classes: - TestReadOnlyMode (13 tests): verifies load_status on all node types, stored schema/keys/hashes on read-only nodes, RuntimeError on data access (iter_packets/as_table), operator metadata preservation, and that reconstructable sources are still UNAVAILABLE in read_only mode. - TestFullMode (12 tests): verifies CSVSource reconstruction with live stream, iter_packets/as_table on fully loaded nodes, pipeline re-run, graceful degradation when sources are non-reconstructable, database type preservation, and attribute access on loaded pipelines. - TestLoadEdgeCases (6 tests): verifies default mode, multi-source operator pipelines, edge pair preservation, hash graph attributes, same JSON with different modes gives different statuses, and function_database round-trip. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ixture The bare `dict` type annotation is not supported by the type converter. Use `dict[str, int]` with explicit type parameters instead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…n signatures Bare dict, list, set, and tuple without type parameters are now rejected at construction time with a clear error message. Previously they silently passed through and only failed later in the type converter. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
This PR adds Pipeline.save() and Pipeline.load() methods for serializing compiled pipelines to/from JSON, with support for full and read-only load modes. It introduces to_config()/from_config() serialization across all core entities (databases, sources, operators, packet functions, function pods), from_descriptor() classmethods on node types for reconstruction, and rejection of bare container types in PythonPacketFunction.
Changes:
- Add serialization infrastructure: registries,
LoadStatusenum, resolver helpers, and schema serialization insrc/orcapod/pipeline/serialization.py - Add
to_config()/from_config()to all databases, sources, operators, packet functions, and function pods; addfrom_descriptor()to all node types with read-only mode support - Add
Pipeline.save()/Pipeline.load()ingraph.pywith full and read-only modes, plus bare container type rejection inschema_utils.py
Reviewed changes
Copilot reviewed 40 out of 40 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/orcapod/pipeline/serialization.py |
New module with registries, LoadStatus enum, resolver helpers, schema serialization |
src/orcapod/pipeline/graph.py |
Pipeline.save() and Pipeline.load() methods |
src/orcapod/pipeline/__init__.py |
Export LoadStatus and PIPELINE_FORMAT_VERSION |
src/orcapod/protocols/database_protocols.py |
Add to_config/from_config to protocol |
src/orcapod/protocols/core_protocols/sources.py |
Add to_config/from_config to protocol |
src/orcapod/protocols/core_protocols/packet_function.py |
Add to_config/from_config to protocol |
src/orcapod/protocols/core_protocols/operator_pod.py |
Add to_config/from_config to protocol |
src/orcapod/protocols/core_protocols/function_pod.py |
Add to_config/from_config to protocol |
src/orcapod/databases/*.py |
Implement to_config/from_config on all database classes |
src/orcapod/core/sources/*.py |
Implement to_config/from_config on all source classes |
src/orcapod/core/packet_function.py |
Implement to_config/from_config on PythonPacketFunction and PacketFunctionWrapper |
src/orcapod/core/function_pod.py |
Implement to_config/from_config on FunctionPod |
src/orcapod/core/operators/*.py |
Implement to_config/from_config on all operators |
src/orcapod/core/nodes/source_node.py |
Add from_descriptor(), load_status, read-only guards |
src/orcapod/core/nodes/function_node.py |
Add from_descriptor(), load_status, read-only overrides |
src/orcapod/core/nodes/operator_node.py |
Add from_descriptor(), load_status, read-only overrides |
src/orcapod/utils/schema_utils.py |
Reject bare container types |
tests/test_pipeline/test_serialization.py |
End-to-end save/load tests |
tests/test_pipeline/test_serialization_helpers.py |
Registry and helper tests |
tests/test_pipeline/test_node_descriptors.py |
Node from_descriptor tests |
tests/test_databases/test_database_config.py |
Database config round-trip tests |
tests/test_core/sources/test_source_config.py |
Source config round-trip tests |
tests/test_core/operators/test_operator_config.py |
Operator config round-trip tests |
tests/test_core/packet_function/test_packet_function_config.py |
Packet function config tests |
tests/test_core/function_pod/test_function_pod_config.py |
Function pod config tests |
tests/test_core/packet_function/test_packet_function.py |
Bare container type rejection tests |
docs/superpowers/specs/2026-03-12-pipeline-serialization-design.md |
Design spec |
docs/superpowers/plans/2026-03-12-pipeline-serialization.md |
Implementation plan |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| f"Type annotation for '{name}' is bare {type_annot.__name__} " | ||
| f"without type parameters. Use e.g. {type_annot.__name__}[str, int] " | ||
| f"or {type_annot.__name__}[int] instead." | ||
| ) |
There was a problem hiding this comment.
The error message suggests dict[str, int] as an example for all container types (via the first branch {type_annot.__name__}[str, int]), but list[str, int], set[str, int], and tuple[str, int] are not valid parameterizations. For list and set, the correct example would be list[int] / set[int]. Consider making the example type-specific rather than using a single template.
| # Pipeline Serialization Design | ||
|
|
||
| **Date**: 2026-03-12 | ||
| **Status**: Approved design, pending implementation |
There was a problem hiding this comment.
The status says "pending implementation" but the implementation is included in this PR. Consider updating to reflect the current state (e.g., "Implemented").
| **Status**: Approved design, pending implementation | |
| **Status**: Approved design, implemented |
| ### Mode behavior summary | ||
|
|
||
| | Mode | Sources | Operators | Function Pods | | ||
| |------|---------|-----------|---------------| | ||
| | `"full"` | Reconstruct if possible, degrade gracefully | Reconstruct (usually succeeds) | Attempt import via `from_config()`, degrade to read-only on failure | | ||
| | `"read_only"` | Reconstruct if possible, degrade gracefully | Reconstruct (usually succeeds) | Skip reconstruction, always read-only | | ||
|
|
||
| Both modes attempt source and operator reconstruction since these are typically file-backed | ||
| or built-in. The key difference is whether function pod import is attempted. |
There was a problem hiding this comment.
The design spec (lines 474-481) states "Both modes attempt source and operator reconstruction" but the implementation skips reconstruction when mode == "read_only". The tests confirm the implementation behavior, so the spec table at lines 475-478 should be updated to match the actual implementation where read_only mode skips all reconstruction.
…sign spec - Use type-specific examples in bare container error messages (e.g. list[int] instead of list[str, int]) - Update design spec status to "implemented" - Fix mode behavior table: read_only skips all reconstruction Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 40 out of 40 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Emit warning when function/operator node falls back to read-only due to upstream unavailability (previously silent) - Use packet function's canonical name as FunctionPod computed_label instead of falling back to class name "FunctionPod" - Serialize output schemas as Arrow type strings (e.g. "int64") instead of Python type repr (e.g. "<class 'int'>") by passing type_converter to serialize_schema() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Loaded pipelines now expose source nodes via attribute access and compiled_nodes, so users can inspect load_status and metadata for all nodes including unavailable/stub sources. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ction name Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Introduce explicit ContentHash typing across core nodes - and add Self typing for generic return types - Expose DeltaTableDatabase, InMemoryArrowDatabase, NoOpArrowDatabase - and Pipeline in package exports - Improve RootSource API: accept label, data_context, and config - Simplify source_id error message - Tighten typing for ArrowTableSource: Self and pa.Table - Update from_config and table property return types - Update Join operator to pass explicit columns to as_table - Simplify rename path logic - Update docstrings and minor wording tweaks - Update PacketFunction label docstring and add Claude settings - Add Claude plugin settings file
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
14-task plan covering: source inheritance refactor, source label cleanup, FunctionNode/OperatorNode merge, GraphTracker → Pipeline merge, compile() simplification, and source nodes as first-class pipeline members. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tern Section 7 of the pipeline simplification spec now uses a SourceStreamBuilder class instead of ArrowTableSource inheritance. This avoids the init order problem where data_context isn't available before super().__init__(). Also defers resolve_field redesign (NotImplementedError everywhere). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace inheritance-based source refactor with SourceStreamBuilder composition. Defer resolve_field to NotImplementedError. Preserve Chunks 2-3 unchanged. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…hment Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rce gains defaults ArrowTableSource now delegates table enrichment to SourceStreamBuilder instead of doing it inline. RootSource provides default implementations for identity_structure(), output_schema(), keys(), iter_packets(), and as_table() that delegate to self._stream. resolve_field() now raises NotImplementedError by default instead of FieldNotResolvableError. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…lder Replace internal _arrow_source delegation with direct SourceStreamBuilder usage. Both sources now store _stream and inherit all stream/identity methods from RootSource defaults, eliminating ~7 delegation methods each. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…lder Replace internal _arrow_source delegation with SourceStreamBuilder, matching the pattern already used by DictSource and DataFrameSource. Both sources now set self._stream directly and rely on RootSource's default stream delegation. Add integration tests for both sources. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace ArrowTableSource delegation with SourceStreamBuilder, setting self._stream so RootSource default delegation handles all stream methods. Keeps custom identity_structure() with tag function hash. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…chedSource resolve_field delegation Sources without an explicit label now fall back to __class__.__name__ via LabelableMixin instead of returning source_id. CachedSource inherits the default NotImplementedError for resolve_field from RootSource. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
FunctionNode now accepts optional pipeline_database/result_database params and exposes attach_databases() for deferred DB attachment. All persistent behavior (two-phase iteration, pipeline records, CachedPacketFunction wrapping, as_source, get_all_records) is integrated with guards for the non-DB case. PersistentFunctionNode is retained as a temporary alias until Task 8 updates all import sites. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rts to FunctionNode After Task 7 merged PersistentFunctionNode into FunctionNode, this removes the temporary alias and updates all import sites across source, tests, and demos to use FunctionNode directly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Merge PersistentOperatorNode into OperatorNode with optional DB params, following the same pattern as the FunctionNode merge (Task 7). OperatorNode now supports attach_databases() for late DB binding. PersistentOperatorNode is kept as a temporary alias for backward compatibility (Task 10 removes it). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rts to OperatorNode Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move all GraphTracker state (_node_lut, _upstreams, _graph_edges, _hash_graph) and methods (record_*, nodes, graph, reset, compile) directly into Pipeline. Delete GraphTracker class from tracker.py. Update all test references to use Pipeline with auto_compile=False. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Instead of creating new FunctionNode/OperatorNode instances during compile(), reuse the recorded node objects by rewiring their upstreams and calling attach_databases() to add DB persistence. This preserves object identity between recording and compilation phases. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Include source nodes in compiled_nodes dict and label assignment. Simplify label resolution in compile() — node.label already falls back through _label -> computed_label() -> __class__.__name__. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Mark pipeline simplification design spec as implemented. Update DESIGN_ISSUES.md to reflect PersistentFunctionNode → FunctionNode merge. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ation ArrowTableSource.resolve_field() is not yet re-implemented after the source refactor (SourceStreamBuilder). Mark the 7 objective tests as strict xfail so they track the missing implementation and will fail loudly once resolve_field is restored. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
CachedSource now accepts any SourceProtocol as its inner source, not just RootSource. Data context defaults from the wrapped source unless explicitly provided. Config is passed through to super without trying to extract it from the source. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ion, and CachedSource graceful degradation - Add SourceProxy class that preserves content_hash, pipeline_hash, source_id, and output_schema for non-reconstructable sources, with bind/unbind delegation - Add _identity_config() to RootSource base class; all source to_config() methods now include identity hashes and serialized schemas - Implement cross-language Arrow type string parser (parse_arrow_type_string) with support for primitives and nested types (list, struct, map, recursive nesting) - Add proper serialize_schema/deserialize_schema round-trip using Arrow type strings - Update resolve_source_from_config() with fallback_to_proxy parameter - CachedSource gracefully serves cached data when inner source is SourceProxy - Add comprehensive tests for SourceProxy, CachedSource with proxy, schema serialization round-trip, and Arrow type string parsing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 99 out of 99 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Closes ENG-252
Summary
Pipeline Serialization
Pipeline.save()andPipeline.load()for serializing compiled pipelines to/from JSONfull(reconstruct live nodes) andread_only(metadata-only, no code/data access needed)to_config()/from_config()serialization to all core entities: databases, sources, operators, packet functions, function podsfrom_descriptor()classmethods onSourceNode,FunctionNode, andOperatorNodefor reconstruction from JSON descriptorsLoadStatusenum (FULL / READ_ONLY / UNAVAILABLE) andPIPELINE_FORMAT_VERSIONfor version managementdict,list,set,tuplewithout type parameters) atPythonPacketFunctionconstruction timeSourceProxy and Identity Preservation
SourceProxyclass that preservescontent_hash,pipeline_hash,source_id, andoutput_schemafor non-reconstructable sources (e.g. in-memoryArrowTableSource,DictSource)SourceProxy.bind(source)/unbind()for substituting a live source later, with identity validation (source_id, content_hash, pipeline_hash, class name)ContentHashterminal semantics in the hasher --identity_structure()returnsContentHashobjects directly, no need to overridecontent_hash()/pipeline_hash()_identity_config()toRootSourcebase class; all sourceto_config()methods now include identity hashes and serialized schemasresolve_source_from_config()gainsfallback_to_proxyparameter -- returnsSourceProxyon failure instead of raisingCross-Language Schema Serialization
parse_arrow_type_string()-- recursive parser for Arrow type strings supporting primitives (int64,large_string, etc.) and nested types (list<item: T>,struct<a: T, b: U>,map<K, V>, and deeply nested compositions)serialize_schema()/deserialize_schema()round-trip using human-readable Arrow type strings (not binary blobs), enabling pipeline JSON to be loaded by non-Python Arrow implementationsCachedSource Graceful Degradation
CachedSourcegracefully serves cached data when inner source isSourceProxy(cannot provide live data)_build_merged_streaminto_ingest_live_data()(may fail) + cache reading (always works)Pipeline and Node Simplification
PersistentFunctionNodeintoFunctionNode: optional DB params at construction or viaattach_databases()PersistentOperatorNodeintoOperatorNode: same pattern withattach_databases()GraphTrackerintoPipeline: recording state (_node_lut,_upstreams,_graph_edges,_hash_graph) now lives directly on PipelineSourceStreamBuilder: extracts enrichment logic (system columns, provenance, system tags) into a reusable builder; all table-backed sources use it directly instead of delegating to an internal_arrow_sourcecompile()mutates in place: recorded nodes are rewired and get DBs attached viaattach_databases()instead of creating new objects -- preserves object identitycompiled_nodesand are accessible by label (e.g.,pipeline.my_source)computed_label()fromRootSource, simplify label resolutionTest plan
test_serialization.pycovering save structure, load modes, integration, read-only behavior, full-mode reconstruction, CachedSource with SourceProxy round-trip, and edge casestest_serialization_helpers.pycovering Arrow type string parsing (23 primitives, nested types), schema serialization/deserialization round-trip, registries, and resolversfrom_descriptor()on all node typesTestCompileMutatesNodes: verifies compile() reuses recorded node objects and attaches DBsTestSourceNodesInPipeline: verifies source nodes incompiled_nodes, accessible by labelTestPipelineRecording/TestPipelineCompile: tracker tests updated fromGraphTrackertoPipelineSourceStreamBuilderunit tests and source builder integration testsFunctionNode.attach_databases()andOperatorNode.attach_databases()testsresolve_fieldtests markedxfail(strict) pending re-implementation after source refactorGenerated with Claude Code