diff --git a/rdl/BatchDataLoader.py b/rdl/BatchDataLoader.py index 16d89f8..6b7af71 100644 --- a/rdl/BatchDataLoader.py +++ b/rdl/BatchDataLoader.py @@ -31,9 +31,9 @@ def load_batch(self, batch_key_tracker): f"sync_version: '{self.change_tracking_info.sync_version}', " f"last_sync_version: '{self.change_tracking_info.last_sync_version}'.") - data_frame = self.source_db.get_next_data_frame(self.source_table_config, self.columns, - self.batch_config, batch_tracker, batch_key_tracker, - self.full_refresh, self.change_tracking_info) + data_frame = self.source_db.get_table_data_frame(self.source_table_config, self.columns, + self.batch_config, batch_tracker, batch_key_tracker, + self.full_refresh, self.change_tracking_info) if data_frame is None or len(data_frame) == 0: self.logger.debug("There are no more rows to import.") diff --git a/rdl/data_sources/MsSqlDataSource.py b/rdl/data_sources/MsSqlDataSource.py index 404596c..bc224e4 100644 --- a/rdl/data_sources/MsSqlDataSource.py +++ b/rdl/data_sources/MsSqlDataSource.py @@ -36,22 +36,29 @@ def __init__(self, connection_string, logger=None): def can_handle_connection_string(connection_string): return MsSqlDataSource.__connection_string_regex_match(connection_string) is not None - @staticmethod - def __connection_string_regex_match(connection_string): - return re.match(MsSqlDataSource.MSSQL_STRING_REGEX, connection_string) + def get_table_info(self, table_config, last_known_sync_version): + columns_in_database = self.__get_table_columns(table_config) + change_tracking_info = self.__get_change_tracking_info(table_config, last_known_sync_version) + source_table_info = SourceTableInfo(columns_in_database, change_tracking_info) + return source_table_info - @staticmethod - def connection_string_prefix(): - return 'mssql+pyodbc://' + @prevent_senstive_data_logging + def get_table_data_frame(self, table_config, columns, batch_config, batch_tracker, batch_key_tracker, + full_refresh, change_tracking_info): + sql = self.__build_select_statement(table_config, columns, batch_config, batch_key_tracker, + full_refresh, change_tracking_info) + + self.logger.debug(f"Starting read of SQL Statement: \n{sql}") + data_frame = pandas.read_sql_query(sql, self.database_engine) + self.logger.debug("Completed read") + + batch_tracker.extract_completed_successfully(len(data_frame)) + + return data_frame @staticmethod - def prefix_column(column_name, full_refresh, primary_key_column_names): - if not isinstance(primary_key_column_names, (list, tuple)): - raise TypeError(f"Argument 'primary_key_column_names' must be a list or tuple") - if column_name in primary_key_column_names and not full_refresh: - return f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.{column_name}" - else: - return f"{MsSqlDataSource.SOURCE_TABLE_ALIAS}.{column_name}" + def __connection_string_regex_match(connection_string): + return re.match(MsSqlDataSource.MSSQL_STRING_REGEX, connection_string) def __create_connection_with_failover(self): conn_string_data = MsSqlDataSource.__connection_string_regex_match(self.connection_string) @@ -83,41 +90,6 @@ def __create_connection_with_failover(self): return pyodbc.connect(dsn, server=failover) raise e - def __build_select_statement(self, table_config, columns, batch_config, batch_key_tracker, full_refresh, - change_tracking_info): - column_array = list( - map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_config['primary_keys']), - columns)) - column_names = ", ".join(column_array) - - if full_refresh: - select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}" - from_sql = f"FROM {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}" - where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.SOURCE_TABLE_ALIAS)}" - order_by_sql = "ORDER BY " + f", {MsSqlDataSource.SOURCE_TABLE_ALIAS}.".join(table_config['primary_keys']) - else: - select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}, " \ - f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_VERSION" \ - f" AS {Providers.AuditColumnsNames.CHANGE_VERSION}, " \ - f"CASE {MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 " \ - f"END AS {Providers.AuditColumnsNames.IS_DELETED}" - from_sql = f"FROM CHANGETABLE(CHANGES" \ - f" {table_config['schema']}.{table_config['name']}," \ - f" {change_tracking_info.last_sync_version})" \ - f" AS {MsSqlDataSource.CHANGE_TABLE_ALIAS}" \ - f" LEFT JOIN {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}" \ - f" ON {self.__build_change_table_on_clause(batch_key_tracker)}" - where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.CHANGE_TABLE_ALIAS)}" - order_by_sql = "ORDER BY " + f", {MsSqlDataSource.CHANGE_TABLE_ALIAS}.".join(table_config['primary_keys']) - - return f"{select_sql} \n {from_sql} \n {where_sql} \n {order_by_sql};" - - def get_table_info(self, table_config, last_known_sync_version): - columns_in_database = self.__get_table_columns(table_config) - change_tracking_info = self.__get_change_tracking_info(table_config, last_known_sync_version) - source_table_info = SourceTableInfo(columns_in_database, change_tracking_info) - return source_table_info - def __get_table_columns(self, table_config): metadata = MetaData() self.logger.debug(f"Reading definition for source table " @@ -126,20 +98,6 @@ def __get_table_columns(self, table_config): autoload_with=self.database_engine) return list(map(lambda column: column.name, table.columns)) - @prevent_senstive_data_logging - def get_next_data_frame(self, table_config, columns, batch_config, batch_tracker, batch_key_tracker, - full_refresh, change_tracking_info): - sql = self.__build_select_statement(table_config, columns, batch_config, batch_key_tracker, - full_refresh, change_tracking_info) - - self.logger.debug(f"Starting read of SQL Statement: \n{sql}") - data_frame = pandas.read_sql_query(sql, self.database_engine) - self.logger.debug("Completed read") - - batch_tracker.extract_completed_successfully(len(data_frame)) - - return data_frame - def __get_change_tracking_info(self, table_config, last_known_sync_version): if last_known_sync_version is None: @@ -216,6 +174,43 @@ def __get_change_tracking_info(self, table_config, last_known_sync_version): return ChangeTrackingInfo(row["last_sync_version"], row["sync_version"], row["force_full_load"], row["data_changed_since_last_sync"]) + def __build_select_statement(self, table_config, columns, batch_config, batch_key_tracker, full_refresh, + change_tracking_info): + column_array = list(map(lambda cfg: MsSqlDataSource.prefix_column( + cfg['source_name'], full_refresh, table_config['primary_keys']), columns)) + column_names = ", ".join(column_array) + + if full_refresh: + select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}" + from_sql = f"FROM {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}" + where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.SOURCE_TABLE_ALIAS)}" + order_by_sql = "ORDER BY " + f", {MsSqlDataSource.SOURCE_TABLE_ALIAS}.".join(table_config['primary_keys']) + else: + select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}, " \ + f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_VERSION" \ + f" AS {Providers.AuditColumnsNames.CHANGE_VERSION}, " \ + f"CASE {MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 " \ + f"END AS {Providers.AuditColumnsNames.IS_DELETED}" + from_sql = f"FROM CHANGETABLE(CHANGES" \ + f" {table_config['schema']}.{table_config['name']}," \ + f" {change_tracking_info.last_sync_version})" \ + f" AS {MsSqlDataSource.CHANGE_TABLE_ALIAS}" \ + f" LEFT JOIN {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}" \ + f" ON {self.__build_change_table_on_clause(batch_key_tracker)}" + where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.CHANGE_TABLE_ALIAS)}" + order_by_sql = "ORDER BY " + f", {MsSqlDataSource.CHANGE_TABLE_ALIAS}.".join(table_config['primary_keys']) + + return f"{select_sql} \n {from_sql} \n {where_sql} \n {order_by_sql};" + + @staticmethod + def prefix_column(column_name, full_refresh, primary_key_column_names): + if not isinstance(primary_key_column_names, (list, tuple)): + raise TypeError(f"Argument 'primary_key_column_names' must be a list or tuple") + if column_name in primary_key_column_names and not full_refresh: + return f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.{column_name}" + else: + return f"{MsSqlDataSource.SOURCE_TABLE_ALIAS}.{column_name}" + @staticmethod def __build_where_clause(batch_key_tracker, table_alias): has_value = False