Skip to content

feat(pipeline): normalized save format with DB deduplication and save levels (ENG-256/ENG-320/ENG-342)#120

Merged
eywalker merged 18 commits intodevfrom
eywalker/eng-256-design-normalize-pipeline-save-format-deduplicate-databases
Apr 1, 2026
Merged

feat(pipeline): normalized save format with DB deduplication and save levels (ENG-256/ENG-320/ENG-342)#120
eywalker merged 18 commits intodevfrom
eywalker/eng-256-design-normalize-pipeline-save-format-deduplicate-databases

Conversation

@kurodo3
Copy link
Copy Markdown
Contributor

@kurodo3 kurodo3 Bot commented Mar 28, 2026

Summary

Fixes ENG-256, ENG-320, ENG-342

  • ENG-256: Design and spec for normalized pipeline save format (four save levels: minimal, definition, standard, full; database registry deduplication; node_uri identity model)
  • ENG-320: Implement Pipeline.save(level=) with new normalized JSON structure; top-level databases registry; node_uri stored at all levels
  • ENG-342: Fix pipeline_path two-level formula (schema:{pipeline_hash} / instance:{content_hash}) in FunctionNode and OperatorNode; add node_uri property to all three node types

Key changes

Pipeline save format

  • New top-level databases dict keyed by db_{sha256[:8]} — nodes reference databases by key string, deduplicating identical configs
  • Save levels: minimal (topology+identity only), definition (node configs, no DBs), standard (full round-trip), full (aspirational, same as standard for now)
  • run_id/snapshot_time are null (placeholder); pipeline_path never stored (reconstructed on load)
  • Uses function_config / operator_config keys — old keys (function_pod, operator) are not supported; loading old-format files raises a clear error

Node identity

  • node_uri property on FunctionNode, OperatorNode, SourceNode — canonical identity tuple
  • _stored_node_uri populated in from_descriptor() read-only paths on all three node types
  • pipeline_path formula fixed: [..., f"schema:{pipeline_hash}", f"instance:{content_hash}"] (was single node:{hash} component)

Database registry

  • DatabaseRegistryProtocol in database_protocols.py — clean protocol with register() and resolve() used by all source to_config/from_config signatures
  • DatabaseRegistry class in serialization.py implements the protocol; handles deduplication and stable key generation (db_abc, db_abc_2, ...)
  • All sources accept db_registry: DatabaseRegistryProtocol | None = None — forwarded unconditionally; only CachedSource uses it

Load path

  • Pipeline.load() requires the new format (top-level databases registry + pipeline.pipeline_database key); old inline-dict format raises a clear ValueError
  • Pipeline.load(path, pipeline_database=...) supported for definition-level saves (no DB embedded in JSON)
  • Source reconstruction failures fall back to SourceProxy (identity-preserving placeholder)
  • Function reconstruction failures fall back to PacketFunctionProxy — node reaches READ_ONLY status
  • Minimal-level saves raise a clear ValueError on load

Test plan

  • All existing serialization tests updated for new format
  • 9 new DatabaseRegistry tests (incl. idempotency, collision suffix)
  • Round-trip integration tests: standard level, definition level, definition level with unloadable function (PacketFunctionProxy)
  • Full test suite: 2928 passed, 49 skipped, 0 failures

🤖 Generated with Claude Code

@kurodo3 kurodo3 Bot changed the base branch from main to dev March 28, 2026 06:23
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 28, 2026

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements a new normalized pipeline serialization format to support multiple save “levels”, deduplicate database configs via a top-level registry, and standardize node identity (node_uri) and pipeline_path semantics (two-level schema: / instance: suffix).

Changes:

  • Added DatabaseRegistry and updated source (de)serialization to support DB key references.
  • Reworked Pipeline.save(level=...) / Pipeline.load() to emit/consume the new normalized JSON structure and remove persisted pipeline_path.
  • Updated node identity/path behavior (node_uri, two-level pipeline_path) and refreshed/expanded test coverage + added design/plan docs.

Reviewed changes

Copilot reviewed 17 out of 18 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/orcapod/pipeline/graph.py Adds save(level=...) normalized format and updates load() for new registry + no stored pipeline_path.
src/orcapod/pipeline/serialization.py Introduces DatabaseRegistry, threads optional db_registry, and extends SourceProxy creation to accept node_descriptor.
src/orcapod/core/sources/cached_source.py Supports DB registry key references in to_config/from_config and removes identity fields from source config.
src/orcapod/core/nodes/function_node.py Switches pipeline_path to schema:/instance: and adds node_uri support + new-format load hints.
src/orcapod/core/nodes/operator_node.py Switches pipeline_path to schema:/instance: and adds node_uri support + cache_mode compat parsing.
src/orcapod/core/nodes/source_node.py Adds node_uri support and stores it during descriptor-based (read-only) construction.
tests/test_pipeline/test_serialization.py Updates existing serialization assertions and adds tests for save levels + new invariants.
tests/test_pipeline/test_serialization_helpers.py Adds unit tests for DatabaseRegistry, cached source registry behavior, node_uri, and proxy loading compat.
tests/test_core/... Updates caching/path assumptions to reflect the two-level pipeline_path formula.
.gitignore Ignores superpowers/* while allowing superpowers/specs and superpowers/plans.
superpowers/specs/... / superpowers/plans/... Adds design spec + implementation plan documentation for the new save format.
Comments suppressed due to low confidence (1)

src/orcapod/pipeline/serialization.py:358

  • resolve_source_from_config(..., fallback_to_proxy=True) still falls back to _source_proxy_from_config(config) without a node_descriptor. In the new pipeline save format, source_config intentionally strips identity fields (content_hash, pipeline_hash, schemas), so this fallback path can now raise ValueError instead of returning a proxy. Consider extending resolve_source_from_config to accept an optional node_descriptor (and pass it through), or make the fallback error message explicitly say identity fields are required when no descriptor is provided.
    _ensure_registries()
    source_type = config.get("source_type")
    if source_type not in SOURCE_REGISTRY:
        if fallback_to_proxy:
            return _source_proxy_from_config(config)
        raise ValueError(

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +765 to +768
if level == "minimal":
raise ValueError(
"Cannot load a 'minimal'-level save: it contains topology and identity "
"only, not enough to reconstruct the pipeline. "
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline.load() explicitly rejects level == "minimal", but level == "definition" files produced by save(level="definition") also omit pipeline_database and pipeline.databases. As written, loading a definition-level file will fall through to the generic "Missing 'databases' block" error, which is misleading. Consider either supporting definition-level loads via an API for supplying pipeline/function databases, or (if not supported) adding a clear ValueError for level == "definition" similar to the minimal-level guard.

Suggested change
if level == "minimal":
raise ValueError(
"Cannot load a 'minimal'-level save: it contains topology and identity "
"only, not enough to reconstruct the pipeline. "
if level in ("minimal", "definition"):
raise ValueError(
"Cannot load a 'minimal' or 'definition'-level save: these contain "
"only topology and identity and do not include database configuration, "
"which is required to reconstruct the pipeline. "

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 5ab7c22. Added an explicit ValueError for level == 'definition' immediately after the existing minimal check, with a message that clearly names the level and explains why loading is not supported (no pipeline database config). Also added a regression test: test_load_definition_level_raises_clear_error.

Comment on lines +896 to +899
dbs = {
"pipeline": pipeline_db,
"pipeline_path_prefix": pipeline_path_prefix,
}
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline.load() passes pipeline_path_prefix through the databases dict, but OperatorNode.from_descriptor() currently derives its prefix only from a stored descriptor["pipeline_path"]. In the new save format, pipeline_path is never stored, so loaded operator nodes will compute an empty prefix and use an incorrect DB scope. Either populate a derived pipeline_path in the descriptor during load, or update OperatorNode.from_descriptor() to use the provided pipeline_path_prefix hint when pipeline_path is missing.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 5ab7c22 (operator_node.py). OperatorNode.from_descriptor() full-mode path now checks for databases["pipeline_path_prefix"] first (the hint already passed by graph.load()) and only falls back to deriving the prefix from the stored pipeline_path for backward compatibility with old-format saves. Regression test: test_load_operator_node_pipeline_path_has_pipeline_name_prefix.

Comment thread src/orcapod/core/nodes/operator_node.py Outdated
Comment on lines +181 to +187
if operator is not None and input_streams:
# Full mode: construct normally
pipeline_path = tuple(descriptor.get("pipeline_path", ()))
# Derive pipeline_path_prefix by stripping the suffix that
# __init__ appends: operator.uri (2 elements) + node:{hash} (1 element).
uri_len = len(operator.uri) + 1 # +1 for node:{hash}
prefix = pipeline_path[:-uri_len] if len(pipeline_path) > uri_len else ()
# __init__ appends: operator.uri + schema:{hash} + instance:{hash} (2 elements).
uri_len = len(operator.uri) + 2 # +2 for schema/instance components
if len(pipeline_path) > uri_len:
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In full-mode reconstruction, OperatorNode.from_descriptor() derives pipeline_path_prefix by stripping a suffix from descriptor.get("pipeline_path"). With the new save format, pipeline_path is not stored, so this will produce an empty prefix and mis-scope operator DB paths. Use a pipeline_path_prefix hint from the databases mapping (similar to FunctionNode), or derive the prefix from the pipeline name + stored node_uri/hashes instead of relying on stored pipeline_path.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 5ab7c22 — same fix as the companion comment on graph.py. OperatorNode.from_descriptor() now preferentially uses databases['pipeline_path_prefix'] passed from the loader, falling back to the stored pipeline_path derivation only for old-format saves that don't pass the hint.

cache_db = resolve_database_from_config(cache_db_ref)

inner_source = resolve_source_from_config(
config["inner_source"], fallback_to_proxy=True
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CachedSource.from_config() resolves cache_database via db_registry, but it does not pass db_registry through when reconstructing inner_source. If inner_source is itself a source type that can emit DB registry keys (e.g., a nested CachedSource), those keys won’t be resolvable and deserialization can fail. Forward db_registry to resolve_source_from_config(..., db_registry=db_registry) so nested sources can resolve embedded DB references consistently.

Suggested change
config["inner_source"], fallback_to_proxy=True
config["inner_source"],
fallback_to_proxy=True,
db_registry=db_registry,

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 5ab7c22 (cached_source.py). Both sides addressed: (1) to_config() now checks via inspect.signature whether the inner source's to_config accepts db_registry and forwards it if so — ensuring nested CachedSource instances serialize their database as a registry key too. (2) from_config() now passes db_registry=db_registry to resolve_source_from_config() for the inner source. Regression tests: test_cached_source_to_config_forwards_db_registry_to_inner_source and test_nested_cached_source_from_config_forwards_db_registry.

kurodo3 Bot added a commit that referenced this pull request Mar 30, 2026
- graph.py: add explicit ValueError for definition-level loads
  (previously fell through to generic "Missing 'databases' block" error)
- operator_node.py: from_descriptor() full mode now uses
  databases["pipeline_path_prefix"] hint instead of deriving prefix from
  the stored pipeline_path key (absent in new format), preventing empty
  prefix and wrong DB scope
- cached_source.py: to_config() forwards db_registry to inner source;
  from_config() forwards db_registry to resolve_source_from_config so
  nested CachedSource instances can resolve registry keys

Tests added (TDD red-green):
- test_load_definition_level_raises_clear_error
- test_load_operator_node_pipeline_path_has_pipeline_name_prefix
- test_cached_source_to_config_forwards_db_registry_to_inner_source
- test_nested_cached_source_from_config_forwards_db_registry

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@eywalker eywalker requested a review from Copilot March 30, 2026 19:04
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 17 out of 18 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/orcapod/pipeline/graph.py Outdated
Comment on lines +792 to +796
function_db = (
resolve_database_from_config(db_registry_data[fn_db_key])
if fn_db_key is not None and fn_db_key in db_registry_data
else None
)
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When loading the new-format registry, an invalid function_database key is silently treated as None (if fn_db_key is not None and fn_db_key in db_registry_data else None). If the JSON is corrupted/mismatched, this can lead to running with the wrong DB without any error. Prefer raising a clear ValueError when function_database is non-null but missing from the databases registry (similar to how pipeline_database is handled).

Suggested change
function_db = (
resolve_database_from_config(db_registry_data[fn_db_key])
if fn_db_key is not None and fn_db_key in db_registry_data
else None
)
if fn_db_key is None:
function_db = None
else:
if fn_db_key not in db_registry_data:
raise ValueError(
f"Function database key {fn_db_key!r} not found in databases "
f"registry. Available keys: {sorted(db_registry_data.keys())}"
)
function_db = resolve_database_from_config(db_registry_data[fn_db_key])

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit d3548ed. Replaced the silent ternary with an explicit if/elif/else: fn_db_key is Nonefunction_db = None; key present but not in registry → raise ValueError naming the bad key and listing available keys; otherwise resolve normally. Regression test: test_load_raises_on_missing_function_database_registry_key.

Comment thread src/orcapod/core/nodes/function_node.py Outdated
Comment on lines +249 to +253
# __init__. The suffix added is:
# pf.uri + (f"schema:{pipeline_hash}", f"instance:{content_hash}")
pf_uri_len = len(function_pod.packet_function.uri) + 2 # +2 for schema/instance
if pipeline_path:
if len(pipeline_path) > pf_uri_len:
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FunctionNode.from_descriptor() derives pipeline_path_prefix by assuming the stored pipeline_path suffix is pf.uri + (schema:, instance:) (i.e., +2). This breaks backward-compat loading of older saves that stored pipeline_path ending with a single node: component, and can drop part/all of the prefix. Since Pipeline.load() already passes a pipeline_path_prefix hint, consider preferring that hint when present, or detect the stored suffix format (e.g., last element startswith node: vs schema:) to strip the correct number of components.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit d3548ed. FunctionNode.from_descriptor() now checks hint_prefix first (same pattern as the OperatorNode fix from the previous round): when databases['pipeline_path_prefix'] is provided by Pipeline.load(), it is used directly without any derivation from the stored pipeline_path. The old derivation logic is kept as a fallback only when no hint is available (e.g. direct from_descriptor calls without a loader hint). Regression test: test_load_function_node_pipeline_path_prefers_hint_over_stored_pipeline_path.

Comment thread src/orcapod/pipeline/graph.py Outdated
Comment on lines +587 to +592
# node_uri: at all levels
if isinstance(node, SourceNode):
descriptor.update(self._build_source_descriptor(node))
elif isinstance(node, FunctionNode):
descriptor.update(self._build_function_descriptor(node))
elif isinstance(node, OperatorNode):
descriptor.update(self._build_operator_descriptor(node))
stream = node.stream
if isinstance(stream, cp.SourceProtocol):
cfg = stream.to_config()
stream_type = cfg.get("source_type", "unknown")
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Pipeline.save(), Source node node_uri is re-derived by calling stream.to_config() and extracting source_type/source_id. Since SourceNode now has a node_uri property that encapsulates this logic (including the read-only fallback), consider using node.node_uri here as well to avoid duplication and reduce the chance of the two implementations diverging.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit d3548ed. The SourceNode special-case in save() is replaced by a single descriptor["node_uri"] = list(node.node_uri) for all node types. SourceNode.node_uri already encodes the same to_config()-based derivation (including the non-SourceProtocol fallback).

Comment on lines +543 to +544
- ``"definition"``: adds full pod/stream configs. No pipeline-level DBs.
- ``"standard"`` (default): adds pipeline-level DB registry. Round-trippable.
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The save(level="definition") docstring implies it’s a valid save level but doesn’t mention that Pipeline.load() explicitly rejects definition files. Consider updating this docstring to clarify that definition-level saves are not loadable with the current API (unless/until an API for supplying pipeline/function databases is added), to avoid misleading users.

Suggested change
- ``"definition"``: adds full pod/stream configs. No pipeline-level DBs.
- ``"standard"`` (default): adds pipeline-level DB registry. Round-trippable.
- ``"definition"``: adds full pod/stream configs, but omits pipeline-level
databases. Intended for definition export only; not round-trippable
and not currently loadable via :meth:`Pipeline.load`.
- ``"standard"`` (default): adds pipeline-level DB registry. Round-trippable
with :meth:`Pipeline.load`.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit d3548ed. Updated the definition level docstring entry to note: Not loadable via Pipeline.load (no database config included).

Comment thread src/orcapod/pipeline/serialization.py Outdated
Comment on lines +56 to +60
# Check if already registered under any key (handles collision-suffix keys)
for existing_key, existing_config in self._entries.items():
if existing_config == config:
return existing_key

Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DatabaseRegistry.register() does a linear scan over all existing entries to detect whether the config is already registered. When many sources/operators embed DB configs, repeated register() calls can become O(n^2) during Pipeline.save(). Consider keeping an auxiliary mapping from a canonicalized config representation (e.g., the canonical JSON string or full sha256) to the assigned key to make re-registration O(1) while preserving collision-suffix handling.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit d3548ed. Added a _reverse: dict[str, str] map (canonical JSON → assigned key) to DatabaseRegistry. register() now does an O(1) lookup in _reverse before any key search. from_dict() also populates _reverse so loaded registries benefit too. The collision-suffix logic is unchanged.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 17 out of 18 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/orcapod/core/nodes/function_node.py Outdated
Comment on lines +244 to +268
pipeline_path = tuple(descriptor.get("pipeline_path", ()))
# Derive pipeline_path_prefix by stripping the suffix that
# __init__ appends (packet_function.uri + node hash element).
# __init__ appends (packet_function.uri + two hash elements).
# The descriptor stores the complete pipeline_path; we need
# to reconstruct the prefix that was originally passed to
# __init__. The suffix added is: pf.uri + (f"node:{hash}",).
pf_uri_len = len(function_pod.packet_function.uri) + 1 # +1 for node:hash
prefix = (
pipeline_path[:-pf_uri_len] if len(pipeline_path) > pf_uri_len else ()
)
# __init__. The suffix added is:
# pf.uri + (f"schema:{pipeline_hash}", f"instance:{content_hash}")
pf_uri_len = len(function_pod.packet_function.uri) + 2 # +2 for schema/instance
if pipeline_path:
if len(pipeline_path) > pf_uri_len:
prefix = pipeline_path[:-pf_uri_len]
elif len(pipeline_path) == pf_uri_len:
prefix = ()
else:
import logging as _logging
_logging.getLogger(__name__).warning(
"pipeline_path %r is shorter than expected (uri_len=%d); "
"using empty prefix — DB path may be incorrect.",
pipeline_path,
pf_uri_len,
)
prefix = hint_prefix if hint_prefix is not None else ()
else:
# New format: pipeline_path not stored; use hint_prefix if available
prefix = hint_prefix if hint_prefix is not None else ()
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FunctionNode.from_descriptor() derives pipeline_path_prefix from descriptor["pipeline_path"] when present, using len(pf.uri) + 2 (schema/instance) as the suffix length. For old-format saves where the stored pipeline_path ends with a single node:{hash} component, this will strip one extra component and can drop the pipeline-name prefix, causing loaded nodes to scope DB reads/writes to the wrong path. Since Pipeline.load() already passes an explicit pipeline_path_prefix hint, prefer that hint when provided (and/or detect node: vs schema: suffix when deriving from stored pipeline_path) to preserve backward compatibility.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in the same commit d3548ed — this is the same issue raised in the companion comment on the prior round. The fix is identical to the OperatorNode pattern: hint_prefix is checked first and used directly when available, making the stored pipeline_path derivation a fallback-only path.

)

@property
def node_uri(self) -> tuple[str, ...]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure node_uri is officially part of the protocol

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added node_uri to SourceNodeProtocol, FunctionNodeProtocol, and OperatorNodeProtocol in node_protocols.py. Placed there rather than PipelineElementProtocolPipelineElementProtocol is used in a pipeline_hash() isinstance gate in base.py and adding node_uri to it would have caused FunctionPod / ArrowTableStream to fail that check, silently changing pipeline hash values.

return tuple(getattr(self, "_stored_node_uri", ()))
stream = self.stream
if hasattr(stream, "to_config"):
cfg = stream.to_config()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's adding logging here to capture which one is getting used

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added debug-level logging to all three branches of SourceNode.node_uri: read-only (stored URI), live stream with to_config(), and live stream without to_config().

# -------------------------------------------------------------------------

def to_config(self) -> dict[str, Any]:
def to_config(self, db_registry=None) -> dict[str, Any]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

give proper type hint to db_registry

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typed as DatabaseRegistry | None (imported under TYPE_CHECKING to avoid circular import).


@classmethod
def from_config(cls, config: dict[str, Any]) -> CachedSource:
def from_config(cls, config: dict[str, Any], db_registry=None) -> "CachedSource":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consistently remove any "" around type hints. Make sure to do from __future__ import annotations if missing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from __future__ import annotations was already present. Removed the remaining quoted return type ("CachedSource"CachedSource) from from_config.


@classmethod
def from_config(cls, config: dict[str, Any]) -> CachedSource:
def from_config(cls, config: dict[str, Any], db_registry=None) -> "CachedSource":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure to provide detailed type hints for db_registry

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typed db_registry: DatabaseRegistry | None = None on both to_config and from_config.

Comment thread src/orcapod/pipeline/graph.py Outdated
else None
)
elif "databases" in pipeline_meta:
# Old format: pipeline.databases.{pipeline_database: {...}}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop support for old format

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropped. Same old-format branch removed (items 10 and 11 addressed in the same code block).

Comment thread src/orcapod/pipeline/graph.py Outdated
descriptor: dict[str, Any],
mode: str,
resolve_source_from_config: Any,
db_registry: Any = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use more proper typehint that's not Any.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typed as Callable[..., Any] in the _load_source_node signature.

Comment thread src/orcapod/pipeline/graph.py Outdated
from orcapod.core.function_pod import FunctionPod
from orcapod.pipeline.serialization import LoadStatus

# Support both new key 'function_config' and old key 'function_pod'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit to only new format. Generally go through and drop support for old format.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropped both fallback keys: function_podfn_config = descriptor.get("function_config") and operatorop_config = descriptor.get("operator_config"). Also dropped the uppercase cache_mode fallback in operator_node.py — now uses CacheMode(value) directly (new format uses lowercase values).

Comment thread src/orcapod/pipeline/serialization.py Outdated
config: dict[str, Any],
*,
fallback_to_proxy: bool = False,
db_registry: "DatabaseRegistry | None" = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not put quotes around type hint

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed: from_dict return type is now DatabaseRegistry (unquoted). from __future__ import annotations was already present at the top of the file.

output_schema = node_descriptor.get("output_schema", {})
tag_schema_dict = output_schema.get("tag", {})
packet_schema_dict = output_schema.get("packet", {})
else:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop support for the old format

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated _source_proxy_from_config to prefer node_descriptor for top-level pipeline nodes (new format). The else branch is retained specifically for inner sources (e.g. the DictSource wrapped inside a CachedSource) which always embed identity via _identity_config() in their own config dict — dropping it entirely would break CachedSource round-trip loading. Raises with a clear message when neither source provides identity fields.

kurodo3 Bot and others added 13 commits March 30, 2026 22:03
Add normalized pipeline save format design document covering:
- Four save levels: minimal, definition, standard, full
- Database deduplication via top-level registry
- Node identity model with formal content_hash and pipeline_hash definitions
- PLT-1161 DAG snapshot alignment at minimal level
- pipeline_path derivation formula (never stored)

Also update .gitignore to track superpowers/specs/ for shared design artifacts.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…20/ENG-342)

Covers ENG-342 (pipeline_path two-level formula fix) and ENG-320
(normalized save format with database registry and save levels).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ENG-342)

FunctionNode was using only pipeline_hash; OperatorNode was using only
content_hash. Both now emit (schema:{pipeline_hash}, instance:{content_hash})
to enable hierarchical result storage grouping.

Also fixes from_descriptor in both node types to strip two suffix elements
(schema: + instance:) instead of one when deriving pipeline_path_prefix
from stored descriptors, ensuring loaded nodes resolve the correct DB path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…om_descriptor guard (ENG-342)

- Remove _pipeline_node_hash (set but never read after pipeline_path fix)
- Add warning log when stored pipeline_path is shorter than expected
- Update stale OperatorNode docstring and test fixture to new two-level format

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… (ENG-320)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ffix keys

Re-registering a config that landed under a collision-suffix key (e.g.
db_abc_2) now correctly returns the existing key instead of creating
a duplicate entry.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…, node_uri property (ENG-320)

- CachedSource.to_config(db_registry=None): emits key or inline dict
- CachedSource.from_config(config, db_registry=None): resolves key or inline
- Remove identity fields from CachedSource.to_config output
- _source_proxy_from_config(config, node_descriptor=None): reads identity from
  node_descriptor (new format) or config (old format, backward compat)
- FunctionNode.node_uri property: returns packet_function.uri or _stored_node_uri
- OperatorNode.node_uri property: returns operator.uri or _stored_node_uri

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…th (ENG-320)

node_uri property was always returning () for deserialized nodes because
_stored_node_uri was never assigned from the descriptor dict.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add level parameter: minimal/definition/standard/full
- Top-level databases registry (SHA-256 keys); pipeline block holds key references
- node_uri at all levels; data_context_key/configs at definition+; cache_mode at standard+
- Rename function_pod->function_config, operator->operator_config
- Remove pipeline_path and result_record_path from all saved output
- Add run_id/snapshot_time (null) to pipeline block
- Update load() to handle both old and new format (backward compatible)
- Pass db_registry to resolve_source_from_config for CachedSource reconstruction
- Derive pipeline_path_prefix and result_path_prefix from pipeline name on load
- Handle lowercase cache_mode values (new .value format) in OperatorNode.from_descriptor

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…llback (ENG-320)

- SourceNode: add node_uri property and populate _stored_node_uri in from_descriptor
- _load_source_node: route reconstruction failures through _source_proxy_from_config
  so identity is preserved as SourceProxy instead of lost as UNAVAILABLE
- load(): raise clear ValueError for minimal-level saves (not round-trippable)
- Add test for level='full' save

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace node_pipeline_identity terminology with node_uri and clarify
that the data-sensitivity difference between content_hash and
pipeline_hash comes entirely from what is chained in from upstream.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Require presenting a revision plan (with verdict per comment) and
waiting for user approval before implementing any changes or posting
replies to reviewers.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- graph.py: add explicit ValueError for definition-level loads
  (previously fell through to generic "Missing 'databases' block" error)
- operator_node.py: from_descriptor() full mode now uses
  databases["pipeline_path_prefix"] hint instead of deriving prefix from
  the stored pipeline_path key (absent in new format), preventing empty
  prefix and wrong DB scope
- cached_source.py: to_config() forwards db_registry to inner source;
  from_config() forwards db_registry to resolve_source_from_config so
  nested CachedSource instances can resolve registry keys

Tests added (TDD red-green):
- test_load_definition_level_raises_clear_error
- test_load_operator_node_pipeline_path_has_pipeline_name_prefix
- test_cached_source_to_config_forwards_db_registry_to_inner_source
- test_nested_cached_source_from_config_forwards_db_registry

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@kurodo3 kurodo3 Bot force-pushed the eywalker/eng-256-design-normalize-pipeline-save-format-deduplicate-databases branch from 5ab7c22 to 3c24342 Compare March 30, 2026 22:05
kurodo3 Bot and others added 2 commits March 30, 2026 22:25
- graph.py: raise ValueError when function_database key is non-null but
  missing from the databases registry (previously silently returned None)
- graph.py: prefer databases["pipeline_path_prefix"] hint in
  FunctionNode.from_descriptor() over deriving from stored pipeline_path
  (mirrors the OperatorNode fix; avoids over-stripping on old-format saves)
- graph.py: use node.node_uri property for SourceNode in save() to remove
  duplicated derivation logic
- graph.py: update save() docstring to note definition level is not loadable
- serialization.py: add _reverse map to DatabaseRegistry for O(1) re-registration
  (avoids linear scan on repeated register() calls during save)

Tests added (TDD red-green):
- test_load_raises_on_missing_function_database_registry_key
- test_load_function_node_pipeline_path_prefers_hint_over_stored_pipeline_path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Behavioral changes (with TDD tests):
- definition-level saves are now loadable via Pipeline.load(); accepts
  optional pipeline_database/function_database kwargs; nodes load as
  CACHE_ONLY/UNAVAILABLE when no DB is supplied
- dropped old format support: pipeline.databases dict structure (pre-v0.1.0)
  raises ValueError — only top-level databases registry is accepted
- dropped function_pod and operator fallback keys from load path; only
  function_config and operator_config keys are recognised
- _source_proxy_from_config: prefers node_descriptor for top-level nodes,
  falls back to _identity_config() fields for inner sources (CachedSource),
  raises when neither provides identity

Non-behavioural cleanups:
- node_uri added to SourceNodeProtocol / FunctionNodeProtocol /
  OperatorNodeProtocol (avoids mutating PipelineElementProtocol which
  would break pipeline_hash isinstance routing)
- SourceNode.node_uri: debug logging added to all three code paths
- CachedSource.to_config / from_config: proper DatabaseRegistry type hints
- graph.py save(): level param typed as Literal[...]
- graph.py _build_source_descriptor: db_registry typed; isinstance replaced
  with duck-type hasattr(stream, "to_config") check
- graph.py _load_source_node: resolve_source_from_config typed Callable,
  db_registry typed DatabaseRegistry; node_descriptor threaded through
- serialization.py: DatabaseRegistry.from_dict return type unquoted;
  resolve_source_from_config gains node_descriptor param
- Pipeline.__init__: pipeline_database now Optional to support definition-
  level loads; flush() guards against None
- operator_node.py: cache_mode loading drops uppercase fallback, uses
  CacheMode(value) directly

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Collaborator

@brian-arnold brian-arnold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review comments

1. inspect.signature for db_registry dispatch is fragile

The PR introduces inspect.signature() in three places to detect whether a source's to_config() / from_config() accepts a db_registry parameter:

  • cached_source.py:to_config() — introspects inner source
  • serialization.py:resolve_source_from_config() — introspects cls.from_config
  • graph.py:_build_source_descriptor() — introspects stream.to_config

Runtime signature introspection can break with decorators, wrappers, __wrapped__, or **kwargs signatures. Since this is a greenfield project and only CachedSource needs db_registry, consider one of these simpler alternatives:

  • isinstance check: if isinstance(stream, CachedSource): stream.to_config(db_registry=...) — explicit and zero introspection overhead.
  • Add db_registry=None to the base source protocol: all sources accept it, only CachedSource uses it. No dispatch logic needed anywhere.
  • try/except TypeError: attempt the call with db_registry, fall back on TypeError.

Any of these would eliminate the import inspect dependency and the implicit contract that only shows up at runtime.


2. pipeline_database=None opens crash paths in FunctionNode

The PR makes pipeline_database optional in Pipeline.__init__ (to support definition-level loads). Pipeline.flush() was correctly guarded, and OperatorNode has proper None checks throughout its execution paths.

However, FunctionNode's caching methods don't have the same guards. If a user loads a definition-level save without supplying a database and then attempts to execute the pipeline, these methods will raise AttributeError: 'NoneType' object has no attribute ...:

  • get_cached_results() — calls self._pipeline_database.get_all_records()
  • add_pipeline_record() — calls self._pipeline_database.get_record_by_id() and .add_record()

The basic execution path is likely safe (guarded by self._cached_function_pod is not None), but the above methods are reachable. Consider either:

  • Adding None guards to these methods (raise a clear RuntimeError instead of AttributeError)
  • Documenting that definition-level loaded pipelines must have databases attached before execution

3. Missing save→load→run round-trip integration test

The test suite thoroughly covers the new save format structure, DatabaseRegistry, and node-level behavior. However, there's no end-to-end test that:

  1. Builds and runs a pipeline
  2. Saves it at standard level
  3. Loads it via Pipeline.load()
  4. Runs the loaded pipeline
  5. Verifies results match the original

This is the golden path for the entire PR and would catch integration issues between the save format, database registry resolution, and pipeline reconstruction. A similar test for definition-level save→load (with caller-supplied databases) would also be valuable.


🤖 Generated with Claude Code

…tocol, None guards, round-trip tests

Comment 1 (inspect.signature fragility):
- Add DatabaseRegistryProtocol to database_protocols.py with register()/resolve()
  methods, cleanly defining the db_registry contract for both save and load
- Add db_registry: DatabaseRegistryProtocol | None = None to SourceProtocol
  and all 11 source to_config/from_config implementations — those that don't
  embed DB references simply ignore the parameter
- Remove all dispatch logic (inspect.signature, isinstance, source_type checks)
  from graph._build_source_descriptor, serialization.resolve_source_from_config,
  and cached_source.to_config — always forward db_registry unconditionally
- DatabaseRegistry (save-time deduplication class) implicitly satisfies
  DatabaseRegistryProtocol; no changes to its implementation

Comment 2 (pipeline_database=None crash paths):
- Add FunctionNode._require_pipeline_database() helper raising RuntimeError
  with an actionable message instead of AttributeError: NoneType
- Guard get_cached_results() and add_pipeline_record() with the helper

Comment 3 (missing round-trip integration test):
- Add TestSaveLoadRunRoundtrip with two end-to-end tests:
  - test_standard_save_load_run_roundtrip: build+run → save → load → run → compare
  - test_definition_save_load_run_roundtrip: save at definition level → load with
    fresh caller-supplied database → run → compare results match original

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@kurodo3
Copy link
Copy Markdown
Contributor Author

kurodo3 Bot commented Apr 1, 2026

Addressed in commit 70cc641. Here's what changed for each item:


1. inspect.signature fragility → DatabaseRegistryProtocol

Replaced all three inspect.signature() dispatch points with a proper protocol-based approach:

  • New DatabaseRegistryProtocol added to database_protocols.py — defines register(config) -> str (save-time deduplication) and resolve(key) -> dict (load-time lookup). The existing DatabaseRegistry class implicitly satisfies it with no changes.
  • SourceProtocol.to_config/from_config now declare db_registry: DatabaseRegistryProtocol | None = None — all sources accept the parameter by contract.
  • All 11 concrete sources (CSVSource, DictSource, ArrowTableSource, etc.) have db_registry=None added to both to_config and from_config — ignored by sources that don't embed DB references.
  • CachedSource uses DatabaseRegistryProtocol in its signatures; inner source is called unconditionally with db_registry=db_registry — no dispatch.
  • graph._build_source_descriptor and serialization.resolve_source_from_config now always forward db_registry — zero dispatch logic, no import inspect.

2. pipeline_database=None crash paths

Added FunctionNode._require_pipeline_database() — a private helper that raises RuntimeError with an actionable message ("supply a pipeline_database via Pipeline.load(..., pipeline_database=)") instead of AttributeError: 'NoneType' object has no attribute ....

Called at the start of both get_cached_results() (after the _cached_function_pod is None early return) and add_pipeline_record().


3. Save→load→run round-trip integration test

Added TestSaveLoadRunRoundtrip class with two end-to-end tests:

  • test_standard_save_load_run_roundtrip: builds a CSVSource pipeline, runs it, captures results, saves at standard level, loads, runs the loaded pipeline, asserts output matches the original run.
  • test_definition_save_load_run_roundtrip: saves at definition level (no embedded DB), loads with a fresh caller-supplied DeltaTableDatabase, asserts the function node reaches FULL status and produces identical results.

Both pass (2927 tests pass total, 0 failures).

…loadable function

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 32 out of 33 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

"run_id": None,
"snapshot_time": None,
}
if include_pipeline_dbs:
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline.save() can crash when saving at standard/full if pipeline_database is None (now allowed by Pipeline.__init__). The call to self._pipeline_database.to_config() will raise an AttributeError instead of a clear error. Consider validating that self._pipeline_database is present when include_pipeline_dbs is true and raising a ValueError that tells the caller to pass pipeline_database= (or save at definition level).

Suggested change
if include_pipeline_dbs:
if include_pipeline_dbs:
if self._pipeline_database is None:
raise ValueError(
"Cannot save pipeline at level 'standard' or 'full' without a "
"pipeline_database. Either pass pipeline_database= when "
"constructing the Pipeline, or save with level='definition'."
)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 28ce24b. Added an explicit ValueError at the top of save() when include_pipeline_dbs is true and self._pipeline_database is None. The error message tells the caller to either pass pipeline_database= at construction time or save with level='definition'.

# Standard/full format: top-level databases registry
db_registry_data = data["databases"]
load_db_registry = DatabaseRegistry.from_dict(db_registry_data)
pipeline_db_key = pipeline_meta["pipeline_database"]
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline.load() resolves pipeline_database via pipeline_db_config = db_registry_data[pipeline_db_key] without checking that pipeline_db_key exists in the registry. If the JSON is corrupted (or edited) this will raise a KeyError with a less helpful message than the explicit ValueError used for function_database. Consider adding the same explicit validation/ValueError for pipeline_database keys.

Suggested change
pipeline_db_key = pipeline_meta["pipeline_database"]
pipeline_db_key = pipeline_meta["pipeline_database"]
if pipeline_db_key not in db_registry_data:
raise ValueError(
f"Pipeline database key {pipeline_db_key!r} not found in databases registry. "
f"Available keys: {sorted(db_registry_data.keys())}"
)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 28ce24b. Added the same explicit existence check and ValueError that function_database already had. pipeline_db_key is now validated against db_registry_data before lookup, giving a clear error message with the available keys instead of a bare KeyError.

Comment on lines +172 to +174
cache_mode = CacheMode(cache_mode_str)
except ValueError:
cache_mode = CacheMode.OFF
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OperatorNode.from_descriptor() only parses cache_mode using CacheMode(cache_mode_str) (i.e., expecting values like "off"/"log"/"replay"). Older descriptors (including some tests) use enum names like "OFF"; for non-OFF modes ("LOG"/"REPLAY") this will silently downgrade to OFF. Consider accepting both representations (e.g., try value parsing first, then fall back to CacheMode[cache_mode_str] when cache_mode_str matches a member name).

Suggested change
cache_mode = CacheMode(cache_mode_str)
except ValueError:
cache_mode = CacheMode.OFF
# Prefer value-based parsing (e.g. "off", "log", "replay").
cache_mode = CacheMode(cache_mode_str)
except ValueError:
# Backwards compatibility: accept enum names (e.g. "OFF", "LOG", "REPLAY").
try:
cache_mode = CacheMode[cache_mode_str.upper()]
except (KeyError, AttributeError):
cache_mode = CacheMode.OFF

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not implementing. Eywalker's review explicitly requested dropping the uppercase/name-based cache_mode fallback (comment: 'drop support for old format'). We intentionally replaced the try/except-on-names approach with value-only parsing (CacheMode(cache_mode_str) expecting lowercase values like 'off'/'log'/'replay'). Restoring the CacheMode[name.upper()] path would contradict that architectural decision. Descriptors produced by the new save() always write lowercase values.

Comment on lines +783 to +810
load_db_registry: DatabaseRegistryProtocol | None = None
if "databases" in data and "pipeline_database" in pipeline_meta:
# Standard/full format: top-level databases registry
db_registry_data = data["databases"]
load_db_registry = DatabaseRegistry.from_dict(db_registry_data)
pipeline_db_key = pipeline_meta["pipeline_database"]
pipeline_db_config = db_registry_data[pipeline_db_key]
pipeline_db = resolve_database_from_config(pipeline_db_config)
fn_db_key = pipeline_meta.get("function_database")
if fn_db_key is None:
function_db = None
elif fn_db_key not in db_registry_data:
raise ValueError(
f"Function database key {fn_db_key!r} not found in databases registry. "
f"Available keys: {sorted(db_registry_data.keys())}"
)
else:
function_db = resolve_database_from_config(db_registry_data[fn_db_key])
elif level == "definition":
# Definition-level: no embedded DB config; caller may supply databases
pipeline_db = pipeline_database
function_db = function_database
else:
raise ValueError(
"Cannot determine database configuration from pipeline JSON. "
"Expected a top-level 'databases' registry with a "
"'pipeline_database' key in the pipeline block."
)
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description says Pipeline.load() is backward-compatible with old save formats (inline pipeline.databases and legacy node keys like function_pod/operator), but the current load() implementation raises unless it finds a top-level databases registry + pipeline.pipeline_database, and _load_function_node/_load_operator_node only read function_config/operator_config. Either update the PR description to match the implemented behavior (no backward-compat) or restore the compatibility paths.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description has been updated to reflect the intentional decision (made during eywalker's review) to drop backward compatibility with the old format. The description previously claimed backward-compat load paths for function_pod/operator keys and inline pipeline.databases dict — these are no longer supported and loading old-format files now raises a clear ValueError. The description now accurately states this.

Comment thread src/orcapod/pipeline/serialization.py Outdated
Comment on lines +348 to +354
Requires *node_descriptor* to contain ``content_hash``,
``pipeline_hash``, and ``output_schema`` fields.
db_registry: Optional registry forwarded to ``from_config`` for all
source types. Sources that do not embed database references ignore
it; ``CachedSource`` uses it to resolve its cache database key.
node_descriptor: Node descriptor dict containing identity fields used
when creating a ``SourceProxy`` fallback.
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve_source_from_config(..., fallback_to_proxy=True) docstring says the proxy fallback "requires node_descriptor" to provide identity fields, but _source_proxy_from_config() can also read identity from config (used for inner sources like CachedSource.inner_source). Consider updating the docstring to reflect that either node_descriptor or identity fields in config are sufficient, depending on the call site.

Suggested change
Requires *node_descriptor* to contain ``content_hash``,
``pipeline_hash``, and ``output_schema`` fields.
db_registry: Optional registry forwarded to ``from_config`` for all
source types. Sources that do not embed database references ignore
it; ``CachedSource`` uses it to resolve its cache database key.
node_descriptor: Node descriptor dict containing identity fields used
when creating a ``SourceProxy`` fallback.
Identity fields (``content_hash``, ``pipeline_hash``, and
``output_schema``) are taken from *node_descriptor* when provided,
or from *config* when available.
db_registry: Optional registry forwarded to ``from_config`` for all
source types. Sources that do not embed database references ignore
it; ``CachedSource`` uses it to resolve its cache database key.
node_descriptor: Optional node descriptor dict containing identity
fields used when creating a ``SourceProxy`` fallback. If omitted,
identity is inferred from ``config`` where possible.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 28ce24b. Updated the docstring to clarify that identity fields are taken from node_descriptor when provided, or inferred from config when available — the latter is the path used for inner sources (e.g. a DictSource embedded inside a CachedSource always includes identity via _identity_config()). The previous wording incorrectly implied node_descriptor was always required.

…k, docstring

- Pipeline.save() raises clear ValueError at standard/full level when
  pipeline_database is None, instead of AttributeError on .to_config()
- Pipeline.load() validates pipeline_db_key exists in the databases registry
  with an explicit ValueError (matching the existing function_database guard)
- resolve_source_from_config docstring corrected: SourceProxy fallback reads
  identity from node_descriptor when provided, or from config for inner sources
  (e.g. CachedSource.inner_source embeds identity via _identity_config())

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@eywalker eywalker merged commit 49186da into dev Apr 1, 2026
9 checks passed
eywalker pushed a commit that referenced this pull request Apr 1, 2026
- graph.py: add explicit ValueError for definition-level loads
  (previously fell through to generic "Missing 'databases' block" error)
- operator_node.py: from_descriptor() full mode now uses
  databases["pipeline_path_prefix"] hint instead of deriving prefix from
  the stored pipeline_path key (absent in new format), preventing empty
  prefix and wrong DB scope
- cached_source.py: to_config() forwards db_registry to inner source;
  from_config() forwards db_registry to resolve_source_from_config so
  nested CachedSource instances can resolve registry keys

Tests added (TDD red-green):
- test_load_definition_level_raises_clear_error
- test_load_operator_node_pipeline_path_has_pipeline_name_prefix
- test_cached_source_to_config_forwards_db_registry_to_inner_source
- test_nested_cached_source_from_config_forwards_db_registry

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
eywalker pushed a commit that referenced this pull request Apr 1, 2026
- graph.py: raise ValueError when function_database key is non-null but
  missing from the databases registry (previously silently returned None)
- graph.py: prefer databases["pipeline_path_prefix"] hint in
  FunctionNode.from_descriptor() over deriving from stored pipeline_path
  (mirrors the OperatorNode fix; avoids over-stripping on old-format saves)
- graph.py: use node.node_uri property for SourceNode in save() to remove
  duplicated derivation logic
- graph.py: update save() docstring to note definition level is not loadable
- serialization.py: add _reverse map to DatabaseRegistry for O(1) re-registration
  (avoids linear scan on repeated register() calls during save)

Tests added (TDD red-green):
- test_load_raises_on_missing_function_database_registry_key
- test_load_function_node_pipeline_path_prefers_hint_over_stored_pipeline_path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
eywalker pushed a commit that referenced this pull request Apr 1, 2026
Behavioral changes (with TDD tests):
- definition-level saves are now loadable via Pipeline.load(); accepts
  optional pipeline_database/function_database kwargs; nodes load as
  CACHE_ONLY/UNAVAILABLE when no DB is supplied
- dropped old format support: pipeline.databases dict structure (pre-v0.1.0)
  raises ValueError — only top-level databases registry is accepted
- dropped function_pod and operator fallback keys from load path; only
  function_config and operator_config keys are recognised
- _source_proxy_from_config: prefers node_descriptor for top-level nodes,
  falls back to _identity_config() fields for inner sources (CachedSource),
  raises when neither provides identity

Non-behavioural cleanups:
- node_uri added to SourceNodeProtocol / FunctionNodeProtocol /
  OperatorNodeProtocol (avoids mutating PipelineElementProtocol which
  would break pipeline_hash isinstance routing)
- SourceNode.node_uri: debug logging added to all three code paths
- CachedSource.to_config / from_config: proper DatabaseRegistry type hints
- graph.py save(): level param typed as Literal[...]
- graph.py _build_source_descriptor: db_registry typed; isinstance replaced
  with duck-type hasattr(stream, "to_config") check
- graph.py _load_source_node: resolve_source_from_config typed Callable,
  db_registry typed DatabaseRegistry; node_descriptor threaded through
- serialization.py: DatabaseRegistry.from_dict return type unquoted;
  resolve_source_from_config gains node_descriptor param
- Pipeline.__init__: pipeline_database now Optional to support definition-
  level loads; flush() guards against None
- operator_node.py: cache_mode loading drops uppercase fallback, uses
  CacheMode(value) directly

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@eywalker eywalker deleted the eywalker/eng-256-design-normalize-pipeline-save-format-deduplicate-databases branch April 1, 2026 02:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants