Conversation
… upsert params, iter_batches type inference
…batches, test clarifications
…, bytes literal, add missing test cases
…PLT-1076) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ld (PLT-1076) - Remove unused `import re` from sqlite_connector.py - Move `bool_` guard before `is_integer`/`is_floating` in `_arrow_type_to_sqlite_sql` to eliminate fragile ordering dependency - Implement `close()` minimally (Task 2 Connection Lifecycle is already done): acquires lock, closes and nulls `self._conn` if open — `__exit__` now works - Add `TestSQLiteConnectorScaffold.test_isinstance_dbconnector_protocol` to verify `SQLiteConnector` satisfies `DBConnectorProtocol` Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…1076) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace direct access to private `_conn` attribute in lifecycle tests with calls to `_require_open()`, testing connection state through the public-facing guard rather than internal implementation details. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ction (PLT-1076) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…not_exists (PLT-1076) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… (PLT-1076) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nector (PLT-1076) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tion tests (PLT-1076) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…d mismatch test (PLT-1076) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
Adds a new SQLite-backed implementation of the relational DB connector layer so ConnectorArrowDatabase can use stdlib sqlite3 as a storage backend (no extra deps), along with unit/integration tests and supporting design/plan docs.
Changes:
- Implement
SQLiteConnector(implementsDBConnectorProtocol) with schema introspection, read batching, upserts, lifecycle, and config serialization. - Export
SQLiteConnectorfromorcapod.databases. - Add unit + integration tests validating type mappings, lifecycle, and
ConnectorArrowDatabaseround-trips on:memory:SQLite.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
src/orcapod/databases/sqlite_connector.py |
New DBConnectorProtocol implementation for SQLite (sqlite3) including DDL/upsert and Arrow batch reads. |
src/orcapod/databases/__init__.py |
Exports SQLiteConnector from the databases package. |
tests/test_databases/test_sqlite_connector.py |
Unit tests for helper functions and SQLiteConnector behavior. |
tests/test_databases/test_sqlite_connector_integration.py |
End-to-end tests exercising ConnectorArrowDatabase on top of SQLiteConnector. |
superpowers/specs/2026-03-23-sqlite-connector-design.md |
Design spec documenting goals, mappings, and behaviors. |
superpowers/plans/2026-03-23-sqlite-connector.md |
Implementation plan/checklist for the feature. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| collected: list[_pa.RecordBatch] = [] | ||
| with self._lock: | ||
| conn = self._require_open() | ||
| cursor = conn.execute(query, params or []) |
There was a problem hiding this comment.
iter_batches() passes params or [] into sqlite3.execute(). This breaks valid callers that pass an empty dict for named parameters (or other falsey-but-valid param objects), because it will be replaced by [] and sqlite3 will raise a binding error. Prefer params if params is not None else [] (or pass params through unchanged when not None).
| cursor = conn.execute(query, params or []) | |
| cursor = conn.execute(query, [] if params is None else params) |
There was a problem hiding this comment.
Fixed in commit 80719b3. Changed params or [] to [] if params is None else params so that empty-but-valid param objects (e.g. an empty dict for named parameters) are passed through unchanged rather than being replaced with [].
| # Build a type lookup from all tables in this database. | ||
| # For each column name, find its Arrow type via get_column_info. | ||
| # Fallback to large_string() for computed/unknown columns. | ||
| type_lookup: dict[str, _pa.DataType] = {} | ||
| for table_name in self.get_table_names(): | ||
| for ci in self.get_column_info(table_name): | ||
| if ci.name not in type_lookup: | ||
| type_lookup[ci.name] = ci.arrow_type | ||
|
|
||
| arrow_types = [type_lookup.get(name, _pa.large_string()) for name in col_names] |
There was a problem hiding this comment.
iter_batches() builds a type_lookup by scanning all tables and mapping by column name (first occurrence wins). If multiple tables share a column name with different declared types (common with ConnectorArrowDatabase tables like x, value, etc.), queries can be decoded with the wrong Arrow types, leading to incorrect results or Arrow conversion errors. Resolve types from the table(s) referenced by the query (e.g., parse the first FROM "table" and call get_column_info(table)), and avoid cross-table column-name heuristics.
There was a problem hiding this comment.
Fixed in commit 80719b3. iter_batches now parses the first FROM "table_name" (or unquoted FROM table_name) from the query using a regex and calls get_column_info(queried_table) on that specific table only. This eliminates cross-table column-name collisions entirely. Columns not found in the resolved table (computed columns, aliases, etc.) still fall back to pa.large_string().
| with self._lock: | ||
| conn = self._require_open() | ||
| cursor = conn.execute( | ||
| "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" |
There was a problem hiding this comment.
get_table_names() currently returns all entries from sqlite_master where type='table', which includes SQLite internal tables like sqlite_sequence. These internal names can collide with user table names (e.g., a record_path that sanitizes to sqlite_sequence) and also make downstream logic (like type resolution in iter_batches) more error-prone. Consider filtering out name LIKE 'sqlite_%' to return only user tables.
| "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" | |
| "SELECT name FROM sqlite_master " | |
| "WHERE type='table' AND name NOT LIKE 'sqlite_%' " | |
| "ORDER BY name" |
There was a problem hiding this comment.
Fixed in commit 80719b3. Added AND name NOT LIKE 'sqlite_%' to the sqlite_master query so internal tables like sqlite_sequence are excluded from the returned list.
| self._validate_table_name(table_name) | ||
| col_defs = [] | ||
| for col in columns: | ||
| sql_type = _arrow_type_to_sqlite_sql(col.arrow_type) | ||
| not_null = " NOT NULL" if not col.nullable else "" | ||
| pk = " PRIMARY KEY" if col.name == pk_column else "" | ||
| col_defs.append(f' "{col.name}" {sql_type}{not_null}{pk}') | ||
| ddl = f'CREATE TABLE IF NOT EXISTS "{table_name}" (\n' |
There was a problem hiding this comment.
create_table_if_not_exists() interpolates column names directly into the DDL via "{col.name}" and also interpolates pk_column (via the col.name == pk_column check) without validating/escaping identifier content. Arrow column names can legally contain ", which would break the SQL and can enable injection in DDL/PRAGMA statements. Add identifier escaping (SQLite uses "" to escape a quote inside a quoted identifier) or reject unsafe column names consistently (and also validate pk_column and that it exists in columns).
There was a problem hiding this comment.
Fixed in commit 80719b3. Column names are now escaped using the SQL standard double-quote-doubling convention (col.name.replace('"', '""')) before being interpolated into the DDL string. This means a column named a"b safely becomes "a""b" in SQL.
| cols = records.column_names | ||
| col_list = ", ".join(f'"{c}"' for c in cols) | ||
| placeholders = ", ".join(f":{c}" for c in cols) | ||
| verb = "INSERT OR IGNORE" if skip_existing else "INSERT OR REPLACE" | ||
| sql = f'{verb} INTO "{table_name}" ({col_list}) VALUES ({placeholders})' | ||
| conn.executemany(sql, records.to_pylist()) |
There was a problem hiding this comment.
upsert_records() uses both quoted identifiers and named parameters derived from records.column_names (e.g. :{c}). Even when a column name is safe as a quoted identifier, it may be invalid as a sqlite named-parameter token (e.g., contains -/spaces) and will fail at runtime; if it contains " it can also break the SQL string. Prefer positional placeholders (?) with a stable column order, or map column names to safe parameter names and validate/escape identifiers before building SQL.
| cols = records.column_names | |
| col_list = ", ".join(f'"{c}"' for c in cols) | |
| placeholders = ", ".join(f":{c}" for c in cols) | |
| verb = "INSERT OR IGNORE" if skip_existing else "INSERT OR REPLACE" | |
| sql = f'{verb} INTO "{table_name}" ({col_list}) VALUES ({placeholders})' | |
| conn.executemany(sql, records.to_pylist()) | |
| cols = list(records.column_names) | |
| col_list = ", ".join(f'"{c}"' for c in cols) | |
| placeholders = ", ".join("?" for _ in cols) | |
| verb = "INSERT OR IGNORE" if skip_existing else "INSERT OR REPLACE" | |
| sql = f'{verb} INTO "{table_name}" ({col_list}) VALUES ({placeholders})' | |
| rows = [tuple(row[c] for c in cols) for row in records.to_pylist()] | |
| conn.executemany(sql, rows) |
There was a problem hiding this comment.
Fixed in commit 80719b3. Switched from named parameter syntax (:col_name) to positional ? placeholders with tuple rows ([tuple(row[c] for c in cols) for row in records.to_pylist()]). Column names are still double-quoted identifiers in the col_list portion (with "" escaping), while the values go through positional binding — avoiding any issue with column names that are invalid sqlite3 named-parameter tokens (e.g. names containing -, spaces, or ").
| db2 = ConnectorArrowDatabase(SQLiteConnector(":memory:")) | ||
| db2.add_record(("fn", "test"), record_id="r1", record=record_v1) | ||
| with pytest.raises(ValueError): | ||
| db2.add_record(("fn", "test"), record_id="r1", record=record_v2) | ||
| db2._connector.close() |
There was a problem hiding this comment.
This test reaches into ConnectorArrowDatabase's private _connector attribute to close the underlying connector. Prefer keeping an explicit reference to the SQLiteConnector you construct (or using a fixture/context manager) and closing that, so the test doesn't depend on private internals of ConnectorArrowDatabase.
| db2 = ConnectorArrowDatabase(SQLiteConnector(":memory:")) | |
| db2.add_record(("fn", "test"), record_id="r1", record=record_v1) | |
| with pytest.raises(ValueError): | |
| db2.add_record(("fn", "test"), record_id="r1", record=record_v2) | |
| db2._connector.close() | |
| connector2 = SQLiteConnector(":memory:") | |
| db2 = ConnectorArrowDatabase(connector2) | |
| db2.add_record(("fn", "test"), record_id="r1", record=record_v1) | |
| with pytest.raises(ValueError): | |
| db2.add_record(("fn", "test"), record_id="r1", record=record_v2) | |
| connector2.close() |
There was a problem hiding this comment.
Fixed in commit 80719b3. Introduced an explicit connector2 variable that holds the SQLiteConnector directly, so connector2.close() is called on the object we own rather than reaching through db2._connector.
- iter_batches: resolve column types from the queried table's FROM clause instead of scanning all tables; fixes cross-table column-name collisions - iter_batches: fix params handling — use `[] if params is None else params` instead of `params or []` to avoid replacing empty-but-valid param objects - get_table_names: filter out SQLite internal tables (sqlite_%) to return only user tables - create_table_if_not_exists: escape double-quotes in column names using SQL standard doubling (\" → \"\") for safe DDL identifiers - upsert_records: switch from named parameter syntax (:col_name) to positional ? placeholders with tuple rows, avoiding breakage on column names that are invalid as sqlite3 named-parameter tokens - test_skip_duplicates: use explicit connector2 variable instead of accessing ConnectorArrowDatabase._connector private attribute Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Review round addressed — commit 80719b3All 6 review comments have been resolved in a single commit. Summary of changes:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for col in columns: | ||
| sql_type = _arrow_type_to_sqlite_sql(col.arrow_type) | ||
| not_null = " NOT NULL" if not col.nullable else "" | ||
| pk = " PRIMARY KEY" if col.name == pk_column else "" | ||
| escaped = col.name.replace('"', '""') | ||
| col_defs.append(f' "{escaped}" {sql_type}{not_null}{pk}') | ||
| ddl = f'CREATE TABLE IF NOT EXISTS "{table_name}" (\n' |
There was a problem hiding this comment.
create_table_if_not_exists() silently creates the table without any PRIMARY KEY if pk_column doesn't match any ColumnInfo.name. That would break upsert_records(skip_existing=...) semantics (no uniqueness constraint → duplicates inserted / IGNORE never triggers) and contradict the protocol contract that pk_column is used as the primary key. Consider validating that pk_column exists exactly once in columns (and raising ValueError if not) before executing the DDL.
There was a problem hiding this comment.
Fixed in commit a16c363. Added upfront validation before the lock block: if pk_column is not present in the column names list, a ValueError is raised with a message listing the available columns. Added a corresponding test test_raises_when_pk_column_not_in_columns to TestCreateTableIfNotExists.
| placeholders = ", ".join("?" for _ in cols) | ||
| verb = "INSERT OR IGNORE" if skip_existing else "INSERT OR REPLACE" | ||
| sql = f'{verb} INTO "{table_name}" ({col_list}) VALUES ({placeholders})' | ||
| rows = [tuple(row[c] for c in cols) for row in records.to_pylist()] |
There was a problem hiding this comment.
upsert_records() materializes all rows twice (records.to_pylist() and then a separate rows = [...] list of tuples) before calling executemany(). For large flush batches this can be a significant memory/time overhead. If possible, avoid building an additional list (e.g., pass an iterator of tuples directly to executemany, or iterate column-wise from the Arrow arrays rather than to_pylist()).
| rows = [tuple(row[c] for c in cols) for row in records.to_pylist()] | |
| rows = (tuple(row[c] for c in cols) for row in records.to_pylist()) |
There was a problem hiding this comment.
Fixed in commit a16c363 as suggested. Changed the list comprehension [tuple(...)] to a generator expression (tuple(...)). executemany accepts any iterable, so the tuples are now constructed lazily one-at-a-time as sqlite3 consumes them, avoiding the second full copy of all row data in memory.
| collected: list[_pa.RecordBatch] = [] | ||
| with self._lock: |
There was a problem hiding this comment.
iter_batches() buffers the entire query result into collected under the connector lock and only yields after the full read finishes. This defeats the streaming intent of an iterator (higher peak memory, higher time-to-first-batch) and also blocks all other connector operations for the duration of the full scan. Consider an approach that yields batches incrementally without holding the lock across yield (e.g., dedicate a separate connection for the iterator, or serialize per-fetchmany while keeping the statement/cursor isolated from concurrent operations).
There was a problem hiding this comment.
Fixed in commit a16c363. Refactored from the full-buffer-under-lock pattern to per-fetchmany incremental streaming. The lock is now held only during (1) the initial query execution + schema resolution + first fetchmany, and (2) each subsequent fetchmany call. Arrow batch construction and yield both happen outside the lock, so other connector operations are never blocked while a caller is iterating over results. The cursor object remains valid across lock releases since sqlite3 per-cursor iteration state is independent of concurrent connection use.
- create_table_if_not_exists: validate pk_column exists in columns before executing DDL; raises ValueError with descriptive message if not found - iter_batches: refactor from full-buffer-under-lock to per-fetchmany incremental streaming; lock is held only during query setup and each individual fetchmany call, so other connector operations are not blocked during Arrow batch construction or yield points - upsert_records: switch rows list comprehension to generator expression to avoid materialising a second copy of all row data in memory Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Second review round addressed — commit a16c363All 3 comments resolved. Summary:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Summary
SQLiteConnector— aDBConnectorProtocolbacked by stdlibsqlite3(zero extra dependencies)SQLiteConnectorfromorcapod.databasesConnectorArrowDatabaseto use SQLite as a storage backendImplementation
SQLiteConnector(src/orcapod/databases/sqlite_connector.py):isolation_level=None(autocommit) +threading.RLockfor thread safety_sqlite_type_to_arrow,_arrow_type_to_sqlite_sql,_coerce_columnupsert_records: uses named-parameter syntax (:col_name) withexecutemany+to_pylist()dictsiter_batches: collects all batches inside the lock, yields outside — avoids lock-across-yield anti-pattern; resolves Arrow types viaget_column_info()lookup (cursor.description[i][1]is alwaysNonein sqlite3)_validate_table_name: rejects table names containing"to prevent SQL injection in PRAGMA/DDL statementsto_config/from_config: serializes to{"connector_type": "sqlite", "db_path": "..."}, validates on loadTests
tests/test_databases/test_sqlite_connector.pycovering all methods, type mappings, edge cases, and protocol conformancetests/test_databases/test_sqlite_connector_integration.pyexercisingConnectorArrowDatabase+SQLiteConnectorend-to-endLinear
Closes PLT-1076
🤖 Generated with Claude Code