Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 236 additions & 0 deletions src/orcapod/pipeline/observability_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
"""Read-only viewer for pipeline observability results stored in Delta Lake."""

from __future__ import annotations

from pathlib import Path
from typing import ClassVar, TYPE_CHECKING

import polars as pl

if TYPE_CHECKING:
from upath import UPath


class ObservabilityReader:
"""Auto-discovers and queries pipeline status and log Delta tables.

Provides two DataFrames: ``status()`` for execution state and
``logs()`` for execution logs. Both return clean polars DataFrames
ready for further analysis with standard polars operations.

Args:
root: Path to the results output directory. Supports local paths,
``pathlib.Path``, and ``UPath`` for cloud storage.

Raises:
ValueError: If ``root`` does not exist or contains no Delta tables.
"""

def __init__(self, root: str | Path | UPath) -> None:
self._root = root if isinstance(root, Path) else Path(root)
if not self._root.exists():
raise ValueError(
f"Results root does not exist: {self._root}"
)

self._status_tables: dict[str, list[Path]] = {}
self._log_tables: dict[str, list[Path]] = {}
self._discover_tables()

if not self._status_tables and not self._log_tables:
raise ValueError(
f"No observability Delta tables found under: {self._root}"
)

self._status_df: pl.DataFrame | None = None
self._logs_df: pl.DataFrame | None = None

def _discover_tables(self) -> None:
"""Find all Delta tables and classify as status or log."""
for delta_log_dir in self._root.rglob("_delta_log"):
if not delta_log_dir.is_dir():
continue
table_dir = delta_log_dir.parent
parts = table_dir.relative_to(self._root).parts

if "status" in parts:
idx = parts.index("status")
if idx + 1 < len(parts):
node_name = parts[idx + 1]
self._status_tables.setdefault(node_name, []).append(
table_dir
)
elif "logs" in parts:
idx = parts.index("logs")
if idx + 1 < len(parts):
node_name = parts[idx + 1]
self._log_tables.setdefault(node_name, []).append(
table_dir
)

Comment on lines +56 to +70
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_discover_tables stores only a single table_dir per node_name; if multiple Delta tables exist for the same node (e.g., different schema_hash / node hash, multiple pipeline versions), later discoveries overwrite earlier ones and data is silently dropped. Consider storing a list of table dirs per node and concatenating all of them (or selecting deterministically by version/timestamp).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — _discover_tables now stores a list[Path] per node name, and _get_status_df/_get_logs_df iterate over all tables for each node before concatenating. This handles multiple Delta tables per node (e.g. different schema hashes or pipeline versions).

@property
def nodes(self) -> list[str]:
"""Sorted list of discovered node names."""
return sorted(self._status_tables.keys())
Comment on lines +73 to +74
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodes currently returns only status-table node names. If a results directory contains logs but no status for a node (or only logs at all), that node will be omitted from nodes even though logs() could potentially serve it. Consider returning the sorted union of status+log discovered nodes (and keeping ordering stable).

Suggested change
"""Sorted list of discovered node names."""
return sorted(self._status_tables.keys())
"""Sorted list of discovered node names (from status and logs)."""
return sorted(set(self._status_tables) | set(self._log_tables))

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By design — nodes is derived from status tables because the StatusObserver emits status for every node the orchestrator executes. A node without status entries was not executed. Logs are secondary — they capture stdout/stderr but are only written after packet execution completes, so a node could have status but no logs (e.g. still running). Keeping nodes status-derived gives a consistent, authoritative view of what ran.


@property
def tag_columns(self) -> list[str]:
"""Inferred user tag column names."""
status = self._get_status_df()
return sorted(
col for col in status.columns
if not col.startswith("__")
and not col.startswith("_status_")
and not col.startswith("_log_")
and not col.startswith("_tag_")
and not col.startswith("_tag::")
and col != "node_label"
)

Comment on lines +77 to +89
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tag_columns is inferred exclusively from the status DataFrame. If the root contains only log tables (or status tables are temporarily absent), tag_columns will be empty even though logs may contain user tag columns. Consider falling back to logs schema when status is empty/unavailable, or inferring from whichever table type is present.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reasoning as nodes — status tables are the primary/authoritative data source. The constructor requires at least one Delta table to exist (raises ValueError otherwise), and status() now short-circuits on empty DataFrames. If a results directory has only log tables and no status, tag_columns returns empty and status() returns an empty DataFrame, which is correct behavior — no status means no execution state to report.

def _get_status_df(self) -> pl.DataFrame:
"""Lazy-load and return the concatenated status DataFrame."""
if self._status_df is None:
frames = []
for table_dirs in self._status_tables.values():
for table_dir in table_dirs:
frames.append(pl.read_delta(str(table_dir)))
if frames:
self._status_df = pl.concat(frames, how="diagonal_relaxed")
else:
self._status_df = pl.DataFrame()
return self._status_df

def _get_logs_df(self) -> pl.DataFrame:
"""Lazy-load and return the concatenated logs DataFrame."""
if self._logs_df is None:
frames = []
for table_dirs in self._log_tables.values():
for table_dir in table_dirs:
frames.append(pl.read_delta(str(table_dir)))
if frames:
self._logs_df = pl.concat(frames, how="diagonal_relaxed")
else:
self._logs_df = pl.DataFrame()
return self._logs_df

# -- Column rename mappings ------------------------------------------------

_STATUS_RENAMES: ClassVar[dict[str, str]] = {
"_status_node_label": "node_label",
"_status_state": "state",
"_status_timestamp": "timestamp",
"_status_error_summary": "error_summary",
}

_LOG_RENAMES: ClassVar[dict[str, str]] = {
"_log_node_label": "node_label",
"_log_timestamp": "timestamp",
"_log_success": "success",
"_log_stdout_log": "stdout_log",
"_log_stderr_log": "stderr_log",
"_log_python_logs": "python_logs",
"_log_traceback": "traceback",
}

_DROP_PREFIXES: ClassVar[tuple[str, ...]] = ("__", "_tag_", "_tag::")
_STATUS_DROP_EXACT: ClassVar[set[str]] = {
"_status_id", "_status_run_id", "_status_pipeline_uri", "_status_node_hash",
}
_LOG_DROP_EXACT: ClassVar[set[str]] = {
"_log_id", "_log_run_id", "_log_node_hash",
}

# -- Internal helpers ------------------------------------------------------

def _clean_status_df(self, df: pl.DataFrame) -> pl.DataFrame:
"""Strip system columns and rename status columns."""
drop_cols = [
col for col in df.columns
if any(col.startswith(p) for p in self._DROP_PREFIXES)
or col in self._STATUS_DROP_EXACT
]
drop_cols.extend(
col for col in df.columns
if col.startswith("_status_") and col not in self._STATUS_RENAMES
)
df = df.drop([c for c in drop_cols if c in df.columns])
return df.rename(
{k: v for k, v in self._STATUS_RENAMES.items() if k in df.columns}
)

def _clean_logs_df(self, df: pl.DataFrame) -> pl.DataFrame:
"""Strip system columns and rename log columns."""
drop_cols = [
col for col in df.columns
if any(col.startswith(p) for p in self._DROP_PREFIXES)
or col in self._LOG_DROP_EXACT
]
drop_cols.extend(
col for col in df.columns
if col.startswith("_log_") and col not in self._LOG_RENAMES
)
df = df.drop([c for c in drop_cols if c in df.columns])
return df.rename(
{k: v for k, v in self._LOG_RENAMES.items() if k in df.columns}
)

# -- Public query methods --------------------------------------------------

def status(self) -> pl.DataFrame:
"""Latest execution status for every (node, input) combination.

Returns one row per (node, input) with the most recent status.
CACHED states are mapped to SUCCESS since they represent
previously computed successful results.

Returns:
DataFrame with columns: ``node_label``, tag columns,
``state``, ``timestamp``, ``error_summary``.
"""
df = self._get_status_df()
if df.is_empty():
return df
df = self._clean_status_df(df)

# Deduplicate to latest status per (node, input)
group_cols = ["node_label"] + self.tag_columns
group_cols = [c for c in group_cols if c in df.columns]
df = df.sort("timestamp").unique(subset=group_cols, keep="last")

Comment on lines +190 to +199
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

status() assumes a non-empty status table with a "timestamp" column; if the root contains no status tables (but does contain log tables), _get_status_df() returns an empty DataFrame and df.sort("timestamp") will raise. Consider short-circuiting when df is empty / missing required columns and returning an empty DataFrame (ideally with the expected schema).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — status() now short-circuits with if df.is_empty(): return df before attempting sort("timestamp"). Added a test (test_empty_status_returns_empty_df) that creates a results directory with only log tables and verifies status() returns an empty DataFrame.

# Map CACHED -> SUCCESS
df = df.with_columns(
pl.when(pl.col("state") == "CACHED")
.then(pl.lit("SUCCESS"))
.otherwise(pl.col("state"))
.alias("state")
)

return df

def logs(self, node: str) -> pl.DataFrame:
"""Full log entries for a node.

Returns all log fields: stdout, stderr, python logs, traceback,
success status, and timestamp, alongside tag columns.

Args:
node: Node name to query. Use ``reader.nodes`` to see
available names.

Returns:
DataFrame with columns: ``node_label``, tag columns,
``stdout_log``, ``stderr_log``, ``python_logs``,
``traceback``, ``success``, ``timestamp``.

Raises:
KeyError: If ``node`` is not found.
"""
if node not in self._status_tables and node not in self._log_tables:
raise KeyError(
f"Node {node!r} not found. Available nodes: {self.nodes}"
)
df = self._get_logs_df()
if df.is_empty():
return df
df = self._clean_logs_df(df)
return df.filter(pl.col("node_label") == node)
Loading
Loading