Skip to content

Commit

Permalink
Move primary key generation for mssql backend after completely fillin…
Browse files Browse the repository at this point in the history
…g a table. (#190)

* For mssql backend: Move primary key generation after completely filling a table.

Moved string type changes for index columns after creation of empty table and before filling it.

* Prepared release 0.9.1.
  • Loading branch information
windiana42 committed Apr 26, 2024
1 parent cd306e3 commit d78675a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 37 deletions.
3 changes: 2 additions & 1 deletion docs/source/changelog.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Changelog

## 0.9.1 (2024-XX-XX)
## 0.9.1 (2024-04-26)
- Support Snowflake as a backend for `SQLTableStore`.
- For mssql backend, moved primary key adding after filling complete table.
- Make polars dematerialization robust against missing connectorx. Fall back to pandas if connectorx is not available.
- Fix some bugs with pandas < 2 and sqlalchemy < 2 compatibility as well as pyarrow handling.
- Use pd.StringDtype("pyarrow") instead of pd.ArrowDtype(pa.string()) for dtype "string[pyarrow]"
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pydiverse-pipedag"
version = "0.9.0"
version = "0.9.1"
description = "A pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files."
authors = [
"QuantCo, Inc.",
Expand Down
61 changes: 26 additions & 35 deletions src/pydiverse/pipedag/backend/table/sql/dialects/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import sqlalchemy.dialects.mssql

from pydiverse.pipedag.backend.table.sql.ddl import (
AddIndex,
AddPrimaryKey,
ChangeColumnTypes,
CreateAlias,
_mssql_update_definition,
Expand Down Expand Up @@ -79,44 +77,13 @@ def __init__(
def _init_database(self):
self._init_database_with_database("master", {"isolation_level": "AUTOCOMMIT"})

def add_primary_key(
self,
table_name: str,
schema: Schema,
key_columns: list[str],
*,
name: str | None = None,
):
self.execute(AddPrimaryKey(table_name, schema, key_columns, name))

def add_index(
self,
table_name: str,
schema: Schema,
index_columns: list[str],
name: str | None = None,
):
sql_types = self.reflect_sql_types(index_columns, table_name, schema)
if any(
[
isinstance(_type, sa.String) and _type.length is None
for _type in sql_types
]
):
# impose some varchar(max) limit to allow use in primary key / index
self.execute(
ChangeColumnTypes(
table_name, schema, index_columns, sql_types, cap_varchar_max=1024
)
)
self.execute(AddIndex(table_name, schema, index_columns, name))

def dialect_requests_empty_creation(self, table: Table, is_sql: bool) -> bool:
_ = is_sql
return (
table.nullable is not None
or table.non_nullable is not None
or (table.primary_key is not None and len(table.primary_key) > 0)
or (table.indexes is not None and len(table.indexes) > 0)
)

def get_forced_nullability_columns(
Expand Down Expand Up @@ -190,8 +157,32 @@ def add_indexes_and_set_nullable(
cap_varchar_max=1024,
)
)
self.add_table_primary_key(table, schema)
index_columns = set() # type: set[str]
if table.indexes is not None:
for index in table.indexes:
index_columns |= set(index)
index_columns_list = list(index_columns)
sql_types = self.reflect_sql_types(
index_columns_list, table.name, schema
)
index_str_max_columns = [
col
for _type, col in zip(sql_types, index_columns_list)
if isinstance(_type, sa.String) and _type.length is None
]
if len(index_str_max_columns) > 0:
# impose some varchar(max) limit to allow use in primary key / index
self.execute(
ChangeColumnTypes(
table.name,
schema,
index_str_max_columns,
sql_types,
cap_varchar_max=1024,
)
)
if on_empty_table is None or not on_empty_table:
self.add_table_primary_key(table, schema)
self.add_table_indexes(table, schema)

def execute_raw_sql(self, raw_sql: RawSql):
Expand Down

0 comments on commit d78675a

Please sign in to comment.