diff --git a/src/orcapod/databases/connector_arrow_database.py b/src/orcapod/databases/connector_arrow_database.py index 0a044928..99962f3f 100644 --- a/src/orcapod/databases/connector_arrow_database.py +++ b/src/orcapod/databases/connector_arrow_database.py @@ -62,20 +62,25 @@ def __init__( self, connector: DBConnectorProtocol, max_hierarchy_depth: int = 10, + _path_prefix: tuple[str, ...] = (), + _shared_pending_batches: "dict[str, pa.Table] | None" = None, + _shared_pending_record_ids: "dict[str, set[str]] | None" = None, + _shared_pending_skip_existing: "dict[str, bool] | None" = None, ) -> None: self._connector = connector self.max_hierarchy_depth = max_hierarchy_depth - self._pending_batches: dict[str, pa.Table] = {} - self._pending_record_ids: dict[str, set[str]] = defaultdict(set) + self._path_prefix = _path_prefix + self._pending_batches: dict[str, pa.Table] = _shared_pending_batches if _shared_pending_batches is not None else {} + self._pending_record_ids: dict[str, set[str]] = _shared_pending_record_ids if _shared_pending_record_ids is not None else defaultdict(set) # Per-batch flag: True when the batch was added with skip_duplicates=True, # so flush() can pass skip_existing=True to the connector and let it use # native INSERT-OR-IGNORE semantics rather than Python-side prefiltering. - self._pending_skip_existing: dict[str, bool] = {} + self._pending_skip_existing: dict[str, bool] = _shared_pending_skip_existing if _shared_pending_skip_existing is not None else {} # ── Path helpers ────────────────────────────────────────────────────────── def _get_record_key(self, record_path: tuple[str, ...]) -> str: - return "/".join(record_path) + return "/".join(self._path_prefix + record_path) def _path_to_table_name(self, record_path: tuple[str, ...]) -> str: """Map a record_path to a safe SQL table name. @@ -93,10 +98,11 @@ def _path_to_table_name(self, record_path: tuple[str, ...]) -> str: def _validate_record_path(self, record_path: tuple[str, ...]) -> None: if not record_path: raise ValueError("record_path cannot be empty") - if len(record_path) > self.max_hierarchy_depth: + if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: raise ValueError( f"record_path depth {len(record_path)} exceeds maximum " - f"{self.max_hierarchy_depth}" + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" ) for i, component in enumerate(record_path): if not component or not isinstance(component, str): @@ -165,7 +171,7 @@ def _get_committed_table( self, record_path: tuple[str, ...] ) -> pa.Table | None: """Fetch all committed records for a path from the connector.""" - table_name = self._path_to_table_name(record_path) + table_name = self._path_to_table_name(self._path_prefix + record_path) if table_name not in self._connector.get_table_names(): return None batches = list( @@ -268,6 +274,42 @@ def add_records( if flush: self.flush() + # ── base_path / at ──────────────────────────────────────────────────────── + + @property + def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix + + def at(self, *path_components: str) -> "ConnectorArrowDatabase": + """Return a new ConnectorArrowDatabase scoped to the given sub-path. + + The returned instance shares the connector and all three pending dicts + (_pending_batches, _pending_record_ids, _pending_skip_existing) by reference. + Calling flush() on any view drains the entire shared pending queue. + + Raises: + ValueError: If any path component is empty, not a str, or contains + ``'/'`` or ``'\\0'`` (which would corrupt the ``'/'``-separated + record key scheme and break ``flush()``'s key reconstruction). + """ + for i, component in enumerate(path_components): + if not component or not isinstance(component, str): + raise ValueError(f"at() path component {i} is invalid: {repr(component)}") + if "/" in component or "\0" in component: + raise ValueError( + f"at() path component {repr(component)} contains an invalid character " + "('/' or '\\0')" + ) + return ConnectorArrowDatabase( + connector=self._connector, + max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + _shared_pending_skip_existing=self._pending_skip_existing, + ) + # ── Flush ───────────────────────────────────────────────────────────────── def flush(self) -> None: @@ -420,6 +462,7 @@ def to_config(self) -> dict[str, Any]: return { "type": "connector_arrow_database", "connector": self._connector.to_config(), + "base_path": list(self._path_prefix), "max_hierarchy_depth": self.max_hierarchy_depth, } diff --git a/src/orcapod/databases/delta_lake_databases.py b/src/orcapod/databases/delta_lake_databases.py index 9c98d2c1..5da84276 100644 --- a/src/orcapod/databases/delta_lake_databases.py +++ b/src/orcapod/databases/delta_lake_databases.py @@ -50,22 +50,24 @@ def __init__( batch_size: int = 1000, max_hierarchy_depth: int = 10, allow_schema_evolution: bool = True, + _path_prefix: tuple[str, ...] = (), ): - self._base_uri, self._storage_options = parse_base_path(base_path, storage_options) - self._is_cloud: bool = is_cloud_uri(self._base_uri) + self._root_uri, self._storage_options = parse_base_path(base_path, storage_options) + self._is_cloud: bool = is_cloud_uri(self._root_uri) + self._path_prefix = _path_prefix self.batch_size = batch_size self.max_hierarchy_depth = max_hierarchy_depth self.allow_schema_evolution = allow_schema_evolution if not self._is_cloud: - # Keep self.base_path for local-path operations (list_sources, etc.) - # NOTE: do NOT access self.base_path on cloud instances. - self.base_path = Path(self._base_uri) + # _local_root is the absolute filesystem root (for list_sources, mkdir, etc.) + # NOTE: do NOT access self._local_root on cloud instances. + self._local_root = Path(self._root_uri) if create_base_path: - self.base_path.mkdir(parents=True, exist_ok=True) - elif not self.base_path.exists(): + self._local_root.mkdir(parents=True, exist_ok=True) + elif not self._local_root.exists(): raise ValueError( - f"Base path {self.base_path} does not exist and create_base_path=False" + f"Base path {self._local_root} does not exist and create_base_path=False" ) # For cloud paths: create_base_path is silently ignored (no directory needed). @@ -100,17 +102,18 @@ def _sanitize_path_component(component: str) -> str: return component def _get_table_uri(self, record_path: tuple[str, ...], create_dir: bool = False) -> str: - """Get the URI for a given record path (works for local and cloud). + """Get the URI for a given record path, incorporating base_path prefix. Args: - record_path: Tuple of path components. + record_path: Tuple of path components (relative to base_path). create_dir: If True, create the local directory (no-op for cloud paths). """ + full_path = self._path_prefix + record_path # prefix applied once, here only if self._is_cloud: - return self._base_uri.rstrip("/") + "/" + "/".join(record_path) + return self._root_uri.rstrip("/") + "/" + "/".join(full_path) else: - path = Path(self._base_uri) - for subpath in record_path: + path = self._local_root + for subpath in full_path: path = path / self._sanitize_path_component(subpath) if create_dir: path.mkdir(parents=True, exist_ok=True) @@ -130,9 +133,11 @@ def _validate_record_path(self, record_path: tuple[str, ...]) -> None: if not record_path: raise ValueError("Source path cannot be empty") - if len(record_path) > self.max_hierarchy_depth: + if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: raise ValueError( - f"Source path depth {len(record_path)} exceeds maximum {self.max_hierarchy_depth}" + f"Source path depth {len(record_path)} exceeds maximum " + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" ) # Validate path components @@ -833,7 +838,8 @@ def to_config(self) -> dict[str, Any]: """Serialize database configuration to a JSON-compatible dict.""" config: dict[str, Any] = { "type": "delta_table", - "base_path": self._base_uri, + "root_uri": self._root_uri, # renamed from "base_path" + "base_path": list(self._path_prefix), # new: relative prefix tuple "batch_size": self.batch_size, "max_hierarchy_depth": self.max_hierarchy_depth, "allow_schema_evolution": self.allow_schema_evolution, @@ -843,15 +849,30 @@ def to_config(self) -> dict[str, Any]: return config @classmethod - def from_config(cls, config: dict[str, Any]) -> DeltaTableDatabase: - """Reconstruct a DeltaTableDatabase from a config dict.""" + def from_config(cls, config: dict[str, Any]) -> "DeltaTableDatabase": + """Reconstruct a DeltaTableDatabase from a config dict. + + Supports both the current format (``"root_uri"`` for the storage root, + ``"base_path"`` as a list for the scoping prefix) and the legacy format + produced before ENG-341 (``"base_path"`` as a URI string, no prefix). + """ + if "root_uri" in config: + # Current format (post-ENG-341) + root_uri = config["root_uri"] + base_path_value = config.get("base_path", []) + _path_prefix = tuple(base_path_value) if isinstance(base_path_value, list) else () + else: + # Legacy format (pre-ENG-341): "base_path" was the root URI string + root_uri = config["base_path"] + _path_prefix = () return cls( - base_path=config["base_path"], + base_path=root_uri, storage_options=config.get("storage_options"), create_base_path=True, batch_size=config.get("batch_size", 1000), max_hierarchy_depth=config.get("max_hierarchy_depth", 10), allow_schema_evolution=config.get("allow_schema_evolution", True), + _path_prefix=_path_prefix, ) def flush(self) -> None: @@ -937,6 +958,50 @@ def flush_batch(self, record_path: tuple[str, ...]) -> None: self._pending_record_ids[record_key] = pending_ids raise + @property + def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix + + def at(self, *path_components: str) -> "DeltaTableDatabase": + """Return a new DeltaTableDatabase scoped to the given sub-path. + + The returned instance uses the same underlying filesystem root but + all reads and writes are relative to the extended prefix. Unlike + InMemoryArrowDatabase and ConnectorArrowDatabase, DeltaTableDatabase + does NOT share pending state — the filesystem is the shared storage. + + Raises: + TypeError: If any component is not a str. + ValueError: If any component is empty, is ``'.'`` or ``'..'``, or + contains filesystem-unsafe characters (``/``, ``\\``, ``*``, + ``?``, ``"``, ``<``, ``>``, ``|``, ``\\0``). + """ + _unsafe_chars = ["/", "\\", "*", "?", '"', "<", ">", "|", "\0"] + for i, component in enumerate(path_components): + if not isinstance(component, str): + raise TypeError( + f"at() path component {i} must be str, got {type(component)!r}" + ) + if not component: + raise ValueError(f"at() path component {i} must not be empty") + if component in (".", ".."): + raise ValueError( + f"at() path component {repr(component)}: '.' and '..' are not allowed" + ) + if any(char in component for char in _unsafe_chars): + raise ValueError( + f"at() path component {repr(component)} contains invalid characters" + ) + return DeltaTableDatabase( + base_path=self._root_uri, + storage_options=self._storage_options, + batch_size=self.batch_size, + max_hierarchy_depth=self.max_hierarchy_depth, + allow_schema_evolution=self.allow_schema_evolution, + _path_prefix=self._path_prefix + path_components, + ) + def list_sources(self) -> list[tuple[str, ...]]: """ List all record paths that contain a valid Delta table under base_path. @@ -972,5 +1037,10 @@ def _scan(current_path: Path, path_components: tuple[str, ...]) -> None: except deltalake.exceptions.TableNotFoundError: _scan(item, components) - _scan(self.base_path, ()) + # Build the effective scoped root directory + scoped_root = self._local_root + for component in self._path_prefix: + scoped_root = scoped_root / self._sanitize_path_component(component) + + _scan(scoped_root, ()) return sources diff --git a/src/orcapod/databases/in_memory_databases.py b/src/orcapod/databases/in_memory_databases.py index 9e31e0ba..be77878d 100644 --- a/src/orcapod/databases/in_memory_databases.py +++ b/src/orcapod/databases/in_memory_databases.py @@ -29,26 +29,36 @@ class InMemoryArrowDatabase: RECORD_ID_COLUMN = "__record_id" - def __init__(self, max_hierarchy_depth: int = 10): + def __init__( + self, + max_hierarchy_depth: int = 10, + _path_prefix: tuple[str, ...] = (), + _shared_tables: "dict[str, pa.Table] | None" = None, + _shared_pending_batches: "dict[str, pa.Table] | None" = None, + _shared_pending_record_ids: "dict[str, set[str]] | None" = None, + ): + self._path_prefix = _path_prefix self.max_hierarchy_depth = max_hierarchy_depth - self._tables: dict[str, pa.Table] = {} - self._pending_batches: dict[str, pa.Table] = {} - self._pending_record_ids: dict[str, set[str]] = defaultdict(set) + self._tables: dict[str, pa.Table] = _shared_tables if _shared_tables is not None else {} + self._pending_batches: dict[str, pa.Table] = _shared_pending_batches if _shared_pending_batches is not None else {} + self._pending_record_ids: dict[str, set[str]] = _shared_pending_record_ids if _shared_pending_record_ids is not None else defaultdict(set) # ------------------------------------------------------------------ # Path helpers # ------------------------------------------------------------------ def _get_record_key(self, record_path: tuple[str, ...]) -> str: - return "/".join(record_path) + return "/".join(self._path_prefix + record_path) def _validate_record_path(self, record_path: tuple[str, ...]) -> None: if not record_path: raise ValueError("record_path cannot be empty") - if len(record_path) > self.max_hierarchy_depth: + if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: raise ValueError( - f"record_path depth {len(record_path)} exceeds maximum {self.max_hierarchy_depth}" + f"record_path depth {len(record_path)} exceeds maximum " + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" ) # Only restrict characters that break the "/".join(record_path) key scheme. @@ -248,6 +258,43 @@ def flush(self) -> None: kept = committed.filter(mask) self._tables[record_key] = pa.concat_tables([kept, pending]) + # ------------------------------------------------------------------ + # Path scoping + # ------------------------------------------------------------------ + + @property + def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix + + def at(self, *path_components: str) -> "InMemoryArrowDatabase": + """Return a new InMemoryArrowDatabase scoped to the given sub-path. + + The returned instance shares the underlying storage dicts (_tables, + _pending_batches, _pending_record_ids) by reference, so writes + through any view are visible to all views of the same root database. + + Raises: + ValueError: If any path component is empty, not a str, or contains + ``'/'`` or ``'\\0'`` (which would corrupt the ``'/'``-separated + record key scheme). + """ + for i, component in enumerate(path_components): + if not component or not isinstance(component, str): + raise ValueError(f"at() path component {i} is invalid: {repr(component)}") + if "/" in component or "\0" in component: + raise ValueError( + f"at() path component {repr(component)} contains an invalid character " + "('/' or '\\0')" + ) + return InMemoryArrowDatabase( + max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_tables=self._tables, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + ) + # ------------------------------------------------------------------ # Read helpers # ------------------------------------------------------------------ @@ -338,6 +385,7 @@ def to_config(self) -> dict[str, Any]: """Serialize database configuration to a JSON-compatible dict.""" return { "type": "in_memory", + "base_path": list(self._path_prefix), "max_hierarchy_depth": self.max_hierarchy_depth, } @@ -346,6 +394,7 @@ def from_config(cls, config: dict[str, Any]) -> "InMemoryArrowDatabase": """Reconstruct an InMemoryArrowDatabase from a config dict.""" return cls( max_hierarchy_depth=config.get("max_hierarchy_depth", 10), + _path_prefix=tuple(config.get("base_path", [])), ) def get_records_with_column_value( diff --git a/src/orcapod/databases/noop_database.py b/src/orcapod/databases/noop_database.py index a65ef88e..26556ddd 100644 --- a/src/orcapod/databases/noop_database.py +++ b/src/orcapod/databases/noop_database.py @@ -22,6 +22,9 @@ class NoOpArrowDatabase: or benchmarking pure compute overhead. """ + def __init__(self, _path_prefix: tuple[str, ...] = ()) -> None: + self._path_prefix = _path_prefix + def add_record( self, record_path: tuple[str, ...], @@ -79,11 +82,27 @@ def get_records_with_column_value( def flush(self) -> None: pass + @property + def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix + + def at(self, *path_components: str) -> "NoOpArrowDatabase": + """Return a new NoOpArrowDatabase scoped to the given sub-path. + + All reads and writes are still discarded; the prefix only affects + the reported base_path of the returned instance. + """ + return NoOpArrowDatabase(_path_prefix=self._path_prefix + path_components) + def to_config(self) -> dict[str, Any]: """Serialize database configuration to a JSON-compatible dict.""" - return {"type": "noop"} + return { + "type": "noop", + "base_path": list(self._path_prefix), + } @classmethod def from_config(cls, config: dict[str, Any]) -> "NoOpArrowDatabase": """Reconstruct a NoOpArrowDatabase from a config dict.""" - return cls() + return cls(_path_prefix=tuple(config.get("base_path", []))) diff --git a/src/orcapod/protocols/database_protocols.py b/src/orcapod/protocols/database_protocols.py index c6dfb6a6..c4b0cb42 100644 --- a/src/orcapod/protocols/database_protocols.py +++ b/src/orcapod/protocols/database_protocols.py @@ -65,6 +65,41 @@ def flush(self) -> None: """Flush any buffered writes to the underlying storage.""" ... + @property + def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view. + + Always () for a root (non-scoped) instance. Extended by at(). + The absolute storage root (filesystem URI, SQL connector, etc.) + is a separate, backend-specific implementation detail. + """ + ... + + def at(self, *path_components: str) -> "ArrowDatabaseProtocol": + """Return a new database scoped to the given sub-path. + + All reads and writes on the returned database are relative to + this database's base_path extended by path_components. The + original is unmodified. + + Calling at() with no arguments returns a new view equivalent + to the current one (same base_path, fresh or shared state + depending on the backend). + + For backends with shared pending state (InMemoryArrowDatabase, + ConnectorArrowDatabase), calling flush() on any view drains + the entire shared pending queue — not just the caller's prefix. + This is intentional: all views share the same underlying store. + + Args: + *path_components: Zero or more path components to append. + + Returns: + A new database instance with + base_path == self.base_path + path_components. + """ + ... + def to_config(self) -> dict[str, Any]: """Serialize database configuration to a JSON-compatible dict. diff --git a/superpowers/plans/2026-04-01-database-path-contextualization.md b/superpowers/plans/2026-04-01-database-path-contextualization.md new file mode 100644 index 00000000..e5ba8eaa --- /dev/null +++ b/superpowers/plans/2026-04-01-database-path-contextualization.md @@ -0,0 +1,1007 @@ +# Database Path Contextualization Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add `at(*path_components)` and `base_path` to `ArrowDatabaseProtocol` and all four database backends, enabling sub-scoped database views that namespace reads/writes under a relative path prefix. + +**Architecture:** Each backend stores a `_path_prefix: tuple[str, ...]` (starts as `()`) and exposes it via a `base_path` property. `at()` returns a new instance with an extended prefix. `InMemoryArrowDatabase` and `ConnectorArrowDatabase` share their underlying storage dicts by reference; `DeltaTableDatabase` creates a fresh instance (the filesystem is the shared storage). Prefix application happens in `_get_record_key` for InMemory/Connector and in `_get_table_uri` for Delta (to avoid a double-prefix bug in `flush()`). + +**Tech Stack:** Python 3.11+, PyArrow, deltalake, uv (run all commands via `uv run`) + +**Spec:** `superpowers/specs/2026-04-01-database-path-contextualization-design.md` + +--- + +## File Map + +| Action | File | What changes | +|--------|------|-------------| +| Modify | `src/orcapod/protocols/database_protocols.py` | Add `base_path` property and `at()` to `ArrowDatabaseProtocol` | +| Modify | `src/orcapod/databases/noop_database.py` | Add `_path_prefix`, `base_path`, `at()`, update config | +| Modify | `src/orcapod/databases/in_memory_databases.py` | Add `_path_prefix`, shared-dict `at()`, update `_get_record_key`, `_validate_record_path`, config | +| Modify | `src/orcapod/databases/connector_arrow_database.py` | Add `_path_prefix`, shared-dict `at()`, update `_get_record_key`, `_validate_record_path`, config | +| Modify | `src/orcapod/databases/delta_lake_databases.py` | Rename `_base_uri`→`_root_uri`, `base_path`→`_local_root`; add `_path_prefix`; update `_get_table_uri`, `_validate_record_path`, `list_sources()`, `at()`, config | +| Modify | `tests/test_databases/test_noop_database.py` | Add `TestAtMethod` class | +| Modify | `tests/test_databases/test_in_memory_database.py` | Add `TestAtMethod` class | +| Modify | `tests/test_databases/test_connector_arrow_database.py` | Add `TestAtMethod` class | +| Modify | `tests/test_databases/test_delta_table_database.py` | Add `TestAtMethod` class | +| Modify | `tests/test_databases/test_database_config.py` | Update `DeltaTableDatabase` config tests for renamed key; add `base_path` round-trip tests for all backends | + +--- + +## Task 1: Protocol + Stubs + +Add `base_path` and `at()` to `ArrowDatabaseProtocol`, then add minimal stubs to all four backends so existing conformance tests (`isinstance(db, ArrowDatabaseProtocol)`) keep passing. + +**Files:** +- Modify: `src/orcapod/protocols/database_protocols.py` +- Modify: `src/orcapod/databases/noop_database.py` +- Modify: `src/orcapod/databases/in_memory_databases.py` +- Modify: `src/orcapod/databases/connector_arrow_database.py` +- Modify: `src/orcapod/databases/delta_lake_databases.py` + +- [ ] **Step 1: Add `base_path` and `at()` to `ArrowDatabaseProtocol`** + +In `src/orcapod/protocols/database_protocols.py`, add after the `flush` method: + +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view. + + Always () for a root (non-scoped) instance. Extended by at(). + The absolute storage root (filesystem URI, SQL connector, etc.) + is a separate, backend-specific implementation detail. + """ + ... + +def at(self, *path_components: str) -> "ArrowDatabaseProtocol": + """Return a new database scoped to the given sub-path. + + All reads and writes on the returned database are relative to + this database's base_path extended by path_components. The + original is unmodified. + + Calling at() with no arguments returns a new view equivalent + to the current one (same base_path, fresh or shared state + depending on the backend). + + For backends with shared pending state (InMemoryArrowDatabase, + ConnectorArrowDatabase), calling flush() on any view drains + the entire shared pending queue — not just the caller's prefix. + This is intentional: all views share the same underlying store. + + Args: + *path_components: Zero or more path components to append. + + Returns: + A new database instance with + base_path == self.base_path + path_components. + """ + ... +``` + +Also add both names to `__all__` at the bottom (they're part of the protocol, no separate export needed — `ArrowDatabaseProtocol` is already exported). + +- [ ] **Step 2: Add stubs to all four backends** + +In each of the four backend files, add these two members (exact bodies shown; full implementations come in later tasks): + +**`noop_database.py`** — add after `flush`: +```python +@property +def base_path(self) -> tuple[str, ...]: + return () + +def at(self, *path_components: str) -> "NoOpArrowDatabase": + return NoOpArrowDatabase() +``` + +**`in_memory_databases.py`** — add after `flush`: +```python +@property +def base_path(self) -> tuple[str, ...]: + return () + +def at(self, *path_components: str) -> "InMemoryArrowDatabase": + return InMemoryArrowDatabase(max_hierarchy_depth=self.max_hierarchy_depth) +``` + +**`connector_arrow_database.py`** — add after `flush`: +```python +@property +def base_path(self) -> tuple[str, ...]: + return () + +def at(self, *path_components: str) -> "ConnectorArrowDatabase": + return ConnectorArrowDatabase( + connector=self._connector, + max_hierarchy_depth=self.max_hierarchy_depth, + ) +``` + +**`delta_lake_databases.py`** — add after `flush`: +```python +@property +def base_path(self) -> tuple[str, ...]: + return () + +def at(self, *path_components: str) -> "DeltaTableDatabase": + return DeltaTableDatabase( + base_path=self._base_uri, + storage_options=self._storage_options, + batch_size=self.batch_size, + max_hierarchy_depth=self.max_hierarchy_depth, + allow_schema_evolution=self.allow_schema_evolution, + ) +``` + +- [ ] **Step 3: Run the full test suite to verify nothing is broken** + +```bash +uv run pytest tests/test_databases/ -x -q +``` + +Expected: all previously passing tests still pass. No new failures. + +- [ ] **Step 4: Commit** + +```bash +git add src/orcapod/protocols/database_protocols.py \ + src/orcapod/databases/noop_database.py \ + src/orcapod/databases/in_memory_databases.py \ + src/orcapod/databases/connector_arrow_database.py \ + src/orcapod/databases/delta_lake_databases.py +git commit -m "feat(databases): add at() and base_path stubs to ArrowDatabaseProtocol and all backends" +``` + +--- + +## Task 2: `NoOpArrowDatabase` — full implementation + +**Files:** +- Modify: `src/orcapod/databases/noop_database.py` +- Modify: `tests/test_databases/test_noop_database.py` +- Modify: `tests/test_databases/test_database_config.py` + +- [ ] **Step 1: Write failing tests** + +Add this class to `tests/test_databases/test_noop_database.py`: + +```python +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self): + db = NoOpArrowDatabase() + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self): + db = NoOpArrowDatabase() + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self): + db = NoOpArrowDatabase() + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self): + db = NoOpArrowDatabase() + db.at("a", "b") + assert db.base_path == () + + def test_scoped_reads_still_return_none(self): + db = NoOpArrowDatabase() + scoped = db.at("pipeline", "node1") + import pyarrow as pa + scoped.add_record(("outputs",), "id1", pa.table({"v": [1]})) + assert scoped.get_record_by_id(("outputs",), "id1") is None + assert scoped.get_all_records(("outputs",)) is None +``` + +Add to `TestNoOpDatabaseConfig` in `tests/test_databases/test_database_config.py`: + +```python +def test_to_config_includes_base_path(self): + db = NoOpArrowDatabase() + assert db.to_config()["base_path"] == [] + +def test_round_trip_preserves_base_path(self): + db = NoOpArrowDatabase() + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = NoOpArrowDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) +``` + +- [ ] **Step 2: Run to verify tests fail** + +```bash +uv run pytest tests/test_databases/test_noop_database.py::TestAtMethod \ + tests/test_databases/test_database_config.py::TestNoOpDatabaseConfig -x -q +``` + +Expected: FAIL — `base_path` returns `()` for scoped instances, `to_config` missing `base_path`. + +- [ ] **Step 3: Implement full `NoOpArrowDatabase`** + +Replace the stub `base_path` and `at()` in `src/orcapod/databases/noop_database.py` and update config: + +Add `_path_prefix` to `__init__`: +```python +def __init__(self, _path_prefix: tuple[str, ...] = ()) -> None: + self._path_prefix = _path_prefix +``` + +Replace the stub `base_path` property: +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix +``` + +Replace the stub `at()`: +```python +def at(self, *path_components: str) -> "NoOpArrowDatabase": + return NoOpArrowDatabase(_path_prefix=self._path_prefix + path_components) +``` + +Replace `to_config`: +```python +def to_config(self) -> dict[str, Any]: + """Serialize database configuration to a JSON-compatible dict.""" + return { + "type": "noop", + "base_path": list(self._path_prefix), + } +``` + +Replace `from_config`: +```python +@classmethod +def from_config(cls, config: dict[str, Any]) -> "NoOpArrowDatabase": + """Reconstruct a NoOpArrowDatabase from a config dict.""" + return cls(_path_prefix=tuple(config.get("base_path", []))) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_databases/test_noop_database.py \ + tests/test_databases/test_database_config.py::TestNoOpDatabaseConfig -q +``` + +Expected: all pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/databases/noop_database.py \ + tests/test_databases/test_noop_database.py \ + tests/test_databases/test_database_config.py +git commit -m "feat(databases): implement at() and base_path for NoOpArrowDatabase" +``` + +--- + +## Task 3: `InMemoryArrowDatabase` — full implementation + +**Files:** +- Modify: `src/orcapod/databases/in_memory_databases.py` +- Modify: `tests/test_databases/test_in_memory_database.py` +- Modify: `tests/test_databases/test_database_config.py` + +- [ ] **Step 1: Write failing tests** + +Add this class to `tests/test_databases/test_in_memory_database.py`: + +```python +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("a", "b") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = make_table(value=[42]) + scoped.add_record(("outputs",), "id1", record) + result = scoped.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[42])) + # Parent sees ("outputs",) as a different key from ("pipeline","node1","outputs") + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_two_scoped_views_share_storage(self, db): + view_a = db.at("pipeline", "node1") + view_b = db.at("pipeline", "node1") + view_a.add_record(("outputs",), "id1", make_table(value=[99])) + view_a.flush() + result = view_b.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [99] + + def test_validate_record_path_checks_combined_depth(self, db): + # max_hierarchy_depth=10 (default). Fill prefix with 9 components. + deep_db = InMemoryArrowDatabase(max_hierarchy_depth=10) + scoped = deep_db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") + # 9 prefix + 1 record_path = 10: OK + scoped.add_record(("z",), "id1", make_table(value=[1])) + # 9 prefix + 2 record_path = 11: should raise + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", make_table(value=[2])) +``` + +Add to `TestInMemoryDatabaseConfig` in `tests/test_databases/test_database_config.py`: + +```python +def test_to_config_includes_base_path(self): + db = InMemoryArrowDatabase() + assert db.to_config()["base_path"] == [] + +def test_round_trip_preserves_base_path(self): + db = InMemoryArrowDatabase() + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = InMemoryArrowDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) +``` + +- [ ] **Step 2: Run to verify tests fail** + +```bash +uv run pytest tests/test_databases/test_in_memory_database.py::TestAtMethod \ + tests/test_databases/test_database_config.py::TestInMemoryDatabaseConfig -x -q +``` + +Expected: FAIL. + +- [ ] **Step 3: Implement full `InMemoryArrowDatabase`** + +Update `__init__` signature in `src/orcapod/databases/in_memory_databases.py`: + +```python +def __init__( + self, + max_hierarchy_depth: int = 10, + _path_prefix: tuple[str, ...] = (), + _shared_tables: "dict[str, pa.Table] | None" = None, + _shared_pending_batches: "dict[str, pa.Table] | None" = None, + _shared_pending_record_ids: "dict[str, set[str]] | None" = None, +): + self._path_prefix = _path_prefix + self.max_hierarchy_depth = max_hierarchy_depth + self._tables: dict[str, pa.Table] = _shared_tables if _shared_tables is not None else {} + self._pending_batches: dict[str, pa.Table] = _shared_pending_batches if _shared_pending_batches is not None else {} + self._pending_record_ids: dict[str, set[str]] = _shared_pending_record_ids if _shared_pending_record_ids is not None else defaultdict(set) +``` + +Replace `_get_record_key`: +```python +def _get_record_key(self, record_path: tuple[str, ...]) -> str: + return "/".join(self._path_prefix + record_path) +``` + +Update `_validate_record_path` — change the depth check line to: +```python +if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: + raise ValueError( + f"record_path depth {len(record_path)} exceeds maximum " + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" + ) +``` + +Replace stub `base_path` property: +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix +``` + +Replace stub `at()`: +```python +def at(self, *path_components: str) -> "InMemoryArrowDatabase": + return InMemoryArrowDatabase( + max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_tables=self._tables, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + ) +``` + +Replace `to_config`: +```python +def to_config(self) -> dict[str, Any]: + """Serialize database configuration to a JSON-compatible dict.""" + return { + "type": "in_memory", + "base_path": list(self._path_prefix), + "max_hierarchy_depth": self.max_hierarchy_depth, + } +``` + +Replace `from_config`: +```python +@classmethod +def from_config(cls, config: dict[str, Any]) -> "InMemoryArrowDatabase": + """Reconstruct an InMemoryArrowDatabase from a config dict.""" + return cls( + max_hierarchy_depth=config.get("max_hierarchy_depth", 10), + _path_prefix=tuple(config.get("base_path", [])), + ) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_databases/test_in_memory_database.py \ + tests/test_databases/test_database_config.py::TestInMemoryDatabaseConfig -q +``` + +Expected: all pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/databases/in_memory_databases.py \ + tests/test_databases/test_in_memory_database.py \ + tests/test_databases/test_database_config.py +git commit -m "feat(databases): implement at() and base_path for InMemoryArrowDatabase" +``` + +--- + +## Task 4: `ConnectorArrowDatabase` — full implementation + +**Files:** +- Modify: `src/orcapod/databases/connector_arrow_database.py` +- Modify: `tests/test_databases/test_connector_arrow_database.py` +- Modify: `tests/test_databases/test_database_config.py` + +- [ ] **Step 1: Write failing tests** + +Add this class to `tests/test_databases/test_connector_arrow_database.py`. Note: look at the top of the test file for the `MockDBConnector` fixture — use the same pattern as other tests in that file to get a `ConnectorArrowDatabase` instance backed by the mock connector. + +```python +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("a", "b") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = pa.table({"value": pa.array([42])}) + scoped.add_record(("outputs",), "id1", record, flush=True) + result = scoped.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", pa.table({"v": pa.array([1])}), flush=True) + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_two_scoped_views_share_storage(self, db): + view_a = db.at("pipeline", "node1") + view_b = db.at("pipeline", "node1") + view_a.add_record(("outputs",), "id1", pa.table({"v": pa.array([99])}), flush=True) + result = view_b.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("v").to_pylist() == [99] + + def test_prefix_appears_in_sql_table_name(self, db): + """Prefix components are included in the SQL table name via _path_to_table_name.""" + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", pa.table({"v": pa.array([1])}), flush=True) + # Table name should be pipeline__node1__outputs + table_names = db._connector.get_table_names() + assert "pipeline__node1__outputs" in table_names + + def test_validate_record_path_checks_combined_depth(self, db): + scoped = db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") # 9 prefix components + scoped.add_record(("z",), "id1", pa.table({"v": pa.array([1])})) + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", pa.table({"v": pa.array([2])})) +``` + +Add to `TestConnectorArrowDatabaseConfig` (or the config section) in `test_database_config.py`: + +```python +# Note: ConnectorArrowDatabase.from_config raises NotImplementedError, +# so only test to_config here. +def test_connector_to_config_includes_base_path(): + from unittest.mock import MagicMock + from orcapod.databases import ConnectorArrowDatabase + mock_connector = MagicMock() + mock_connector.to_config.return_value = {"type": "mock"} + db = ConnectorArrowDatabase(connector=mock_connector) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + assert config["base_path"] == ["pipeline", "v1"] +``` + +- [ ] **Step 2: Run to verify tests fail** + +```bash +uv run pytest tests/test_databases/test_connector_arrow_database.py::TestAtMethod -x -q +``` + +Expected: FAIL. + +- [ ] **Step 3: Implement full `ConnectorArrowDatabase`** + +Update `__init__` in `src/orcapod/databases/connector_arrow_database.py`: + +```python +def __init__( + self, + connector: DBConnectorProtocol, + max_hierarchy_depth: int = 10, + _path_prefix: tuple[str, ...] = (), + _shared_pending_batches: "dict[str, pa.Table] | None" = None, + _shared_pending_record_ids: "dict[str, set[str]] | None" = None, + _shared_pending_skip_existing: "dict[str, bool] | None" = None, +) -> None: + self._connector = connector + self.max_hierarchy_depth = max_hierarchy_depth + self._path_prefix = _path_prefix + self._pending_batches: dict[str, pa.Table] = _shared_pending_batches if _shared_pending_batches is not None else {} + self._pending_record_ids: dict[str, set[str]] = _shared_pending_record_ids if _shared_pending_record_ids is not None else defaultdict(set) + self._pending_skip_existing: dict[str, bool] = _shared_pending_skip_existing if _shared_pending_skip_existing is not None else {} +``` + +Replace `_get_record_key`: +```python +def _get_record_key(self, record_path: tuple[str, ...]) -> str: + return "/".join(self._path_prefix + record_path) +``` + +Update `_validate_record_path` depth check: +```python +if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: + raise ValueError( + f"record_path depth {len(record_path)} exceeds maximum " + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" + ) +``` + +Replace stub `base_path` property: +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix +``` + +Replace stub `at()`: +```python +def at(self, *path_components: str) -> "ConnectorArrowDatabase": + return ConnectorArrowDatabase( + connector=self._connector, + max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + _shared_pending_skip_existing=self._pending_skip_existing, + ) +``` + +Update `to_config`: +```python +def to_config(self) -> dict[str, Any]: + """Serialize configuration to a JSON-compatible dict.""" + return { + "type": "connector_arrow_database", + "connector": self._connector.to_config(), + "base_path": list(self._path_prefix), + "max_hierarchy_depth": self.max_hierarchy_depth, + } +``` + +(`from_config` still raises `NotImplementedError` — no change needed.) + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_databases/test_connector_arrow_database.py -q +``` + +Expected: all pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/databases/connector_arrow_database.py \ + tests/test_databases/test_connector_arrow_database.py \ + tests/test_databases/test_database_config.py +git commit -m "feat(databases): implement at() and base_path for ConnectorArrowDatabase" +``` + +--- + +## Task 5: `DeltaTableDatabase` — full implementation + +This is the most complex task. It involves attribute renames, `_get_table_uri` changes, `list_sources()` update, and a config key rename that requires updating an existing test. + +**Files:** +- Modify: `src/orcapod/databases/delta_lake_databases.py` +- Modify: `tests/test_databases/test_delta_table_database.py` +- Modify: `tests/test_databases/test_database_config.py` + +- [ ] **Step 1: Write failing tests** + +Add this class to `tests/test_databases/test_delta_table_database.py`: + +```python +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("pipeline", "node1") + assert scoped.base_path == ("pipeline", "node1") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("pipeline", "node1") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = make_table(value=[42]) + scoped.add_record(("outputs",), "id1", record, flush=True) + result = scoped.get_record_by_id(("outputs",), "id1", flush=True) + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[42]), flush=True) + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_scoped_write_stored_under_correct_filesystem_path(self, db, tmp_path): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[1]), flush=True) + # Delta table should be at /db/pipeline/node1/outputs/ + import os + expected_path = tmp_path / "db" / "pipeline" / "node1" / "outputs" + assert expected_path.exists() + + def test_validate_record_path_checks_combined_depth(self, tmp_path): + db = DeltaTableDatabase(base_path=tmp_path / "db", max_hierarchy_depth=10) + scoped = db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") # 9 prefix + # 9 + 1 = 10: OK + scoped.add_record(("z",), "id1", make_table(value=[1])) + # 9 + 2 = 11: should raise + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", make_table(value=[2])) + + def test_list_sources_on_scoped_instance(self, tmp_path): + db = DeltaTableDatabase(base_path=tmp_path / "db") + scoped = db.at("pipeline") + scoped.add_record(("node1",), "id1", make_table(value=[1]), flush=True) + scoped.add_record(("node2",), "id2", make_table(value=[2]), flush=True) + sources = scoped.list_sources() + assert ("node1",) in sources + assert ("node2",) in sources + # Root db should NOT see these under bare ("node1",) — they live under pipeline/ + root_sources = db.list_sources() + assert ("node1",) not in root_sources + assert ("pipeline", "node1") in root_sources +``` + +Update `TestDeltaTableDatabaseConfig` in `test_database_config.py` — the existing `test_to_config_includes_base_path` test **must be updated** (it now tests the wrong key): + +```python +# REPLACE the existing test_to_config_includes_base_path with: +def test_to_config_includes_root_uri(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + config = db.to_config() + assert config["root_uri"] == str(tmp_path / "delta_db") + +def test_to_config_includes_base_path_tuple(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + config = db.to_config() + assert config["base_path"] == [] + +def test_to_config_scoped_includes_base_path_tuple(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + assert config["root_uri"] == str(tmp_path / "delta_db") + assert config["base_path"] == ["pipeline", "v1"] + +def test_round_trip_preserves_base_path(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = DeltaTableDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) +``` + +Also update the existing `test_round_trip` to use `root_uri`: +```python +def test_round_trip(self, tmp_path): + db = DeltaTableDatabase( + base_path=str(tmp_path / "delta_db"), + batch_size=500, + max_hierarchy_depth=5, + ) + config = db.to_config() + restored = DeltaTableDatabase.from_config(config) + assert restored.to_config() == config +``` +(This test doesn't check keys by name so it should still pass once the implementation is consistent.) + +- [ ] **Step 2: Run to verify tests fail** + +```bash +uv run pytest tests/test_databases/test_delta_table_database.py::TestAtMethod \ + tests/test_databases/test_database_config.py::TestDeltaTableDatabaseConfig -x -q +``` + +Expected: FAIL — `base_path` not scoped, config key still `"base_path"` for URI, etc. + +- [ ] **Step 3: Implement `DeltaTableDatabase` — rename attributes** + +In `src/orcapod/databases/delta_lake_databases.py`, update `__init__`: + +```python +def __init__( + self, + base_path: str | Path | UPath, + storage_options: dict[str, str] | None = None, + create_base_path: bool = True, + batch_size: int = 1000, + max_hierarchy_depth: int = 10, + allow_schema_evolution: bool = True, + _path_prefix: tuple[str, ...] = (), +): + self._root_uri, self._storage_options = parse_base_path(base_path, storage_options) + self._is_cloud: bool = is_cloud_uri(self._root_uri) + self._path_prefix = _path_prefix + self.batch_size = batch_size + self.max_hierarchy_depth = max_hierarchy_depth + self.allow_schema_evolution = allow_schema_evolution + + if not self._is_cloud: + # _local_root is the absolute filesystem root (for list_sources, mkdir, etc.) + # NOTE: do NOT access self._local_root on cloud instances. + self._local_root = Path(self._root_uri) + if create_base_path: + self._local_root.mkdir(parents=True, exist_ok=True) + elif not self._local_root.exists(): + raise ValueError( + f"Base path {self._local_root} does not exist and create_base_path=False" + ) + + self._delta_table_cache: dict[str, deltalake.DeltaTable] = {} + self._pending_batches: dict[str, pa.Table] = {} + self._pending_record_ids: dict[str, set[str]] = defaultdict(set) + self._existing_ids_cache: dict[str, set[str]] = defaultdict(set) + self._cache_dirty: dict[str, bool] = defaultdict(lambda: True) +``` + +Add `base_path` property (replace stub): +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix +``` + +- [ ] **Step 4: Update `_get_table_uri` to apply prefix** + +Replace the existing `_get_table_uri` method: + +```python +def _get_table_uri(self, record_path: tuple[str, ...], create_dir: bool = False) -> str: + """Get the URI for a given record path, incorporating base_path prefix. + + Args: + record_path: Tuple of path components (relative to base_path). + create_dir: If True, create the local directory (no-op for cloud paths). + """ + full_path = self._path_prefix + record_path # prefix applied once, here only + if self._is_cloud: + return self._root_uri.rstrip("/") + "/" + "/".join(full_path) + else: + path = self._local_root + for subpath in full_path: + path = path / self._sanitize_path_component(subpath) + if create_dir: + path.mkdir(parents=True, exist_ok=True) + return str(path) +``` + +- [ ] **Step 5: Update `_validate_record_path` for combined depth** + +Change the depth check line in `_validate_record_path`: + +```python +if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: + raise ValueError( + f"Source path depth {len(record_path)} exceeds maximum " + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" + ) +``` + +- [ ] **Step 6: Update `list_sources()` to scan from scoped root** + +Replace `_scan(self.base_path, ())` at the end of `list_sources()` with: + +```python +# Build the effective scoped root directory +scoped_root = self._local_root +for component in self._path_prefix: + scoped_root = scoped_root / self._sanitize_path_component(component) + +_scan(scoped_root, ()) +return sources +``` + +Also update the `_scan` closure's depth guard to use relative depth (not combined): +```python +def _scan(current_path: Path, path_components: tuple[str, ...]) -> None: + if len(path_components) >= self.max_hierarchy_depth: + return + ... +``` + +- [ ] **Step 7: Implement `at()` and update config** + +Replace the stub `at()`: +```python +def at(self, *path_components: str) -> "DeltaTableDatabase": + return DeltaTableDatabase( + base_path=self._root_uri, + storage_options=self._storage_options, + batch_size=self.batch_size, + max_hierarchy_depth=self.max_hierarchy_depth, + allow_schema_evolution=self.allow_schema_evolution, + _path_prefix=self._path_prefix + path_components, + ) +``` + +Replace `to_config`: +```python +def to_config(self) -> dict[str, Any]: + """Serialize database configuration to a JSON-compatible dict.""" + config: dict[str, Any] = { + "type": "delta_table", + "root_uri": self._root_uri, # renamed from "base_path" + "base_path": list(self._path_prefix), # new: relative prefix tuple + "batch_size": self.batch_size, + "max_hierarchy_depth": self.max_hierarchy_depth, + "allow_schema_evolution": self.allow_schema_evolution, + } + if self._storage_options: + config["storage_options"] = self._storage_options + return config +``` + +Replace `from_config`: +```python +@classmethod +def from_config(cls, config: dict[str, Any]) -> "DeltaTableDatabase": + """Reconstruct a DeltaTableDatabase from a config dict.""" + return cls( + base_path=config["root_uri"], + storage_options=config.get("storage_options"), + create_base_path=True, + batch_size=config.get("batch_size", 1000), + max_hierarchy_depth=config.get("max_hierarchy_depth", 10), + allow_schema_evolution=config.get("allow_schema_evolution", True), + _path_prefix=tuple(config.get("base_path", [])), + ) +``` + +- [ ] **Step 8: Run the full database test suite** + +```bash +uv run pytest tests/test_databases/ -q --ignore=tests/test_databases/test_delta_table_database_s3.py +``` + +Expected: all pass. The S3 tests are excluded because they require a live MinIO server. + +- [ ] **Step 9: Commit** + +```bash +git add src/orcapod/databases/delta_lake_databases.py \ + tests/test_databases/test_delta_table_database.py \ + tests/test_databases/test_database_config.py +git commit -m "feat(databases): implement at() and base_path for DeltaTableDatabase" +``` + +--- + +## Task 6: Full suite verification and PR + +- [ ] **Step 1: Run the complete test suite** + +```bash +uv run pytest tests/ -q --ignore=tests/test_databases/test_delta_table_database_s3.py \ + --ignore=tests/test_databases/test_spiraldb_connector_integration.py \ + --ignore=tests/test_databases/test_postgresql_connector_integration.py \ + --ignore=tests/test_databases/test_sqlite_connector_integration.py +``` + +Expected: all pass. (Integration tests are excluded as they require live external services.) + +- [ ] **Step 2: Authenticate with GitHub and push branch** + +```bash +gh-app-token-generator nauticalab | gh auth login --with-token +git checkout -b eywalker/eng-341-feat-database-path-contextualization-create-sub-scoped +git push -u origin eywalker/eng-341-feat-database-path-contextualization-create-sub-scoped +``` + +- [ ] **Step 3: Create PR** + +```bash +gh pr create \ + --base dev \ + --title "feat(databases): add at() and base_path for sub-scoped database views (ENG-341)" \ + --body "$(cat <<'EOF' +## Summary + +Implements ENG-341: database path contextualization — sub-scoped database views. + +- Adds `at(*path_components)` and `base_path: tuple[str, ...]` to `ArrowDatabaseProtocol` +- All four backends implemented: `DeltaTableDatabase`, `InMemoryArrowDatabase`, `ConnectorArrowDatabase`, `NoOpArrowDatabase` +- `InMemoryArrowDatabase` and `ConnectorArrowDatabase` share underlying storage by reference; `DeltaTableDatabase` creates a fresh instance (filesystem is shared storage) +- `to_config`/`from_config` round-trips `base_path` for all backends +- `DeltaTableDatabase` config key rename: `"base_path"` (URI root) → `"root_uri"`; `"base_path"` now carries the scoping prefix tuple +- `_validate_record_path` updated to check combined `len(base_path) + len(record_path)` depth +- `list_sources()` on scoped `DeltaTableDatabase` instances scans from the scoped root + +Closes ENG-341 +EOF +)" +``` + +- [ ] **Step 4: Update Linear issue to In Progress → In Review** + +```bash +# Use Linear MCP tool or leave for reviewer to handle +``` diff --git a/superpowers/specs/2026-04-01-database-path-contextualization-design.md b/superpowers/specs/2026-04-01-database-path-contextualization-design.md new file mode 100644 index 00000000..1c894ec5 --- /dev/null +++ b/superpowers/specs/2026-04-01-database-path-contextualization-design.md @@ -0,0 +1,472 @@ +# Database Path Contextualization — Design Spec + +**Linear issue:** ENG-341 +**Date:** 2026-04-01 +**Status:** Approved + +--- + +## Overview + +Databases need a general mechanism to produce a new database instance scoped to a +sub-path, without modifying the original. The primitive is `at(*path_components)`, +which returns a new database view whose root is the caller's root extended by the +given path components. + +This is the foundational primitive needed by ENG-340 (ScopedDatabase / +`pipeline_path` decoupling) and is useful anywhere a component needs its own +storage namespace derived from a parent database. + +--- + +## Core Concepts + +### Two-level path model + +Every database instance carries two path concepts: + +| Concept | Type | Meaning | +|---------|------|---------| +| **Absolute root** | backend-specific | The storage location configured at construction (filesystem URI, SQL connector, none). Fixed forever. Implementation detail of each backend. | +| **`base_path`** | `tuple[str, ...]` | The relative "cwd" within the absolute root. Starts as `()`. Extended by `at()`. Public, queryable. | + +When a record is written at `record_path`, its full resolved location is: + +``` +absolute_root / base_path / record_path +``` + +Each backend translates this concatenated tuple into its own format: + +| Backend | Translation | +|---------|-------------| +| `DeltaTableDatabase` | `root_uri / "p0" / "p1" / ... / "r0" / "r1" / ...` (filesystem path) | +| `InMemoryArrowDatabase` | `"p0/p1/.../r0/r1/..."` (dict key) | +| `ConnectorArrowDatabase` | `"p0__p1__...__r0__r1__..."` (SQL table name) | +| `NoOpArrowDatabase` | discarded (prefix tracked for introspection only) | + +### Prefix application: where it happens + +Each backend applies the prefix at a different point in its stack: + +- **`DeltaTableDatabase`**: prefix is applied only inside `_get_table_uri()`. + `_get_record_key()` is NOT changed — it continues to return `"/".join(record_path)`. + This preserves the correctness of `flush()`, which reconstructs `record_path` from + the key and then calls `flush_batch(record_path)` → `_get_table_uri(record_path)`. + If the prefix were baked into `_get_record_key`, `_get_table_uri` would apply it a + second time. Keeping the prefix out of `_get_record_key` avoids the double-prefix. + +- **`InMemoryArrowDatabase`** and **`ConnectorArrowDatabase`**: prefix is applied inside + `_get_record_key()`. Both backends' `flush()` methods reconstruct `record_path` from + the key and then call a terminal translation function (`_tables` dict access for + in-memory; `_path_to_table_name` for connector). Neither re-applies any prefix in + that terminal step, so including the prefix in the key is safe and correct. + +### Consistency with existing pipeline path pattern + +The existing `pipeline_path_prefix: tuple[str, ...]` pattern in `FunctionNode` and +`CachedFunctionPod` already uses this model: a single shared database receives +writes at namespaced keys derived by prepending a prefix tuple to `record_path`. +`at()` formalizes this pattern — instead of threading a prefix tuple through every +call site, callers create a scoped view once. + +--- + +## Interface Changes + +### `ArrowDatabaseProtocol` + +Two new members added to the protocol: + +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view. + + Always () for a root (non-scoped) instance. Extended by at(). + The absolute storage root (filesystem URI, SQL connector, etc.) + is a separate, backend-specific implementation detail. + """ + ... + +def at(self, *path_components: str) -> "ArrowDatabaseProtocol": + """Return a new database scoped to the given sub-path. + + All reads and writes on the returned database are relative to + this database's base_path extended by path_components. The + original is unmodified. + + Calling at() with no arguments returns a new view equivalent + to the current one (same base_path, fresh or shared state + depending on the backend). + + For backends with shared pending state (InMemoryArrowDatabase, + ConnectorArrowDatabase), calling flush() on any view drains + the entire shared pending queue — not just the caller's prefix. + This is intentional: all views share the same underlying store. + + Args: + *path_components: Zero or more path components to append. + + Returns: + A new database instance with + base_path == self.base_path + path_components. + """ + ... +``` + +Concrete implementations return their own type (e.g. `DeltaTableDatabase`) for +type narrowness, even though the protocol signature uses `ArrowDatabaseProtocol`. + +The `_path_prefix` constructor parameter used internally by each backend is +prefixed with `_` to signal it is not part of the public API. Callers should +always use `at()` to create scoped views. + +--- + +## Per-Backend Implementation + +### `DeltaTableDatabase` + +**Naming clarification:** + +- The constructor parameter remains `base_path: str | Path | UPath` (unchanged). +- The internal attribute `self._base_uri: str` is renamed to `self._root_uri: str`. +- The existing `self.base_path: Path` (local-only, the absolute root as a `Path` + object) is renamed to `self._local_root: Path`. This frees the name `base_path` + for the new `tuple[str, ...]` property. +- The new `base_path` property returns `self._path_prefix: tuple[str, ...]`. + +**Depth validation:** + +`_validate_record_path` is updated to check the combined depth: + +```python +if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: + raise ValueError(...) +``` + +**`_get_table_uri` update (prefix applied here only):** + +```python +def _get_table_uri(self, record_path: tuple[str, ...], create_dir: bool = False) -> str: + full_path = self._path_prefix + record_path # prefix applied once, here + if self._is_cloud: + return self._root_uri.rstrip("/") + "/" + "/".join(full_path) + else: + path = self._local_root + for component in full_path: + path = path / self._sanitize_path_component(component) + if create_dir: + path.mkdir(parents=True, exist_ok=True) + return str(path) +``` + +`_get_record_key` is **not changed** — it continues to return `"/".join(record_path)` +(without prefix). This keeps `flush()` correct: it reconstructs the original +`record_path` from the pending key and calls `_get_table_uri(record_path)`, which +applies the prefix exactly once. + +**`list_sources()` on scoped instances:** + +For a scoped instance, `list_sources()` scans from `self._local_root` joined with +the prefix components (i.e. the effective scoped root directory) and returns paths +relative to that scoped root — consistent with `base_path` semantics. Cloud paths +continue to raise `NotImplementedError`. + +**`at()` implementation:** + +Returns a new independent `DeltaTableDatabase` instance (fresh pending state, fresh +caches). The underlying filesystem is the shared storage — no dict sharing needed. +A write through `db.at("x")` that is flushed will be visible to a second `db.at("x")` +only after it is read back from disk (no in-process cache sharing). + +```python +def at(self, *path_components: str) -> "DeltaTableDatabase": + return DeltaTableDatabase( + base_path=self._root_uri, # constructor param name unchanged + storage_options=self._storage_options, + batch_size=self.batch_size, + max_hierarchy_depth=self.max_hierarchy_depth, + allow_schema_evolution=self.allow_schema_evolution, + _path_prefix=self._path_prefix + path_components, + ) +``` + +**Config serialization:** + +The existing `to_config` key `"base_path"` (which stored the filesystem URI root) +is renamed to `"root_uri"` to free `"base_path"` for the tuple. The new config +shape: + +```python +{ + "type": "delta_table", + "root_uri": self._root_uri, # renamed from "base_path" + "base_path": list(self._path_prefix), # new; () serializes as [] + "batch_size": ..., + "max_hierarchy_depth": ..., + "allow_schema_evolution": ..., +} +``` + +`from_config` reads both keys: +```python +base_path=config["root_uri"], +_path_prefix=tuple(config.get("base_path", [])), +``` + +**Existing test update required:** `test_database_config.py::test_to_config_includes_base_path` +currently asserts `config["base_path"] == str(tmp_path / "delta_db")`. After the +rename this key becomes `"root_uri"`. The test must be updated to assert +`config["root_uri"] == str(tmp_path / "delta_db")` and also assert +`config["base_path"] == []`. + +--- + +### `InMemoryArrowDatabase` + +**Internal changes:** + +- Add `_path_prefix: tuple[str, ...]` (default `()`). +- Add `base_path` property returning `_path_prefix`. +- Update `_get_record_key` to prepend the prefix: + + ```python + def _get_record_key(self, record_path: tuple[str, ...]) -> str: + return "/".join(self._path_prefix + record_path) + ``` + +- Update `_validate_record_path` to check combined depth: + + ```python + if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: + raise ValueError(...) + ``` + +**`at()` implementation:** + +Returns a new `InMemoryArrowDatabase` that **shares** the underlying storage dicts +(`_tables`, `_pending_batches`, `_pending_record_ids`) by reference. The prefix +ensures all keys are namespaced; there is no collision. Calling `flush()` on any +view drains the **entire** shared pending queue (all prefixes), not just the +caller's. This is intentional and is documented in the `at()` docstring. + +```python +def at(self, *path_components: str) -> "InMemoryArrowDatabase": + return InMemoryArrowDatabase( + max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_tables=self._tables, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + ) +``` + +The `_path_prefix` and `_shared_*` parameters are internal (leading `_`). +A root instance is always created with no positional arguments: +`InMemoryArrowDatabase()`. + +**Config serialization:** + +```python +{ + "type": "in_memory", + "base_path": list(self._path_prefix), # new + "max_hierarchy_depth": ..., +} +``` + +`from_config` restores the prefix as a tuple: +```python +_path_prefix=tuple(config.get("base_path", [])), +``` + +Note: `from_config` reconstructs a root instance with empty `_tables` (in-memory +data does not survive serialization). The `base_path` is preserved so the scoped +position is restored even though data is not. + +--- + +### `ConnectorArrowDatabase` + +**Internal changes:** + +- Add `_path_prefix: tuple[str, ...]` (default `()`). +- Add `base_path` property returning `_path_prefix`. +- Update `_get_record_key` to prepend the prefix: + + ```python + def _get_record_key(self, record_path: tuple[str, ...]) -> str: + return "/".join(self._path_prefix + record_path) + ``` + +- Update `_validate_record_path` to check combined depth. +- Add `_shared_pending_skip_existing` as a shared-dict parameter in `__init__` + (alongside `_shared_pending_batches` and `_shared_pending_record_ids`). + +**`flush()` correctness:** + +`flush()` reconstructs `record_path = tuple(record_key.split("/"))`. Because +`_get_record_key` now includes the prefix, the reconstructed tuple is +`_path_prefix + original_record_path`. `_path_to_table_name` translates the +full tuple in one step (e.g. `("pipeline", "node1")` → `"pipeline__node1"`). +There is no second prefix application, so `flush()` remains correct with no +changes needed. + +**`at()` implementation:** + +Returns a new `ConnectorArrowDatabase` sharing the same `_connector` (SQL +database is the shared storage) and all three pending dicts: + +```python +def at(self, *path_components: str) -> "ConnectorArrowDatabase": + return ConnectorArrowDatabase( + connector=self._connector, + max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + _shared_pending_skip_existing=self._pending_skip_existing, + ) +``` + +**Config serialization:** + +```python +{ + "type": "connector_arrow_database", + "connector": ..., + "base_path": list(self._path_prefix), # new + "max_hierarchy_depth": ..., +} +``` + +`from_config` currently raises `NotImplementedError` (PLT-1074/1075/1076); the +`"base_path"` key is added to the config shape for forward-compatibility but +`from_config` itself is not implemented in this issue. + +--- + +### `NoOpArrowDatabase` + +**Internal changes:** + +- Add `_path_prefix: tuple[str, ...]` (default `()`). +- Add `base_path` property returning `_path_prefix`. +- All read/write behaviour unchanged (discards writes, returns `None` for reads). + +**`at()` implementation:** + +```python +def at(self, *path_components: str) -> "NoOpArrowDatabase": + return NoOpArrowDatabase(_path_prefix=self._path_prefix + path_components) +``` + +**Config serialization:** + +```python +def to_config(self) -> dict[str, Any]: + return { + "type": "noop", + "base_path": list(self._path_prefix), + } + +@classmethod +def from_config(cls, config: dict[str, Any]) -> "NoOpArrowDatabase": + return cls(_path_prefix=tuple(config.get("base_path", []))) +``` + +--- + +## Data Flow Example + +```python +db = DeltaTableDatabase("/experiments") +# db.base_path == () + +run_db = db.at("run_2026") +# run_db.base_path == ("run_2026",) +# run_db._root_uri == "/experiments" (unchanged) + +node_db = run_db.at("transform_fn", "v1") +# node_db.base_path == ("run_2026", "transform_fn", "v1") +# node_db == db.at("run_2026", "transform_fn", "v1") (equivalent) + +node_db.add_record(("outputs",), record_id="abc", record=table) +# resolves to: /experiments/run_2026/transform_fn/v1/outputs/ +``` + +--- + +## Config Round-Trip + +```python +node_db = DeltaTableDatabase("/experiments").at("run_2026", "transform_fn", "v1") +config = node_db.to_config() +# { +# "type": "delta_table", +# "root_uri": "/experiments", +# "base_path": ["run_2026", "transform_fn", "v1"], +# "batch_size": 1000, +# "max_hierarchy_depth": 10, +# "allow_schema_evolution": True, +# } +reconstructed = DeltaTableDatabase.from_config(config) +assert reconstructed.base_path == ("run_2026", "transform_fn", "v1") +assert isinstance(reconstructed.base_path, tuple) # not list +``` + +--- + +## Testing + +Each backend's existing test file gains a `TestAtMethod` class. The `DeltaTableDatabase` +tests use `tmp_path` fixtures; the others need no fixtures. + +**Test cases per backend:** + +1. `base_path` is `()` on a fresh root instance. +2. `at("a", "b")` produces `base_path == ("a", "b")` and `isinstance(base_path, tuple)`. +3. `at("a").at("b")` is equivalent to `at("a", "b")`. +4. Writes through a scoped view are readable from the same scoped view. +5. Writes through a scoped view are **not** visible via the parent at the same + bare `record_path` (different namespace). +6. **(InMemory and Connector only)** Writes through `db.at("x")` are visible from + a second `db.at("x")` view created from the same root instance (shared storage). + `DeltaTableDatabase` explicitly does NOT require this — scoped views are independent + instances with no shared in-process caches. +7. `to_config` / `from_config` round-trips `base_path` correctly, including: + - Correct value + - `isinstance(restored.base_path, tuple)` (not list) +8. `_validate_record_path` raises `ValueError` when `len(base_path) + len(record_path)` + exceeds `max_hierarchy_depth`. + +**Existing test update:** + +`test_database_config.py::test_to_config_includes_base_path` must be updated: +- Assert `config["root_uri"] == str(tmp_path / "delta_db")` (renamed key) +- Assert `config["base_path"] == []` (new key, empty tuple for root instance) + +--- + +## Scope + +**In scope (ENG-341):** + +- `ArrowDatabaseProtocol`: add `base_path` property and `at()` method. +- `DeltaTableDatabase`: implement `at()` and `base_path`; rename internal attrs; + update `_get_table_uri`, `_validate_record_path`, `list_sources()`, config. +- `InMemoryArrowDatabase`: implement `at()` and `base_path` with shared storage; + update `_get_record_key`, `_validate_record_path`, config. +- `ConnectorArrowDatabase`: implement `at()` and `base_path` with shared storage; + update `_get_record_key`, `_validate_record_path`, config. +- `NoOpArrowDatabase`: implement `at()` and `base_path`; update config. +- Unit tests for all backends including existing config test update. + +**Out of scope:** + +- Integrating `at()` into node/pipeline wiring (ENG-340). +- Cloud-path `list_sources()` (pre-existing limitation; scoped local `list_sources()` + is in scope). +- `from_config` for `ConnectorArrowDatabase` (PLT-1074/1075/1076). diff --git a/tests/test_databases/test_connector_arrow_database.py b/tests/test_databases/test_connector_arrow_database.py index 4b8bd69a..75e18acb 100644 --- a/tests/test_databases/test_connector_arrow_database.py +++ b/tests/test_databases/test_connector_arrow_database.py @@ -17,7 +17,7 @@ 10. Hierarchical record_path + _path_to_table_name 11. Flush behaviour (pending cleared, connector receives data) 12. Config (to_config shape, from_config raises NotImplementedError) -13. Context-manager lifecycle (connector.close is called) +13. at() method and base_path attribute """ from __future__ import annotations @@ -713,3 +713,73 @@ def test_from_config_raises_not_implemented(self, db): config = db.to_config() with pytest.raises(NotImplementedError): ConnectorArrowDatabase.from_config(config) + + +# =========================================================================== +# 13. at() and base_path +# =========================================================================== + + +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("a", "b") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = pa.table({"value": pa.array([42])}) + scoped.add_record(("outputs",), "id1", record, flush=True) + result = scoped.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", pa.table({"v": pa.array([1])}), flush=True) + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_two_scoped_views_share_storage(self, db): + view_a = db.at("pipeline", "node1") + view_b = db.at("pipeline", "node1") + view_a.add_record(("outputs",), "id1", pa.table({"v": pa.array([99])}), flush=True) + result = view_b.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("v").to_pylist() == [99] + + def test_prefix_appears_in_sql_table_name(self, db): + """Prefix components are included in the SQL table name via _path_to_table_name.""" + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", pa.table({"v": pa.array([1])}), flush=True) + # Table name should be pipeline__node1__outputs + table_names = db._connector.get_table_names() + assert "pipeline__node1__outputs" in table_names + + def test_validate_record_path_checks_combined_depth(self, db): + scoped = db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") # 9 prefix components + scoped.add_record(("z",), "id1", pa.table({"v": pa.array([1])})) + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", pa.table({"v": pa.array([2])})) + + def test_at_rejects_slash_in_component(self, db): + with pytest.raises(ValueError, match="invalid character"): + db.at("pipe/line") + + def test_at_rejects_null_in_component(self, db): + with pytest.raises(ValueError, match="invalid character"): + db.at("pipe\x00line") + + def test_at_rejects_empty_component(self, db): + with pytest.raises(ValueError): + db.at("") diff --git a/tests/test_databases/test_database_config.py b/tests/test_databases/test_database_config.py index 5b2fca07..3b76f534 100644 --- a/tests/test_databases/test_database_config.py +++ b/tests/test_databases/test_database_config.py @@ -11,10 +11,30 @@ def test_to_config_includes_type(self, tmp_path): config = db.to_config() assert config["type"] == "delta_table" - def test_to_config_includes_base_path(self, tmp_path): + def test_to_config_includes_root_uri(self, tmp_path): db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) config = db.to_config() - assert config["base_path"] == str(tmp_path / "delta_db") + assert config["root_uri"] == str(tmp_path / "delta_db") + + def test_to_config_includes_base_path_tuple(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + config = db.to_config() + assert config["base_path"] == [] + + def test_to_config_scoped_includes_base_path_tuple(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + assert config["root_uri"] == str(tmp_path / "delta_db") + assert config["base_path"] == ["pipeline", "v1"] + + def test_round_trip_preserves_base_path(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = DeltaTableDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) def test_to_config_includes_all_settings(self, tmp_path): db = DeltaTableDatabase( @@ -38,6 +58,19 @@ def test_round_trip(self, tmp_path): restored = DeltaTableDatabase.from_config(config) assert restored.to_config() == config + def test_from_config_accepts_legacy_base_path(self, tmp_path): + """from_config accepts pre-ENG-341 configs where 'base_path' was the root URI string.""" + legacy_config = { + "type": "delta_table", + "base_path": str(tmp_path / "delta_db"), + "batch_size": 1000, + "max_hierarchy_depth": 10, + "allow_schema_evolution": True, + } + db = DeltaTableDatabase.from_config(legacy_config) + assert db.base_path == () + assert db.to_config()["root_uri"] == str(tmp_path / "delta_db") + class TestInMemoryDatabaseConfig: def test_to_config_includes_type(self): @@ -51,6 +84,29 @@ def test_round_trip(self): restored = InMemoryArrowDatabase.from_config(config) assert restored.to_config() == config + def test_to_config_includes_base_path(self): + db = InMemoryArrowDatabase() + assert db.to_config()["base_path"] == [] + + def test_round_trip_preserves_base_path(self): + db = InMemoryArrowDatabase() + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = InMemoryArrowDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) + + +def test_connector_to_config_includes_base_path(): + from unittest.mock import MagicMock + from orcapod.databases import ConnectorArrowDatabase + mock_connector = MagicMock() + mock_connector.to_config.return_value = {"type": "mock"} + db = ConnectorArrowDatabase(connector=mock_connector) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + assert config["base_path"] == ["pipeline", "v1"] + class TestNoOpDatabaseConfig: def test_to_config_includes_type(self): @@ -63,3 +119,15 @@ def test_round_trip(self): config = db.to_config() restored = NoOpArrowDatabase.from_config(config) assert restored.to_config() == config + + def test_to_config_includes_base_path(self): + db = NoOpArrowDatabase() + assert db.to_config()["base_path"] == [] + + def test_round_trip_preserves_base_path(self): + db = NoOpArrowDatabase() + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = NoOpArrowDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) diff --git a/tests/test_databases/test_delta_table_database.py b/tests/test_databases/test_delta_table_database.py index 4407dc79..c74a6ed2 100644 --- a/tests/test_databases/test_delta_table_database.py +++ b/tests/test_databases/test_delta_table_database.py @@ -402,3 +402,84 @@ def test_schema_evolution(tmp_path): result = db.get_all_records(("src",)) assert result is not None assert "b" in result.column_names + + +# --------------------------------------------------------------------------- +# 12. at() method and base_path scoping +# --------------------------------------------------------------------------- + + +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("pipeline", "node1") + assert scoped.base_path == ("pipeline", "node1") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("pipeline", "node1") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = make_table(value=[42]) + scoped.add_record(("outputs",), "id1", record, flush=True) + result = scoped.get_record_by_id(("outputs",), "id1", flush=True) + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[42]), flush=True) + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_scoped_write_stored_under_correct_filesystem_path(self, db, tmp_path): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[1]), flush=True) + # Delta table should be at /db/pipeline/node1/outputs/ + expected_path = tmp_path / "db" / "pipeline" / "node1" / "outputs" + assert expected_path.exists() + + def test_validate_record_path_checks_combined_depth(self, tmp_path): + db = DeltaTableDatabase(base_path=tmp_path / "db", max_hierarchy_depth=10) + scoped = db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") # 9 prefix + # 9 + 1 = 10: OK + scoped.add_record(("z",), "id1", make_table(value=[1])) + # 9 + 2 = 11: should raise + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", make_table(value=[2])) + + def test_list_sources_on_scoped_instance(self, tmp_path): + db = DeltaTableDatabase(base_path=tmp_path / "db") + scoped = db.at("pipeline") + scoped.add_record(("node1",), "id1", make_table(value=[1]), flush=True) + scoped.add_record(("node2",), "id2", make_table(value=[2]), flush=True) + sources = scoped.list_sources() + assert ("node1",) in sources + assert ("node2",) in sources + # Root db should NOT see these under bare ("node1",) — they live under pipeline/ + root_sources = db.list_sources() + assert ("node1",) not in root_sources + assert ("pipeline", "node1") in root_sources + + def test_at_rejects_slash_in_component(self, db): + with pytest.raises(ValueError, match="invalid character"): + db.at("pipe/line") + + def test_at_rejects_dotdot_component(self, db): + with pytest.raises(ValueError, match="not allowed"): + db.at("..") + + def test_at_rejects_empty_component(self, db): + with pytest.raises(ValueError): + db.at("") + + def test_at_rejects_non_str_component(self, db): + with pytest.raises(TypeError): + db.at(42) # type: ignore[arg-type] diff --git a/tests/test_databases/test_in_memory_database.py b/tests/test_databases/test_in_memory_database.py index c37a157e..29e4ec80 100644 --- a/tests/test_databases/test_in_memory_database.py +++ b/tests/test_databases/test_in_memory_database.py @@ -331,3 +331,71 @@ def test_multiple_flushes_accumulate_records(self, db): result = db.get_all_records(self.PATH) assert result is not None assert result.num_rows == 2 + + +# --------------------------------------------------------------------------- +# 10. at() and base_path +# --------------------------------------------------------------------------- + + +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("a", "b") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = make_table(value=[42]) + scoped.add_record(("outputs",), "id1", record) + result = scoped.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[42])) + # Parent sees ("outputs",) as a different key from ("pipeline","node1","outputs") + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_two_scoped_views_share_storage(self, db): + view_a = db.at("pipeline", "node1") + view_b = db.at("pipeline", "node1") + view_a.add_record(("outputs",), "id1", make_table(value=[99])) + view_a.flush() + result = view_b.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [99] + + def test_validate_record_path_checks_combined_depth(self, db): + # max_hierarchy_depth=10 (default). Fill prefix with 9 components. + deep_db = InMemoryArrowDatabase(max_hierarchy_depth=10) + scoped = deep_db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") + # 9 prefix + 1 record_path = 10: OK + scoped.add_record(("z",), "id1", make_table(value=[1])) + # 9 prefix + 2 record_path = 11: should raise + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", make_table(value=[2])) + + def test_at_rejects_slash_in_component(self, db): + with pytest.raises(ValueError, match="invalid character"): + db.at("pipe/line") + + def test_at_rejects_null_in_component(self, db): + with pytest.raises(ValueError, match="invalid character"): + db.at("pipe\x00line") + + def test_at_rejects_empty_component(self, db): + with pytest.raises(ValueError): + db.at("") diff --git a/tests/test_databases/test_noop_database.py b/tests/test_databases/test_noop_database.py index 7db9b2bd..c4e0eff7 100644 --- a/tests/test_databases/test_noop_database.py +++ b/tests/test_databases/test_noop_database.py @@ -167,3 +167,37 @@ def test_different_paths_all_return_none(self, db): def test_reads_on_unwritten_path_return_none(self, db): db.add_records(("written",), make_table(x=[1])) assert db.get_all_records(("never_written",)) is None + + +# --------------------------------------------------------------------------- +# 5. at() method and base_path +# --------------------------------------------------------------------------- + + +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self): + db = NoOpArrowDatabase() + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self): + db = NoOpArrowDatabase() + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self): + db = NoOpArrowDatabase() + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self): + db = NoOpArrowDatabase() + db.at("a", "b") + assert db.base_path == () + + def test_scoped_reads_still_return_none(self): + db = NoOpArrowDatabase() + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", pa.table({"v": [1]})) + assert scoped.get_record_by_id(("outputs",), "id1") is None + assert scoped.get_all_records(("outputs",)) is None