From ded54f6971040e129bca103038af68d71fb062e4 Mon Sep 17 00:00:00 2001 From: Brian Arnold Date: Thu, 26 Mar 2026 01:52:51 +0000 Subject: [PATCH 1/6] feat(resultsreader): add prototype --- src/orcapod/pipeline/results_reader.py | 310 +++++++++++++++++++++ tests/test_pipeline/test_results_reader.py | 174 ++++++++++++ 2 files changed, 484 insertions(+) create mode 100644 src/orcapod/pipeline/results_reader.py create mode 100644 tests/test_pipeline/test_results_reader.py diff --git a/src/orcapod/pipeline/results_reader.py b/src/orcapod/pipeline/results_reader.py new file mode 100644 index 00000000..9e5976a0 --- /dev/null +++ b/src/orcapod/pipeline/results_reader.py @@ -0,0 +1,310 @@ +"""Read-only viewer for pipeline observability results stored in Delta Lake.""" + +from __future__ import annotations + +from pathlib import Path +from typing import ClassVar, TYPE_CHECKING + +import polars as pl + +if TYPE_CHECKING: + from upath import UPath + + +class ResultsReader: + """Auto-discovers and queries pipeline status and log Delta tables. + + Args: + root: Path to the results output directory. Supports local paths, + ``pathlib.Path``, and ``UPath`` for cloud storage. + + Raises: + ValueError: If ``root`` does not exist or contains no Delta tables. + """ + + def __init__(self, root: str | Path | UPath) -> None: + self._root = root if isinstance(root, Path) else Path(root) + if not self._root.exists(): + raise ValueError( + f"Results root does not exist: {self._root}" + ) + + self._status_tables: dict[str, Path] = {} + self._log_tables: dict[str, Path] = {} + self._discover_tables() + + if not self._status_tables and not self._log_tables: + raise ValueError( + f"No observability Delta tables found under: {self._root}" + ) + + self._status_df: pl.DataFrame | None = None + self._logs_df: pl.DataFrame | None = None + + def _discover_tables(self) -> None: + """Find all Delta tables and classify as status or log.""" + for delta_log_dir in self._root.rglob("_delta_log"): + if not delta_log_dir.is_dir(): + continue + table_dir = delta_log_dir.parent + parts = table_dir.relative_to(self._root).parts + + if "status" in parts: + idx = parts.index("status") + if idx + 1 < len(parts): + node_name = parts[idx + 1] + self._status_tables[node_name] = table_dir + elif "logs" in parts: + idx = parts.index("logs") + if idx + 1 < len(parts): + node_name = parts[idx + 1] + self._log_tables[node_name] = table_dir + + @property + def nodes(self) -> list[str]: + """Sorted list of discovered node names.""" + return sorted(self._status_tables.keys()) + + @property + def tag_columns(self) -> list[str]: + """Inferred user tag column names.""" + status = self._get_status_df() + return sorted( + col for col in status.columns + if not col.startswith("__") + and not col.startswith("_status_") + and not col.startswith("_log_") + and not col.startswith("_tag_") + and not col.startswith("_tag::") + and col != "node_label" + ) + + def _get_status_df(self) -> pl.DataFrame: + """Lazy-load and return the concatenated status DataFrame.""" + if self._status_df is None: + frames = [] + for node_name, table_dir in self._status_tables.items(): + df = pl.read_delta(str(table_dir)) + frames.append(df) + if frames: + self._status_df = pl.concat(frames, how="diagonal_relaxed") + else: + self._status_df = pl.DataFrame() + return self._status_df + + def _get_logs_df(self) -> pl.DataFrame: + """Lazy-load and return the concatenated logs DataFrame.""" + if self._logs_df is None: + frames = [] + for node_name, table_dir in self._log_tables.items(): + df = pl.read_delta(str(table_dir)) + frames.append(df) + if frames: + self._logs_df = pl.concat(frames, how="diagonal_relaxed") + else: + self._logs_df = pl.DataFrame() + return self._logs_df + + # -- Column rename mappings ------------------------------------------------ + + _STATUS_RENAMES: ClassVar[dict[str, str]] = { + "_status_node_label": "node_label", + "_status_state": "state", + "_status_timestamp": "timestamp", + "_status_error_summary": "error_summary", + } + + _LOG_RENAMES: ClassVar[dict[str, str]] = { + "_log_node_label": "node_label", + "_log_timestamp": "timestamp", + "_log_success": "success", + "_log_stdout_log": "stdout_log", + "_log_stderr_log": "stderr_log", + "_log_python_logs": "python_logs", + "_log_traceback": "traceback", + } + + _DROP_PREFIXES: ClassVar[tuple[str, ...]] = ("__", "_tag_", "_tag::") + _STATUS_DROP_EXACT: ClassVar[set[str]] = { + "_status_id", "_status_run_id", "_status_pipeline_uri", "_status_node_hash", + } + _LOG_DROP_EXACT: ClassVar[set[str]] = { + "_log_id", "_log_run_id", "_log_node_hash", + } + + _LOG_TERSE_COLUMNS: ClassVar[tuple[str, ...]] = ( + "node_label", "traceback", "success", "timestamp", + ) + + # -- Internal helpers ------------------------------------------------------ + + def _clean_status_df(self, df: pl.DataFrame) -> pl.DataFrame: + """Strip system columns and rename status columns.""" + drop_cols = [ + col for col in df.columns + if any(col.startswith(p) for p in self._DROP_PREFIXES) + or col in self._STATUS_DROP_EXACT + ] + drop_cols.extend( + col for col in df.columns + if col.startswith("_status_") and col not in self._STATUS_RENAMES + ) + df = df.drop([c for c in drop_cols if c in df.columns]) + return df.rename( + {k: v for k, v in self._STATUS_RENAMES.items() if k in df.columns} + ) + + def _clean_logs_df(self, df: pl.DataFrame) -> pl.DataFrame: + """Strip system columns and rename log columns.""" + drop_cols = [ + col for col in df.columns + if any(col.startswith(p) for p in self._DROP_PREFIXES) + or col in self._LOG_DROP_EXACT + ] + drop_cols.extend( + col for col in df.columns + if col.startswith("_log_") and col not in self._LOG_RENAMES + ) + df = df.drop([c for c in drop_cols if c in df.columns]) + return df.rename( + {k: v for k, v in self._LOG_RENAMES.items() if k in df.columns} + ) + + def _validate_node(self, node: str) -> None: + """Raise KeyError if node is not a known node.""" + if node not in self._status_tables: + raise KeyError( + f"Node {node!r} not found. Available nodes: {self.nodes}" + ) + + def _apply_filters( + self, df: pl.DataFrame, filters: dict[str, str], + ) -> pl.DataFrame: + """Filter DataFrame by tag column values.""" + tag_cols = self.tag_columns + for key, value in filters.items(): + if key not in tag_cols: + raise KeyError( + f"Filter column {key!r} is not a tag column. " + f"Available tag columns: {tag_cols}" + ) + df = df.filter(pl.col(key) == value) + return df + + def _deduplicate_status(self, df: pl.DataFrame) -> pl.DataFrame: + """Keep only the latest status row per (node_label, tag_columns).""" + group_cols = ["node_label"] + self.tag_columns + group_cols = [c for c in group_cols if c in df.columns] + return df.sort("timestamp").unique(subset=group_cols, keep="last") + + # -- Public query methods -------------------------------------------------- + + def summary(self, group_by: list[str] | None = None) -> pl.DataFrame: + """Node-level summary with counts by execution state. + + Args: + group_by: Optional tag columns to group by in addition to + ``node_label``. If ``None``, groups by ``node_label`` only. + + Returns: + DataFrame with columns: ``node_label``, state count columns + (``SUCCESS``, ``FAILED``, ``RUNNING``, ``CACHED``), + ``last_updated``, and any ``group_by`` columns. + """ + df = self._get_status_df() + df = self._clean_status_df(df) + df = self._deduplicate_status(df) + + group_cols = ["node_label"] + if group_by: + group_cols.extend(c for c in group_by if c != "node_label") + + return ( + df.group_by(group_cols) + .agg( + *[ + pl.col("state").eq(s).sum().alias(s) + for s in ("SUCCESS", "FAILED", "RUNNING", "CACHED") + ], + pl.col("timestamp").max().alias("last_updated"), + ) + .sort(group_cols) + ) + + def details( + self, node: str | None = None, **filters: str, + ) -> pl.DataFrame: + """Per-input rows showing the latest state for each input. + + Args: + node: Optional node name to filter to. + **filters: Tag column filters, e.g. ``subject="Goliath"``. + + Returns: + DataFrame with columns: ``node_label``, tag columns, ``state``, + ``timestamp``, ``error_summary``. + + Raises: + KeyError: If ``node`` is not found or a filter key is invalid. + """ + df = self._get_status_df() + df = self._clean_status_df(df) + if node is not None: + self._validate_node(node) + df = df.filter(pl.col("node_label") == node) + df = self._apply_filters(df, filters) + return self._deduplicate_status(df) + + def failures( + self, node: str | None = None, **filters: str, + ) -> pl.DataFrame: + """All failed rows across nodes. + + Shorthand for ``details()`` filtered to ``state == "FAILED"``. + + Args: + node: Optional node name to filter to. + **filters: Tag column filters, e.g. ``subject="Goliath"``. + + Returns: + DataFrame with same columns as ``details()``, filtered to failures. + """ + df = self.details(node=node, **filters) + return df.filter(pl.col("state") == "FAILED") + + def logs(self, node: str, **filters: str) -> pl.DataFrame: + """Log entries for a node with terse output. + + Returns ``node_label``, tag columns, ``traceback``, ``success``, + and ``timestamp``. Use ``full_logs()`` for stdout/stderr/python_logs. + + Args: + node: Node name to query. + **filters: Tag column filters, e.g. ``subject="Goliath"``. + + Raises: + KeyError: If ``node`` is not found or a filter key is invalid. + """ + df = self.full_logs(node, **filters) + if df.is_empty(): + return df + keep = list(self._LOG_TERSE_COLUMNS) + self.tag_columns + return df.select([c for c in keep if c in df.columns]) + + def full_logs(self, node: str, **filters: str) -> pl.DataFrame: + """Full log entries for a node including stdout/stderr/python_logs. + + Args: + node: Node name to query. + **filters: Tag column filters, e.g. ``subject="Goliath"``. + + Raises: + KeyError: If ``node`` is not found or a filter key is invalid. + """ + self._validate_node(node) + df = self._get_logs_df() + if df.is_empty(): + return df + df = self._clean_logs_df(df) + df = df.filter(pl.col("node_label") == node) + return self._apply_filters(df, filters) diff --git a/tests/test_pipeline/test_results_reader.py b/tests/test_pipeline/test_results_reader.py new file mode 100644 index 00000000..53a55dbd --- /dev/null +++ b/tests/test_pipeline/test_results_reader.py @@ -0,0 +1,174 @@ +"""Tests for ResultsReader.""" + +from __future__ import annotations + +from pathlib import Path + +import polars as pl +import pytest + +from orcapod.pipeline.results_reader import ResultsReader + + +def _write_status_table( + root: Path, + pipeline_name: str, + node_name: str, + rows: list[dict], +) -> None: + """Write a status Delta table mimicking StatusObserver output.""" + table_dir = ( + root / pipeline_name / "status" / node_name / "hash_a" / "v0" + / "python.function.v0" / "node:hash_b" + ) + table_dir.mkdir(parents=True, exist_ok=True) + df = pl.DataFrame(rows) + # Cast any Null-typed columns to String so Delta Lake accepts them. + for col_name in df.columns: + if df[col_name].dtype == pl.Null: + df = df.with_columns(pl.col(col_name).cast(pl.String)) + df.write_delta(str(table_dir)) + + +def _write_log_table( + root: Path, + pipeline_name: str, + node_name: str, + rows: list[dict], +) -> None: + """Write a log Delta table mimicking LoggingObserver output.""" + table_dir = ( + root / pipeline_name / "logs" / node_name / "hash_a" / "v0" + / "python.function.v0" / "node:hash_b" + ) + table_dir.mkdir(parents=True, exist_ok=True) + df = pl.DataFrame(rows) + # Cast any Null-typed columns to String so Delta Lake accepts them. + for col_name in df.columns: + if df[col_name].dtype == pl.Null: + df = df.with_columns(pl.col(col_name).cast(pl.String)) + df.write_delta(str(table_dir)) + + +def _status_row( + node_label: str, + state: str, + *, + subject: str = "subj_A", + session_date: str = "2026-01-01", + error_summary: str | None = None, + timestamp: str = "2026-01-01T00:00:00+00:00", +) -> dict: + """Build a single status row dict.""" + return { + "__record_id": f"rec_{node_label}_{state}_{subject}", + "_status_id": f"sid_{node_label}_{state}_{subject}", + "_status_run_id": "run_001", + "_status_pipeline_uri": "test_pipeline@abc123", + "_status_node_label": node_label, + "_status_node_hash": f"hash_{node_label}", + "_status_state": state, + "_status_timestamp": timestamp, + "_status_error_summary": error_summary, + "subject": subject, + "session_date": session_date, + "_tag::source_id::abc123:0": "tag_val", + "_tag::record_id::abc123:0": "tag_rec", + } + + +def _log_row( + node_label: str, + *, + success: bool = True, + subject: str = "subj_A", + session_date: str = "2026-01-01", + traceback: str | None = None, + stdout: str = "", + stderr: str = "", + python_logs: str = "", + timestamp: str = "2026-01-01T00:00:00+00:00", +) -> dict: + """Build a single log row dict.""" + return { + "__record_id": f"rec_{node_label}_{subject}", + "_log_id": f"lid_{node_label}_{subject}", + "_log_run_id": "run_001", + "_log_node_label": node_label, + "_log_node_hash": f"hash_{node_label}", + "_log_stdout_log": stdout, + "_log_stderr_log": stderr, + "_log_python_logs": python_logs, + "_log_traceback": traceback, + "_log_success": success, + "_log_timestamp": timestamp, + "subject": subject, + "session_date": session_date, + "_tag::source_id::abc123:0": "tag_val", + "_tag::record_id::abc123:0": "tag_rec", + } + + +@pytest.fixture() +def results_root(tmp_path: Path) -> Path: + """Create a realistic results directory with status and log tables.""" + root = tmp_path / "results_out" / "op_pipeline" + + # Node A: 2 inputs, both succeed + _write_status_table(root, "my_pipeline", "node_a", [ + _status_row("node_a", "RUNNING", subject="subj_A", timestamp="2026-01-01T00:00:01+00:00"), + _status_row("node_a", "SUCCESS", subject="subj_A", timestamp="2026-01-01T00:00:02+00:00"), + _status_row("node_a", "RUNNING", subject="subj_B", timestamp="2026-01-01T00:00:03+00:00"), + _status_row("node_a", "SUCCESS", subject="subj_B", timestamp="2026-01-01T00:00:04+00:00"), + ]) + _write_log_table(root, "my_pipeline", "node_a", [ + _log_row("node_a", subject="subj_A"), + _log_row("node_a", subject="subj_B"), + ]) + + # Node B: 2 inputs, one succeeds, one fails + _write_status_table(root, "my_pipeline", "node_b", [ + _status_row("node_b", "RUNNING", subject="subj_A", timestamp="2026-01-01T00:01:01+00:00"), + _status_row("node_b", "SUCCESS", subject="subj_A", timestamp="2026-01-01T00:01:02+00:00"), + _status_row("node_b", "RUNNING", subject="subj_B", timestamp="2026-01-01T00:01:03+00:00"), + _status_row( + "node_b", "FAILED", subject="subj_B", + timestamp="2026-01-01T00:01:04+00:00", + error_summary="ValueError: bad input", + ), + ]) + _write_log_table(root, "my_pipeline", "node_b", [ + _log_row("node_b", subject="subj_A"), + _log_row( + "node_b", subject="subj_B", success=False, + traceback="Traceback (most recent call last):\n ...\nValueError: bad input", + stderr="Error processing subj_B", + ), + ]) + + # Node C: status only, no logs (e.g. still running or logs not yet written) + _write_status_table(root, "my_pipeline", "node_c", [ + _status_row("node_c", "RUNNING", subject="subj_A", timestamp="2026-01-01T00:02:01+00:00"), + ]) + + return tmp_path / "results_out" + + +class TestDiscovery: + def test_discovers_nodes(self, results_root: Path): + reader = ResultsReader(results_root) + assert reader.nodes == ["node_a", "node_b", "node_c"] + + def test_discovers_tag_columns(self, results_root: Path): + reader = ResultsReader(results_root) + assert reader.tag_columns == ["session_date", "subject"] + + def test_raises_on_missing_root(self, tmp_path: Path): + with pytest.raises(ValueError, match="does not exist"): + ResultsReader(tmp_path / "nonexistent") + + def test_raises_on_empty_root(self, tmp_path: Path): + empty = tmp_path / "empty" + empty.mkdir() + with pytest.raises(ValueError, match="No.*Delta.*tables"): + ResultsReader(empty) From a2f0094303bc8654e03dc7f161c4c9cf676467e9 Mon Sep 17 00:00:00 2001 From: Brian Arnold Date: Fri, 27 Mar 2026 23:27:42 +0000 Subject: [PATCH 2/6] feat(resultsreader): change summary to status) --- src/orcapod/pipeline/results_reader.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/orcapod/pipeline/results_reader.py b/src/orcapod/pipeline/results_reader.py index 9e5976a0..16b22ebd 100644 --- a/src/orcapod/pipeline/results_reader.py +++ b/src/orcapod/pipeline/results_reader.py @@ -199,17 +199,19 @@ def _deduplicate_status(self, df: pl.DataFrame) -> pl.DataFrame: # -- Public query methods -------------------------------------------------- - def summary(self, group_by: list[str] | None = None) -> pl.DataFrame: - """Node-level summary with counts by execution state. + def status(self, group_by: list[str] | None = None) -> pl.DataFrame: + """Node-level status overview with counts by execution state. + + CACHED results are counted as SUCCESS since they represent + previously computed successful results. Args: group_by: Optional tag columns to group by in addition to ``node_label``. If ``None``, groups by ``node_label`` only. Returns: - DataFrame with columns: ``node_label``, state count columns - (``SUCCESS``, ``FAILED``, ``RUNNING``, ``CACHED``), - ``last_updated``, and any ``group_by`` columns. + DataFrame with columns: ``node_label``, ``SUCCESS``, ``FAILED``, + ``RUNNING``, ``last_updated``, and any ``group_by`` columns. """ df = self._get_status_df() df = self._clean_status_df(df) @@ -222,10 +224,9 @@ def summary(self, group_by: list[str] | None = None) -> pl.DataFrame: return ( df.group_by(group_cols) .agg( - *[ - pl.col("state").eq(s).sum().alias(s) - for s in ("SUCCESS", "FAILED", "RUNNING", "CACHED") - ], + pl.col("state").is_in(["SUCCESS", "CACHED"]).sum().alias("SUCCESS"), + pl.col("state").eq("FAILED").sum().alias("FAILED"), + pl.col("state").eq("RUNNING").sum().alias("RUNNING"), pl.col("timestamp").max().alias("last_updated"), ) .sort(group_cols) From ddd7d3ffba90bd1bda6879d31b894b192bac3f85 Mon Sep 17 00:00:00 2001 From: Brian Arnold Date: Sat, 28 Mar 2026 01:56:47 +0000 Subject: [PATCH 3/6] Simplify ResultsReader --- src/orcapod/pipeline/results_reader.py | 157 ++++++------------------- 1 file changed, 38 insertions(+), 119 deletions(-) diff --git a/src/orcapod/pipeline/results_reader.py b/src/orcapod/pipeline/results_reader.py index 16b22ebd..429a19eb 100644 --- a/src/orcapod/pipeline/results_reader.py +++ b/src/orcapod/pipeline/results_reader.py @@ -14,6 +14,10 @@ class ResultsReader: """Auto-discovers and queries pipeline status and log Delta tables. + Provides two DataFrames: ``status()`` for execution state and + ``logs()`` for execution logs. Both return clean polars DataFrames + ready for further analysis with standard polars operations. + Args: root: Path to the results output directory. Supports local paths, ``pathlib.Path``, and ``UPath`` for cloud storage. @@ -132,10 +136,6 @@ def _get_logs_df(self) -> pl.DataFrame: "_log_id", "_log_run_id", "_log_node_hash", } - _LOG_TERSE_COLUMNS: ClassVar[tuple[str, ...]] = ( - "node_label", "traceback", "success", "timestamp", - ) - # -- Internal helpers ------------------------------------------------------ def _clean_status_df(self, df: pl.DataFrame) -> pl.DataFrame: @@ -170,142 +170,61 @@ def _clean_logs_df(self, df: pl.DataFrame) -> pl.DataFrame: {k: v for k, v in self._LOG_RENAMES.items() if k in df.columns} ) - def _validate_node(self, node: str) -> None: - """Raise KeyError if node is not a known node.""" - if node not in self._status_tables: - raise KeyError( - f"Node {node!r} not found. Available nodes: {self.nodes}" - ) - - def _apply_filters( - self, df: pl.DataFrame, filters: dict[str, str], - ) -> pl.DataFrame: - """Filter DataFrame by tag column values.""" - tag_cols = self.tag_columns - for key, value in filters.items(): - if key not in tag_cols: - raise KeyError( - f"Filter column {key!r} is not a tag column. " - f"Available tag columns: {tag_cols}" - ) - df = df.filter(pl.col(key) == value) - return df - - def _deduplicate_status(self, df: pl.DataFrame) -> pl.DataFrame: - """Keep only the latest status row per (node_label, tag_columns).""" - group_cols = ["node_label"] + self.tag_columns - group_cols = [c for c in group_cols if c in df.columns] - return df.sort("timestamp").unique(subset=group_cols, keep="last") - # -- Public query methods -------------------------------------------------- - def status(self, group_by: list[str] | None = None) -> pl.DataFrame: - """Node-level status overview with counts by execution state. + def status(self) -> pl.DataFrame: + """Latest execution status for every (node, input) combination. - CACHED results are counted as SUCCESS since they represent + Returns one row per (node, input) with the most recent status. + CACHED states are mapped to SUCCESS since they represent previously computed successful results. - Args: - group_by: Optional tag columns to group by in addition to - ``node_label``. If ``None``, groups by ``node_label`` only. - Returns: - DataFrame with columns: ``node_label``, ``SUCCESS``, ``FAILED``, - ``RUNNING``, ``last_updated``, and any ``group_by`` columns. + DataFrame with columns: ``node_label``, tag columns, + ``state``, ``timestamp``, ``error_summary``. """ df = self._get_status_df() df = self._clean_status_df(df) - df = self._deduplicate_status(df) - - group_cols = ["node_label"] - if group_by: - group_cols.extend(c for c in group_by if c != "node_label") - - return ( - df.group_by(group_cols) - .agg( - pl.col("state").is_in(["SUCCESS", "CACHED"]).sum().alias("SUCCESS"), - pl.col("state").eq("FAILED").sum().alias("FAILED"), - pl.col("state").eq("RUNNING").sum().alias("RUNNING"), - pl.col("timestamp").max().alias("last_updated"), - ) - .sort(group_cols) - ) - - def details( - self, node: str | None = None, **filters: str, - ) -> pl.DataFrame: - """Per-input rows showing the latest state for each input. - Args: - node: Optional node name to filter to. - **filters: Tag column filters, e.g. ``subject="Goliath"``. - - Returns: - DataFrame with columns: ``node_label``, tag columns, ``state``, - ``timestamp``, ``error_summary``. + # Deduplicate to latest status per (node, input) + group_cols = ["node_label"] + self.tag_columns + group_cols = [c for c in group_cols if c in df.columns] + df = df.sort("timestamp").unique(subset=group_cols, keep="last") + + # Map CACHED -> SUCCESS + df = df.with_columns( + pl.when(pl.col("state") == "CACHED") + .then(pl.lit("SUCCESS")) + .otherwise(pl.col("state")) + .alias("state") + ) - Raises: - KeyError: If ``node`` is not found or a filter key is invalid. - """ - df = self._get_status_df() - df = self._clean_status_df(df) - if node is not None: - self._validate_node(node) - df = df.filter(pl.col("node_label") == node) - df = self._apply_filters(df, filters) - return self._deduplicate_status(df) + return df - def failures( - self, node: str | None = None, **filters: str, - ) -> pl.DataFrame: - """All failed rows across nodes. + def logs(self, node: str) -> pl.DataFrame: + """Full log entries for a node. - Shorthand for ``details()`` filtered to ``state == "FAILED"``. + Returns all log fields: stdout, stderr, python logs, traceback, + success status, and timestamp, alongside tag columns. Args: - node: Optional node name to filter to. - **filters: Tag column filters, e.g. ``subject="Goliath"``. + node: Node name to query. Use ``reader.nodes`` to see + available names. Returns: - DataFrame with same columns as ``details()``, filtered to failures. - """ - df = self.details(node=node, **filters) - return df.filter(pl.col("state") == "FAILED") - - def logs(self, node: str, **filters: str) -> pl.DataFrame: - """Log entries for a node with terse output. - - Returns ``node_label``, tag columns, ``traceback``, ``success``, - and ``timestamp``. Use ``full_logs()`` for stdout/stderr/python_logs. - - Args: - node: Node name to query. - **filters: Tag column filters, e.g. ``subject="Goliath"``. - - Raises: - KeyError: If ``node`` is not found or a filter key is invalid. - """ - df = self.full_logs(node, **filters) - if df.is_empty(): - return df - keep = list(self._LOG_TERSE_COLUMNS) + self.tag_columns - return df.select([c for c in keep if c in df.columns]) - - def full_logs(self, node: str, **filters: str) -> pl.DataFrame: - """Full log entries for a node including stdout/stderr/python_logs. - - Args: - node: Node name to query. - **filters: Tag column filters, e.g. ``subject="Goliath"``. + DataFrame with columns: ``node_label``, tag columns, + ``stdout_log``, ``stderr_log``, ``python_logs``, + ``traceback``, ``success``, ``timestamp``. Raises: - KeyError: If ``node`` is not found or a filter key is invalid. + KeyError: If ``node`` is not found. """ - self._validate_node(node) + if node not in self._status_tables: + raise KeyError( + f"Node {node!r} not found. Available nodes: {self.nodes}" + ) df = self._get_logs_df() if df.is_empty(): return df df = self._clean_logs_df(df) - df = df.filter(pl.col("node_label") == node) - return self._apply_filters(df, filters) + return df.filter(pl.col("node_label") == node) From 95367573e88ca86b3ad463931c9a1a33b0cfe41f Mon Sep 17 00:00:00 2001 From: Brian Arnold Date: Mon, 30 Mar 2026 18:17:05 +0000 Subject: [PATCH 4/6] refactor(results_reader): rename ResultsReader to ObservabilityReade --- src/orcapod/pipeline/results_reader.py | 2 +- tests/test_pipeline/test_results_reader.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/orcapod/pipeline/results_reader.py b/src/orcapod/pipeline/results_reader.py index 429a19eb..6ec5108f 100644 --- a/src/orcapod/pipeline/results_reader.py +++ b/src/orcapod/pipeline/results_reader.py @@ -11,7 +11,7 @@ from upath import UPath -class ResultsReader: +class ObservabilityReader: """Auto-discovers and queries pipeline status and log Delta tables. Provides two DataFrames: ``status()`` for execution state and diff --git a/tests/test_pipeline/test_results_reader.py b/tests/test_pipeline/test_results_reader.py index 53a55dbd..50e7cac2 100644 --- a/tests/test_pipeline/test_results_reader.py +++ b/tests/test_pipeline/test_results_reader.py @@ -1,4 +1,4 @@ -"""Tests for ResultsReader.""" +"""Tests for ObservabilityReader.""" from __future__ import annotations @@ -7,7 +7,7 @@ import polars as pl import pytest -from orcapod.pipeline.results_reader import ResultsReader +from orcapod.pipeline.results_reader import ObservabilityReader def _write_status_table( @@ -156,19 +156,19 @@ def results_root(tmp_path: Path) -> Path: class TestDiscovery: def test_discovers_nodes(self, results_root: Path): - reader = ResultsReader(results_root) + reader = ObservabilityReader(results_root) assert reader.nodes == ["node_a", "node_b", "node_c"] def test_discovers_tag_columns(self, results_root: Path): - reader = ResultsReader(results_root) + reader = ObservabilityReader(results_root) assert reader.tag_columns == ["session_date", "subject"] def test_raises_on_missing_root(self, tmp_path: Path): with pytest.raises(ValueError, match="does not exist"): - ResultsReader(tmp_path / "nonexistent") + ObservabilityReader(tmp_path / "nonexistent") def test_raises_on_empty_root(self, tmp_path: Path): empty = tmp_path / "empty" empty.mkdir() with pytest.raises(ValueError, match="No.*Delta.*tables"): - ResultsReader(empty) + ObservabilityReader(empty) From 5cab032192b7325e700594033c80c845ae198803 Mon Sep 17 00:00:00 2001 From: Brian Arnold Date: Mon, 30 Mar 2026 18:53:04 +0000 Subject: [PATCH 5/6] Rename source files --- .../pipeline/{results_reader.py => observability_reader.py} | 0 .../{test_results_reader.py => test_observability_reader.py} | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename src/orcapod/pipeline/{results_reader.py => observability_reader.py} (100%) rename tests/test_pipeline/{test_results_reader.py => test_observability_reader.py} (98%) diff --git a/src/orcapod/pipeline/results_reader.py b/src/orcapod/pipeline/observability_reader.py similarity index 100% rename from src/orcapod/pipeline/results_reader.py rename to src/orcapod/pipeline/observability_reader.py diff --git a/tests/test_pipeline/test_results_reader.py b/tests/test_pipeline/test_observability_reader.py similarity index 98% rename from tests/test_pipeline/test_results_reader.py rename to tests/test_pipeline/test_observability_reader.py index 50e7cac2..b9be1a4d 100644 --- a/tests/test_pipeline/test_results_reader.py +++ b/tests/test_pipeline/test_observability_reader.py @@ -7,7 +7,7 @@ import polars as pl import pytest -from orcapod.pipeline.results_reader import ObservabilityReader +from orcapod.pipeline.observability_reader import ObservabilityReader def _write_status_table( From 79d7940a219fac05bdafc01cc483b706a2b2bf2b Mon Sep 17 00:00:00 2001 From: Brian Arnold Date: Mon, 30 Mar 2026 21:43:07 +0000 Subject: [PATCH 6/6] fix(observability_reader): address PR review feedback - Store list[Path] per node to handle multiple Delta tables per node - Short-circuit status() on empty DataFrame to avoid sort errors - logs() validates against both status and log tables - Add tests for status() dedup, CACHED->SUCCESS, column cleanup, failures - Add tests for logs() column cleanup, filtering, traceback content Co-Authored-By: Claude Opus 4.6 (1M context) --- src/orcapod/pipeline/observability_reader.py | 28 +++--- .../test_observability_reader.py | 92 +++++++++++++++++++ 2 files changed, 109 insertions(+), 11 deletions(-) diff --git a/src/orcapod/pipeline/observability_reader.py b/src/orcapod/pipeline/observability_reader.py index 6ec5108f..cf752830 100644 --- a/src/orcapod/pipeline/observability_reader.py +++ b/src/orcapod/pipeline/observability_reader.py @@ -33,8 +33,8 @@ def __init__(self, root: str | Path | UPath) -> None: f"Results root does not exist: {self._root}" ) - self._status_tables: dict[str, Path] = {} - self._log_tables: dict[str, Path] = {} + self._status_tables: dict[str, list[Path]] = {} + self._log_tables: dict[str, list[Path]] = {} self._discover_tables() if not self._status_tables and not self._log_tables: @@ -57,12 +57,16 @@ def _discover_tables(self) -> None: idx = parts.index("status") if idx + 1 < len(parts): node_name = parts[idx + 1] - self._status_tables[node_name] = table_dir + self._status_tables.setdefault(node_name, []).append( + table_dir + ) elif "logs" in parts: idx = parts.index("logs") if idx + 1 < len(parts): node_name = parts[idx + 1] - self._log_tables[node_name] = table_dir + self._log_tables.setdefault(node_name, []).append( + table_dir + ) @property def nodes(self) -> list[str]: @@ -87,9 +91,9 @@ def _get_status_df(self) -> pl.DataFrame: """Lazy-load and return the concatenated status DataFrame.""" if self._status_df is None: frames = [] - for node_name, table_dir in self._status_tables.items(): - df = pl.read_delta(str(table_dir)) - frames.append(df) + for table_dirs in self._status_tables.values(): + for table_dir in table_dirs: + frames.append(pl.read_delta(str(table_dir))) if frames: self._status_df = pl.concat(frames, how="diagonal_relaxed") else: @@ -100,9 +104,9 @@ def _get_logs_df(self) -> pl.DataFrame: """Lazy-load and return the concatenated logs DataFrame.""" if self._logs_df is None: frames = [] - for node_name, table_dir in self._log_tables.items(): - df = pl.read_delta(str(table_dir)) - frames.append(df) + for table_dirs in self._log_tables.values(): + for table_dir in table_dirs: + frames.append(pl.read_delta(str(table_dir))) if frames: self._logs_df = pl.concat(frames, how="diagonal_relaxed") else: @@ -184,6 +188,8 @@ def status(self) -> pl.DataFrame: ``state``, ``timestamp``, ``error_summary``. """ df = self._get_status_df() + if df.is_empty(): + return df df = self._clean_status_df(df) # Deduplicate to latest status per (node, input) @@ -219,7 +225,7 @@ def logs(self, node: str) -> pl.DataFrame: Raises: KeyError: If ``node`` is not found. """ - if node not in self._status_tables: + if node not in self._status_tables and node not in self._log_tables: raise KeyError( f"Node {node!r} not found. Available nodes: {self.nodes}" ) diff --git a/tests/test_pipeline/test_observability_reader.py b/tests/test_pipeline/test_observability_reader.py index b9be1a4d..0c10fdcc 100644 --- a/tests/test_pipeline/test_observability_reader.py +++ b/tests/test_pipeline/test_observability_reader.py @@ -172,3 +172,95 @@ def test_raises_on_empty_root(self, tmp_path: Path): empty.mkdir() with pytest.raises(ValueError, match="No.*Delta.*tables"): ObservabilityReader(empty) + + +class TestStatus: + def test_deduplicates_to_latest_state(self, results_root: Path): + reader = ObservabilityReader(results_root) + df = reader.status() + # node_a has RUNNING then SUCCESS for each input — only SUCCESS should remain + node_a = df.filter(pl.col("node_label") == "node_a") + assert len(node_a) == 2 + assert all(s == "SUCCESS" for s in node_a["state"].to_list()) + + def test_maps_cached_to_success(self, tmp_path: Path): + root = tmp_path / "results_out" / "pipeline" + _write_status_table(root, "p", "cached_node", [ + _status_row("cached_node", "RUNNING", timestamp="2026-01-01T00:00:01+00:00"), + _status_row("cached_node", "CACHED", timestamp="2026-01-01T00:00:02+00:00"), + ]) + reader = ObservabilityReader(tmp_path / "results_out") + df = reader.status() + assert len(df) == 1 + assert df["state"].item() == "SUCCESS" + + def test_returns_clean_columns(self, results_root: Path): + reader = ObservabilityReader(results_root) + df = reader.status() + assert "node_label" in df.columns + assert "state" in df.columns + assert "timestamp" in df.columns + assert "error_summary" in df.columns + for col in df.columns: + assert not col.startswith("_status_") + assert not col.startswith("_tag::") + assert not col.startswith("_tag_") + assert not col.startswith("__") + + def test_includes_failed_with_error_summary(self, results_root: Path): + reader = ObservabilityReader(results_root) + failed = reader.status().filter(pl.col("state") == "FAILED") + assert len(failed) == 1 + assert failed["node_label"].item() == "node_b" + assert failed["subject"].item() == "subj_B" + assert "ValueError" in failed["error_summary"].item() + + def test_empty_status_returns_empty_df(self, tmp_path: Path): + root = tmp_path / "results_out" / "pipeline" + _write_log_table(root, "p", "log_only_node", [ + _log_row("log_only_node"), + ]) + reader = ObservabilityReader(tmp_path / "results_out") + df = reader.status() + assert df.is_empty() + + +class TestLogs: + def test_returns_clean_columns(self, results_root: Path): + reader = ObservabilityReader(results_root) + df = reader.logs("node_a") + assert "node_label" in df.columns + assert "traceback" in df.columns + assert "success" in df.columns + assert "stdout_log" in df.columns + assert "stderr_log" in df.columns + assert "timestamp" in df.columns + assert "subject" in df.columns + assert "session_date" in df.columns + for col in df.columns: + assert not col.startswith("_log_") + assert not col.startswith("_tag::") + assert not col.startswith("_tag_") + assert not col.startswith("__") + + def test_filters_to_requested_node(self, results_root: Path): + reader = ObservabilityReader(results_root) + df = reader.logs("node_b") + assert all(n == "node_b" for n in df["node_label"].to_list()) + + def test_contains_failure_traceback(self, results_root: Path): + reader = ObservabilityReader(results_root) + df = reader.logs("node_b").filter(pl.col("success") == False) + assert len(df) == 1 + assert "ValueError: bad input" in df["traceback"].item() + assert df["stderr_log"].item() == "Error processing subj_B" + + def test_unknown_node_raises(self, results_root: Path): + reader = ObservabilityReader(results_root) + with pytest.raises(KeyError, match="not_a_node"): + reader.logs("not_a_node") + + def test_node_with_no_logs_returns_empty(self, results_root: Path): + reader = ObservabilityReader(results_root) + df = reader.logs("node_c") + assert df.is_empty()