Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions rdl/data_sources/MsSqlDataSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ class MsSqlDataSource(object):
def __init__(self, connection_string, logger=None):
self.logger = logger or logging.getLogger(__name__)
self.connection_string = connection_string
self.database_engine = create_engine(connection_string, creator=self.create_connection_with_failover)
self.database_engine = create_engine(connection_string, creator=self.__create_connection_with_failover)
self.column_type_resolver = ColumnTypeResolver()

@staticmethod
def can_handle_connection_string(connection_string):
return MsSqlDataSource.connection_string_regex_match(connection_string) is not None
return MsSqlDataSource.__connection_string_regex_match(connection_string) is not None

@staticmethod
def connection_string_regex_match(connection_string):
def __connection_string_regex_match(connection_string):
return re.match(MsSqlDataSource.MSSQL_STRING_REGEX, connection_string)

@staticmethod
Expand All @@ -52,8 +52,8 @@ def prefix_column(column_name, full_refresh, primary_key_column_names):
else:
return f"{MsSqlDataSource.SOURCE_TABLE_ALIAS}.{column_name}"

def create_connection_with_failover(self):
conn_string_data = MsSqlDataSource.connection_string_regex_match(self.connection_string)
def __create_connection_with_failover(self):
conn_string_data = MsSqlDataSource.__connection_string_regex_match(self.connection_string)
server = conn_string_data.group('server')
failover = conn_string_data.group('failover')
database = conn_string_data.group('database')
Expand Down Expand Up @@ -82,7 +82,7 @@ def create_connection_with_failover(self):
return pyodbc.connect(dsn, server=failover)
raise e

def build_select_statement(self, table_config, columns, batch_config, batch_key_tracker, full_refresh,
def __build_select_statement(self, table_config, columns, batch_config, batch_key_tracker, full_refresh,
change_tracking_info):
column_array = list(
map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_config['primary_keys']),
Expand All @@ -92,7 +92,7 @@ def build_select_statement(self, table_config, columns, batch_config, batch_key_
if full_refresh:
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}"
from_sql = f"FROM {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}"
where_sql = f"WHERE {self.build_where_clause(batch_key_tracker, MsSqlDataSource.SOURCE_TABLE_ALIAS)}"
where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.SOURCE_TABLE_ALIAS)}"
order_by_sql = "ORDER BY " + f", {MsSqlDataSource.SOURCE_TABLE_ALIAS}.".join(table_config['primary_keys'])
else:
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}, " \
Expand All @@ -105,29 +105,29 @@ def build_select_statement(self, table_config, columns, batch_config, batch_key_
f" {change_tracking_info.last_sync_version})" \
f" AS {MsSqlDataSource.CHANGE_TABLE_ALIAS}" \
f" LEFT JOIN {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}" \
f" ON {self.build_change_table_on_clause(batch_key_tracker)}"
where_sql = f"WHERE {self.build_where_clause(batch_key_tracker, MsSqlDataSource.CHANGE_TABLE_ALIAS)}"
f" ON {self.__build_change_table_on_clause(batch_key_tracker)}"
where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.CHANGE_TABLE_ALIAS)}"
order_by_sql = "ORDER BY " + f", {MsSqlDataSource.CHANGE_TABLE_ALIAS}.".join(table_config['primary_keys'])

return f"{select_sql} \n {from_sql} \n {where_sql} \n {order_by_sql};"

# Returns an array of configured_columns containing only columns that this data source supports. Logs invalid ones.
def assert_data_source_is_valid(self, table_config, configured_columns):
columns_in_database = self.get_table_columns(table_config)
columns_in_database = self.__get_table_columns(table_config)

for column in configured_columns:
self.assert_column_exists(column['source_name'],
columns_in_database,
f"{table_config['schema']}.{table_config['name']}")
self.__assert_column_exists(column['source_name'],
columns_in_database,
f"{table_config['schema']}.{table_config['name']}")

def assert_column_exists(self, column_name, columns_in_database, table_name):
def __assert_column_exists(self, column_name, columns_in_database, table_name):
if column_name in columns_in_database:
return True

message = f'Column {column_name} does not exist in source table {table_name}'
raise ValueError(message)

def get_table_columns(self, table_config):
def __get_table_columns(self, table_config):
metadata = MetaData()
self.logger.debug(f"Reading definition for source table "
f"{table_config['schema']}.{table_config['name']}")
Expand All @@ -138,7 +138,7 @@ def get_table_columns(self, table_config):
@prevent_senstive_data_logging
def get_next_data_frame(self, table_config, columns, batch_config, batch_tracker, batch_key_tracker,
full_refresh, change_tracking_info):
sql = self.build_select_statement(table_config, columns, batch_config, batch_key_tracker,
sql = self.__build_select_statement(table_config, columns, batch_config, batch_key_tracker,
full_refresh, change_tracking_info)

self.logger.debug(f"Starting read of SQL Statement: \n{sql}")
Expand Down Expand Up @@ -226,7 +226,7 @@ def get_change_tracking_info(self, table_config, last_known_sync_version):
row["force_full_load"], row["data_changed_since_last_sync"])

@staticmethod
def build_where_clause(batch_key_tracker, table_alias):
def __build_where_clause(batch_key_tracker, table_alias):
has_value = False

try:
Expand All @@ -244,7 +244,7 @@ def build_where_clause(batch_key_tracker, table_alias):
sql_builder.close()

@staticmethod
def build_change_table_on_clause(batch_key_tracker):
def __build_change_table_on_clause(batch_key_tracker):
has_value = False

try:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages

setup(name='rdl',
version='0.1.5',
version='0.1.12-beta',
packages=find_packages(),
install_requires=[
'numpy==1.16.2',
Expand Down