Observability reader [ENG-347]#121
Conversation
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
Adds a new ObservabilityReader utility to discover and query pipeline observability artifacts (status + logs) stored as Delta Lake tables under a results directory, plus initial tests covering discovery and basic validation.
Changes:
- Introduce
ObservabilityReaderfor auto-discovery of status/log Delta tables and for returning cleaned Polars DataFrames. - Implement DataFrame cleaning (drop system columns, rename key fields) and a
status()view that deduplicates to latest per (node, tag). - Add a new test module that writes representative Delta tables and validates node discovery, tag inference, and error handling.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
src/orcapod/pipeline/observability_reader.py |
New reader that discovers Delta tables under a root and exposes status() / logs() query helpers. |
tests/test_pipeline/test_observability_reader.py |
New tests creating local Delta tables and validating discovery/tag inference + basic invalid-root behaviors. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if "status" in parts: | ||
| idx = parts.index("status") | ||
| if idx + 1 < len(parts): | ||
| node_name = parts[idx + 1] | ||
| self._status_tables[node_name] = table_dir | ||
| elif "logs" in parts: | ||
| idx = parts.index("logs") | ||
| if idx + 1 < len(parts): | ||
| node_name = parts[idx + 1] | ||
| self._log_tables[node_name] = table_dir | ||
|
|
There was a problem hiding this comment.
_discover_tables stores only a single table_dir per node_name; if multiple Delta tables exist for the same node (e.g., different schema_hash / node hash, multiple pipeline versions), later discoveries overwrite earlier ones and data is silently dropped. Consider storing a list of table dirs per node and concatenating all of them (or selecting deterministically by version/timestamp).
There was a problem hiding this comment.
Fixed — _discover_tables now stores a list[Path] per node name, and _get_status_df/_get_logs_df iterate over all tables for each node before concatenating. This handles multiple Delta tables per node (e.g. different schema hashes or pipeline versions).
| """Sorted list of discovered node names.""" | ||
| return sorted(self._status_tables.keys()) |
There was a problem hiding this comment.
nodes currently returns only status-table node names. If a results directory contains logs but no status for a node (or only logs at all), that node will be omitted from nodes even though logs() could potentially serve it. Consider returning the sorted union of status+log discovered nodes (and keeping ordering stable).
| """Sorted list of discovered node names.""" | |
| return sorted(self._status_tables.keys()) | |
| """Sorted list of discovered node names (from status and logs).""" | |
| return sorted(set(self._status_tables) | set(self._log_tables)) |
There was a problem hiding this comment.
By design — nodes is derived from status tables because the StatusObserver emits status for every node the orchestrator executes. A node without status entries was not executed. Logs are secondary — they capture stdout/stderr but are only written after packet execution completes, so a node could have status but no logs (e.g. still running). Keeping nodes status-derived gives a consistent, authoritative view of what ran.
| def tag_columns(self) -> list[str]: | ||
| """Inferred user tag column names.""" | ||
| status = self._get_status_df() | ||
| return sorted( | ||
| col for col in status.columns | ||
| if not col.startswith("__") | ||
| and not col.startswith("_status_") | ||
| and not col.startswith("_log_") | ||
| and not col.startswith("_tag_") | ||
| and not col.startswith("_tag::") | ||
| and col != "node_label" | ||
| ) | ||
|
|
There was a problem hiding this comment.
tag_columns is inferred exclusively from the status DataFrame. If the root contains only log tables (or status tables are temporarily absent), tag_columns will be empty even though logs may contain user tag columns. Consider falling back to logs schema when status is empty/unavailable, or inferring from whichever table type is present.
There was a problem hiding this comment.
Same reasoning as nodes — status tables are the primary/authoritative data source. The constructor requires at least one Delta table to exist (raises ValueError otherwise), and status() now short-circuits on empty DataFrames. If a results directory has only log tables and no status, tag_columns returns empty and status() returns an empty DataFrame, which is correct behavior — no status means no execution state to report.
| df = self._get_status_df() | ||
| df = self._clean_status_df(df) | ||
|
|
||
| # Deduplicate to latest status per (node, input) | ||
| group_cols = ["node_label"] + self.tag_columns | ||
| group_cols = [c for c in group_cols if c in df.columns] | ||
| df = df.sort("timestamp").unique(subset=group_cols, keep="last") | ||
|
|
There was a problem hiding this comment.
status() assumes a non-empty status table with a "timestamp" column; if the root contains no status tables (but does contain log tables), _get_status_df() returns an empty DataFrame and df.sort("timestamp") will raise. Consider short-circuiting when df is empty / missing required columns and returning an empty DataFrame (ideally with the expected schema).
There was a problem hiding this comment.
Fixed — status() now short-circuits with if df.is_empty(): return df before attempting sort("timestamp"). Added a test (test_empty_status_returns_empty_df) that creates a results directory with only log tables and verifies status() returns an empty DataFrame.
| if node not in self._status_tables: | ||
| raise KeyError( | ||
| f"Node {node!r} not found. Available nodes: {self.nodes}" |
There was a problem hiding this comment.
logs() validates node existence against _status_tables, not against discovered log tables (or the union). This will raise KeyError for nodes that have logs but no status table, and the error message’s Available nodes list may be misleading. Consider checking against self.nodes once nodes includes log-only nodes, or against _log_tables directly.
| if node not in self._status_tables: | |
| raise KeyError( | |
| f"Node {node!r} not found. Available nodes: {self.nodes}" | |
| if node not in self._status_tables and node not in self._log_tables: | |
| available_nodes = sorted( | |
| set(self._status_tables.keys()) | set(self._log_tables.keys()) | |
| ) | |
| raise KeyError( | |
| f"Node {node!r} not found. Available nodes: {available_nodes}" |
There was a problem hiding this comment.
Fixed — logs() now checks both _status_tables and _log_tables before raising KeyError. A node that has logs but no status will still be accessible via logs(). The nodes property intentionally remains status-derived (see comment #2), but logs() is more permissive.
| empty = tmp_path / "empty" | ||
| empty.mkdir() | ||
| with pytest.raises(ValueError, match="No.*Delta.*tables"): | ||
| ObservabilityReader(empty) |
There was a problem hiding this comment.
The new tests cover discovery/tag inference and error cases, but they don’t exercise the main public behaviors: status() (deduplication to latest per tag and CACHED→SUCCESS mapping) and logs() filtering/column cleaning. Adding assertions for those outputs would protect the core functionality this PR introduces.
| ObservabilityReader(empty) | |
| ObservabilityReader(empty) | |
| def test_status_deduplicates_and_maps_cached_to_success( | |
| self, results_root: Path | |
| ): | |
| # Write an additional status table for a dedicated test node so we can | |
| # control the rows used to exercise deduplication and CACHED→SUCCESS mapping. | |
| _write_status_table( | |
| results_root, | |
| pipeline_name="example_pipeline", | |
| node_name="node_x", | |
| rows=[ | |
| # Older row: should be ignored after deduplication. | |
| _status_row( | |
| node_label="node_x", | |
| state="RUNNING", | |
| timestamp="2026-01-01T00:00:00+00:00", | |
| ), | |
| # Newer row with CACHED state: should be selected and mapped to SUCCESS. | |
| _status_row( | |
| node_label="node_x", | |
| state="CACHED", | |
| timestamp="2026-01-01T01:00:00+00:00", | |
| ), | |
| ], | |
| ) | |
| reader = ObservabilityReader(results_root) | |
| # Restrict to the synthetic node_x we just wrote. | |
| df = reader.status(node_labels=["node_x"]) | |
| assert isinstance(df, pl.DataFrame) | |
| # There should be exactly one row for the tag combination after | |
| # deduplication to the latest record. | |
| assert df.height == 1 | |
| # The state of the latest CACHED row should be exposed as SUCCESS. | |
| assert "state" in df.columns | |
| assert df["state"].to_list() == ["SUCCESS"] | |
| def test_logs_filters_internal_columns_and_preserves_tags( | |
| self, results_root: Path | |
| ): | |
| # Write an additional log table for a dedicated test node so we can | |
| # verify that ObservabilityReader.logs() cleans internal columns. | |
| _write_log_table( | |
| results_root, | |
| pipeline_name="example_pipeline", | |
| node_name="node_y", | |
| rows=[ | |
| _log_row( | |
| node_label="node_y", | |
| success=False, | |
| stdout="some stdout", | |
| stderr="some stderr", | |
| python_logs="some python logs", | |
| traceback="Traceback (most recent call last): ...", | |
| ), | |
| ], | |
| ) | |
| reader = ObservabilityReader(results_root) | |
| df = reader.logs(node_labels=["node_y"]) | |
| assert isinstance(df, pl.DataFrame) | |
| # Internal bookkeeping columns should have been removed. | |
| for col in df.columns: | |
| assert not col.startswith("_log_") | |
| assert not col.startswith("_tag::") | |
| assert not col.startswith("__") | |
| # User‑visible tag columns should still be present. | |
| assert "subject" in df.columns | |
| assert "session_date" in df.columns |
There was a problem hiding this comment.
Added 10 new tests covering the core behaviors:
TestStatus (5 tests):
test_deduplicates_to_latest_state— verifies RUNNING rows are deduplicated awaytest_maps_cached_to_success— verifies CACHED → SUCCESS mappingtest_returns_clean_columns— no_status_*,_tag::*,__*columns leak throughtest_includes_failed_with_error_summary— error_summary present on FAILED rowstest_empty_status_returns_empty_df— log-only directory returns empty status
TestLogs (5 tests):
test_returns_clean_columns— no_log_*,_tag::*,__*columns leak; tag columns preservedtest_filters_to_requested_node— only requested node's entries returnedtest_contains_failure_traceback— traceback and stderr content accessibletest_unknown_node_raises— KeyError with available nodes listedtest_node_with_no_logs_returns_empty— node with status but no logs returns empty DataFrame
- Store list[Path] per node to handle multiple Delta tables per node - Short-circuit status() on empty DataFrame to avoid sort errors - logs() validates against both status and log tables - Add tests for status() dedup, CACHED->SUCCESS, column cleanup, failures - Add tests for logs() column cleanup, filtering, traceback content Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds an ObservabilityReader class that allows fetching all statuses and logs from a given pipeline path.