Conversation
There was a problem hiding this comment.
Pull request overview
Adds pipeline serialization support by introducing a JSON manifest format plus object/database config serialization, enabling Pipeline.save() and Pipeline.load(..., mode="cache_only") to reconstruct a read-only pipeline that can serve cached results without recomputation.
Changes:
- Introduces serialization primitives (
ConfigRegistry,object_to_config/object_from_config) and a v0.1 manifest format with graph + node metadata. - Adds
Pipeline.save()/Pipeline.load()(cache-only) and aCacheOnlyNodeimplementation to access cached records. - Adds
to_config()/from_config()support for databases and comprehensive save/load tests.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_pipeline/test_save_load.py | End-to-end and unit coverage for manifest IO, registry, object/db configs, save/load, and cache-only nodes |
| src/orcapod/serialization/object_config.py | Introspection-based object serialization + reconstruction helpers |
| src/orcapod/serialization/manifest.py | Manifest schema (v0.1) + graph/node/database serialization and IO helpers |
| src/orcapod/serialization/config_registry.py | Class resolution + construction registry (explicit registration + auto-import) |
| src/orcapod/serialization/init.py | Exposes serialization public API |
| src/orcapod/protocols/database_protocols.py | Adds ConfigurableProtocol for to_config/from_config |
| src/orcapod/pipeline/graph.py | Adds Pipeline.save() and Pipeline.load() (cache-only) plumbing |
| src/orcapod/pipeline/cache_only_node.py | Implements read-only node for serving cached results |
| src/orcapod/databases/noop_database.py | Adds config serialization for NoOp DB |
| src/orcapod/databases/in_memory_databases.py | Adds config serialization for in-memory DB |
| src/orcapod/databases/delta_lake_databases.py | Adds config serialization for Delta DB (but currently breaks init) |
Comments suppressed due to low confidence (1)
src/orcapod/databases/delta_lake_databases.py:94
- In
DeltaTableDatabase.from_config, initialization of pending-batch state (_pending_batches,_pending_record_ids, etc.) is currently indented underfrom_configafterreturn cls(**params), making it unreachable and leaving instances without required attributes. Move this batch-management initialization back into__init__(as it was previously), and keepfrom_configlimited to parsing params + constructing the class.
@classmethod
def from_config(cls, config: dict[str, object]) -> "DeltaTableDatabase":
"""Reconstruct from a config dict."""
params = config.get("params", {})
assert isinstance(params, dict)
return cls(**params)
# Batch management
self._pending_batches: dict[str, pa.Table] = {}
self._pending_record_ids: dict[str, set[str]] = defaultdict(set)
self._existing_ids_cache: dict[str, set[str]] = defaultdict(set)
# TODO: reconsider this approach as this is NOT serializable
self._cache_dirty: dict[str, bool] = defaultdict(lambda: True)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
Add SourceCacheMode enum (FULL/OFF) to control whether source nodes persist their output to the cache database during pipeline execution. - FULL (default): current behavior — materializes tags + packets to DB - OFF: pass-through — source data flows directly to downstream nodes without any database interaction Pipeline accepts source_cache_mode parameter and passes it through compile() to PersistentSourceNode. When OFF, run() is a no-op, iter_packets()/as_table() delegate directly to the wrapped stream, and get_all_records() returns None. https://claude.ai/code/session_016x3vkNoCTPW6GdzVNRZeAZ
6731463 to
40d28e5
Compare
Move source caching from pipeline-level (PersistentSourceNode) to source-level (CachedSource). Pipeline.compile() now wraps leaf streams in plain SourceNode (thin graph vertex wrapper) instead of PersistentSourceNode, eliminating double-caching when composing pipelines. Key changes: - Remove PersistentSourceNode and SourceCacheMode from pipeline - Pipeline.compile() wraps leaf streams in SourceNode (no caching) - Rename PersistentSource → CachedSource for clarity - Add RootSource.cached() convenience method - Update all tests and demo Source caching is now a source-level concern: cached = source.cached(cache_database=db) # or cached = CachedSource(source, cache_database=db) https://claude.ai/code/session_016x3vkNoCTPW6GdzVNRZeAZ
…ce pipeline integration - SourceNode.as_table() delegates to wrapped stream - SourceNode.iter_packets() delegates to wrapped stream - SourceNode.run() is a no-op - Pipeline with CachedSource input works end-to-end (source caching in source_db, pipeline execution in pipeline_db) https://claude.ai/code/session_016x3vkNoCTPW6GdzVNRZeAZ
…ching CachedSource.run() was not part of the StreamProtocol or SourceProtocol interfaces. Replace with flow() (inherited from StreamBase) which triggers the same caching via iter_packets() → _ensure_stream(). Updated all tests. https://claude.ai/code/session_016x3vkNoCTPW6GdzVNRZeAZ
CachedSource._ensure_stream() now checks if the wrapped source's last_modified is newer than the cached stream's timestamp. If stale, the in-memory cache is discarded and rebuilt from the DB + live data. Adds test_source_modified_time_triggers_rebuild verifying that updating the source's modified time causes CachedSource to rebuild on next access. https://claude.ai/code/session_016x3vkNoCTPW6GdzVNRZeAZ
…e method Replace _is_source_stale() with a proper is_stale property override on StreamBase. CachedSource is a RootSource (no upstreams/producer) but still depends on the wrapped source's modification time. https://claude.ai/code/session_016x3vkNoCTPW6GdzVNRZeAZ
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| During the ``with`` block, operator and function pod invocations are | ||
| recorded as non-persistent nodes (same as ``GraphTracker``). On context | ||
| exit, ``compile()`` replaces every node with its persistent variant: | ||
|
|
||
| - Leaf streams → ``PersistentSourceNode`` | ||
| - Leaf streams → ``SourceNode`` (thin wrapper for graph vertex) | ||
| - Function pod invocations → ``PersistentFunctionNode`` | ||
| - Operator invocations → ``PersistentOperatorNode`` | ||
|
|
There was a problem hiding this comment.
Pipeline docstring still says compile() "replaces every node with its persistent variant", but leaf streams are now wrapped in SourceNode (non-persistent, no caching). Please adjust the wording to avoid implying leaf nodes are persisted/cached by the pipeline.
| cache_database=self._pipeline_database, | ||
| cache_path_prefix=self._pipeline_path_prefix, | ||
| ) | ||
| persistent_node = SourceNode(stream=stream) |
There was a problem hiding this comment.
When wrapping leaf streams, SourceNode is constructed without inheriting the wrapped stream's data_context/config. Because StreamBase.content_hash()/pipeline_hash() depend on the node's own data context, this can make the SourceNode hash differ from the original stream hash for non-default contexts, which can break graph rewiring and cache/table addressing. Consider defaulting SourceNode's data_context/config from the wrapped stream (or pass stream.data_context_key and stream.orcapod_config here).
| persistent_node = SourceNode(stream=stream) | |
| persistent_node = SourceNode( | |
| stream=stream, | |
| data_context_key=getattr(stream, "data_context_key", None), | |
| orcapod_config=getattr(stream, "orcapod_config", None), | |
| ) |
| def cached( | ||
| self, | ||
| cache_database: Any, | ||
| cache_path_prefix: tuple[str, ...] = (), | ||
| **kwargs: Any, | ||
| ) -> "RootSource": | ||
| """Return a ``CachedSource`` wrapping this source. |
There was a problem hiding this comment.
RootSource.cached() is annotated to return "RootSource" and accepts cache_database: Any, but it always constructs and returns a CachedSource and requires an ArrowDatabaseProtocol. Tightening the return type (to CachedSource or RootSource & CachedSource union) and parameter type will improve type-safety for callers and align with the actual behavior.
…ilot review
Nodes are now transparent wrappers for data context — they always
delegate to their primary wrapped entity instead of holding their own:
- SourceNode → wrapped stream
- FunctionNode → function pod
- OperatorNode → operator pod
This ensures consistent hashing (content_hash/pipeline_hash use the
same semantic hasher as the wrapped entity) and eliminates the risk
of context mismatch when Pipeline.compile() constructs nodes.
Also addresses Copilot PR review comments:
- Fix misleading Pipeline docstring ("persistent variant" → "execution-ready nodes")
- Tighten RootSource.cached() type annotations (Any → ArrowDatabaseProtocol,
RootSource → CachedSource)
- Add DESIGN_ISSUES.md entry T2 for config/context delegation chain review
https://claude.ai/code/session_016x3vkNoCTPW6GdzVNRZeAZ
Closes PLT-896
This pull request refactors the pipeline's approach to source node caching by removing the
PersistentSourceNodeandPersistentSourceabstractions in favor of a more explicit and composableCachedSourceclass. The pipeline now usesSourceNodeas a simple graph vertex wrapper, and caching is handled by wrapping sources inCachedSourcewhen needed. This change simplifies the pipeline's node structure and clarifies the separation between graph structure and caching behavior. All relevant code, documentation, and tests have been updated to reflect this new approach.Pipeline Node and Caching Refactor:
PersistentSourceNodein the pipeline withSourceNode, making source nodes thin wrappers without built-in caching. Caching is now opt-in and handled by wrapping sources inCachedSourcebefore adding them to the pipeline. (src/orcapod/pipeline/graph.py,demo_pipeline.py,src/orcapod/pipeline/__init__.py, [1] [2] [3] [4] [5] [6] [7] [8] [9] F278acf3L1, [10]Core Sources and Caching API:
PersistentSourcetoCachedSourceand updated all references, imports, and documentation accordingly. Added acached()convenience method toRootSourcefor easy wrapping. (src/orcapod/core/sources/persistent_source.py→src/orcapod/core/sources/cached_source.py,src/orcapod/core/sources/base.py, [1] [2] [3]Documentation and Comments:
CachedSource. Pipeline documentation now explicitly states that source caching is not a pipeline concern. (src/orcapod/pipeline/graph.py,src/orcapod/core/sources/cached_source.py, [1] [2]Tests Update:
CachedSourceinstead ofPersistentSource, ensuring test coverage and clarity for the new caching approach. (tests/test_core/sources/test_persistent_source.py→tests/test_core/sources/test_cached_source.py, [1] [2] [3] [4] [5]Cleanup:
PersistentSourceNodeclass and its related code from the codebase. (src/orcapod/pipeline/nodes.py, src/orcapod/pipeline/nodes.pyL1-R3)These changes modernize and simplify the pipeline's approach to source node management and caching, making the codebase easier to understand and maintain.