From 56f9b5098e9a02feacab559798dbd0ac8efccda2 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 02:50:17 +0000 Subject: [PATCH 01/12] docs(spec): add ENG-341 database path contextualization design --- ...-database-path-contextualization-design.md | 330 ++++++++++++++++++ 1 file changed, 330 insertions(+) create mode 100644 superpowers/specs/2026-04-01-database-path-contextualization-design.md diff --git a/superpowers/specs/2026-04-01-database-path-contextualization-design.md b/superpowers/specs/2026-04-01-database-path-contextualization-design.md new file mode 100644 index 00000000..127c717d --- /dev/null +++ b/superpowers/specs/2026-04-01-database-path-contextualization-design.md @@ -0,0 +1,330 @@ +# Database Path Contextualization — Design Spec + +**Linear issue:** ENG-341 +**Date:** 2026-04-01 +**Status:** Approved + +--- + +## Overview + +Databases need a general mechanism to produce a new database instance scoped to a +sub-path, without modifying the original. The primitive is `at(*path_components)`, +which returns a new database view whose root is the caller's root extended by the +given path components. + +This is the foundational primitive needed by ENG-340 (ScopedDatabase / +`pipeline_path` decoupling) and is useful anywhere a component needs its own +storage namespace derived from a parent database. + +--- + +## Core Concepts + +### Two-level path model + +Every database instance carries two path concepts: + +| Concept | Type | Meaning | +|---------|------|---------| +| **Absolute root** | backend-specific | The storage location configured at construction (filesystem URI, SQL connector, none). Fixed forever. Implementation detail. | +| **`base_path`** | `tuple[str, ...]` | The relative "cwd" within the absolute root. Starts as `()`. Extended by `at()`. Public, queryable. | + +When a record is written at `record_path`, its full resolved location is: + +``` +absolute_root / base_path / record_path +``` + +Each backend translates this concatenated tuple into its own format: + +| Backend | Translation | +|---------|-------------| +| `DeltaTableDatabase` | `root_uri / "p0" / "p1" / ... / "r0" / "r1" / ...` | +| `InMemoryArrowDatabase` | `"p0/p1/.../r0/r1/..."` (dict key) | +| `ConnectorArrowDatabase` | `"p0__p1__...__r0__r1__..."` (SQL table name) | +| `NoOpArrowDatabase` | discarded (prefix tracked for introspection) | + +### Consistency with existing pipeline path pattern + +The existing `pipeline_path_prefix: tuple[str, ...]` pattern in `FunctionNode` and +`CachedFunctionPod` already uses this model: a single shared database receives +writes at namespaced keys derived by prepending a prefix tuple to `record_path`. +`at()` formalizes this pattern — instead of threading a prefix tuple through every +call site, callers create a scoped view once. + +--- + +## Interface Changes + +### `ArrowDatabaseProtocol` + +Two new members added to the protocol: + +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view. + + Always () for a root (non-scoped) instance. Extended by at(). + The absolute storage root (filesystem URI, SQL connector, etc.) + is a separate, backend-specific implementation detail. + """ + ... + +def at(self, *path_components: str) -> "ArrowDatabaseProtocol": + """Return a new database scoped to the given sub-path. + + All reads and writes on the returned database are relative to + this database's base_path extended by path_components. The + original is unmodified. + + Args: + *path_components: One or more path components to append. + + Returns: + A new database instance with base_path = self.base_path + path_components. + """ + ... +``` + +Concrete implementations return their own type (e.g. `DeltaTableDatabase`) for +type narrowness, even though the protocol signature uses `ArrowDatabaseProtocol`. + +--- + +## Per-Backend Implementation + +### `DeltaTableDatabase` + +**Internal changes:** + +- Rename `_base_uri` → `_root_uri` (the absolute filesystem/cloud root; private). +- Rename existing `self.base_path: Path` (local-only, absolute root as `Path`) → + `self._local_root: Path` (private, used only by `list_sources()`). +- Add `_path_prefix: tuple[str, ...]= ()`. +- Add `base_path` property returning `_path_prefix`. +- Update `_get_table_uri(record_path)` to insert `_path_prefix` between the root + and `record_path`: + + ```python + def _get_table_uri(self, record_path: tuple[str, ...], create_dir: bool = False) -> str: + full_path = self._path_prefix + record_path + if self._is_cloud: + return self._root_uri.rstrip("/") + "/" + "/".join(full_path) + else: + path = self._local_root + for component in full_path: + path = path / self._sanitize_path_component(component) + if create_dir: + path.mkdir(parents=True, exist_ok=True) + return str(path) + ``` + +**`at()` implementation:** + +Returns a new independent `DeltaTableDatabase` instance (fresh pending state). +The underlying filesystem is the shared storage — no dict sharing needed. + +```python +def at(self, *path_components: str) -> "DeltaTableDatabase": + return DeltaTableDatabase( + base_path=self._root_uri, + storage_options=self._storage_options, + batch_size=self.batch_size, + max_hierarchy_depth=self.max_hierarchy_depth, + allow_schema_evolution=self.allow_schema_evolution, + _path_prefix=self._path_prefix + path_components, + ) +``` + +**Config serialization:** + +`to_config` adds `"base_path": list(self._path_prefix)` (serialized as a list for +JSON compatibility). `from_config` reads it back as a tuple. + +--- + +### `InMemoryArrowDatabase` + +**Internal changes:** + +- Add `_path_prefix: tuple[str, ...]= ()`. +- Add `base_path` property returning `_path_prefix`. +- Update `_get_record_key` to prepend the prefix: + + ```python + def _get_record_key(self, record_path: tuple[str, ...]) -> str: + return "/".join(self._path_prefix + record_path) + ``` + +**`at()` implementation:** + +Returns a new `InMemoryArrowDatabase` that **shares** the underlying storage dicts +(`_tables`, `_pending_batches`, `_pending_record_ids`) by reference. The prefix +ensures all keys are namespaced; there is no collision. Calling `flush()` on any +view drains the entire shared pending queue. + +```python +def at(self, *path_components: str) -> "InMemoryArrowDatabase": + return InMemoryArrowDatabase( + max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_tables=self._tables, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + ) +``` + +The shared-dict parameters are internal (prefixed with `_`). A root instance is +always created with no arguments: `InMemoryArrowDatabase()`. + +**Config serialization:** + +`to_config` adds `"base_path": list(self._path_prefix)`. Note that `from_config` +always reconstructs a root instance (empty `_tables`) since in-memory data does +not survive serialization; the `base_path` is preserved so the scoped position is +restored even though data is not. + +--- + +### `ConnectorArrowDatabase` + +**Internal changes:** + +- Add `_path_prefix: tuple[str, ...]= ()`. +- Add `base_path` property returning `_path_prefix`. +- Update `_get_record_key` to prepend the prefix: + + ```python + def _get_record_key(self, record_path: tuple[str, ...]) -> str: + return "/".join(self._path_prefix + record_path) + ``` + + Because `flush()` reconstructs `record_path = tuple(record_key.split("/"))` and + then calls `_path_to_table_name`, the prefix components are naturally included in + the SQL table name (e.g. prefix `("pipeline",)` + path `("node1",)` → + `"pipeline__node1"`). No changes to `flush()` are needed. + +**`at()` implementation:** + +Returns a new `ConnectorArrowDatabase` sharing the same `_connector` (SQL database +is the shared storage) and the pending dicts, with an extended `_path_prefix`: + +```python +def at(self, *path_components: str) -> "ConnectorArrowDatabase": + return ConnectorArrowDatabase( + connector=self._connector, + max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + _shared_pending_skip_existing=self._pending_skip_existing, + ) +``` + +**Config serialization:** + +`to_config` adds `"base_path": list(self._path_prefix)`. `from_config` currently +raises `NotImplementedError` (PLT-1074/1075/1076); no change to that. + +--- + +### `NoOpArrowDatabase` + +**Internal changes:** + +- Add `_path_prefix: tuple[str, ...]= ()`. +- Add `base_path` property returning `_path_prefix`. +- All read/write behaviour unchanged (discards writes, returns `None` for reads). + +**`at()` implementation:** + +Returns a new `NoOpArrowDatabase` with the extended prefix. Prefix is tracked for +introspection but has no effect on storage (there is none). + +```python +def at(self, *path_components: str) -> "NoOpArrowDatabase": + return NoOpArrowDatabase(_path_prefix=self._path_prefix + path_components) +``` + +**Config serialization:** + +`to_config` adds `"base_path": list(self._path_prefix)`. `from_config` restores +the prefix. + +--- + +## Data Flow Example + +```python +db = DeltaTableDatabase("/experiments") +# db.base_path == () + +run_db = db.at("run_2026") +# run_db.base_path == ("run_2026",) +# run_db._root_uri == "/experiments" (unchanged) + +node_db = run_db.at("transform_fn", "v1") +# node_db.base_path == ("run_2026", "transform_fn", "v1") + +node_db.add_record(("outputs",), record_id="abc", record=table) +# resolves to: /experiments/run_2026/transform_fn/v1/outputs/ +``` + +--- + +## Config Round-Trip + +```python +node_db = DeltaTableDatabase("/experiments").at("run_2026", "transform_fn", "v1") +config = node_db.to_config() +# { +# "type": "delta_table", +# "base_path": ["run_2026", "transform_fn", "v1"], +# "root_uri": "/experiments", +# ... +# } +reconstructed = DeltaTableDatabase.from_config(config) +assert reconstructed.base_path == ("run_2026", "transform_fn", "v1") +``` + +--- + +## Testing + +Each backend's existing test file gains a `TestAtMethod` class covering: + +1. `base_path` is `()` on a fresh root instance. +2. `at("a", "b")` produces `base_path == ("a", "b")`. +3. `at("a").at("b")` is equivalent to `at("a", "b")`. +4. Writes through a scoped view are readable from the same view. +5. Writes through a scoped view are **not** visible via the parent at the same + `record_path` (they live under the prefix namespace). +6. Writes through a scoped view **are** visible from a second view created with + `at()` using the same path (shared storage test — applies to InMemory and + Connector backends). +7. `to_config` / `from_config` round-trips `base_path`. + +For `DeltaTableDatabase`, tests use `tmp_path` fixtures (pytest). +For `InMemoryArrowDatabase` and `ConnectorArrowDatabase`, no fixtures needed. + +--- + +## Scope + +**In scope (ENG-341):** + +- `ArrowDatabaseProtocol`: add `base_path` property and `at()` method. +- `DeltaTableDatabase`: implement `at()` and `base_path`; rename internal attrs. +- `InMemoryArrowDatabase`: implement `at()` and `base_path` with shared storage. +- `ConnectorArrowDatabase`: implement `at()` and `base_path` with shared storage. +- `NoOpArrowDatabase`: implement `at()` and `base_path`. +- `to_config` / `from_config` updated for all backends. +- Unit tests for all backends. + +**Out of scope:** + +- Integrating `at()` into node/pipeline wiring (ENG-340). +- Cloud-path `list_sources()` (pre-existing limitation). +- `from_config` for `ConnectorArrowDatabase` (PLT-1074/1075/1076). From 1137c2b5a2b604c046a40d3d459db71c99a6a5a3 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 02:54:43 +0000 Subject: [PATCH 02/12] docs(spec): address reviewer issues in ENG-341 design spec --- ...-database-path-contextualization-design.md | 290 +++++++++++++----- 1 file changed, 216 insertions(+), 74 deletions(-) diff --git a/superpowers/specs/2026-04-01-database-path-contextualization-design.md b/superpowers/specs/2026-04-01-database-path-contextualization-design.md index 127c717d..1c894ec5 100644 --- a/superpowers/specs/2026-04-01-database-path-contextualization-design.md +++ b/superpowers/specs/2026-04-01-database-path-contextualization-design.md @@ -27,7 +27,7 @@ Every database instance carries two path concepts: | Concept | Type | Meaning | |---------|------|---------| -| **Absolute root** | backend-specific | The storage location configured at construction (filesystem URI, SQL connector, none). Fixed forever. Implementation detail. | +| **Absolute root** | backend-specific | The storage location configured at construction (filesystem URI, SQL connector, none). Fixed forever. Implementation detail of each backend. | | **`base_path`** | `tuple[str, ...]` | The relative "cwd" within the absolute root. Starts as `()`. Extended by `at()`. Public, queryable. | When a record is written at `record_path`, its full resolved location is: @@ -40,10 +40,27 @@ Each backend translates this concatenated tuple into its own format: | Backend | Translation | |---------|-------------| -| `DeltaTableDatabase` | `root_uri / "p0" / "p1" / ... / "r0" / "r1" / ...` | +| `DeltaTableDatabase` | `root_uri / "p0" / "p1" / ... / "r0" / "r1" / ...` (filesystem path) | | `InMemoryArrowDatabase` | `"p0/p1/.../r0/r1/..."` (dict key) | | `ConnectorArrowDatabase` | `"p0__p1__...__r0__r1__..."` (SQL table name) | -| `NoOpArrowDatabase` | discarded (prefix tracked for introspection) | +| `NoOpArrowDatabase` | discarded (prefix tracked for introspection only) | + +### Prefix application: where it happens + +Each backend applies the prefix at a different point in its stack: + +- **`DeltaTableDatabase`**: prefix is applied only inside `_get_table_uri()`. + `_get_record_key()` is NOT changed — it continues to return `"/".join(record_path)`. + This preserves the correctness of `flush()`, which reconstructs `record_path` from + the key and then calls `flush_batch(record_path)` → `_get_table_uri(record_path)`. + If the prefix were baked into `_get_record_key`, `_get_table_uri` would apply it a + second time. Keeping the prefix out of `_get_record_key` avoids the double-prefix. + +- **`InMemoryArrowDatabase`** and **`ConnectorArrowDatabase`**: prefix is applied inside + `_get_record_key()`. Both backends' `flush()` methods reconstruct `record_path` from + the key and then call a terminal translation function (`_tables` dict access for + in-memory; `_path_to_table_name` for connector). Neither re-applies any prefix in + that terminal step, so including the prefix in the key is safe and correct. ### Consistency with existing pipeline path pattern @@ -79,11 +96,21 @@ def at(self, *path_components: str) -> "ArrowDatabaseProtocol": this database's base_path extended by path_components. The original is unmodified. + Calling at() with no arguments returns a new view equivalent + to the current one (same base_path, fresh or shared state + depending on the backend). + + For backends with shared pending state (InMemoryArrowDatabase, + ConnectorArrowDatabase), calling flush() on any view drains + the entire shared pending queue — not just the caller's prefix. + This is intentional: all views share the same underlying store. + Args: - *path_components: One or more path components to append. + *path_components: Zero or more path components to append. Returns: - A new database instance with base_path = self.base_path + path_components. + A new database instance with + base_path == self.base_path + path_components. """ ... ``` @@ -91,45 +118,73 @@ def at(self, *path_components: str) -> "ArrowDatabaseProtocol": Concrete implementations return their own type (e.g. `DeltaTableDatabase`) for type narrowness, even though the protocol signature uses `ArrowDatabaseProtocol`. +The `_path_prefix` constructor parameter used internally by each backend is +prefixed with `_` to signal it is not part of the public API. Callers should +always use `at()` to create scoped views. + --- ## Per-Backend Implementation ### `DeltaTableDatabase` -**Internal changes:** +**Naming clarification:** -- Rename `_base_uri` → `_root_uri` (the absolute filesystem/cloud root; private). -- Rename existing `self.base_path: Path` (local-only, absolute root as `Path`) → - `self._local_root: Path` (private, used only by `list_sources()`). -- Add `_path_prefix: tuple[str, ...]= ()`. -- Add `base_path` property returning `_path_prefix`. -- Update `_get_table_uri(record_path)` to insert `_path_prefix` between the root - and `record_path`: +- The constructor parameter remains `base_path: str | Path | UPath` (unchanged). +- The internal attribute `self._base_uri: str` is renamed to `self._root_uri: str`. +- The existing `self.base_path: Path` (local-only, the absolute root as a `Path` + object) is renamed to `self._local_root: Path`. This frees the name `base_path` + for the new `tuple[str, ...]` property. +- The new `base_path` property returns `self._path_prefix: tuple[str, ...]`. - ```python - def _get_table_uri(self, record_path: tuple[str, ...], create_dir: bool = False) -> str: - full_path = self._path_prefix + record_path - if self._is_cloud: - return self._root_uri.rstrip("/") + "/" + "/".join(full_path) - else: - path = self._local_root - for component in full_path: - path = path / self._sanitize_path_component(component) - if create_dir: - path.mkdir(parents=True, exist_ok=True) - return str(path) - ``` +**Depth validation:** + +`_validate_record_path` is updated to check the combined depth: + +```python +if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: + raise ValueError(...) +``` + +**`_get_table_uri` update (prefix applied here only):** + +```python +def _get_table_uri(self, record_path: tuple[str, ...], create_dir: bool = False) -> str: + full_path = self._path_prefix + record_path # prefix applied once, here + if self._is_cloud: + return self._root_uri.rstrip("/") + "/" + "/".join(full_path) + else: + path = self._local_root + for component in full_path: + path = path / self._sanitize_path_component(component) + if create_dir: + path.mkdir(parents=True, exist_ok=True) + return str(path) +``` + +`_get_record_key` is **not changed** — it continues to return `"/".join(record_path)` +(without prefix). This keeps `flush()` correct: it reconstructs the original +`record_path` from the pending key and calls `_get_table_uri(record_path)`, which +applies the prefix exactly once. + +**`list_sources()` on scoped instances:** + +For a scoped instance, `list_sources()` scans from `self._local_root` joined with +the prefix components (i.e. the effective scoped root directory) and returns paths +relative to that scoped root — consistent with `base_path` semantics. Cloud paths +continue to raise `NotImplementedError`. **`at()` implementation:** -Returns a new independent `DeltaTableDatabase` instance (fresh pending state). -The underlying filesystem is the shared storage — no dict sharing needed. +Returns a new independent `DeltaTableDatabase` instance (fresh pending state, fresh +caches). The underlying filesystem is the shared storage — no dict sharing needed. +A write through `db.at("x")` that is flushed will be visible to a second `db.at("x")` +only after it is read back from disk (no in-process cache sharing). ```python def at(self, *path_components: str) -> "DeltaTableDatabase": return DeltaTableDatabase( - base_path=self._root_uri, + base_path=self._root_uri, # constructor param name unchanged storage_options=self._storage_options, batch_size=self.batch_size, max_hierarchy_depth=self.max_hierarchy_depth, @@ -140,8 +195,32 @@ def at(self, *path_components: str) -> "DeltaTableDatabase": **Config serialization:** -`to_config` adds `"base_path": list(self._path_prefix)` (serialized as a list for -JSON compatibility). `from_config` reads it back as a tuple. +The existing `to_config` key `"base_path"` (which stored the filesystem URI root) +is renamed to `"root_uri"` to free `"base_path"` for the tuple. The new config +shape: + +```python +{ + "type": "delta_table", + "root_uri": self._root_uri, # renamed from "base_path" + "base_path": list(self._path_prefix), # new; () serializes as [] + "batch_size": ..., + "max_hierarchy_depth": ..., + "allow_schema_evolution": ..., +} +``` + +`from_config` reads both keys: +```python +base_path=config["root_uri"], +_path_prefix=tuple(config.get("base_path", [])), +``` + +**Existing test update required:** `test_database_config.py::test_to_config_includes_base_path` +currently asserts `config["base_path"] == str(tmp_path / "delta_db")`. After the +rename this key becomes `"root_uri"`. The test must be updated to assert +`config["root_uri"] == str(tmp_path / "delta_db")` and also assert +`config["base_path"] == []`. --- @@ -149,7 +228,7 @@ JSON compatibility). `from_config` reads it back as a tuple. **Internal changes:** -- Add `_path_prefix: tuple[str, ...]= ()`. +- Add `_path_prefix: tuple[str, ...]` (default `()`). - Add `base_path` property returning `_path_prefix`. - Update `_get_record_key` to prepend the prefix: @@ -158,12 +237,20 @@ JSON compatibility). `from_config` reads it back as a tuple. return "/".join(self._path_prefix + record_path) ``` +- Update `_validate_record_path` to check combined depth: + + ```python + if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: + raise ValueError(...) + ``` + **`at()` implementation:** Returns a new `InMemoryArrowDatabase` that **shares** the underlying storage dicts (`_tables`, `_pending_batches`, `_pending_record_ids`) by reference. The prefix ensures all keys are namespaced; there is no collision. Calling `flush()` on any -view drains the entire shared pending queue. +view drains the **entire** shared pending queue (all prefixes), not just the +caller's. This is intentional and is documented in the `at()` docstring. ```python def at(self, *path_components: str) -> "InMemoryArrowDatabase": @@ -176,15 +263,28 @@ def at(self, *path_components: str) -> "InMemoryArrowDatabase": ) ``` -The shared-dict parameters are internal (prefixed with `_`). A root instance is -always created with no arguments: `InMemoryArrowDatabase()`. +The `_path_prefix` and `_shared_*` parameters are internal (leading `_`). +A root instance is always created with no positional arguments: +`InMemoryArrowDatabase()`. **Config serialization:** -`to_config` adds `"base_path": list(self._path_prefix)`. Note that `from_config` -always reconstructs a root instance (empty `_tables`) since in-memory data does -not survive serialization; the `base_path` is preserved so the scoped position is -restored even though data is not. +```python +{ + "type": "in_memory", + "base_path": list(self._path_prefix), # new + "max_hierarchy_depth": ..., +} +``` + +`from_config` restores the prefix as a tuple: +```python +_path_prefix=tuple(config.get("base_path", [])), +``` + +Note: `from_config` reconstructs a root instance with empty `_tables` (in-memory +data does not survive serialization). The `base_path` is preserved so the scoped +position is restored even though data is not. --- @@ -192,7 +292,7 @@ restored even though data is not. **Internal changes:** -- Add `_path_prefix: tuple[str, ...]= ()`. +- Add `_path_prefix: tuple[str, ...]` (default `()`). - Add `base_path` property returning `_path_prefix`. - Update `_get_record_key` to prepend the prefix: @@ -201,15 +301,23 @@ restored even though data is not. return "/".join(self._path_prefix + record_path) ``` - Because `flush()` reconstructs `record_path = tuple(record_key.split("/"))` and - then calls `_path_to_table_name`, the prefix components are naturally included in - the SQL table name (e.g. prefix `("pipeline",)` + path `("node1",)` → - `"pipeline__node1"`). No changes to `flush()` are needed. +- Update `_validate_record_path` to check combined depth. +- Add `_shared_pending_skip_existing` as a shared-dict parameter in `__init__` + (alongside `_shared_pending_batches` and `_shared_pending_record_ids`). + +**`flush()` correctness:** + +`flush()` reconstructs `record_path = tuple(record_key.split("/"))`. Because +`_get_record_key` now includes the prefix, the reconstructed tuple is +`_path_prefix + original_record_path`. `_path_to_table_name` translates the +full tuple in one step (e.g. `("pipeline", "node1")` → `"pipeline__node1"`). +There is no second prefix application, so `flush()` remains correct with no +changes needed. **`at()` implementation:** -Returns a new `ConnectorArrowDatabase` sharing the same `_connector` (SQL database -is the shared storage) and the pending dicts, with an extended `_path_prefix`: +Returns a new `ConnectorArrowDatabase` sharing the same `_connector` (SQL +database is the shared storage) and all three pending dicts: ```python def at(self, *path_components: str) -> "ConnectorArrowDatabase": @@ -225,8 +333,18 @@ def at(self, *path_components: str) -> "ConnectorArrowDatabase": **Config serialization:** -`to_config` adds `"base_path": list(self._path_prefix)`. `from_config` currently -raises `NotImplementedError` (PLT-1074/1075/1076); no change to that. +```python +{ + "type": "connector_arrow_database", + "connector": ..., + "base_path": list(self._path_prefix), # new + "max_hierarchy_depth": ..., +} +``` + +`from_config` currently raises `NotImplementedError` (PLT-1074/1075/1076); the +`"base_path"` key is added to the config shape for forward-compatibility but +`from_config` itself is not implemented in this issue. --- @@ -234,15 +352,12 @@ raises `NotImplementedError` (PLT-1074/1075/1076); no change to that. **Internal changes:** -- Add `_path_prefix: tuple[str, ...]= ()`. +- Add `_path_prefix: tuple[str, ...]` (default `()`). - Add `base_path` property returning `_path_prefix`. - All read/write behaviour unchanged (discards writes, returns `None` for reads). **`at()` implementation:** -Returns a new `NoOpArrowDatabase` with the extended prefix. Prefix is tracked for -introspection but has no effect on storage (there is none). - ```python def at(self, *path_components: str) -> "NoOpArrowDatabase": return NoOpArrowDatabase(_path_prefix=self._path_prefix + path_components) @@ -250,8 +365,17 @@ def at(self, *path_components: str) -> "NoOpArrowDatabase": **Config serialization:** -`to_config` adds `"base_path": list(self._path_prefix)`. `from_config` restores -the prefix. +```python +def to_config(self) -> dict[str, Any]: + return { + "type": "noop", + "base_path": list(self._path_prefix), + } + +@classmethod +def from_config(cls, config: dict[str, Any]) -> "NoOpArrowDatabase": + return cls(_path_prefix=tuple(config.get("base_path", []))) +``` --- @@ -267,6 +391,7 @@ run_db = db.at("run_2026") node_db = run_db.at("transform_fn", "v1") # node_db.base_path == ("run_2026", "transform_fn", "v1") +# node_db == db.at("run_2026", "transform_fn", "v1") (equivalent) node_db.add_record(("outputs",), record_id="abc", record=table) # resolves to: /experiments/run_2026/transform_fn/v1/outputs/ @@ -281,33 +406,47 @@ node_db = DeltaTableDatabase("/experiments").at("run_2026", "transform_fn", "v1" config = node_db.to_config() # { # "type": "delta_table", -# "base_path": ["run_2026", "transform_fn", "v1"], # "root_uri": "/experiments", -# ... +# "base_path": ["run_2026", "transform_fn", "v1"], +# "batch_size": 1000, +# "max_hierarchy_depth": 10, +# "allow_schema_evolution": True, # } reconstructed = DeltaTableDatabase.from_config(config) assert reconstructed.base_path == ("run_2026", "transform_fn", "v1") +assert isinstance(reconstructed.base_path, tuple) # not list ``` --- ## Testing -Each backend's existing test file gains a `TestAtMethod` class covering: +Each backend's existing test file gains a `TestAtMethod` class. The `DeltaTableDatabase` +tests use `tmp_path` fixtures; the others need no fixtures. + +**Test cases per backend:** 1. `base_path` is `()` on a fresh root instance. -2. `at("a", "b")` produces `base_path == ("a", "b")`. +2. `at("a", "b")` produces `base_path == ("a", "b")` and `isinstance(base_path, tuple)`. 3. `at("a").at("b")` is equivalent to `at("a", "b")`. -4. Writes through a scoped view are readable from the same view. +4. Writes through a scoped view are readable from the same scoped view. 5. Writes through a scoped view are **not** visible via the parent at the same - `record_path` (they live under the prefix namespace). -6. Writes through a scoped view **are** visible from a second view created with - `at()` using the same path (shared storage test — applies to InMemory and - Connector backends). -7. `to_config` / `from_config` round-trips `base_path`. - -For `DeltaTableDatabase`, tests use `tmp_path` fixtures (pytest). -For `InMemoryArrowDatabase` and `ConnectorArrowDatabase`, no fixtures needed. + bare `record_path` (different namespace). +6. **(InMemory and Connector only)** Writes through `db.at("x")` are visible from + a second `db.at("x")` view created from the same root instance (shared storage). + `DeltaTableDatabase` explicitly does NOT require this — scoped views are independent + instances with no shared in-process caches. +7. `to_config` / `from_config` round-trips `base_path` correctly, including: + - Correct value + - `isinstance(restored.base_path, tuple)` (not list) +8. `_validate_record_path` raises `ValueError` when `len(base_path) + len(record_path)` + exceeds `max_hierarchy_depth`. + +**Existing test update:** + +`test_database_config.py::test_to_config_includes_base_path` must be updated: +- Assert `config["root_uri"] == str(tmp_path / "delta_db")` (renamed key) +- Assert `config["base_path"] == []` (new key, empty tuple for root instance) --- @@ -316,15 +455,18 @@ For `InMemoryArrowDatabase` and `ConnectorArrowDatabase`, no fixtures needed. **In scope (ENG-341):** - `ArrowDatabaseProtocol`: add `base_path` property and `at()` method. -- `DeltaTableDatabase`: implement `at()` and `base_path`; rename internal attrs. -- `InMemoryArrowDatabase`: implement `at()` and `base_path` with shared storage. -- `ConnectorArrowDatabase`: implement `at()` and `base_path` with shared storage. -- `NoOpArrowDatabase`: implement `at()` and `base_path`. -- `to_config` / `from_config` updated for all backends. -- Unit tests for all backends. +- `DeltaTableDatabase`: implement `at()` and `base_path`; rename internal attrs; + update `_get_table_uri`, `_validate_record_path`, `list_sources()`, config. +- `InMemoryArrowDatabase`: implement `at()` and `base_path` with shared storage; + update `_get_record_key`, `_validate_record_path`, config. +- `ConnectorArrowDatabase`: implement `at()` and `base_path` with shared storage; + update `_get_record_key`, `_validate_record_path`, config. +- `NoOpArrowDatabase`: implement `at()` and `base_path`; update config. +- Unit tests for all backends including existing config test update. **Out of scope:** - Integrating `at()` into node/pipeline wiring (ENG-340). -- Cloud-path `list_sources()` (pre-existing limitation). +- Cloud-path `list_sources()` (pre-existing limitation; scoped local `list_sources()` + is in scope). - `from_config` for `ConnectorArrowDatabase` (PLT-1074/1075/1076). From 0720163f137cbf4d58447950f4b8cf89bcc08a2e Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 03:21:09 +0000 Subject: [PATCH 03/12] docs(plan): add ENG-341 implementation plan --- ...6-04-01-database-path-contextualization.md | 1007 +++++++++++++++++ 1 file changed, 1007 insertions(+) create mode 100644 superpowers/plans/2026-04-01-database-path-contextualization.md diff --git a/superpowers/plans/2026-04-01-database-path-contextualization.md b/superpowers/plans/2026-04-01-database-path-contextualization.md new file mode 100644 index 00000000..e5ba8eaa --- /dev/null +++ b/superpowers/plans/2026-04-01-database-path-contextualization.md @@ -0,0 +1,1007 @@ +# Database Path Contextualization Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add `at(*path_components)` and `base_path` to `ArrowDatabaseProtocol` and all four database backends, enabling sub-scoped database views that namespace reads/writes under a relative path prefix. + +**Architecture:** Each backend stores a `_path_prefix: tuple[str, ...]` (starts as `()`) and exposes it via a `base_path` property. `at()` returns a new instance with an extended prefix. `InMemoryArrowDatabase` and `ConnectorArrowDatabase` share their underlying storage dicts by reference; `DeltaTableDatabase` creates a fresh instance (the filesystem is the shared storage). Prefix application happens in `_get_record_key` for InMemory/Connector and in `_get_table_uri` for Delta (to avoid a double-prefix bug in `flush()`). + +**Tech Stack:** Python 3.11+, PyArrow, deltalake, uv (run all commands via `uv run`) + +**Spec:** `superpowers/specs/2026-04-01-database-path-contextualization-design.md` + +--- + +## File Map + +| Action | File | What changes | +|--------|------|-------------| +| Modify | `src/orcapod/protocols/database_protocols.py` | Add `base_path` property and `at()` to `ArrowDatabaseProtocol` | +| Modify | `src/orcapod/databases/noop_database.py` | Add `_path_prefix`, `base_path`, `at()`, update config | +| Modify | `src/orcapod/databases/in_memory_databases.py` | Add `_path_prefix`, shared-dict `at()`, update `_get_record_key`, `_validate_record_path`, config | +| Modify | `src/orcapod/databases/connector_arrow_database.py` | Add `_path_prefix`, shared-dict `at()`, update `_get_record_key`, `_validate_record_path`, config | +| Modify | `src/orcapod/databases/delta_lake_databases.py` | Rename `_base_uri`→`_root_uri`, `base_path`→`_local_root`; add `_path_prefix`; update `_get_table_uri`, `_validate_record_path`, `list_sources()`, `at()`, config | +| Modify | `tests/test_databases/test_noop_database.py` | Add `TestAtMethod` class | +| Modify | `tests/test_databases/test_in_memory_database.py` | Add `TestAtMethod` class | +| Modify | `tests/test_databases/test_connector_arrow_database.py` | Add `TestAtMethod` class | +| Modify | `tests/test_databases/test_delta_table_database.py` | Add `TestAtMethod` class | +| Modify | `tests/test_databases/test_database_config.py` | Update `DeltaTableDatabase` config tests for renamed key; add `base_path` round-trip tests for all backends | + +--- + +## Task 1: Protocol + Stubs + +Add `base_path` and `at()` to `ArrowDatabaseProtocol`, then add minimal stubs to all four backends so existing conformance tests (`isinstance(db, ArrowDatabaseProtocol)`) keep passing. + +**Files:** +- Modify: `src/orcapod/protocols/database_protocols.py` +- Modify: `src/orcapod/databases/noop_database.py` +- Modify: `src/orcapod/databases/in_memory_databases.py` +- Modify: `src/orcapod/databases/connector_arrow_database.py` +- Modify: `src/orcapod/databases/delta_lake_databases.py` + +- [ ] **Step 1: Add `base_path` and `at()` to `ArrowDatabaseProtocol`** + +In `src/orcapod/protocols/database_protocols.py`, add after the `flush` method: + +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view. + + Always () for a root (non-scoped) instance. Extended by at(). + The absolute storage root (filesystem URI, SQL connector, etc.) + is a separate, backend-specific implementation detail. + """ + ... + +def at(self, *path_components: str) -> "ArrowDatabaseProtocol": + """Return a new database scoped to the given sub-path. + + All reads and writes on the returned database are relative to + this database's base_path extended by path_components. The + original is unmodified. + + Calling at() with no arguments returns a new view equivalent + to the current one (same base_path, fresh or shared state + depending on the backend). + + For backends with shared pending state (InMemoryArrowDatabase, + ConnectorArrowDatabase), calling flush() on any view drains + the entire shared pending queue — not just the caller's prefix. + This is intentional: all views share the same underlying store. + + Args: + *path_components: Zero or more path components to append. + + Returns: + A new database instance with + base_path == self.base_path + path_components. + """ + ... +``` + +Also add both names to `__all__` at the bottom (they're part of the protocol, no separate export needed — `ArrowDatabaseProtocol` is already exported). + +- [ ] **Step 2: Add stubs to all four backends** + +In each of the four backend files, add these two members (exact bodies shown; full implementations come in later tasks): + +**`noop_database.py`** — add after `flush`: +```python +@property +def base_path(self) -> tuple[str, ...]: + return () + +def at(self, *path_components: str) -> "NoOpArrowDatabase": + return NoOpArrowDatabase() +``` + +**`in_memory_databases.py`** — add after `flush`: +```python +@property +def base_path(self) -> tuple[str, ...]: + return () + +def at(self, *path_components: str) -> "InMemoryArrowDatabase": + return InMemoryArrowDatabase(max_hierarchy_depth=self.max_hierarchy_depth) +``` + +**`connector_arrow_database.py`** — add after `flush`: +```python +@property +def base_path(self) -> tuple[str, ...]: + return () + +def at(self, *path_components: str) -> "ConnectorArrowDatabase": + return ConnectorArrowDatabase( + connector=self._connector, + max_hierarchy_depth=self.max_hierarchy_depth, + ) +``` + +**`delta_lake_databases.py`** — add after `flush`: +```python +@property +def base_path(self) -> tuple[str, ...]: + return () + +def at(self, *path_components: str) -> "DeltaTableDatabase": + return DeltaTableDatabase( + base_path=self._base_uri, + storage_options=self._storage_options, + batch_size=self.batch_size, + max_hierarchy_depth=self.max_hierarchy_depth, + allow_schema_evolution=self.allow_schema_evolution, + ) +``` + +- [ ] **Step 3: Run the full test suite to verify nothing is broken** + +```bash +uv run pytest tests/test_databases/ -x -q +``` + +Expected: all previously passing tests still pass. No new failures. + +- [ ] **Step 4: Commit** + +```bash +git add src/orcapod/protocols/database_protocols.py \ + src/orcapod/databases/noop_database.py \ + src/orcapod/databases/in_memory_databases.py \ + src/orcapod/databases/connector_arrow_database.py \ + src/orcapod/databases/delta_lake_databases.py +git commit -m "feat(databases): add at() and base_path stubs to ArrowDatabaseProtocol and all backends" +``` + +--- + +## Task 2: `NoOpArrowDatabase` — full implementation + +**Files:** +- Modify: `src/orcapod/databases/noop_database.py` +- Modify: `tests/test_databases/test_noop_database.py` +- Modify: `tests/test_databases/test_database_config.py` + +- [ ] **Step 1: Write failing tests** + +Add this class to `tests/test_databases/test_noop_database.py`: + +```python +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self): + db = NoOpArrowDatabase() + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self): + db = NoOpArrowDatabase() + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self): + db = NoOpArrowDatabase() + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self): + db = NoOpArrowDatabase() + db.at("a", "b") + assert db.base_path == () + + def test_scoped_reads_still_return_none(self): + db = NoOpArrowDatabase() + scoped = db.at("pipeline", "node1") + import pyarrow as pa + scoped.add_record(("outputs",), "id1", pa.table({"v": [1]})) + assert scoped.get_record_by_id(("outputs",), "id1") is None + assert scoped.get_all_records(("outputs",)) is None +``` + +Add to `TestNoOpDatabaseConfig` in `tests/test_databases/test_database_config.py`: + +```python +def test_to_config_includes_base_path(self): + db = NoOpArrowDatabase() + assert db.to_config()["base_path"] == [] + +def test_round_trip_preserves_base_path(self): + db = NoOpArrowDatabase() + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = NoOpArrowDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) +``` + +- [ ] **Step 2: Run to verify tests fail** + +```bash +uv run pytest tests/test_databases/test_noop_database.py::TestAtMethod \ + tests/test_databases/test_database_config.py::TestNoOpDatabaseConfig -x -q +``` + +Expected: FAIL — `base_path` returns `()` for scoped instances, `to_config` missing `base_path`. + +- [ ] **Step 3: Implement full `NoOpArrowDatabase`** + +Replace the stub `base_path` and `at()` in `src/orcapod/databases/noop_database.py` and update config: + +Add `_path_prefix` to `__init__`: +```python +def __init__(self, _path_prefix: tuple[str, ...] = ()) -> None: + self._path_prefix = _path_prefix +``` + +Replace the stub `base_path` property: +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix +``` + +Replace the stub `at()`: +```python +def at(self, *path_components: str) -> "NoOpArrowDatabase": + return NoOpArrowDatabase(_path_prefix=self._path_prefix + path_components) +``` + +Replace `to_config`: +```python +def to_config(self) -> dict[str, Any]: + """Serialize database configuration to a JSON-compatible dict.""" + return { + "type": "noop", + "base_path": list(self._path_prefix), + } +``` + +Replace `from_config`: +```python +@classmethod +def from_config(cls, config: dict[str, Any]) -> "NoOpArrowDatabase": + """Reconstruct a NoOpArrowDatabase from a config dict.""" + return cls(_path_prefix=tuple(config.get("base_path", []))) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_databases/test_noop_database.py \ + tests/test_databases/test_database_config.py::TestNoOpDatabaseConfig -q +``` + +Expected: all pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/databases/noop_database.py \ + tests/test_databases/test_noop_database.py \ + tests/test_databases/test_database_config.py +git commit -m "feat(databases): implement at() and base_path for NoOpArrowDatabase" +``` + +--- + +## Task 3: `InMemoryArrowDatabase` — full implementation + +**Files:** +- Modify: `src/orcapod/databases/in_memory_databases.py` +- Modify: `tests/test_databases/test_in_memory_database.py` +- Modify: `tests/test_databases/test_database_config.py` + +- [ ] **Step 1: Write failing tests** + +Add this class to `tests/test_databases/test_in_memory_database.py`: + +```python +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("a", "b") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = make_table(value=[42]) + scoped.add_record(("outputs",), "id1", record) + result = scoped.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[42])) + # Parent sees ("outputs",) as a different key from ("pipeline","node1","outputs") + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_two_scoped_views_share_storage(self, db): + view_a = db.at("pipeline", "node1") + view_b = db.at("pipeline", "node1") + view_a.add_record(("outputs",), "id1", make_table(value=[99])) + view_a.flush() + result = view_b.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [99] + + def test_validate_record_path_checks_combined_depth(self, db): + # max_hierarchy_depth=10 (default). Fill prefix with 9 components. + deep_db = InMemoryArrowDatabase(max_hierarchy_depth=10) + scoped = deep_db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") + # 9 prefix + 1 record_path = 10: OK + scoped.add_record(("z",), "id1", make_table(value=[1])) + # 9 prefix + 2 record_path = 11: should raise + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", make_table(value=[2])) +``` + +Add to `TestInMemoryDatabaseConfig` in `tests/test_databases/test_database_config.py`: + +```python +def test_to_config_includes_base_path(self): + db = InMemoryArrowDatabase() + assert db.to_config()["base_path"] == [] + +def test_round_trip_preserves_base_path(self): + db = InMemoryArrowDatabase() + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = InMemoryArrowDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) +``` + +- [ ] **Step 2: Run to verify tests fail** + +```bash +uv run pytest tests/test_databases/test_in_memory_database.py::TestAtMethod \ + tests/test_databases/test_database_config.py::TestInMemoryDatabaseConfig -x -q +``` + +Expected: FAIL. + +- [ ] **Step 3: Implement full `InMemoryArrowDatabase`** + +Update `__init__` signature in `src/orcapod/databases/in_memory_databases.py`: + +```python +def __init__( + self, + max_hierarchy_depth: int = 10, + _path_prefix: tuple[str, ...] = (), + _shared_tables: "dict[str, pa.Table] | None" = None, + _shared_pending_batches: "dict[str, pa.Table] | None" = None, + _shared_pending_record_ids: "dict[str, set[str]] | None" = None, +): + self._path_prefix = _path_prefix + self.max_hierarchy_depth = max_hierarchy_depth + self._tables: dict[str, pa.Table] = _shared_tables if _shared_tables is not None else {} + self._pending_batches: dict[str, pa.Table] = _shared_pending_batches if _shared_pending_batches is not None else {} + self._pending_record_ids: dict[str, set[str]] = _shared_pending_record_ids if _shared_pending_record_ids is not None else defaultdict(set) +``` + +Replace `_get_record_key`: +```python +def _get_record_key(self, record_path: tuple[str, ...]) -> str: + return "/".join(self._path_prefix + record_path) +``` + +Update `_validate_record_path` — change the depth check line to: +```python +if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: + raise ValueError( + f"record_path depth {len(record_path)} exceeds maximum " + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" + ) +``` + +Replace stub `base_path` property: +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix +``` + +Replace stub `at()`: +```python +def at(self, *path_components: str) -> "InMemoryArrowDatabase": + return InMemoryArrowDatabase( + max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_tables=self._tables, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + ) +``` + +Replace `to_config`: +```python +def to_config(self) -> dict[str, Any]: + """Serialize database configuration to a JSON-compatible dict.""" + return { + "type": "in_memory", + "base_path": list(self._path_prefix), + "max_hierarchy_depth": self.max_hierarchy_depth, + } +``` + +Replace `from_config`: +```python +@classmethod +def from_config(cls, config: dict[str, Any]) -> "InMemoryArrowDatabase": + """Reconstruct an InMemoryArrowDatabase from a config dict.""" + return cls( + max_hierarchy_depth=config.get("max_hierarchy_depth", 10), + _path_prefix=tuple(config.get("base_path", [])), + ) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_databases/test_in_memory_database.py \ + tests/test_databases/test_database_config.py::TestInMemoryDatabaseConfig -q +``` + +Expected: all pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/databases/in_memory_databases.py \ + tests/test_databases/test_in_memory_database.py \ + tests/test_databases/test_database_config.py +git commit -m "feat(databases): implement at() and base_path for InMemoryArrowDatabase" +``` + +--- + +## Task 4: `ConnectorArrowDatabase` — full implementation + +**Files:** +- Modify: `src/orcapod/databases/connector_arrow_database.py` +- Modify: `tests/test_databases/test_connector_arrow_database.py` +- Modify: `tests/test_databases/test_database_config.py` + +- [ ] **Step 1: Write failing tests** + +Add this class to `tests/test_databases/test_connector_arrow_database.py`. Note: look at the top of the test file for the `MockDBConnector` fixture — use the same pattern as other tests in that file to get a `ConnectorArrowDatabase` instance backed by the mock connector. + +```python +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("a", "b") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = pa.table({"value": pa.array([42])}) + scoped.add_record(("outputs",), "id1", record, flush=True) + result = scoped.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", pa.table({"v": pa.array([1])}), flush=True) + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_two_scoped_views_share_storage(self, db): + view_a = db.at("pipeline", "node1") + view_b = db.at("pipeline", "node1") + view_a.add_record(("outputs",), "id1", pa.table({"v": pa.array([99])}), flush=True) + result = view_b.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("v").to_pylist() == [99] + + def test_prefix_appears_in_sql_table_name(self, db): + """Prefix components are included in the SQL table name via _path_to_table_name.""" + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", pa.table({"v": pa.array([1])}), flush=True) + # Table name should be pipeline__node1__outputs + table_names = db._connector.get_table_names() + assert "pipeline__node1__outputs" in table_names + + def test_validate_record_path_checks_combined_depth(self, db): + scoped = db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") # 9 prefix components + scoped.add_record(("z",), "id1", pa.table({"v": pa.array([1])})) + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", pa.table({"v": pa.array([2])})) +``` + +Add to `TestConnectorArrowDatabaseConfig` (or the config section) in `test_database_config.py`: + +```python +# Note: ConnectorArrowDatabase.from_config raises NotImplementedError, +# so only test to_config here. +def test_connector_to_config_includes_base_path(): + from unittest.mock import MagicMock + from orcapod.databases import ConnectorArrowDatabase + mock_connector = MagicMock() + mock_connector.to_config.return_value = {"type": "mock"} + db = ConnectorArrowDatabase(connector=mock_connector) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + assert config["base_path"] == ["pipeline", "v1"] +``` + +- [ ] **Step 2: Run to verify tests fail** + +```bash +uv run pytest tests/test_databases/test_connector_arrow_database.py::TestAtMethod -x -q +``` + +Expected: FAIL. + +- [ ] **Step 3: Implement full `ConnectorArrowDatabase`** + +Update `__init__` in `src/orcapod/databases/connector_arrow_database.py`: + +```python +def __init__( + self, + connector: DBConnectorProtocol, + max_hierarchy_depth: int = 10, + _path_prefix: tuple[str, ...] = (), + _shared_pending_batches: "dict[str, pa.Table] | None" = None, + _shared_pending_record_ids: "dict[str, set[str]] | None" = None, + _shared_pending_skip_existing: "dict[str, bool] | None" = None, +) -> None: + self._connector = connector + self.max_hierarchy_depth = max_hierarchy_depth + self._path_prefix = _path_prefix + self._pending_batches: dict[str, pa.Table] = _shared_pending_batches if _shared_pending_batches is not None else {} + self._pending_record_ids: dict[str, set[str]] = _shared_pending_record_ids if _shared_pending_record_ids is not None else defaultdict(set) + self._pending_skip_existing: dict[str, bool] = _shared_pending_skip_existing if _shared_pending_skip_existing is not None else {} +``` + +Replace `_get_record_key`: +```python +def _get_record_key(self, record_path: tuple[str, ...]) -> str: + return "/".join(self._path_prefix + record_path) +``` + +Update `_validate_record_path` depth check: +```python +if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: + raise ValueError( + f"record_path depth {len(record_path)} exceeds maximum " + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" + ) +``` + +Replace stub `base_path` property: +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix +``` + +Replace stub `at()`: +```python +def at(self, *path_components: str) -> "ConnectorArrowDatabase": + return ConnectorArrowDatabase( + connector=self._connector, + max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + _shared_pending_skip_existing=self._pending_skip_existing, + ) +``` + +Update `to_config`: +```python +def to_config(self) -> dict[str, Any]: + """Serialize configuration to a JSON-compatible dict.""" + return { + "type": "connector_arrow_database", + "connector": self._connector.to_config(), + "base_path": list(self._path_prefix), + "max_hierarchy_depth": self.max_hierarchy_depth, + } +``` + +(`from_config` still raises `NotImplementedError` — no change needed.) + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_databases/test_connector_arrow_database.py -q +``` + +Expected: all pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/databases/connector_arrow_database.py \ + tests/test_databases/test_connector_arrow_database.py \ + tests/test_databases/test_database_config.py +git commit -m "feat(databases): implement at() and base_path for ConnectorArrowDatabase" +``` + +--- + +## Task 5: `DeltaTableDatabase` — full implementation + +This is the most complex task. It involves attribute renames, `_get_table_uri` changes, `list_sources()` update, and a config key rename that requires updating an existing test. + +**Files:** +- Modify: `src/orcapod/databases/delta_lake_databases.py` +- Modify: `tests/test_databases/test_delta_table_database.py` +- Modify: `tests/test_databases/test_database_config.py` + +- [ ] **Step 1: Write failing tests** + +Add this class to `tests/test_databases/test_delta_table_database.py`: + +```python +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("pipeline", "node1") + assert scoped.base_path == ("pipeline", "node1") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("pipeline", "node1") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = make_table(value=[42]) + scoped.add_record(("outputs",), "id1", record, flush=True) + result = scoped.get_record_by_id(("outputs",), "id1", flush=True) + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[42]), flush=True) + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_scoped_write_stored_under_correct_filesystem_path(self, db, tmp_path): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[1]), flush=True) + # Delta table should be at /db/pipeline/node1/outputs/ + import os + expected_path = tmp_path / "db" / "pipeline" / "node1" / "outputs" + assert expected_path.exists() + + def test_validate_record_path_checks_combined_depth(self, tmp_path): + db = DeltaTableDatabase(base_path=tmp_path / "db", max_hierarchy_depth=10) + scoped = db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") # 9 prefix + # 9 + 1 = 10: OK + scoped.add_record(("z",), "id1", make_table(value=[1])) + # 9 + 2 = 11: should raise + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", make_table(value=[2])) + + def test_list_sources_on_scoped_instance(self, tmp_path): + db = DeltaTableDatabase(base_path=tmp_path / "db") + scoped = db.at("pipeline") + scoped.add_record(("node1",), "id1", make_table(value=[1]), flush=True) + scoped.add_record(("node2",), "id2", make_table(value=[2]), flush=True) + sources = scoped.list_sources() + assert ("node1",) in sources + assert ("node2",) in sources + # Root db should NOT see these under bare ("node1",) — they live under pipeline/ + root_sources = db.list_sources() + assert ("node1",) not in root_sources + assert ("pipeline", "node1") in root_sources +``` + +Update `TestDeltaTableDatabaseConfig` in `test_database_config.py` — the existing `test_to_config_includes_base_path` test **must be updated** (it now tests the wrong key): + +```python +# REPLACE the existing test_to_config_includes_base_path with: +def test_to_config_includes_root_uri(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + config = db.to_config() + assert config["root_uri"] == str(tmp_path / "delta_db") + +def test_to_config_includes_base_path_tuple(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + config = db.to_config() + assert config["base_path"] == [] + +def test_to_config_scoped_includes_base_path_tuple(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + assert config["root_uri"] == str(tmp_path / "delta_db") + assert config["base_path"] == ["pipeline", "v1"] + +def test_round_trip_preserves_base_path(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = DeltaTableDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) +``` + +Also update the existing `test_round_trip` to use `root_uri`: +```python +def test_round_trip(self, tmp_path): + db = DeltaTableDatabase( + base_path=str(tmp_path / "delta_db"), + batch_size=500, + max_hierarchy_depth=5, + ) + config = db.to_config() + restored = DeltaTableDatabase.from_config(config) + assert restored.to_config() == config +``` +(This test doesn't check keys by name so it should still pass once the implementation is consistent.) + +- [ ] **Step 2: Run to verify tests fail** + +```bash +uv run pytest tests/test_databases/test_delta_table_database.py::TestAtMethod \ + tests/test_databases/test_database_config.py::TestDeltaTableDatabaseConfig -x -q +``` + +Expected: FAIL — `base_path` not scoped, config key still `"base_path"` for URI, etc. + +- [ ] **Step 3: Implement `DeltaTableDatabase` — rename attributes** + +In `src/orcapod/databases/delta_lake_databases.py`, update `__init__`: + +```python +def __init__( + self, + base_path: str | Path | UPath, + storage_options: dict[str, str] | None = None, + create_base_path: bool = True, + batch_size: int = 1000, + max_hierarchy_depth: int = 10, + allow_schema_evolution: bool = True, + _path_prefix: tuple[str, ...] = (), +): + self._root_uri, self._storage_options = parse_base_path(base_path, storage_options) + self._is_cloud: bool = is_cloud_uri(self._root_uri) + self._path_prefix = _path_prefix + self.batch_size = batch_size + self.max_hierarchy_depth = max_hierarchy_depth + self.allow_schema_evolution = allow_schema_evolution + + if not self._is_cloud: + # _local_root is the absolute filesystem root (for list_sources, mkdir, etc.) + # NOTE: do NOT access self._local_root on cloud instances. + self._local_root = Path(self._root_uri) + if create_base_path: + self._local_root.mkdir(parents=True, exist_ok=True) + elif not self._local_root.exists(): + raise ValueError( + f"Base path {self._local_root} does not exist and create_base_path=False" + ) + + self._delta_table_cache: dict[str, deltalake.DeltaTable] = {} + self._pending_batches: dict[str, pa.Table] = {} + self._pending_record_ids: dict[str, set[str]] = defaultdict(set) + self._existing_ids_cache: dict[str, set[str]] = defaultdict(set) + self._cache_dirty: dict[str, bool] = defaultdict(lambda: True) +``` + +Add `base_path` property (replace stub): +```python +@property +def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix +``` + +- [ ] **Step 4: Update `_get_table_uri` to apply prefix** + +Replace the existing `_get_table_uri` method: + +```python +def _get_table_uri(self, record_path: tuple[str, ...], create_dir: bool = False) -> str: + """Get the URI for a given record path, incorporating base_path prefix. + + Args: + record_path: Tuple of path components (relative to base_path). + create_dir: If True, create the local directory (no-op for cloud paths). + """ + full_path = self._path_prefix + record_path # prefix applied once, here only + if self._is_cloud: + return self._root_uri.rstrip("/") + "/" + "/".join(full_path) + else: + path = self._local_root + for subpath in full_path: + path = path / self._sanitize_path_component(subpath) + if create_dir: + path.mkdir(parents=True, exist_ok=True) + return str(path) +``` + +- [ ] **Step 5: Update `_validate_record_path` for combined depth** + +Change the depth check line in `_validate_record_path`: + +```python +if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: + raise ValueError( + f"Source path depth {len(record_path)} exceeds maximum " + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" + ) +``` + +- [ ] **Step 6: Update `list_sources()` to scan from scoped root** + +Replace `_scan(self.base_path, ())` at the end of `list_sources()` with: + +```python +# Build the effective scoped root directory +scoped_root = self._local_root +for component in self._path_prefix: + scoped_root = scoped_root / self._sanitize_path_component(component) + +_scan(scoped_root, ()) +return sources +``` + +Also update the `_scan` closure's depth guard to use relative depth (not combined): +```python +def _scan(current_path: Path, path_components: tuple[str, ...]) -> None: + if len(path_components) >= self.max_hierarchy_depth: + return + ... +``` + +- [ ] **Step 7: Implement `at()` and update config** + +Replace the stub `at()`: +```python +def at(self, *path_components: str) -> "DeltaTableDatabase": + return DeltaTableDatabase( + base_path=self._root_uri, + storage_options=self._storage_options, + batch_size=self.batch_size, + max_hierarchy_depth=self.max_hierarchy_depth, + allow_schema_evolution=self.allow_schema_evolution, + _path_prefix=self._path_prefix + path_components, + ) +``` + +Replace `to_config`: +```python +def to_config(self) -> dict[str, Any]: + """Serialize database configuration to a JSON-compatible dict.""" + config: dict[str, Any] = { + "type": "delta_table", + "root_uri": self._root_uri, # renamed from "base_path" + "base_path": list(self._path_prefix), # new: relative prefix tuple + "batch_size": self.batch_size, + "max_hierarchy_depth": self.max_hierarchy_depth, + "allow_schema_evolution": self.allow_schema_evolution, + } + if self._storage_options: + config["storage_options"] = self._storage_options + return config +``` + +Replace `from_config`: +```python +@classmethod +def from_config(cls, config: dict[str, Any]) -> "DeltaTableDatabase": + """Reconstruct a DeltaTableDatabase from a config dict.""" + return cls( + base_path=config["root_uri"], + storage_options=config.get("storage_options"), + create_base_path=True, + batch_size=config.get("batch_size", 1000), + max_hierarchy_depth=config.get("max_hierarchy_depth", 10), + allow_schema_evolution=config.get("allow_schema_evolution", True), + _path_prefix=tuple(config.get("base_path", [])), + ) +``` + +- [ ] **Step 8: Run the full database test suite** + +```bash +uv run pytest tests/test_databases/ -q --ignore=tests/test_databases/test_delta_table_database_s3.py +``` + +Expected: all pass. The S3 tests are excluded because they require a live MinIO server. + +- [ ] **Step 9: Commit** + +```bash +git add src/orcapod/databases/delta_lake_databases.py \ + tests/test_databases/test_delta_table_database.py \ + tests/test_databases/test_database_config.py +git commit -m "feat(databases): implement at() and base_path for DeltaTableDatabase" +``` + +--- + +## Task 6: Full suite verification and PR + +- [ ] **Step 1: Run the complete test suite** + +```bash +uv run pytest tests/ -q --ignore=tests/test_databases/test_delta_table_database_s3.py \ + --ignore=tests/test_databases/test_spiraldb_connector_integration.py \ + --ignore=tests/test_databases/test_postgresql_connector_integration.py \ + --ignore=tests/test_databases/test_sqlite_connector_integration.py +``` + +Expected: all pass. (Integration tests are excluded as they require live external services.) + +- [ ] **Step 2: Authenticate with GitHub and push branch** + +```bash +gh-app-token-generator nauticalab | gh auth login --with-token +git checkout -b eywalker/eng-341-feat-database-path-contextualization-create-sub-scoped +git push -u origin eywalker/eng-341-feat-database-path-contextualization-create-sub-scoped +``` + +- [ ] **Step 3: Create PR** + +```bash +gh pr create \ + --base dev \ + --title "feat(databases): add at() and base_path for sub-scoped database views (ENG-341)" \ + --body "$(cat <<'EOF' +## Summary + +Implements ENG-341: database path contextualization — sub-scoped database views. + +- Adds `at(*path_components)` and `base_path: tuple[str, ...]` to `ArrowDatabaseProtocol` +- All four backends implemented: `DeltaTableDatabase`, `InMemoryArrowDatabase`, `ConnectorArrowDatabase`, `NoOpArrowDatabase` +- `InMemoryArrowDatabase` and `ConnectorArrowDatabase` share underlying storage by reference; `DeltaTableDatabase` creates a fresh instance (filesystem is shared storage) +- `to_config`/`from_config` round-trips `base_path` for all backends +- `DeltaTableDatabase` config key rename: `"base_path"` (URI root) → `"root_uri"`; `"base_path"` now carries the scoping prefix tuple +- `_validate_record_path` updated to check combined `len(base_path) + len(record_path)` depth +- `list_sources()` on scoped `DeltaTableDatabase` instances scans from the scoped root + +Closes ENG-341 +EOF +)" +``` + +- [ ] **Step 4: Update Linear issue to In Progress → In Review** + +```bash +# Use Linear MCP tool or leave for reviewer to handle +``` From b137e45d320e6d1054ad40804148efddb05535d3 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 03:38:29 +0000 Subject: [PATCH 04/12] feat(databases): add at() and base_path stubs to ArrowDatabaseProtocol and all backends Adds base_path property (returns tuple[str, ...]) and at() method stubs to ArrowDatabaseProtocol and all four backends (NoOpArrowDatabase, InMemoryArrowDatabase, ConnectorArrowDatabase, DeltaTableDatabase) to satisfy protocol conformance for ENG-341. Also renames DeltaTableDatabase's internal self.base_path (Path) instance attribute to self._local_base_path to avoid collision with the new tuple-typed property. Co-Authored-By: Claude Sonnet 4.6 --- .../databases/connector_arrow_database.py | 12 +++++++ src/orcapod/databases/delta_lake_databases.py | 27 ++++++++++---- src/orcapod/databases/in_memory_databases.py | 7 ++++ src/orcapod/databases/noop_database.py | 7 ++++ src/orcapod/protocols/database_protocols.py | 35 +++++++++++++++++++ 5 files changed, 81 insertions(+), 7 deletions(-) diff --git a/src/orcapod/databases/connector_arrow_database.py b/src/orcapod/databases/connector_arrow_database.py index 0a044928..e7f418fd 100644 --- a/src/orcapod/databases/connector_arrow_database.py +++ b/src/orcapod/databases/connector_arrow_database.py @@ -268,6 +268,18 @@ def add_records( if flush: self.flush() + # ── base_path / at ──────────────────────────────────────────────────────── + + @property + def base_path(self) -> tuple[str, ...]: + return () + + def at(self, *path_components: str) -> "ConnectorArrowDatabase": + return ConnectorArrowDatabase( + connector=self._connector, + max_hierarchy_depth=self.max_hierarchy_depth, + ) + # ── Flush ───────────────────────────────────────────────────────────────── def flush(self) -> None: diff --git a/src/orcapod/databases/delta_lake_databases.py b/src/orcapod/databases/delta_lake_databases.py index 9c98d2c1..e2b42a82 100644 --- a/src/orcapod/databases/delta_lake_databases.py +++ b/src/orcapod/databases/delta_lake_databases.py @@ -58,14 +58,14 @@ def __init__( self.allow_schema_evolution = allow_schema_evolution if not self._is_cloud: - # Keep self.base_path for local-path operations (list_sources, etc.) - # NOTE: do NOT access self.base_path on cloud instances. - self.base_path = Path(self._base_uri) + # Keep self._local_base_path for local-path operations (list_sources, etc.) + # NOTE: do NOT access self._local_base_path on cloud instances. + self._local_base_path = Path(self._base_uri) if create_base_path: - self.base_path.mkdir(parents=True, exist_ok=True) - elif not self.base_path.exists(): + self._local_base_path.mkdir(parents=True, exist_ok=True) + elif not self._local_base_path.exists(): raise ValueError( - f"Base path {self.base_path} does not exist and create_base_path=False" + f"Base path {self._local_base_path} does not exist and create_base_path=False" ) # For cloud paths: create_base_path is silently ignored (no directory needed). @@ -937,6 +937,19 @@ def flush_batch(self, record_path: tuple[str, ...]) -> None: self._pending_record_ids[record_key] = pending_ids raise + @property + def base_path(self) -> tuple[str, ...]: + return () + + def at(self, *path_components: str) -> "DeltaTableDatabase": + return DeltaTableDatabase( + base_path=self._base_uri, + storage_options=self._storage_options, + batch_size=self.batch_size, + max_hierarchy_depth=self.max_hierarchy_depth, + allow_schema_evolution=self.allow_schema_evolution, + ) + def list_sources(self) -> list[tuple[str, ...]]: """ List all record paths that contain a valid Delta table under base_path. @@ -972,5 +985,5 @@ def _scan(current_path: Path, path_components: tuple[str, ...]) -> None: except deltalake.exceptions.TableNotFoundError: _scan(item, components) - _scan(self.base_path, ()) + _scan(self._local_base_path, ()) return sources diff --git a/src/orcapod/databases/in_memory_databases.py b/src/orcapod/databases/in_memory_databases.py index 9e31e0ba..272414d7 100644 --- a/src/orcapod/databases/in_memory_databases.py +++ b/src/orcapod/databases/in_memory_databases.py @@ -230,6 +230,13 @@ def add_records( # Flush # ------------------------------------------------------------------ + @property + def base_path(self) -> tuple[str, ...]: + return () + + def at(self, *path_components: str) -> "InMemoryArrowDatabase": + return InMemoryArrowDatabase(max_hierarchy_depth=self.max_hierarchy_depth) + def flush(self) -> None: for record_key in list(self._pending_batches.keys()): pending = self._pending_batches.pop(record_key) diff --git a/src/orcapod/databases/noop_database.py b/src/orcapod/databases/noop_database.py index a65ef88e..c103ff01 100644 --- a/src/orcapod/databases/noop_database.py +++ b/src/orcapod/databases/noop_database.py @@ -79,6 +79,13 @@ def get_records_with_column_value( def flush(self) -> None: pass + @property + def base_path(self) -> tuple[str, ...]: + return () + + def at(self, *path_components: str) -> "NoOpArrowDatabase": + return NoOpArrowDatabase() + def to_config(self) -> dict[str, Any]: """Serialize database configuration to a JSON-compatible dict.""" return {"type": "noop"} diff --git a/src/orcapod/protocols/database_protocols.py b/src/orcapod/protocols/database_protocols.py index c6dfb6a6..c4b0cb42 100644 --- a/src/orcapod/protocols/database_protocols.py +++ b/src/orcapod/protocols/database_protocols.py @@ -65,6 +65,41 @@ def flush(self) -> None: """Flush any buffered writes to the underlying storage.""" ... + @property + def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view. + + Always () for a root (non-scoped) instance. Extended by at(). + The absolute storage root (filesystem URI, SQL connector, etc.) + is a separate, backend-specific implementation detail. + """ + ... + + def at(self, *path_components: str) -> "ArrowDatabaseProtocol": + """Return a new database scoped to the given sub-path. + + All reads and writes on the returned database are relative to + this database's base_path extended by path_components. The + original is unmodified. + + Calling at() with no arguments returns a new view equivalent + to the current one (same base_path, fresh or shared state + depending on the backend). + + For backends with shared pending state (InMemoryArrowDatabase, + ConnectorArrowDatabase), calling flush() on any view drains + the entire shared pending queue — not just the caller's prefix. + This is intentional: all views share the same underlying store. + + Args: + *path_components: Zero or more path components to append. + + Returns: + A new database instance with + base_path == self.base_path + path_components. + """ + ... + def to_config(self) -> dict[str, Any]: """Serialize database configuration to a JSON-compatible dict. From 2af4352f7e8eec8f593675c3e7a21270f79ecb9a Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 03:53:55 +0000 Subject: [PATCH 05/12] feat(databases): implement at() and base_path for NoOpArrowDatabase Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/databases/noop_database.py | 15 ++++++--- tests/test_databases/test_database_config.py | 12 +++++++ tests/test_databases/test_noop_database.py | 35 ++++++++++++++++++++ 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/src/orcapod/databases/noop_database.py b/src/orcapod/databases/noop_database.py index c103ff01..982346e3 100644 --- a/src/orcapod/databases/noop_database.py +++ b/src/orcapod/databases/noop_database.py @@ -22,6 +22,9 @@ class NoOpArrowDatabase: or benchmarking pure compute overhead. """ + def __init__(self, _path_prefix: tuple[str, ...] = ()) -> None: + self._path_prefix = _path_prefix + def add_record( self, record_path: tuple[str, ...], @@ -81,16 +84,20 @@ def flush(self) -> None: @property def base_path(self) -> tuple[str, ...]: - return () + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix def at(self, *path_components: str) -> "NoOpArrowDatabase": - return NoOpArrowDatabase() + return NoOpArrowDatabase(_path_prefix=self._path_prefix + path_components) def to_config(self) -> dict[str, Any]: """Serialize database configuration to a JSON-compatible dict.""" - return {"type": "noop"} + return { + "type": "noop", + "base_path": list(self._path_prefix), + } @classmethod def from_config(cls, config: dict[str, Any]) -> "NoOpArrowDatabase": """Reconstruct a NoOpArrowDatabase from a config dict.""" - return cls() + return cls(_path_prefix=tuple(config.get("base_path", []))) diff --git a/tests/test_databases/test_database_config.py b/tests/test_databases/test_database_config.py index 5b2fca07..632277b6 100644 --- a/tests/test_databases/test_database_config.py +++ b/tests/test_databases/test_database_config.py @@ -63,3 +63,15 @@ def test_round_trip(self): config = db.to_config() restored = NoOpArrowDatabase.from_config(config) assert restored.to_config() == config + + def test_to_config_includes_base_path(self): + db = NoOpArrowDatabase() + assert db.to_config()["base_path"] == [] + + def test_round_trip_preserves_base_path(self): + db = NoOpArrowDatabase() + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = NoOpArrowDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) diff --git a/tests/test_databases/test_noop_database.py b/tests/test_databases/test_noop_database.py index 7db9b2bd..7ecc969f 100644 --- a/tests/test_databases/test_noop_database.py +++ b/tests/test_databases/test_noop_database.py @@ -167,3 +167,38 @@ def test_different_paths_all_return_none(self, db): def test_reads_on_unwritten_path_return_none(self, db): db.add_records(("written",), make_table(x=[1])) assert db.get_all_records(("never_written",)) is None + + +# --------------------------------------------------------------------------- +# 5. at() method and base_path +# --------------------------------------------------------------------------- + + +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self): + db = NoOpArrowDatabase() + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self): + db = NoOpArrowDatabase() + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self): + db = NoOpArrowDatabase() + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self): + db = NoOpArrowDatabase() + db.at("a", "b") + assert db.base_path == () + + def test_scoped_reads_still_return_none(self): + db = NoOpArrowDatabase() + scoped = db.at("pipeline", "node1") + import pyarrow as pa + scoped.add_record(("outputs",), "id1", pa.table({"v": [1]})) + assert scoped.get_record_by_id(("outputs",), "id1") is None + assert scoped.get_all_records(("outputs",)) is None From 8459f03db294bfbe9a9b9de4f872b8c295c739d7 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 03:56:52 +0000 Subject: [PATCH 06/12] fix(databases): remove redundant import, add at() docstring in NoOpArrowDatabase Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/databases/noop_database.py | 5 +++++ tests/test_databases/test_noop_database.py | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/orcapod/databases/noop_database.py b/src/orcapod/databases/noop_database.py index 982346e3..26556ddd 100644 --- a/src/orcapod/databases/noop_database.py +++ b/src/orcapod/databases/noop_database.py @@ -88,6 +88,11 @@ def base_path(self) -> tuple[str, ...]: return self._path_prefix def at(self, *path_components: str) -> "NoOpArrowDatabase": + """Return a new NoOpArrowDatabase scoped to the given sub-path. + + All reads and writes are still discarded; the prefix only affects + the reported base_path of the returned instance. + """ return NoOpArrowDatabase(_path_prefix=self._path_prefix + path_components) def to_config(self) -> dict[str, Any]: diff --git a/tests/test_databases/test_noop_database.py b/tests/test_databases/test_noop_database.py index 7ecc969f..c4e0eff7 100644 --- a/tests/test_databases/test_noop_database.py +++ b/tests/test_databases/test_noop_database.py @@ -198,7 +198,6 @@ def test_at_does_not_modify_original(self): def test_scoped_reads_still_return_none(self): db = NoOpArrowDatabase() scoped = db.at("pipeline", "node1") - import pyarrow as pa scoped.add_record(("outputs",), "id1", pa.table({"v": [1]})) assert scoped.get_record_by_id(("outputs",), "id1") is None assert scoped.get_all_records(("outputs",)) is None From 6d524ae69cd120000b8af653423b49f16d746e21 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 03:59:41 +0000 Subject: [PATCH 07/12] feat(databases): implement at() and base_path for InMemoryArrowDatabase Adds full at() and base_path implementation: scoped views share the underlying _tables/_pending_batches/_pending_record_ids dicts by reference, _get_record_key prepends the path prefix, and _validate_record_path checks combined prefix+record_path depth. to_config/from_config round-trip the base_path field. Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/databases/in_memory_databases.py | 57 ++++++++++++++----- tests/test_databases/test_database_config.py | 12 ++++ .../test_databases/test_in_memory_database.py | 56 ++++++++++++++++++ 3 files changed, 111 insertions(+), 14 deletions(-) diff --git a/src/orcapod/databases/in_memory_databases.py b/src/orcapod/databases/in_memory_databases.py index 272414d7..c922062f 100644 --- a/src/orcapod/databases/in_memory_databases.py +++ b/src/orcapod/databases/in_memory_databases.py @@ -29,26 +29,36 @@ class InMemoryArrowDatabase: RECORD_ID_COLUMN = "__record_id" - def __init__(self, max_hierarchy_depth: int = 10): + def __init__( + self, + max_hierarchy_depth: int = 10, + _path_prefix: tuple[str, ...] = (), + _shared_tables: "dict[str, pa.Table] | None" = None, + _shared_pending_batches: "dict[str, pa.Table] | None" = None, + _shared_pending_record_ids: "dict[str, set[str]] | None" = None, + ): + self._path_prefix = _path_prefix self.max_hierarchy_depth = max_hierarchy_depth - self._tables: dict[str, pa.Table] = {} - self._pending_batches: dict[str, pa.Table] = {} - self._pending_record_ids: dict[str, set[str]] = defaultdict(set) + self._tables: dict[str, pa.Table] = _shared_tables if _shared_tables is not None else {} + self._pending_batches: dict[str, pa.Table] = _shared_pending_batches if _shared_pending_batches is not None else {} + self._pending_record_ids: dict[str, set[str]] = _shared_pending_record_ids if _shared_pending_record_ids is not None else defaultdict(set) # ------------------------------------------------------------------ # Path helpers # ------------------------------------------------------------------ def _get_record_key(self, record_path: tuple[str, ...]) -> str: - return "/".join(record_path) + return "/".join(self._path_prefix + record_path) def _validate_record_path(self, record_path: tuple[str, ...]) -> None: if not record_path: raise ValueError("record_path cannot be empty") - if len(record_path) > self.max_hierarchy_depth: + if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: raise ValueError( - f"record_path depth {len(record_path)} exceeds maximum {self.max_hierarchy_depth}" + f"record_path depth {len(record_path)} exceeds maximum " + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" ) # Only restrict characters that break the "/".join(record_path) key scheme. @@ -230,13 +240,6 @@ def add_records( # Flush # ------------------------------------------------------------------ - @property - def base_path(self) -> tuple[str, ...]: - return () - - def at(self, *path_components: str) -> "InMemoryArrowDatabase": - return InMemoryArrowDatabase(max_hierarchy_depth=self.max_hierarchy_depth) - def flush(self) -> None: for record_key in list(self._pending_batches.keys()): pending = self._pending_batches.pop(record_key) @@ -255,6 +258,30 @@ def flush(self) -> None: kept = committed.filter(mask) self._tables[record_key] = pa.concat_tables([kept, pending]) + # ------------------------------------------------------------------ + # Path scoping + # ------------------------------------------------------------------ + + @property + def base_path(self) -> tuple[str, ...]: + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix + + def at(self, *path_components: str) -> "InMemoryArrowDatabase": + """Return a new InMemoryArrowDatabase scoped to the given sub-path. + + The returned instance shares the underlying storage dicts (_tables, + _pending_batches, _pending_record_ids) by reference, so writes + through any view are visible to all views of the same root database. + """ + return InMemoryArrowDatabase( + max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_tables=self._tables, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + ) + # ------------------------------------------------------------------ # Read helpers # ------------------------------------------------------------------ @@ -345,6 +372,7 @@ def to_config(self) -> dict[str, Any]: """Serialize database configuration to a JSON-compatible dict.""" return { "type": "in_memory", + "base_path": list(self._path_prefix), "max_hierarchy_depth": self.max_hierarchy_depth, } @@ -353,6 +381,7 @@ def from_config(cls, config: dict[str, Any]) -> "InMemoryArrowDatabase": """Reconstruct an InMemoryArrowDatabase from a config dict.""" return cls( max_hierarchy_depth=config.get("max_hierarchy_depth", 10), + _path_prefix=tuple(config.get("base_path", [])), ) def get_records_with_column_value( diff --git a/tests/test_databases/test_database_config.py b/tests/test_databases/test_database_config.py index 632277b6..24967b4e 100644 --- a/tests/test_databases/test_database_config.py +++ b/tests/test_databases/test_database_config.py @@ -51,6 +51,18 @@ def test_round_trip(self): restored = InMemoryArrowDatabase.from_config(config) assert restored.to_config() == config + def test_to_config_includes_base_path(self): + db = InMemoryArrowDatabase() + assert db.to_config()["base_path"] == [] + + def test_round_trip_preserves_base_path(self): + db = InMemoryArrowDatabase() + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = InMemoryArrowDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) + class TestNoOpDatabaseConfig: def test_to_config_includes_type(self): diff --git a/tests/test_databases/test_in_memory_database.py b/tests/test_databases/test_in_memory_database.py index c37a157e..7fd61bb5 100644 --- a/tests/test_databases/test_in_memory_database.py +++ b/tests/test_databases/test_in_memory_database.py @@ -331,3 +331,59 @@ def test_multiple_flushes_accumulate_records(self, db): result = db.get_all_records(self.PATH) assert result is not None assert result.num_rows == 2 + + +# --------------------------------------------------------------------------- +# 10. at() and base_path +# --------------------------------------------------------------------------- + + +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("a", "b") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = make_table(value=[42]) + scoped.add_record(("outputs",), "id1", record) + result = scoped.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[42])) + # Parent sees ("outputs",) as a different key from ("pipeline","node1","outputs") + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_two_scoped_views_share_storage(self, db): + view_a = db.at("pipeline", "node1") + view_b = db.at("pipeline", "node1") + view_a.add_record(("outputs",), "id1", make_table(value=[99])) + view_a.flush() + result = view_b.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [99] + + def test_validate_record_path_checks_combined_depth(self, db): + # max_hierarchy_depth=10 (default). Fill prefix with 9 components. + deep_db = InMemoryArrowDatabase(max_hierarchy_depth=10) + scoped = deep_db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") + # 9 prefix + 1 record_path = 10: OK + scoped.add_record(("z",), "id1", make_table(value=[1])) + # 9 prefix + 2 record_path = 11: should raise + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", make_table(value=[2])) From 61d643afb73aa7223ce2b4a940d9c04f4713c8bc Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 04:06:25 +0000 Subject: [PATCH 08/12] feat(databases): implement at() and base_path for ConnectorArrowDatabase Full implementation of path scoping: `at()` returns a new instance sharing the connector and pending dicts by reference; `base_path` exposes `_path_prefix`; `_get_record_key` and `_get_committed_table` prepend the prefix so scoped views transparently read/write to prefixed SQL table names. `_validate_record_path` checks combined depth (prefix + path). `to_config` serializes `base_path`. Co-Authored-By: Claude Sonnet 4.6 --- .../databases/connector_arrow_database.py | 34 ++++++++--- .../test_connector_arrow_database.py | 58 +++++++++++++++++++ tests/test_databases/test_database_config.py | 11 ++++ 3 files changed, 95 insertions(+), 8 deletions(-) diff --git a/src/orcapod/databases/connector_arrow_database.py b/src/orcapod/databases/connector_arrow_database.py index e7f418fd..92cf4501 100644 --- a/src/orcapod/databases/connector_arrow_database.py +++ b/src/orcapod/databases/connector_arrow_database.py @@ -62,20 +62,25 @@ def __init__( self, connector: DBConnectorProtocol, max_hierarchy_depth: int = 10, + _path_prefix: tuple[str, ...] = (), + _shared_pending_batches: "dict[str, pa.Table] | None" = None, + _shared_pending_record_ids: "dict[str, set[str]] | None" = None, + _shared_pending_skip_existing: "dict[str, bool] | None" = None, ) -> None: self._connector = connector self.max_hierarchy_depth = max_hierarchy_depth - self._pending_batches: dict[str, pa.Table] = {} - self._pending_record_ids: dict[str, set[str]] = defaultdict(set) + self._path_prefix = _path_prefix + self._pending_batches: dict[str, pa.Table] = _shared_pending_batches if _shared_pending_batches is not None else {} + self._pending_record_ids: dict[str, set[str]] = _shared_pending_record_ids if _shared_pending_record_ids is not None else defaultdict(set) # Per-batch flag: True when the batch was added with skip_duplicates=True, # so flush() can pass skip_existing=True to the connector and let it use # native INSERT-OR-IGNORE semantics rather than Python-side prefiltering. - self._pending_skip_existing: dict[str, bool] = {} + self._pending_skip_existing: dict[str, bool] = _shared_pending_skip_existing if _shared_pending_skip_existing is not None else {} # ── Path helpers ────────────────────────────────────────────────────────── def _get_record_key(self, record_path: tuple[str, ...]) -> str: - return "/".join(record_path) + return "/".join(self._path_prefix + record_path) def _path_to_table_name(self, record_path: tuple[str, ...]) -> str: """Map a record_path to a safe SQL table name. @@ -93,10 +98,11 @@ def _path_to_table_name(self, record_path: tuple[str, ...]) -> str: def _validate_record_path(self, record_path: tuple[str, ...]) -> None: if not record_path: raise ValueError("record_path cannot be empty") - if len(record_path) > self.max_hierarchy_depth: + if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: raise ValueError( f"record_path depth {len(record_path)} exceeds maximum " - f"{self.max_hierarchy_depth}" + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" ) for i, component in enumerate(record_path): if not component or not isinstance(component, str): @@ -165,7 +171,7 @@ def _get_committed_table( self, record_path: tuple[str, ...] ) -> pa.Table | None: """Fetch all committed records for a path from the connector.""" - table_name = self._path_to_table_name(record_path) + table_name = self._path_to_table_name(self._path_prefix + record_path) if table_name not in self._connector.get_table_names(): return None batches = list( @@ -272,12 +278,23 @@ def add_records( @property def base_path(self) -> tuple[str, ...]: - return () + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix def at(self, *path_components: str) -> "ConnectorArrowDatabase": + """Return a new ConnectorArrowDatabase scoped to the given sub-path. + + The returned instance shares the connector and all three pending dicts + (_pending_batches, _pending_record_ids, _pending_skip_existing) by reference. + Calling flush() on any view drains the entire shared pending queue. + """ return ConnectorArrowDatabase( connector=self._connector, max_hierarchy_depth=self.max_hierarchy_depth, + _path_prefix=self._path_prefix + path_components, + _shared_pending_batches=self._pending_batches, + _shared_pending_record_ids=self._pending_record_ids, + _shared_pending_skip_existing=self._pending_skip_existing, ) # ── Flush ───────────────────────────────────────────────────────────────── @@ -432,6 +449,7 @@ def to_config(self) -> dict[str, Any]: return { "type": "connector_arrow_database", "connector": self._connector.to_config(), + "base_path": list(self._path_prefix), "max_hierarchy_depth": self.max_hierarchy_depth, } diff --git a/tests/test_databases/test_connector_arrow_database.py b/tests/test_databases/test_connector_arrow_database.py index 4b8bd69a..6619d257 100644 --- a/tests/test_databases/test_connector_arrow_database.py +++ b/tests/test_databases/test_connector_arrow_database.py @@ -713,3 +713,61 @@ def test_from_config_raises_not_implemented(self, db): config = db.to_config() with pytest.raises(NotImplementedError): ConnectorArrowDatabase.from_config(config) + + +# =========================================================================== +# 13. at() and base_path +# =========================================================================== + + +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("a", "b") + assert scoped.base_path == ("a", "b") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("a", "b") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = pa.table({"value": pa.array([42])}) + scoped.add_record(("outputs",), "id1", record, flush=True) + result = scoped.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", pa.table({"v": pa.array([1])}), flush=True) + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_two_scoped_views_share_storage(self, db): + view_a = db.at("pipeline", "node1") + view_b = db.at("pipeline", "node1") + view_a.add_record(("outputs",), "id1", pa.table({"v": pa.array([99])}), flush=True) + result = view_b.get_record_by_id(("outputs",), "id1") + assert result is not None + assert result.column("v").to_pylist() == [99] + + def test_prefix_appears_in_sql_table_name(self, db): + """Prefix components are included in the SQL table name via _path_to_table_name.""" + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", pa.table({"v": pa.array([1])}), flush=True) + # Table name should be pipeline__node1__outputs + table_names = db._connector.get_table_names() + assert "pipeline__node1__outputs" in table_names + + def test_validate_record_path_checks_combined_depth(self, db): + scoped = db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") # 9 prefix components + scoped.add_record(("z",), "id1", pa.table({"v": pa.array([1])})) + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", pa.table({"v": pa.array([2])})) diff --git a/tests/test_databases/test_database_config.py b/tests/test_databases/test_database_config.py index 24967b4e..6e912a75 100644 --- a/tests/test_databases/test_database_config.py +++ b/tests/test_databases/test_database_config.py @@ -64,6 +64,17 @@ def test_round_trip_preserves_base_path(self): assert isinstance(restored.base_path, tuple) +def test_connector_to_config_includes_base_path(): + from unittest.mock import MagicMock + from orcapod.databases import ConnectorArrowDatabase + mock_connector = MagicMock() + mock_connector.to_config.return_value = {"type": "mock"} + db = ConnectorArrowDatabase(connector=mock_connector) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + assert config["base_path"] == ["pipeline", "v1"] + + class TestNoOpDatabaseConfig: def test_to_config_includes_type(self): db = NoOpArrowDatabase() From 9ae30371beae46cc080ee9afb5c9084903de3313 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 04:18:02 +0000 Subject: [PATCH 09/12] fix(databases): update connector test module docstring for TestAtMethod section --- tests/test_databases/test_connector_arrow_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_databases/test_connector_arrow_database.py b/tests/test_databases/test_connector_arrow_database.py index 6619d257..1c22ffd8 100644 --- a/tests/test_databases/test_connector_arrow_database.py +++ b/tests/test_databases/test_connector_arrow_database.py @@ -17,7 +17,7 @@ 10. Hierarchical record_path + _path_to_table_name 11. Flush behaviour (pending cleared, connector receives data) 12. Config (to_config shape, from_config raises NotImplementedError) -13. Context-manager lifecycle (connector.close is called) +13. at() method and base_path attribute """ from __future__ import annotations From 38b70c35331683b9660ed887b4eb048d163c67a1 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 04:21:57 +0000 Subject: [PATCH 10/12] feat(databases): implement at() and base_path for DeltaTableDatabase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements full database path contextualization for DeltaTableDatabase (ENG-341 Task 5): renames internal attributes (_base_uri→_root_uri, _local_base_path→_local_root), adds _path_prefix support, updates _get_table_uri to apply prefix, fixes _validate_record_path for combined depth checking, updates list_sources() to scan from scoped root, implements at() to return a properly scoped instance, and renames config key "base_path" (URI) to "root_uri" while adding "base_path" for the prefix tuple. Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/databases/delta_lake_databases.py | 63 ++++++++++++------ tests/test_databases/test_database_config.py | 24 ++++++- .../test_delta_table_database.py | 66 +++++++++++++++++++ 3 files changed, 130 insertions(+), 23 deletions(-) diff --git a/src/orcapod/databases/delta_lake_databases.py b/src/orcapod/databases/delta_lake_databases.py index e2b42a82..36234de1 100644 --- a/src/orcapod/databases/delta_lake_databases.py +++ b/src/orcapod/databases/delta_lake_databases.py @@ -50,22 +50,24 @@ def __init__( batch_size: int = 1000, max_hierarchy_depth: int = 10, allow_schema_evolution: bool = True, + _path_prefix: tuple[str, ...] = (), ): - self._base_uri, self._storage_options = parse_base_path(base_path, storage_options) - self._is_cloud: bool = is_cloud_uri(self._base_uri) + self._root_uri, self._storage_options = parse_base_path(base_path, storage_options) + self._is_cloud: bool = is_cloud_uri(self._root_uri) + self._path_prefix = _path_prefix self.batch_size = batch_size self.max_hierarchy_depth = max_hierarchy_depth self.allow_schema_evolution = allow_schema_evolution if not self._is_cloud: - # Keep self._local_base_path for local-path operations (list_sources, etc.) - # NOTE: do NOT access self._local_base_path on cloud instances. - self._local_base_path = Path(self._base_uri) + # _local_root is the absolute filesystem root (for list_sources, mkdir, etc.) + # NOTE: do NOT access self._local_root on cloud instances. + self._local_root = Path(self._root_uri) if create_base_path: - self._local_base_path.mkdir(parents=True, exist_ok=True) - elif not self._local_base_path.exists(): + self._local_root.mkdir(parents=True, exist_ok=True) + elif not self._local_root.exists(): raise ValueError( - f"Base path {self._local_base_path} does not exist and create_base_path=False" + f"Base path {self._local_root} does not exist and create_base_path=False" ) # For cloud paths: create_base_path is silently ignored (no directory needed). @@ -100,17 +102,18 @@ def _sanitize_path_component(component: str) -> str: return component def _get_table_uri(self, record_path: tuple[str, ...], create_dir: bool = False) -> str: - """Get the URI for a given record path (works for local and cloud). + """Get the URI for a given record path, incorporating base_path prefix. Args: - record_path: Tuple of path components. + record_path: Tuple of path components (relative to base_path). create_dir: If True, create the local directory (no-op for cloud paths). """ + full_path = self._path_prefix + record_path # prefix applied once, here only if self._is_cloud: - return self._base_uri.rstrip("/") + "/" + "/".join(record_path) + return self._root_uri.rstrip("/") + "/" + "/".join(full_path) else: - path = Path(self._base_uri) - for subpath in record_path: + path = self._local_root + for subpath in full_path: path = path / self._sanitize_path_component(subpath) if create_dir: path.mkdir(parents=True, exist_ok=True) @@ -130,9 +133,11 @@ def _validate_record_path(self, record_path: tuple[str, ...]) -> None: if not record_path: raise ValueError("Source path cannot be empty") - if len(record_path) > self.max_hierarchy_depth: + if len(self._path_prefix) + len(record_path) > self.max_hierarchy_depth: raise ValueError( - f"Source path depth {len(record_path)} exceeds maximum {self.max_hierarchy_depth}" + f"Source path depth {len(record_path)} exceeds maximum " + f"{self.max_hierarchy_depth - len(self._path_prefix)} " + f"(base_path uses {len(self._path_prefix)} components)" ) # Validate path components @@ -833,7 +838,8 @@ def to_config(self) -> dict[str, Any]: """Serialize database configuration to a JSON-compatible dict.""" config: dict[str, Any] = { "type": "delta_table", - "base_path": self._base_uri, + "root_uri": self._root_uri, # renamed from "base_path" + "base_path": list(self._path_prefix), # new: relative prefix tuple "batch_size": self.batch_size, "max_hierarchy_depth": self.max_hierarchy_depth, "allow_schema_evolution": self.allow_schema_evolution, @@ -843,15 +849,16 @@ def to_config(self) -> dict[str, Any]: return config @classmethod - def from_config(cls, config: dict[str, Any]) -> DeltaTableDatabase: + def from_config(cls, config: dict[str, Any]) -> "DeltaTableDatabase": """Reconstruct a DeltaTableDatabase from a config dict.""" return cls( - base_path=config["base_path"], + base_path=config["root_uri"], storage_options=config.get("storage_options"), create_base_path=True, batch_size=config.get("batch_size", 1000), max_hierarchy_depth=config.get("max_hierarchy_depth", 10), allow_schema_evolution=config.get("allow_schema_evolution", True), + _path_prefix=tuple(config.get("base_path", [])), ) def flush(self) -> None: @@ -939,15 +946,24 @@ def flush_batch(self, record_path: tuple[str, ...]) -> None: @property def base_path(self) -> tuple[str, ...]: - return () + """The current relative root of this database view (always () for root instances).""" + return self._path_prefix def at(self, *path_components: str) -> "DeltaTableDatabase": + """Return a new DeltaTableDatabase scoped to the given sub-path. + + The returned instance uses the same underlying filesystem root but + all reads and writes are relative to the extended prefix. Unlike + InMemoryArrowDatabase and ConnectorArrowDatabase, DeltaTableDatabase + does NOT share pending state — the filesystem is the shared storage. + """ return DeltaTableDatabase( - base_path=self._base_uri, + base_path=self._root_uri, storage_options=self._storage_options, batch_size=self.batch_size, max_hierarchy_depth=self.max_hierarchy_depth, allow_schema_evolution=self.allow_schema_evolution, + _path_prefix=self._path_prefix + path_components, ) def list_sources(self) -> list[tuple[str, ...]]: @@ -985,5 +1001,10 @@ def _scan(current_path: Path, path_components: tuple[str, ...]) -> None: except deltalake.exceptions.TableNotFoundError: _scan(item, components) - _scan(self._local_base_path, ()) + # Build the effective scoped root directory + scoped_root = self._local_root + for component in self._path_prefix: + scoped_root = scoped_root / self._sanitize_path_component(component) + + _scan(scoped_root, ()) return sources diff --git a/tests/test_databases/test_database_config.py b/tests/test_databases/test_database_config.py index 6e912a75..bda8ce5a 100644 --- a/tests/test_databases/test_database_config.py +++ b/tests/test_databases/test_database_config.py @@ -11,10 +11,30 @@ def test_to_config_includes_type(self, tmp_path): config = db.to_config() assert config["type"] == "delta_table" - def test_to_config_includes_base_path(self, tmp_path): + def test_to_config_includes_root_uri(self, tmp_path): db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) config = db.to_config() - assert config["base_path"] == str(tmp_path / "delta_db") + assert config["root_uri"] == str(tmp_path / "delta_db") + + def test_to_config_includes_base_path_tuple(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + config = db.to_config() + assert config["base_path"] == [] + + def test_to_config_scoped_includes_base_path_tuple(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + assert config["root_uri"] == str(tmp_path / "delta_db") + assert config["base_path"] == ["pipeline", "v1"] + + def test_round_trip_preserves_base_path(self, tmp_path): + db = DeltaTableDatabase(base_path=str(tmp_path / "delta_db")) + scoped = db.at("pipeline", "v1") + config = scoped.to_config() + restored = DeltaTableDatabase.from_config(config) + assert restored.base_path == ("pipeline", "v1") + assert isinstance(restored.base_path, tuple) def test_to_config_includes_all_settings(self, tmp_path): db = DeltaTableDatabase( diff --git a/tests/test_databases/test_delta_table_database.py b/tests/test_databases/test_delta_table_database.py index 4407dc79..e0b9f7df 100644 --- a/tests/test_databases/test_delta_table_database.py +++ b/tests/test_databases/test_delta_table_database.py @@ -402,3 +402,69 @@ def test_schema_evolution(tmp_path): result = db.get_all_records(("src",)) assert result is not None assert "b" in result.column_names + + +# --------------------------------------------------------------------------- +# 12. at() method and base_path scoping +# --------------------------------------------------------------------------- + + +class TestAtMethod: + def test_base_path_is_empty_on_root_instance(self, db): + assert db.base_path == () + assert isinstance(db.base_path, tuple) + + def test_at_sets_base_path(self, db): + scoped = db.at("pipeline", "node1") + assert scoped.base_path == ("pipeline", "node1") + assert isinstance(scoped.base_path, tuple) + + def test_at_chaining_equivalent_to_multi_component(self, db): + assert db.at("a").at("b").base_path == db.at("a", "b").base_path + + def test_at_does_not_modify_original(self, db): + db.at("pipeline", "node1") + assert db.base_path == () + + def test_writes_through_scoped_view_readable_from_same_view(self, db): + scoped = db.at("pipeline", "node1") + record = make_table(value=[42]) + scoped.add_record(("outputs",), "id1", record, flush=True) + result = scoped.get_record_by_id(("outputs",), "id1", flush=True) + assert result is not None + assert result.column("value").to_pylist() == [42] + + def test_scoped_write_not_visible_via_parent_at_same_path(self, db): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[42]), flush=True) + assert db.get_record_by_id(("outputs",), "id1") is None + + def test_scoped_write_stored_under_correct_filesystem_path(self, db, tmp_path): + scoped = db.at("pipeline", "node1") + scoped.add_record(("outputs",), "id1", make_table(value=[1]), flush=True) + # Delta table should be at /db/pipeline/node1/outputs/ + import os + expected_path = tmp_path / "db" / "pipeline" / "node1" / "outputs" + assert expected_path.exists() + + def test_validate_record_path_checks_combined_depth(self, tmp_path): + db = DeltaTableDatabase(base_path=tmp_path / "db", max_hierarchy_depth=10) + scoped = db.at("a", "b", "c", "d", "e", "f", "g", "h", "i") # 9 prefix + # 9 + 1 = 10: OK + scoped.add_record(("z",), "id1", make_table(value=[1])) + # 9 + 2 = 11: should raise + with pytest.raises(ValueError): + scoped.add_record(("z", "extra"), "id2", make_table(value=[2])) + + def test_list_sources_on_scoped_instance(self, tmp_path): + db = DeltaTableDatabase(base_path=tmp_path / "db") + scoped = db.at("pipeline") + scoped.add_record(("node1",), "id1", make_table(value=[1]), flush=True) + scoped.add_record(("node2",), "id2", make_table(value=[2]), flush=True) + sources = scoped.list_sources() + assert ("node1",) in sources + assert ("node2",) in sources + # Root db should NOT see these under bare ("node1",) — they live under pipeline/ + root_sources = db.list_sources() + assert ("node1",) not in root_sources + assert ("pipeline", "node1") in root_sources From d9aa9160b8d708a258e3ef0d86df52608748c830 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 04:27:32 +0000 Subject: [PATCH 11/12] fix(databases): remove unused import os in DeltaTableDatabase test --- tests/test_databases/test_delta_table_database.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_databases/test_delta_table_database.py b/tests/test_databases/test_delta_table_database.py index e0b9f7df..64131285 100644 --- a/tests/test_databases/test_delta_table_database.py +++ b/tests/test_databases/test_delta_table_database.py @@ -443,7 +443,6 @@ def test_scoped_write_stored_under_correct_filesystem_path(self, db, tmp_path): scoped = db.at("pipeline", "node1") scoped.add_record(("outputs",), "id1", make_table(value=[1]), flush=True) # Delta table should be at /db/pipeline/node1/outputs/ - import os expected_path = tmp_path / "db" / "pipeline" / "node1" / "outputs" assert expected_path.exists() From fa614412d58cb4ebb099de9ae64a24d47f0c1e54 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 05:04:21 +0000 Subject: [PATCH 12/12] fix(databases): validate at() path components; add from_config legacy support - InMemoryArrowDatabase.at() and ConnectorArrowDatabase.at() now reject components containing '/' or '\0', which would corrupt the '/'-separated record key scheme and break flush()'s key reconstruction via split('/') - DeltaTableDatabase.at() validates components against the full filesystem unsafe-char set plus '.' and '..' (directory traversal) - DeltaTableDatabase.from_config() now accepts both current format ('root_uri' key) and legacy pre-ENG-341 format ('base_path' as a URI string), preventing KeyError when deserializing older persisted configs - Tests added for all four changes Addresses Copilot review on PR #123 --- .../databases/connector_arrow_database.py | 13 ++++++ src/orcapod/databases/delta_lake_databases.py | 42 +++++++++++++++++-- src/orcapod/databases/in_memory_databases.py | 13 ++++++ .../test_connector_arrow_database.py | 12 ++++++ tests/test_databases/test_database_config.py | 13 ++++++ .../test_delta_table_database.py | 16 +++++++ .../test_databases/test_in_memory_database.py | 12 ++++++ 7 files changed, 118 insertions(+), 3 deletions(-) diff --git a/src/orcapod/databases/connector_arrow_database.py b/src/orcapod/databases/connector_arrow_database.py index 92cf4501..99962f3f 100644 --- a/src/orcapod/databases/connector_arrow_database.py +++ b/src/orcapod/databases/connector_arrow_database.py @@ -287,7 +287,20 @@ def at(self, *path_components: str) -> "ConnectorArrowDatabase": The returned instance shares the connector and all three pending dicts (_pending_batches, _pending_record_ids, _pending_skip_existing) by reference. Calling flush() on any view drains the entire shared pending queue. + + Raises: + ValueError: If any path component is empty, not a str, or contains + ``'/'`` or ``'\\0'`` (which would corrupt the ``'/'``-separated + record key scheme and break ``flush()``'s key reconstruction). """ + for i, component in enumerate(path_components): + if not component or not isinstance(component, str): + raise ValueError(f"at() path component {i} is invalid: {repr(component)}") + if "/" in component or "\0" in component: + raise ValueError( + f"at() path component {repr(component)} contains an invalid character " + "('/' or '\\0')" + ) return ConnectorArrowDatabase( connector=self._connector, max_hierarchy_depth=self.max_hierarchy_depth, diff --git a/src/orcapod/databases/delta_lake_databases.py b/src/orcapod/databases/delta_lake_databases.py index 36234de1..5da84276 100644 --- a/src/orcapod/databases/delta_lake_databases.py +++ b/src/orcapod/databases/delta_lake_databases.py @@ -850,15 +850,29 @@ def to_config(self) -> dict[str, Any]: @classmethod def from_config(cls, config: dict[str, Any]) -> "DeltaTableDatabase": - """Reconstruct a DeltaTableDatabase from a config dict.""" + """Reconstruct a DeltaTableDatabase from a config dict. + + Supports both the current format (``"root_uri"`` for the storage root, + ``"base_path"`` as a list for the scoping prefix) and the legacy format + produced before ENG-341 (``"base_path"`` as a URI string, no prefix). + """ + if "root_uri" in config: + # Current format (post-ENG-341) + root_uri = config["root_uri"] + base_path_value = config.get("base_path", []) + _path_prefix = tuple(base_path_value) if isinstance(base_path_value, list) else () + else: + # Legacy format (pre-ENG-341): "base_path" was the root URI string + root_uri = config["base_path"] + _path_prefix = () return cls( - base_path=config["root_uri"], + base_path=root_uri, storage_options=config.get("storage_options"), create_base_path=True, batch_size=config.get("batch_size", 1000), max_hierarchy_depth=config.get("max_hierarchy_depth", 10), allow_schema_evolution=config.get("allow_schema_evolution", True), - _path_prefix=tuple(config.get("base_path", [])), + _path_prefix=_path_prefix, ) def flush(self) -> None: @@ -956,7 +970,29 @@ def at(self, *path_components: str) -> "DeltaTableDatabase": all reads and writes are relative to the extended prefix. Unlike InMemoryArrowDatabase and ConnectorArrowDatabase, DeltaTableDatabase does NOT share pending state — the filesystem is the shared storage. + + Raises: + TypeError: If any component is not a str. + ValueError: If any component is empty, is ``'.'`` or ``'..'``, or + contains filesystem-unsafe characters (``/``, ``\\``, ``*``, + ``?``, ``"``, ``<``, ``>``, ``|``, ``\\0``). """ + _unsafe_chars = ["/", "\\", "*", "?", '"', "<", ">", "|", "\0"] + for i, component in enumerate(path_components): + if not isinstance(component, str): + raise TypeError( + f"at() path component {i} must be str, got {type(component)!r}" + ) + if not component: + raise ValueError(f"at() path component {i} must not be empty") + if component in (".", ".."): + raise ValueError( + f"at() path component {repr(component)}: '.' and '..' are not allowed" + ) + if any(char in component for char in _unsafe_chars): + raise ValueError( + f"at() path component {repr(component)} contains invalid characters" + ) return DeltaTableDatabase( base_path=self._root_uri, storage_options=self._storage_options, diff --git a/src/orcapod/databases/in_memory_databases.py b/src/orcapod/databases/in_memory_databases.py index c922062f..be77878d 100644 --- a/src/orcapod/databases/in_memory_databases.py +++ b/src/orcapod/databases/in_memory_databases.py @@ -273,7 +273,20 @@ def at(self, *path_components: str) -> "InMemoryArrowDatabase": The returned instance shares the underlying storage dicts (_tables, _pending_batches, _pending_record_ids) by reference, so writes through any view are visible to all views of the same root database. + + Raises: + ValueError: If any path component is empty, not a str, or contains + ``'/'`` or ``'\\0'`` (which would corrupt the ``'/'``-separated + record key scheme). """ + for i, component in enumerate(path_components): + if not component or not isinstance(component, str): + raise ValueError(f"at() path component {i} is invalid: {repr(component)}") + if "/" in component or "\0" in component: + raise ValueError( + f"at() path component {repr(component)} contains an invalid character " + "('/' or '\\0')" + ) return InMemoryArrowDatabase( max_hierarchy_depth=self.max_hierarchy_depth, _path_prefix=self._path_prefix + path_components, diff --git a/tests/test_databases/test_connector_arrow_database.py b/tests/test_databases/test_connector_arrow_database.py index 1c22ffd8..75e18acb 100644 --- a/tests/test_databases/test_connector_arrow_database.py +++ b/tests/test_databases/test_connector_arrow_database.py @@ -771,3 +771,15 @@ def test_validate_record_path_checks_combined_depth(self, db): scoped.add_record(("z",), "id1", pa.table({"v": pa.array([1])})) with pytest.raises(ValueError): scoped.add_record(("z", "extra"), "id2", pa.table({"v": pa.array([2])})) + + def test_at_rejects_slash_in_component(self, db): + with pytest.raises(ValueError, match="invalid character"): + db.at("pipe/line") + + def test_at_rejects_null_in_component(self, db): + with pytest.raises(ValueError, match="invalid character"): + db.at("pipe\x00line") + + def test_at_rejects_empty_component(self, db): + with pytest.raises(ValueError): + db.at("") diff --git a/tests/test_databases/test_database_config.py b/tests/test_databases/test_database_config.py index bda8ce5a..3b76f534 100644 --- a/tests/test_databases/test_database_config.py +++ b/tests/test_databases/test_database_config.py @@ -58,6 +58,19 @@ def test_round_trip(self, tmp_path): restored = DeltaTableDatabase.from_config(config) assert restored.to_config() == config + def test_from_config_accepts_legacy_base_path(self, tmp_path): + """from_config accepts pre-ENG-341 configs where 'base_path' was the root URI string.""" + legacy_config = { + "type": "delta_table", + "base_path": str(tmp_path / "delta_db"), + "batch_size": 1000, + "max_hierarchy_depth": 10, + "allow_schema_evolution": True, + } + db = DeltaTableDatabase.from_config(legacy_config) + assert db.base_path == () + assert db.to_config()["root_uri"] == str(tmp_path / "delta_db") + class TestInMemoryDatabaseConfig: def test_to_config_includes_type(self): diff --git a/tests/test_databases/test_delta_table_database.py b/tests/test_databases/test_delta_table_database.py index 64131285..c74a6ed2 100644 --- a/tests/test_databases/test_delta_table_database.py +++ b/tests/test_databases/test_delta_table_database.py @@ -467,3 +467,19 @@ def test_list_sources_on_scoped_instance(self, tmp_path): root_sources = db.list_sources() assert ("node1",) not in root_sources assert ("pipeline", "node1") in root_sources + + def test_at_rejects_slash_in_component(self, db): + with pytest.raises(ValueError, match="invalid character"): + db.at("pipe/line") + + def test_at_rejects_dotdot_component(self, db): + with pytest.raises(ValueError, match="not allowed"): + db.at("..") + + def test_at_rejects_empty_component(self, db): + with pytest.raises(ValueError): + db.at("") + + def test_at_rejects_non_str_component(self, db): + with pytest.raises(TypeError): + db.at(42) # type: ignore[arg-type] diff --git a/tests/test_databases/test_in_memory_database.py b/tests/test_databases/test_in_memory_database.py index 7fd61bb5..29e4ec80 100644 --- a/tests/test_databases/test_in_memory_database.py +++ b/tests/test_databases/test_in_memory_database.py @@ -387,3 +387,15 @@ def test_validate_record_path_checks_combined_depth(self, db): # 9 prefix + 2 record_path = 11: should raise with pytest.raises(ValueError): scoped.add_record(("z", "extra"), "id2", make_table(value=[2])) + + def test_at_rejects_slash_in_component(self, db): + with pytest.raises(ValueError, match="invalid character"): + db.at("pipe/line") + + def test_at_rejects_null_in_component(self, db): + with pytest.raises(ValueError, match="invalid character"): + db.at("pipe\x00line") + + def test_at_rejects_empty_component(self, db): + with pytest.raises(ValueError): + db.at("")