Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/orcapod/core/nodes/operator_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,13 @@ def from_descriptor(
node._stored_pipeline_hash = descriptor.get("pipeline_hash")
node._stored_pipeline_path = tuple(descriptor.get("pipeline_path", ()))

# Determine load status based on DB availability
# Determine load status based on DB availability and cache mode.
# An uncached operator (cache_mode=OFF) never writes records to the
# database, so even when a pipeline_db exists there is nothing to
# read back. Only operators that actively persist results
# (LOG or REPLAY mode) can legitimately serve data in read-only mode.
node._load_status = LoadStatus.UNAVAILABLE
if pipeline_db is not None:
if pipeline_db is not None and cache_mode != CacheMode.OFF:
node._load_status = LoadStatus.READ_ONLY

return node
Expand Down
194 changes: 186 additions & 8 deletions tests/test_pipeline/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,12 @@ def test_read_only_function_returns_stored_hashes(self, simple_pipeline):
assert fn.pipeline_hash().to_string() == orig_pipeline

def test_read_only_operator_with_join(self, tmp_path):
"""Read-only mode preserves operator node metadata for multi-input operators."""
"""Read-only mode preserves operator node metadata for multi-input operators.

An uncached Join (cache_mode=OFF) has no records in the database, so it
resolves to UNAVAILABLE regardless of load mode — stored schema metadata
is still accessible via output_schema(). (PLT-1158)
"""
db = DeltaTableDatabase(base_path=str(tmp_path / "db"))
source1 = DictSource(
data=[{"a": 1, "b": 10}], tag_columns=["a"], source_id="s1"
Expand All @@ -660,9 +665,10 @@ def test_read_only_operator_with_join(self, tmp_path):
loaded = Pipeline.load(str(path), mode="read_only")

op_node = loaded.compiled_nodes["my_join"]
assert op_node.load_status == LoadStatus.READ_ONLY
# Uncached Join (cache_mode=OFF) → UNAVAILABLE even in read_only mode
assert op_node.load_status == LoadStatus.UNAVAILABLE

# Metadata should be accessible
# Stored metadata is still accessible
tag_schema, packet_schema = op_node.output_schema()
assert isinstance(tag_schema, Schema)
assert isinstance(packet_schema, Schema)
Expand Down Expand Up @@ -776,7 +782,13 @@ def test_full_mode_function_degrades_when_source_unavailable(self, simple_pipeli
assert fn.load_status == LoadStatus.CACHE_ONLY

def test_full_mode_operator_degrades_when_sources_unavailable(self, tmp_path):
"""Operator degrades to READ_ONLY when upstream sources are UNAVAILABLE."""
"""Uncached operator degrades to UNAVAILABLE when upstream sources are UNAVAILABLE.

An operator without explicit DB caching (cache_mode=OFF) never writes
records to the database, so even when a pipeline_db exists there is
nothing to read back. The correct status is therefore UNAVAILABLE,
not READ_ONLY. (PLT-1158)
"""
db = DeltaTableDatabase(base_path=str(tmp_path / "db"))
source1 = DictSource(
data=[{"a": 1, "b": 10}], tag_columns=["a"], source_id="s1"
Expand All @@ -795,8 +807,8 @@ def test_full_mode_operator_degrades_when_sources_unavailable(self, tmp_path):
loaded = Pipeline.load(str(path), mode="full")

op_node = loaded.compiled_nodes["my_join"]
# Both sources are DictSource (UNAVAILABLE), so operator degrades
assert op_node.load_status == LoadStatus.READ_ONLY
# Uncached Join (cache_mode=OFF) with UNAVAILABLE sources → UNAVAILABLE
assert op_node.load_status == LoadStatus.UNAVAILABLE

def test_full_mode_preserves_pipeline_name(self, simple_pipeline):
"""Full mode preserves the pipeline name."""
Expand Down Expand Up @@ -999,14 +1011,14 @@ def test_load_multi_source_operator_pipeline_read_only(self, tmp_path):
for node in loaded._persistent_node_map.values():
assert hasattr(node, "load_status")

# Sources are UNAVAILABLE, operator is READ_ONLY
# Sources are UNAVAILABLE, uncached operator is also UNAVAILABLE (PLT-1158)
sources = [
n for n in loaded._persistent_node_map.values() if n.node_type == "source"
]
assert all(s.load_status == LoadStatus.UNAVAILABLE for s in sources)

op = loaded.compiled_nodes["join_node"]
assert op.load_status == LoadStatus.READ_ONLY
assert op.load_status == LoadStatus.UNAVAILABLE

def test_load_preserves_all_edge_pairs(self, multi_source_pipeline):
"""Loaded pipeline has the same edge pairs as the original."""
Expand Down Expand Up @@ -1435,3 +1447,169 @@ async def test_cache_only_mode_async_execute_yields_cached_data(self, tmp_path):
key=lambda r: r["x"],
)
assert recovered == original


# ---------------------------------------------------------------------------
# PLT-1158: Uncached operator with UNAVAILABLE sources must not be READ_ONLY
# ---------------------------------------------------------------------------


def _adder(x: int, y: int, z: int) -> int:
"""Simple adder used in the PLT-1158 reproduction pipeline."""
return x + y + z


class TestPLT1158UncachedOperatorStatus:
"""Regression tests for PLT-1158.

An implicit operator node (e.g. the join between two source nodes) that
has no explicit database caching (cache_mode=OFF) must resolve to
LoadStatus.UNAVAILABLE when its parent sources cannot be reconstructed.
Previously it incorrectly resolved to READ_ONLY, which caused downstream
function nodes to skip CACHE_ONLY mode and attempt computation — failing
because the operator had no live data stream.
"""

def _build_pipeline(self, tmp_path):
"""Build and save the pipeline from the PLT-1158 issue example.

Two DictSources are joined (implicit, uncached Join) and passed to
a function pod that sums three fields. The pipeline is run, flushed,
and saved to disk before returning the saved path and the expected
results.
"""
db = DeltaTableDatabase(base_path=str(tmp_path / "db"))

data1 = DictSource(
[
{"id": 1, "x": 10, "y": 20},
{"id": 2, "x": 30, "y": 40},
{"id": 3, "x": 60, "y": 10},
],
tag_columns=["id"],
source_id="data1",
)
data2 = DictSource(
[
{"id": 2, "z": 30},
{"id": 3, "z": 50},
],
tag_columns=["id"],
source_id="data2",
)

pf = PythonPacketFunction(
function=_adder,
output_keys=["result"],
function_name="_adder",
)
pod = FunctionPod(packet_function=pf)

pipeline = Pipeline(name="my_sample_pipeline", pipeline_database=db)
with pipeline:
pod.process(data1, data2, label="adder")

pipeline.run()
db.flush()

json_path = str(tmp_path / "pipeline.json")
pipeline.save(json_path)

# Collect expected results for later comparison
adder_node = pipeline.compiled_nodes["adder"]
expected = sorted(
[p.as_dict()["result"] for _, p in adder_node.iter_packets()]
)
return json_path, expected

# ------------------------------------------------------------------
# Status propagation
# ------------------------------------------------------------------

def test_uncached_operator_is_unavailable_not_read_only(self, tmp_path):
"""Uncached join resolves to UNAVAILABLE (not READ_ONLY) after load.

This is the core regression guard for PLT-1158: previously the join
got READ_ONLY because a pipeline_db existed, even though no records
were ever written to it (cache_mode=OFF).
"""
json_path, _ = self._build_pipeline(tmp_path)
loaded = Pipeline.load(json_path, mode="full")

# The implicit join operator has no DB caching → UNAVAILABLE.
# Find operator nodes via node type (the join is upstream of the function node).
op_nodes = [
n for n in loaded._persistent_node_map.values()
if n.node_type == "operator"
]
assert len(op_nodes) >= 1
for op in op_nodes:
assert op.load_status == LoadStatus.UNAVAILABLE, (
f"Expected UNAVAILABLE for uncached operator, got {op.load_status}"
)

def test_source_nodes_are_unavailable(self, tmp_path):
"""Source nodes (DictSource) are UNAVAILABLE after load — pre-condition."""
json_path, _ = self._build_pipeline(tmp_path)
loaded = Pipeline.load(json_path, mode="full")

source_nodes = [
n for n in loaded._persistent_node_map.values()
if n.node_type == "source"
]
assert len(source_nodes) >= 1
for src in source_nodes:
assert src.load_status == LoadStatus.UNAVAILABLE

def test_function_node_gets_cache_only_when_operator_is_unavailable(
self, tmp_path
):
"""Function node downstream of an UNAVAILABLE operator gets CACHE_ONLY.

When the operator is correctly UNAVAILABLE, _load_function_node enters
the UNAVAILABLE branch and wires up a proxy pod in CACHE_ONLY mode,
allowing the function node to serve all previously cached results.
"""
json_path, _ = self._build_pipeline(tmp_path)
loaded = Pipeline.load(json_path, mode="full")

fn = loaded.compiled_nodes["adder"]
assert fn.load_status == LoadStatus.CACHE_ONLY

def test_cache_only_function_node_serves_cached_results(self, tmp_path):
"""CACHE_ONLY function node returns the same results as the original run.

This is the end-to-end regression guard from the PLT-1158 issue: in a
fresh session the pipeline is loaded, the adder function node (backed
by a DB cache) serves all previously computed results without touching
the unavailable operator or sources.
"""
json_path, expected = self._build_pipeline(tmp_path)
loaded = Pipeline.load(json_path, mode="full")

fn = loaded.compiled_nodes["adder"]
assert fn.load_status == LoadStatus.CACHE_ONLY

# Must not raise; must return same values as the original run
recovered = sorted(
[p.as_dict()["result"] for _, p in fn.iter_packets()]
)
assert recovered == expected

def test_read_only_mode_uncached_operator_is_also_unavailable(self, tmp_path):
"""In read_only mode, an uncached operator is still UNAVAILABLE.

The UNAVAILABLE status for cache_mode=OFF operators is not mode-specific:
the operator never wrote records to the DB, so READ_ONLY would be
misleading regardless of the load mode.
"""
json_path, _ = self._build_pipeline(tmp_path)
loaded = Pipeline.load(json_path, mode="read_only")

op_nodes = [
n for n in loaded._persistent_node_map.values()
if n.node_type == "operator"
]
assert len(op_nodes) >= 1
for op in op_nodes:
assert op.load_status == LoadStatus.UNAVAILABLE
Loading