diff --git a/rdl/data_sources/MsSqlDataSource.py b/rdl/data_sources/MsSqlDataSource.py index 9cf7504..d09a4ba 100644 --- a/rdl/data_sources/MsSqlDataSource.py +++ b/rdl/data_sources/MsSqlDataSource.py @@ -28,15 +28,15 @@ class MsSqlDataSource(object): def __init__(self, connection_string, logger=None): self.logger = logger or logging.getLogger(__name__) self.connection_string = connection_string - self.database_engine = create_engine(connection_string, creator=self.create_connection_with_failover) + self.database_engine = create_engine(connection_string, creator=self.__create_connection_with_failover) self.column_type_resolver = ColumnTypeResolver() @staticmethod def can_handle_connection_string(connection_string): - return MsSqlDataSource.connection_string_regex_match(connection_string) is not None + return MsSqlDataSource.__connection_string_regex_match(connection_string) is not None @staticmethod - def connection_string_regex_match(connection_string): + def __connection_string_regex_match(connection_string): return re.match(MsSqlDataSource.MSSQL_STRING_REGEX, connection_string) @staticmethod @@ -52,8 +52,8 @@ def prefix_column(column_name, full_refresh, primary_key_column_names): else: return f"{MsSqlDataSource.SOURCE_TABLE_ALIAS}.{column_name}" - def create_connection_with_failover(self): - conn_string_data = MsSqlDataSource.connection_string_regex_match(self.connection_string) + def __create_connection_with_failover(self): + conn_string_data = MsSqlDataSource.__connection_string_regex_match(self.connection_string) server = conn_string_data.group('server') failover = conn_string_data.group('failover') database = conn_string_data.group('database') @@ -82,7 +82,7 @@ def create_connection_with_failover(self): return pyodbc.connect(dsn, server=failover) raise e - def build_select_statement(self, table_config, columns, batch_config, batch_key_tracker, full_refresh, + def __build_select_statement(self, table_config, columns, batch_config, batch_key_tracker, full_refresh, change_tracking_info): column_array = list( map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_config['primary_keys']), @@ -92,7 +92,7 @@ def build_select_statement(self, table_config, columns, batch_config, batch_key_ if full_refresh: select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}" from_sql = f"FROM {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}" - where_sql = f"WHERE {self.build_where_clause(batch_key_tracker, MsSqlDataSource.SOURCE_TABLE_ALIAS)}" + where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.SOURCE_TABLE_ALIAS)}" order_by_sql = "ORDER BY " + f", {MsSqlDataSource.SOURCE_TABLE_ALIAS}.".join(table_config['primary_keys']) else: select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}, " \ @@ -105,29 +105,29 @@ def build_select_statement(self, table_config, columns, batch_config, batch_key_ f" {change_tracking_info.last_sync_version})" \ f" AS {MsSqlDataSource.CHANGE_TABLE_ALIAS}" \ f" LEFT JOIN {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}" \ - f" ON {self.build_change_table_on_clause(batch_key_tracker)}" - where_sql = f"WHERE {self.build_where_clause(batch_key_tracker, MsSqlDataSource.CHANGE_TABLE_ALIAS)}" + f" ON {self.__build_change_table_on_clause(batch_key_tracker)}" + where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.CHANGE_TABLE_ALIAS)}" order_by_sql = "ORDER BY " + f", {MsSqlDataSource.CHANGE_TABLE_ALIAS}.".join(table_config['primary_keys']) return f"{select_sql} \n {from_sql} \n {where_sql} \n {order_by_sql};" # Returns an array of configured_columns containing only columns that this data source supports. Logs invalid ones. def assert_data_source_is_valid(self, table_config, configured_columns): - columns_in_database = self.get_table_columns(table_config) + columns_in_database = self.__get_table_columns(table_config) for column in configured_columns: - self.assert_column_exists(column['source_name'], - columns_in_database, - f"{table_config['schema']}.{table_config['name']}") + self.__assert_column_exists(column['source_name'], + columns_in_database, + f"{table_config['schema']}.{table_config['name']}") - def assert_column_exists(self, column_name, columns_in_database, table_name): + def __assert_column_exists(self, column_name, columns_in_database, table_name): if column_name in columns_in_database: return True message = f'Column {column_name} does not exist in source table {table_name}' raise ValueError(message) - def get_table_columns(self, table_config): + def __get_table_columns(self, table_config): metadata = MetaData() self.logger.debug(f"Reading definition for source table " f"{table_config['schema']}.{table_config['name']}") @@ -138,7 +138,7 @@ def get_table_columns(self, table_config): @prevent_senstive_data_logging def get_next_data_frame(self, table_config, columns, batch_config, batch_tracker, batch_key_tracker, full_refresh, change_tracking_info): - sql = self.build_select_statement(table_config, columns, batch_config, batch_key_tracker, + sql = self.__build_select_statement(table_config, columns, batch_config, batch_key_tracker, full_refresh, change_tracking_info) self.logger.debug(f"Starting read of SQL Statement: \n{sql}") @@ -226,7 +226,7 @@ def get_change_tracking_info(self, table_config, last_known_sync_version): row["force_full_load"], row["data_changed_since_last_sync"]) @staticmethod - def build_where_clause(batch_key_tracker, table_alias): + def __build_where_clause(batch_key_tracker, table_alias): has_value = False try: @@ -244,7 +244,7 @@ def build_where_clause(batch_key_tracker, table_alias): sql_builder.close() @staticmethod - def build_change_table_on_clause(batch_key_tracker): + def __build_change_table_on_clause(batch_key_tracker): has_value = False try: diff --git a/setup.py b/setup.py index b7f7798..545dd89 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages setup(name='rdl', - version='0.1.5', + version='0.1.12-beta', packages=find_packages(), install_requires=[ 'numpy==1.16.2',