Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rdl/BatchDataLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ def load_batch(self, batch_key_tracker):
f"sync_version: '{self.change_tracking_info.sync_version}', "
f"last_sync_version: '{self.change_tracking_info.last_sync_version}'.")

data_frame = self.source_db.get_next_data_frame(self.source_table_config, self.columns,
self.batch_config, batch_tracker, batch_key_tracker,
self.full_refresh, self.change_tracking_info)
data_frame = self.source_db.get_table_data_frame(self.source_table_config, self.columns,
self.batch_config, batch_tracker, batch_key_tracker,
self.full_refresh, self.change_tracking_info)

if data_frame is None or len(data_frame) == 0:
self.logger.debug("There are no more rows to import.")
Expand Down
119 changes: 57 additions & 62 deletions rdl/data_sources/MsSqlDataSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,29 @@ def __init__(self, connection_string, logger=None):
def can_handle_connection_string(connection_string):
return MsSqlDataSource.__connection_string_regex_match(connection_string) is not None

@staticmethod
def __connection_string_regex_match(connection_string):
return re.match(MsSqlDataSource.MSSQL_STRING_REGEX, connection_string)
def get_table_info(self, table_config, last_known_sync_version):
columns_in_database = self.__get_table_columns(table_config)
change_tracking_info = self.__get_change_tracking_info(table_config, last_known_sync_version)
source_table_info = SourceTableInfo(columns_in_database, change_tracking_info)
return source_table_info

@staticmethod
def connection_string_prefix():
return 'mssql+pyodbc://'
@prevent_senstive_data_logging
def get_table_data_frame(self, table_config, columns, batch_config, batch_tracker, batch_key_tracker,
full_refresh, change_tracking_info):
sql = self.__build_select_statement(table_config, columns, batch_config, batch_key_tracker,
full_refresh, change_tracking_info)

self.logger.debug(f"Starting read of SQL Statement: \n{sql}")
data_frame = pandas.read_sql_query(sql, self.database_engine)
self.logger.debug("Completed read")

batch_tracker.extract_completed_successfully(len(data_frame))

return data_frame

@staticmethod
def prefix_column(column_name, full_refresh, primary_key_column_names):
if not isinstance(primary_key_column_names, (list, tuple)):
raise TypeError(f"Argument 'primary_key_column_names' must be a list or tuple")
if column_name in primary_key_column_names and not full_refresh:
return f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.{column_name}"
else:
return f"{MsSqlDataSource.SOURCE_TABLE_ALIAS}.{column_name}"
def __connection_string_regex_match(connection_string):
return re.match(MsSqlDataSource.MSSQL_STRING_REGEX, connection_string)

def __create_connection_with_failover(self):
conn_string_data = MsSqlDataSource.__connection_string_regex_match(self.connection_string)
Expand Down Expand Up @@ -83,41 +90,6 @@ def __create_connection_with_failover(self):
return pyodbc.connect(dsn, server=failover)
raise e

def __build_select_statement(self, table_config, columns, batch_config, batch_key_tracker, full_refresh,
change_tracking_info):
column_array = list(
map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_config['primary_keys']),
columns))
column_names = ", ".join(column_array)

if full_refresh:
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}"
from_sql = f"FROM {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}"
where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.SOURCE_TABLE_ALIAS)}"
order_by_sql = "ORDER BY " + f", {MsSqlDataSource.SOURCE_TABLE_ALIAS}.".join(table_config['primary_keys'])
else:
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}, " \
f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_VERSION" \
f" AS {Providers.AuditColumnsNames.CHANGE_VERSION}, " \
f"CASE {MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 " \
f"END AS {Providers.AuditColumnsNames.IS_DELETED}"
from_sql = f"FROM CHANGETABLE(CHANGES" \
f" {table_config['schema']}.{table_config['name']}," \
f" {change_tracking_info.last_sync_version})" \
f" AS {MsSqlDataSource.CHANGE_TABLE_ALIAS}" \
f" LEFT JOIN {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}" \
f" ON {self.__build_change_table_on_clause(batch_key_tracker)}"
where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.CHANGE_TABLE_ALIAS)}"
order_by_sql = "ORDER BY " + f", {MsSqlDataSource.CHANGE_TABLE_ALIAS}.".join(table_config['primary_keys'])

return f"{select_sql} \n {from_sql} \n {where_sql} \n {order_by_sql};"

def get_table_info(self, table_config, last_known_sync_version):
columns_in_database = self.__get_table_columns(table_config)
change_tracking_info = self.__get_change_tracking_info(table_config, last_known_sync_version)
source_table_info = SourceTableInfo(columns_in_database, change_tracking_info)
return source_table_info

def __get_table_columns(self, table_config):
metadata = MetaData()
self.logger.debug(f"Reading definition for source table "
Expand All @@ -126,20 +98,6 @@ def __get_table_columns(self, table_config):
autoload_with=self.database_engine)
return list(map(lambda column: column.name, table.columns))

@prevent_senstive_data_logging
def get_next_data_frame(self, table_config, columns, batch_config, batch_tracker, batch_key_tracker,
full_refresh, change_tracking_info):
sql = self.__build_select_statement(table_config, columns, batch_config, batch_key_tracker,
full_refresh, change_tracking_info)

self.logger.debug(f"Starting read of SQL Statement: \n{sql}")
data_frame = pandas.read_sql_query(sql, self.database_engine)
self.logger.debug("Completed read")

batch_tracker.extract_completed_successfully(len(data_frame))

return data_frame

def __get_change_tracking_info(self, table_config, last_known_sync_version):

if last_known_sync_version is None:
Expand Down Expand Up @@ -216,6 +174,43 @@ def __get_change_tracking_info(self, table_config, last_known_sync_version):
return ChangeTrackingInfo(row["last_sync_version"], row["sync_version"],
row["force_full_load"], row["data_changed_since_last_sync"])

def __build_select_statement(self, table_config, columns, batch_config, batch_key_tracker, full_refresh,
change_tracking_info):
column_array = list(map(lambda cfg: MsSqlDataSource.prefix_column(
cfg['source_name'], full_refresh, table_config['primary_keys']), columns))
column_names = ", ".join(column_array)

if full_refresh:
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}"
from_sql = f"FROM {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}"
where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.SOURCE_TABLE_ALIAS)}"
order_by_sql = "ORDER BY " + f", {MsSqlDataSource.SOURCE_TABLE_ALIAS}.".join(table_config['primary_keys'])
else:
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}, " \
f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_VERSION" \
f" AS {Providers.AuditColumnsNames.CHANGE_VERSION}, " \
f"CASE {MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 " \
f"END AS {Providers.AuditColumnsNames.IS_DELETED}"
from_sql = f"FROM CHANGETABLE(CHANGES" \
f" {table_config['schema']}.{table_config['name']}," \
f" {change_tracking_info.last_sync_version})" \
f" AS {MsSqlDataSource.CHANGE_TABLE_ALIAS}" \
f" LEFT JOIN {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}" \
f" ON {self.__build_change_table_on_clause(batch_key_tracker)}"
where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.CHANGE_TABLE_ALIAS)}"
order_by_sql = "ORDER BY " + f", {MsSqlDataSource.CHANGE_TABLE_ALIAS}.".join(table_config['primary_keys'])

return f"{select_sql} \n {from_sql} \n {where_sql} \n {order_by_sql};"

@staticmethod
def prefix_column(column_name, full_refresh, primary_key_column_names):
if not isinstance(primary_key_column_names, (list, tuple)):
raise TypeError(f"Argument 'primary_key_column_names' must be a list or tuple")
if column_name in primary_key_column_names and not full_refresh:
return f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.{column_name}"
else:
return f"{MsSqlDataSource.SOURCE_TABLE_ALIAS}.{column_name}"

@staticmethod
def __build_where_clause(batch_key_tracker, table_alias):
has_value = False
Expand Down