Skip to content
This repository has been archived by the owner on Nov 8, 2021. It is now read-only.

Commit

Permalink
fix: bugs uncovered by production use (#21)
Browse files Browse the repository at this point in the history
* Fix bugs uncovered when using with snowflake for production runs

* Fix linting and unit test errors
  • Loading branch information
villebro committed Dec 12, 2019
1 parent d3152b6 commit 8918145
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 20 deletions.
2 changes: 1 addition & 1 deletion sqltask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqltask.base.row_source import BaseRowSource
from sqltask.base.table import BaseOutputRow, BaseTableContext

__version__ = '0.6.1'
__version__ = '0.6.2'

# initialize logging
logger = logging.getLogger(__name__)
Expand Down
16 changes: 9 additions & 7 deletions sqltask/engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class BaseEngineSpec:
engine: Optional[str] = None
default_upload_type = UploadType.SQL_INSERT
supported_uploads: Set[UploadType] = {UploadType.SQL_INSERT}
supports_primary_keys = True
supports_column_comments = True
supports_table_comments = True
supports_schemas = True
Expand Down Expand Up @@ -184,7 +185,7 @@ def add_column(cls,
stmt += " NULL"
else:
stmt += " NOT NULL"
if column.primary_key is True:
if cls.supports_primary_keys and column.primary_key is True:
stmt += " PRIMARY KEY"
if cls.supports_column_comments and column.comment:
comment = get_escaped_string_value(column.comment)
Expand Down Expand Up @@ -262,16 +263,17 @@ def validate_column_value(cls, value: Any, column: Column) -> None:
elif not column.nullable and value is None:
raise ValueError(f"Column {name} cannot be null")
elif valid_types is None:
# type checking not valid
# type checking not defined for this type
pass
else:
type_ = column.type
length = type_.length if hasattr(type_, "length") else None # type: ignore
if type(value) not in valid_types:
raise ValueError(f"Column {name} type {column.type} is not compatible "
f"with value: {value}")
if isinstance(value, str) and hasattr(column.type, "length") and \
column.type.length is not None \
and len(value) > column.type.length: # type: ignore
f"with type {type(value).__name__}: {value}")
if isinstance(value, str) and length is not None \
and len(value) > length:
raise ValueError(f"Column {name} only supports "
f"{column.type.length} " # type: ignore
f"{length} "
f"character strings, given value is {len(value)} "
f"characters.")
1 change: 1 addition & 0 deletions sqltask/engine_specs/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class SqliteEngineSpec(BaseEngineSpec):
supported_uploads = {UploadType.SQL_INSERT}
default_upload_type = UploadType.SQL_INSERT
supports_column_comments = False
supports_primary_keys = False
supports_table_comments = False
supports_schemas = False

Expand Down
23 changes: 16 additions & 7 deletions sqltask/utils/engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,28 @@ def create_tmp_csv(table_context: BaseTableContext, delimiter: str = "\t") -> st
"""
csv_rows = []
metadata = table_context.engine_context.metadata
if table_context.name not in metadata.tables:
metadata.reflect(only=[table_context.name])
columns = metadata.tables[table_context.name].columns
# the reason we want to reflect the table schema is because the columns might
# be in a different order than in the table context definition. And since
# some engines have the fully qualified table name as key (schema.table) in the tables
# dict, while others only have the table name. Therefore, we need to loop through
# the tables until we find the correct table. Slightly hackish.
metadata.reflect(only=[table_context.name], schema=table_context.schema)
target_table = None
for table in metadata.tables.values():
if table.name == table_context.name:
target_table = table

if target_table is None:
raise Exception(f"Table {target_table} not found in schema despite reflection.")

for row in table_context.output_rows:
csv_row = []
for column in columns:
for column in target_table.columns:
csv_row.append(row[column.name])
csv_rows.append(csv_row)

table = table_context.table

epoch = str(datetime.utcnow().timestamp())
file_path = f"{tempfile.gettempdir()}/{table.name}_{epoch}.csv"
file_path = f"{tempfile.gettempdir()}/{target_table.name}_{epoch}.csv"
logger.info(f"Creating temporary file `{file_path}`")

with open(file_path, 'w', encoding="utf-8", newline='') as csv_file:
Expand Down
16 changes: 14 additions & 2 deletions tests/base/test_table.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
from unittest import TestCase

from tests.fixtures import get_table_context


class TestBaseTableContext(TestCase):
def test_row_source(self):
pass
def test_migration_add_and_remove_columns(self):
table_context = get_table_context()
engine = table_context.engine_context.engine
engine.execute("""
CREATE TABLE tbl (
customer_name VARCHAR(10) NOT NULL,
report_date DATE NOT NULL,
birthdate DATE NULL,
redundant_field VARCHAR(128) NOT NULL
)
""")
table_context.migrate_schema()
9 changes: 6 additions & 3 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import date
from typing import List, Optional

from sqlalchemy.schema import Column
from sqlalchemy.types import Date, String
Expand All @@ -7,12 +8,14 @@
from sqltask.base.table import BaseTableContext


def get_table_context() -> BaseTableContext:
def get_table_context(name: Optional[str] = None,
columns: Optional[List[Column]] = None,
) -> BaseTableContext:
engine_context = EngineContext("source", "sqlite://")
return BaseTableContext(
name="table",
name=name or "tbl",
engine_context=engine_context,
columns=[
columns=columns or [
Column("report_date", Date, primary_key=True),
Column("customer_name", String(10), comment="Name", primary_key=True),
Column("birthdate", Date, comment="Birthday", nullable=True),
Expand Down

0 comments on commit 8918145

Please sign in to comment.