Skip to content
This repository has been archived by the owner on Nov 8, 2021. It is now read-only.

Commit

Permalink
Add comment altering and add disabling logging to DqOutputRow (#18)
Browse files Browse the repository at this point in the history
* Add comment updating logic and bump to version to 0.5

* Add option disable dq logging to DqOutputRow

* Bump to 0.5.2

* Escape single quotes on comment statements

* Escape single quotes on column comment statements

* Fix escape bug
  • Loading branch information
villebro committed Nov 12, 2019
1 parent cb7b674 commit 874790a
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 7 deletions.
2 changes: 1 addition & 1 deletion sqltask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqltask.base.row_source import BaseRowSource
from sqltask.base.table import BaseOutputRow, BaseTableContext

__version__ = '0.4.5'
__version__ = '0.5.2'

# initialize logging
logger = logging.getLogger(__name__)
Expand Down
Empty file added sqltask/base/row_target.py
Empty file.
46 changes: 42 additions & 4 deletions sqltask/base/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,33 @@ def migrate_schema(self) -> None:
"""
table = self.table
engine = self.engine_context.engine
engine_spec = self.engine_context.engine_spec
metadata = self.engine_context.metadata
if engine.has_table(table.name, schema=self.schema):
# table exists, add column
inspector = sa.inspect(engine)
cols_existing = [col['name'] for col in inspector.get_columns(table.name)]

# update table comment if different from current comment
if engine_spec.supports_table_comments:
table_comment = inspector.get_table_comment(table.name)
if table.comment != table_comment:
engine_spec.update_table_comment(self, table.comment)

# check if existing columns are up to date
cols_existing = {col['name']: col
for col in inspector.get_columns(table.name)}
for column in table.columns:
if column.name not in cols_existing:
col_existing = cols_existing.get(column.name)

# add columns if not in table
if not col_existing:
self.engine_context.engine_spec.add_column(self, column)
else:
if engine_spec.supports_column_comments and \
column.comment is not None and \
col_existing["comment"] != column.comment:
# update column comment if different from current comment
engine_spec.update_column_comment(
self, column.name, column.comment)

# remove redundant columns
cols_new = {col.name: col for col in table.columns}
Expand Down Expand Up @@ -316,11 +335,27 @@ def append(self) -> None:


class DqOutputRow(BaseOutputRow):
def __init__(self, table_context: DqTableContext):
super().__init__(table_context)
self.logging_enabled = True

def set_logging_enabled(self, enabled: bool) -> None:
"""
If logging is set to false, data quality issues will not be passed to the
log table. This is useful for rows with lower priority data, e.g. inactive
users, whose data quality may be of poorer quality due to being stale.
:param enabled: set to True to log issues; False to ignore calls to `log_dq``
"""
self.logging_enabled = enabled

def log_dq(self, column_name: Optional[str], category: dq.Category,
priority: dq.Priority, source: dq.Source,
message: Optional[str] = None) -> None:
"""
Log data quality issue to be recorded in data quality table.
Log data quality issue to be recorded in data quality table. If logging
has been disabled by calling `set_logging_enabled(False)`, data quality
issues will be ignored.
:param column_name: Name of affected column in target table.
:param category: The type of data quality issue.
Expand All @@ -329,6 +364,9 @@ def log_dq(self, column_name: Optional[str], category: dq.Category,
Should be None for aggregate data quality issues.
:param message: Verbose description of observed issue.
"""
if self.logging_enabled is False:
return

if column_name not in self.table_context.table.columns:
raise Exception(f"Column `{column_name}` not in table "
f"`{self.table_context.table.name}`")
Expand Down
38 changes: 37 additions & 1 deletion sqltask/engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,42 @@ def drop_column(cls,
:return:
"""
table_name = table_context.table.name
logging.debug(f"Drop column `{column_name}` from table `{table_name}`")
logging.info(f"Drop column `{column_name}` from table `{table_name}`")
stmt = f'ALTER TABLE {table_name} DROP COLUMN {column_name}'
table_context.engine_context.engine.execute(stmt)

@classmethod
def update_table_comment(cls,
table_context: BaseTableContext,
comment: str):
"""
Update the comment of a table.
:param table_context: table which to alter
:param comment: new coment
:return:
"""
table_name = table_context.table.name
logging.info(f"Change comment on table `{table_name}`")
comment = comment.replace("'", "\\'")
stmt = f"COMMENT ON TABLE {table_name} IS '{comment}'"
table_context.engine_context.engine.execute(stmt)

@classmethod
def update_column_comment(cls,
table_context: BaseTableContext,
column_name: str,
comment: str):
"""
Update the comment of a column.
:param table_context: table which to alter
:param column_name: column whose comment is to be updated
:param comment: new coment
:return:
"""
table_name = table_context.table.name
logging.info(f"Change comment on table `{table_name}`")
comment = comment.replace("'", "\\'")
stmt = f"COMMENT ON COLUMN {table_name}.{column_name} IS '{comment}'"
table_context.engine_context.engine.execute(stmt)
1 change: 1 addition & 0 deletions sqltask/engine_specs/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class SnowflakeEngineSpec(BaseEngineSpec):
UploadType.CSV,
}
default_upload_type = UploadType.CSV
supports_table_comments = False

@classmethod
def _insert_rows_csv(cls, table_context: BaseTableContext) -> None:
Expand Down
2 changes: 1 addition & 1 deletion sqltask/sources/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __iter__(self):
row_number = 0
logger.debug(
f"Start reading CSV row source: {self}")
with open(self.file_path, newline="") as csvfile:
with open(self.file_path, newline="", encoding=self.encoding) as csvfile:
csvreader = csv.reader(csvfile, delimiter=self.delimiter)
for in_row in csvreader:
row_number += 1
Expand Down

0 comments on commit 874790a

Please sign in to comment.