From 7b5c9b5b1276b783ac04bcd9c8ed696ad4bfbc78 Mon Sep 17 00:00:00 2001 From: David Ames Date: Mon, 2 Jul 2018 15:16:45 +1000 Subject: [PATCH 01/11] early hacks --- modules/DataLoadManager.py | 4 ++++ modules/batch_executions/BatchExecution.py | 14 ++++++++++++++ modules/batch_executions/BatchExecutionManager.py | 14 ++++++++++++++ 3 files changed, 32 insertions(+) create mode 100644 modules/batch_executions/BatchExecution.py create mode 100644 modules/batch_executions/BatchExecutionManager.py diff --git a/modules/DataLoadManager.py b/modules/DataLoadManager.py index 0724d85..9c8fbeb 100644 --- a/modules/DataLoadManager.py +++ b/modules/DataLoadManager.py @@ -38,6 +38,10 @@ def start_single_import(self, target_engine, configuration_name, requested_full_ pipeline_configuration['load_table'])) full_refresh = True + #Ask the source to check that ChangeTrackingIsEnabled and valid + #Maybe this returns a flag? + + data_load_tracker = DataLoadTracker(configuration_name, json_data, full_refresh) self.data_source.assert_data_source_is_valid(pipeline_configuration['source_table'], diff --git a/modules/batch_executions/BatchExecution.py b/modules/batch_executions/BatchExecution.py new file mode 100644 index 0000000..f8c4a0f --- /dev/null +++ b/modules/batch_executions/BatchExecution.py @@ -0,0 +1,14 @@ +from sqlalchemy import Column, DateTime, Integer, String, Boolean, BigInteger +from sqlalchemy.sql import func +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + +class BatchExecution(Base): + __tablename__ = 'batch_execution' + id = Column(Integer, primary_key=True) + model_name = Column(String(250), nullable=False) + is_full_refresh = Column(Boolean, nullable=False) + start_synchronization_version = Column(BigInteger, nullable=False) + next_bookmark_synchronization_version = Column(BigInteger, nullable=False) + started_on = Column(DateTime(timezone=True), server_default=func.now()) \ No newline at end of file diff --git a/modules/batch_executions/BatchExecutionManager.py b/modules/batch_executions/BatchExecutionManager.py new file mode 100644 index 0000000..cf7234c --- /dev/null +++ b/modules/batch_executions/BatchExecutionManager.py @@ -0,0 +1,14 @@ +import os +import json +import logging +from modules.BatchDataLoader import BatchDataLoader +from modules.DestinationTableManager import DestinationTableManager +from modules.DataLoadTracker import DataLoadTracker + + + +class BatchExecutionManager(object): + def __init__(self, configuration_path, data_source, logger=None): + self.logger = logger or logging.getLogger(__name__) + self.configuration_path = configuration_path + self.data_source = data_source From 3a1ac54e445b0b221b485cb353b44a943351fdba Mon Sep 17 00:00:00 2001 From: David Ames Date: Fri, 6 Jul 2018 18:37:10 +1000 Subject: [PATCH 02/11] basic incremental loads now working --- modules/BatchDataLoader.py | 16 +++- modules/DataLoadManager.py | 25 ++++-- modules/DataLoadTracker.py | 11 ++- modules/DestinationTableManager.py | 36 ++++++++- modules/data_sources/ChangeTrackingInfo.py | 13 ++++ modules/data_sources/MsSqlDataSource.py | 90 ++++++++++++++++++---- test_full_refresh_from_mssql.cmd | 12 +-- 7 files changed, 165 insertions(+), 38 deletions(-) create mode 100644 modules/data_sources/ChangeTrackingInfo.py diff --git a/modules/BatchDataLoader.py b/modules/BatchDataLoader.py index 6837117..41b9e1c 100644 --- a/modules/BatchDataLoader.py +++ b/modules/BatchDataLoader.py @@ -6,7 +6,7 @@ class BatchDataLoader(object): def __init__(self, data_source, source_table_configuration, target_schema, target_table, columns, data_load_tracker, - batch_configuration, target_engine, logger=None): + batch_configuration, target_engine, full_refresh, change_tracking_info, logger=None): self.logger = logger or logging.getLogger(__name__) self.source_table_configuration = source_table_configuration self.columns = columns @@ -16,15 +16,18 @@ def __init__(self, data_source, source_table_configuration, target_schema, targe self.data_load_tracker = data_load_tracker self.batch_configuration = batch_configuration self.target_engine = target_engine + self.full_refresh = full_refresh + self.change_tracking_info = change_tracking_info # Imports rows, returns True if >0 rows were found def load_batch(self, previous_batch_key): batch_tracker = self.data_load_tracker.start_batch() - self.logger.debug("ImportBatch Starting from previous_batch_key: {0}".format(previous_batch_key)) + self.logger.debug("ImportBatch Starting from previous_batch_key: {0}. Full Refresh: {1} this_sync_version: {2}".format(previous_batch_key, self.full_refresh, self.change_tracking_info.this_sync_version)) data_frame = self.data_source.get_next_data_frame(self.source_table_configuration, self.columns, - self.batch_configuration, batch_tracker, previous_batch_key) + self.batch_configuration, batch_tracker, previous_batch_key, + self.full_refresh, self.change_tracking_info) if data_frame is None or len(data_frame) == 0: self.logger.debug("There are no rows to import, returning -1") @@ -44,7 +47,7 @@ def write_data_frame_to_table(self, data_frame): qualified_target_table = "{0}.{1}".format(self.target_schema, self.target_table) self.logger.debug("Starting write to table {0}".format(qualified_target_table)) data = StringIO() - + print(data_frame) data_frame.to_csv(data, header=False, index=False, na_rep='', float_format='%.16g') # Float_format is used to truncate any insignificant digits. Unfortunately it gives us an artificial limitation @@ -58,6 +61,7 @@ def write_data_frame_to_table(self, data_frame): sql = "COPY {0}({1}) FROM STDIN with csv".format(qualified_target_table, column_list) self.logger.debug("Writing to table using command {0}".format(sql)) + curs.copy_expert(sql=sql, file=data) self.logger.debug("Completed write to table {0}".format(qualified_target_table)) @@ -70,6 +74,10 @@ def get_destination_column_name(self, source_column_name): if column['source_name'] == source_column_name: return column['destination']['name'] + # Internal columns - map them straight through + if source_column_name.startswith("data_pipeline_"): + return source_column_name; + message = 'A source column with name {0} was not found in the column configuration'.format(source_column_name) raise ValueError(message) diff --git a/modules/DataLoadManager.py b/modules/DataLoadManager.py index 9c8fbeb..48a40e6 100644 --- a/modules/DataLoadManager.py +++ b/modules/DataLoadManager.py @@ -38,14 +38,23 @@ def start_single_import(self, target_engine, configuration_name, requested_full_ pipeline_configuration['load_table'])) full_refresh = True - #Ask the source to check that ChangeTrackingIsEnabled and valid - #Maybe this returns a flag? + self.data_source.assert_data_source_is_valid(pipeline_configuration['source_table'], + pipeline_configuration['columns']) + last_sync_version = destination_table_manager.get_last_sync_version(pipeline_configuration['target_schema'], + pipeline_configuration['load_table']) - data_load_tracker = DataLoadTracker(configuration_name, json_data, full_refresh) + change_tracking_info = self.data_source.init_change_tracking(pipeline_configuration['source_table'], + last_sync_version) - self.data_source.assert_data_source_is_valid(pipeline_configuration['source_table'], - pipeline_configuration['columns']) + + data_load_tracker = DataLoadTracker(configuration_name, json_data, full_refresh, change_tracking_info) + + + self.logger.debug(" Change Tracking: this_sync_version: {0} next_sync_version: {1} force_full_load:{2} : ".format(change_tracking_info.this_sync_version, change_tracking_info.next_sync_version, change_tracking_info.force_full_load())) + if not full_refresh and change_tracking_info.force_full_load(): + self.logger.info("Change tracking has forced this to be a full load") + full_refresh = True columns = pipeline_configuration['columns'] destination_table_manager.create_schema(pipeline_configuration['target_schema']) @@ -64,7 +73,9 @@ def start_single_import(self, target_engine, configuration_name, requested_full_ columns, data_load_tracker, pipeline_configuration['batch'], - target_engine) + target_engine, + full_refresh, + change_tracking_info) previous_unique_column_value = 0 while previous_unique_column_value > -1: @@ -86,5 +97,5 @@ def start_single_import(self, target_engine, configuration_name, requested_full_ destination_table_manager.drop_table(pipeline_configuration['target_schema'], pipeline_configuration['stage_table']) data_load_tracker.completed_successfully() - self.logger.info("Import for configuration: {0} Complete. {1}".format(configuration_name, data_load_tracker.get_statistics())) + self.logger.info("Import Complete for: {0}. {1}".format(configuration_name, data_load_tracker.get_statistics())) diff --git a/modules/DataLoadTracker.py b/modules/DataLoadTracker.py index 7a69974..6f4b5fb 100644 --- a/modules/DataLoadTracker.py +++ b/modules/DataLoadTracker.py @@ -14,12 +14,13 @@ class DataLoadTracker: total_row_count = 0 rows_per_second = 0 - def __init__(self, configuration_name, configuration, is_full_load): + def __init__(self, configuration_name, configuration, is_full_load, change_tracking_info): self.configuration_name = configuration_name self.configuration = configuration self.is_full_load = is_full_load self.started = datetime.now() self.status = "Not Started" + self.change_tracking_info = change_tracking_info def start_batch(self): batch = self.Batch() @@ -36,9 +37,11 @@ def completed_successfully(self): self.rows_per_second = self.total_row_count / self.total_execution_time.total_seconds() def get_statistics(self): - return "Rows: {0}, Total Execution Time: {1}. ({2:.2f} rows per second)".format(self.total_row_count, - self.total_execution_time, - self.rows_per_second) + load_type = 'full' if self.is_full_load else "incremental from version {0} ".format(self.change_tracking_info.this_sync_version) + return "Rows: {0} ({1}), Total Execution Time: {2}. ({3:.2f} rows per second) ".format(self.total_row_count, + load_type, + self.total_execution_time, + self.rows_per_second) class Batch: row_count = 0 diff --git a/modules/DestinationTableManager.py b/modules/DestinationTableManager.py index 27653e8..4bd73db 100644 --- a/modules/DestinationTableManager.py +++ b/modules/DestinationTableManager.py @@ -3,7 +3,7 @@ import logging from modules.ColumnTypeResolver import ColumnTypeResolver -from sqlalchemy import MetaData, DateTime, Boolean +from sqlalchemy import MetaData, DateTime, Boolean, BigInteger from sqlalchemy.schema import Column, Table from sqlalchemy.sql import func @@ -11,6 +11,8 @@ class DestinationTableManager(object): TIMESTAMP_COLUMN_NAME = "data_pipeline_timestamp" IS_DELETED_COLUMN_NAME = "data_pipeline_is_deleted" + CHANGE_VERSION_COLUMN_NAME = "data_pipeline_change_version" + NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME = "data_pipeline_next_change_minimum_version" def __init__(self, target_engine, logger=None): self.logger = logger or logging.getLogger(__name__) @@ -52,9 +54,11 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs table.append_column( Column(self.IS_DELETED_COLUMN_NAME, Boolean, server_default='f', default=False)) + table.append_column( + Column(self.CHANGE_VERSION_COLUMN_NAME, BigInteger)) - - + table.append_column( + Column(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME, BigInteger)) if drop_first: self.logger.debug( @@ -63,9 +67,11 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs self.logger.debug( "Dropped table {0}.{1}".format(schema_name, table_name)) + self.logger.debug("Creating table {0}.{1}".format(schema_name, table_name)) table.create(self.target_engine, checkfirst=False) self.logger.debug("Created table {0}.{1}".format(schema_name, table_name)) + return def create_column(self, configuration): @@ -73,6 +79,17 @@ def create_column(self, configuration): primary_key=configuration.get("primary_key", False), nullable=configuration['nullable']) + # TODO: this should come from a different log table which is only written to at the end of a successful load load. + def get_last_sync_version(self, schema_name, table_name): + sql = "SELECT max({0}) as version FROM {1}.{2}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME, schema_name, table_name) + + result = self.target_engine.execute(sql) + row = result.fetchone() + if row["version"] is None: + return 0 + return row["version"] + + def rename_table(self, schema_name, source_table_name, target_table_name): # Steps to efficiently rename a table. @@ -116,6 +133,10 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column column_array = list(map(lambda column: column['destination']['name'], columns_configuration)) column_list = ','.join(map(str, column_array)) column_list = column_list + ",{0}".format(self.TIMESTAMP_COLUMN_NAME) + column_list = column_list + ",{0}".format(self.IS_DELETED_COLUMN_NAME) + column_list = column_list + ",{0}".format(self.CHANGE_VERSION_COLUMN_NAME) + column_list = column_list + ",{0}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME) + primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in columns_configuration if 'primary_key' in column_configuration['destination'] and column_configuration['destination']['primary_key']] @@ -132,7 +153,14 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column sql_builder.write("{0} = EXCLUDED.{0},".format(column_configuratiomn['destination']['name'])) sql_builder.write(os.linesep) - sql_builder.write("{0} = EXCLUDED.{0}".format(self.TIMESTAMP_COLUMN_NAME)) + sql_builder.write("{0} = EXCLUDED.{0},".format(self.TIMESTAMP_COLUMN_NAME)) + sql_builder.write(os.linesep) + sql_builder.write("{0} = EXCLUDED.{0},".format(self.IS_DELETED_COLUMN_NAME)) + sql_builder.write(os.linesep) + sql_builder.write("{0} = EXCLUDED.{0},".format(self.CHANGE_VERSION_COLUMN_NAME)) + sql_builder.write(os.linesep) + sql_builder.write("{0} = EXCLUDED.{0}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME)) + sql_builder.write(os.linesep) self.logger.debug("Upsert executing {0}".format(sql_builder.getvalue())) self.target_engine.execute(sql_builder.getvalue()) diff --git a/modules/data_sources/ChangeTrackingInfo.py b/modules/data_sources/ChangeTrackingInfo.py new file mode 100644 index 0000000..11fc5e8 --- /dev/null +++ b/modules/data_sources/ChangeTrackingInfo.py @@ -0,0 +1,13 @@ + +class ChangeTrackingInfo: + this_sync_version = 0 + next_sync_version = 0 + + def __init__(self, this_sync_version, next_sync_version): + self.this_sync_version = this_sync_version + self.next_sync_version = next_sync_version + + def force_full_load(self): + return bool(self.next_sync_version == 0) + + diff --git a/modules/data_sources/MsSqlDataSource.py b/modules/data_sources/MsSqlDataSource.py index 2684d22..53b3b89 100644 --- a/modules/data_sources/MsSqlDataSource.py +++ b/modules/data_sources/MsSqlDataSource.py @@ -1,9 +1,12 @@ +import io import logging import pandas from sqlalchemy import create_engine from sqlalchemy import MetaData from sqlalchemy.schema import Table from modules.ColumnTypeResolver import ColumnTypeResolver +from modules.data_sources.ChangeTrackingInfo import ChangeTrackingInfo +from sqlalchemy.sql import text class MsSqlDataSource(object): @@ -21,20 +24,44 @@ def can_handle_connection_string(connection_string): def connection_string_prefix(): return 'mssql+pyodbc://' - - def build_select_statement(self, table_configuration, columns, batch_configuration, previous_batch_key): - column_array = list(map(lambda cfg: cfg['source_name'], columns)) + @staticmethod + def prefix_column(column_name, full_refresh, primary_key_column_name): + if column_name == primary_key_column_name and not full_refresh: + return "chg.{0}".format(column_name) + else: + return "t.{0}".format(column_name) + + def build_select_statement(self, table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info): + column_array = list(map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_configuration['primary_key']), columns)) column_names = ", ".join(column_array) - return "SELECT TOP ({0}) {1} FROM {2}.{3} WHERE {4} > {5} ORDER BY {4}".format(batch_configuration['size'], - column_names, - table_configuration[ - 'schema'], - table_configuration[ - 'name'], - table_configuration[ - 'primary_key'], - previous_batch_key) + #This line below is temp until we have a proper storage log of what ran - then data_pipeline_next_change_minimum_version will be stored there. + column_names = "{0}, {1} as data_pipeline_next_change_minimum_version".format(column_names, change_tracking_info.next_sync_version) + if full_refresh: + return "SELECT TOP ({0}) {1} FROM {2}.{3} t WHERE t.{4} > {5} ORDER BY t.{4}".format(batch_configuration['size'], + column_names, + table_configuration[ + 'schema'], + table_configuration[ + 'name'], + table_configuration[ + 'primary_key'], + previous_batch_key) + else: + sql_builder = io.StringIO() + sql_builder.write("SELECT TOP ({0}) {1}, ".format(batch_configuration['size'], column_names)) + sql_builder.write("chg.SYS_CHANGE_VERSION as data_pipeline_change_version, CASE chg.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 END as data_pipeline_is_deleted \n") + sql_builder.write("FROM CHANGETABLE(CHANGES {0}.{1}, {2}) chg ".format(table_configuration['schema'], + table_configuration['name'], + change_tracking_info.this_sync_version)) + sql_builder.write(" LEFT JOIN {0}.{1} t on chg.{2} = t.{2} ".format( table_configuration['schema'], + table_configuration['name'], + table_configuration['primary_key'],)) + + sql_builder.write("WHERE chg.{0} > {1} ORDER BY chg.{0}".format(table_configuration['primary_key'], + previous_batch_key)) + + return sql_builder.getvalue() # Returns an array of configured_columns containing only columns that this data source supports. Logs invalid ones. def assert_data_source_is_valid(self, table_configuration, configured_columns): @@ -59,8 +86,8 @@ def get_table_columns(self, table_configuration): return list(map(lambda column: column.name, table.columns)) - def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key): - sql = self.build_select_statement(table_configuration, columns, batch_configuration, previous_batch_key) + def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key, full_refresh, change_tracking_info,): + sql = self.build_select_statement(table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info,) self.logger.debug("Starting read of SQL Statement: {0}".format(sql)) data_frame = pandas.read_sql_query(sql, self.database_engine) @@ -69,3 +96,38 @@ def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker.extract_completed_successfully(len(data_frame)) return data_frame + + def init_change_tracking(self, table_configuration, last_sync_version): + + sql_builder = io.StringIO() + sql_builder.write( + "IF NOT EXISTS(SELECT 1 FROM sys.change_tracking_tables WHERE object_id = OBJECT_ID('{0}.{1}'))\n".format( + table_configuration['schema'], table_configuration['name'])) + sql_builder.write("BEGIN\n") + sql_builder.write("ALTER TABLE {0}.{1} ENABLE CHANGE_TRACKING WITH(TRACK_COLUMNS_UPDATED=OFF);\n".format( + table_configuration['schema'], table_configuration['name'])) + sql_builder.write("END\n") + + self.database_engine.execute(text(sql_builder.getvalue()).execution_options(autocommit=True)) + + + sql_builder = io.StringIO() + sql_builder.write("DECLARE @last_sync_version bigint = {0}; \n".format(last_sync_version)) + sql_builder.write("DECLARE @this_sync_version bigint = 0; \n") + sql_builder.write("DECLARE @next_sync_version bigint = CHANGE_TRACKING_CURRENT_VERSION(); \n") + sql_builder.write("IF @last_sync_version >= CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('{0}.{1}'))\n".format(table_configuration['schema'],table_configuration['name'])) + sql_builder.write(" SET @this_sync_version = @last_sync_version; \n") + sql_builder.write(" SELECT @next_sync_version as next_sync_version, @this_sync_version as this_sync_version; \n") + + self.logger.debug("Getting ChangeTrackingInformation for {0}.{1}. {2}".format(table_configuration['schema'], + table_configuration['name'], + sql_builder.getvalue())) + + result = self.database_engine.execute(sql_builder.getvalue()) + row = result.fetchone() + return_value = ChangeTrackingInfo(row["this_sync_version"], row["next_sync_version"]) + + sql_builder.close() + + return return_value + diff --git a/test_full_refresh_from_mssql.cmd b/test_full_refresh_from_mssql.cmd index b996db6..6a2b83f 100644 --- a/test_full_refresh_from_mssql.cmd +++ b/test_full_refresh_from_mssql.cmd @@ -1,8 +1,10 @@ -IF /I "%APPVEYOR%"=="TRUE" py rdl.py mssql+pyodbc://(local)\SQL2016/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0 postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG --full-refresh yes -IF /I NOT "%APPVEYOR%"=="TRUE" py rdl.py mssql+pyodbc://(local)/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0 postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG --full-refresh yes -if %errorlevel% neq 0 exit /b %errorlevel% -psql -U postgres -d relational_data_loader_integration_tests -a -v ON_ERROR_STOP=1 -f ./integration_tests/mssql_source/assertions/large_table_test_full_refresh_assertions.sql -if %errorlevel% neq 0 exit /b %errorlevel% +@echo off +REM IF /I "%APPVEYOR%"=="TRUE" py rdl.py mssql+pyodbc://(local)\SQL2016/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0 postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG --full-refresh yes +IF /I NOT "%APPVEYOR%"=="TRUE" py rdl.py mssql+pyodbc://(local)/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0 postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG --full-refresh no +REM if %errorlevel% neq 0 exit /b %errorlevel% +REM psql -U postgres -d relational_data_loader_integration_tests -a -v ON_ERROR_STOP=1 -f ./integration_tests/mssql_source/assertions/large_table_test_full_refresh_assertions.sql +REM if %errorlevel% neq 0 exit /b %errorlevel% + From 0388fca27c380de6cb0b4eaf1ade24fd8caa79ba Mon Sep 17 00:00:00 2001 From: David Ames Date: Fri, 6 Jul 2018 18:57:38 +1000 Subject: [PATCH 03/11] Remove debug --- modules/BatchDataLoader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/BatchDataLoader.py b/modules/BatchDataLoader.py index 41b9e1c..1ccc5b8 100644 --- a/modules/BatchDataLoader.py +++ b/modules/BatchDataLoader.py @@ -47,7 +47,6 @@ def write_data_frame_to_table(self, data_frame): qualified_target_table = "{0}.{1}".format(self.target_schema, self.target_table) self.logger.debug("Starting write to table {0}".format(qualified_target_table)) data = StringIO() - print(data_frame) data_frame.to_csv(data, header=False, index=False, na_rep='', float_format='%.16g') # Float_format is used to truncate any insignificant digits. Unfortunately it gives us an artificial limitation From 1c4554cba3a0e49de8850861d50d970cabf1582f Mon Sep 17 00:00:00 2001 From: David Ames Date: Fri, 6 Jul 2018 19:48:21 +1000 Subject: [PATCH 04/11] Fixed CSV regressions --- modules/data_sources/CsvDataSource.py | 8 ++++++-- modules/data_sources/MsSqlDataSource.py | 10 ++-------- test_full_refresh_from_mssql.cmd | 8 ++++---- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/modules/data_sources/CsvDataSource.py b/modules/data_sources/CsvDataSource.py index 8d72497..974c20e 100644 --- a/modules/data_sources/CsvDataSource.py +++ b/modules/data_sources/CsvDataSource.py @@ -3,6 +3,7 @@ import os.path from modules.ColumnTypeResolver import ColumnTypeResolver from pathlib import Path +from modules.data_sources.ChangeTrackingInfo import ChangeTrackingInfo class CsvDataSource(object): @@ -10,6 +11,7 @@ def __init__(self, connection_string, logger=None): self.logger = logger or logging.getLogger(__name__) self.source_path = Path(connection_string[len(self.connection_string_prefix()):]) self.column_type_resolver = ColumnTypeResolver() + @staticmethod def can_handle_connection_string(connection_string): return connection_string.startswith(CsvDataSource.connection_string_prefix()) @@ -41,7 +43,7 @@ def assert_column_exists(self, column_name, data_frame, csv_file): # For now, the CSV data sources will get all rows in the CSV regardless of batch size. - Ie, they don't currently support paging. - def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key): + def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key, full_refresh, change_tracking_info): if previous_batch_key > 0: return None @@ -55,9 +57,11 @@ def get_next_data_frame(self, table_configuration, columns, batch_configuration, self.logger.debug("Starting read of file: {0}".format(csv_file)) - data_frame = pandas.read_csv(csv_file) self.logger.debug("Completed read") batch_tracker.extract_completed_successfully(len(data_frame)) return data_frame + + def init_change_tracking(self, table_configuration, last_sync_version): + return ChangeTrackingInfo(0,0) diff --git a/modules/data_sources/MsSqlDataSource.py b/modules/data_sources/MsSqlDataSource.py index 53b3b89..fabafd7 100644 --- a/modules/data_sources/MsSqlDataSource.py +++ b/modules/data_sources/MsSqlDataSource.py @@ -34,8 +34,6 @@ def prefix_column(column_name, full_refresh, primary_key_column_name): def build_select_statement(self, table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info): column_array = list(map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_configuration['primary_key']), columns)) column_names = ", ".join(column_array) - - #This line below is temp until we have a proper storage log of what ran - then data_pipeline_next_change_minimum_version will be stored there. column_names = "{0}, {1} as data_pipeline_next_change_minimum_version".format(column_names, change_tracking_info.next_sync_version) if full_refresh: return "SELECT TOP ({0}) {1} FROM {2}.{3} t WHERE t.{4} > {5} ORDER BY t.{4}".format(batch_configuration['size'], @@ -86,7 +84,7 @@ def get_table_columns(self, table_configuration): return list(map(lambda column: column.name, table.columns)) - def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key, full_refresh, change_tracking_info,): + def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key, full_refresh, change_tracking_info): sql = self.build_select_statement(table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info,) self.logger.debug("Starting read of SQL Statement: {0}".format(sql)) data_frame = pandas.read_sql_query(sql, self.database_engine) @@ -110,7 +108,6 @@ def init_change_tracking(self, table_configuration, last_sync_version): self.database_engine.execute(text(sql_builder.getvalue()).execution_options(autocommit=True)) - sql_builder = io.StringIO() sql_builder.write("DECLARE @last_sync_version bigint = {0}; \n".format(last_sync_version)) sql_builder.write("DECLARE @this_sync_version bigint = 0; \n") @@ -125,9 +122,6 @@ def init_change_tracking(self, table_configuration, last_sync_version): result = self.database_engine.execute(sql_builder.getvalue()) row = result.fetchone() - return_value = ChangeTrackingInfo(row["this_sync_version"], row["next_sync_version"]) - sql_builder.close() - return return_value - + return ChangeTrackingInfo(row["this_sync_version"], row["next_sync_version"]) diff --git a/test_full_refresh_from_mssql.cmd b/test_full_refresh_from_mssql.cmd index 6a2b83f..0f7138b 100644 --- a/test_full_refresh_from_mssql.cmd +++ b/test_full_refresh_from_mssql.cmd @@ -1,9 +1,9 @@ @echo off -REM IF /I "%APPVEYOR%"=="TRUE" py rdl.py mssql+pyodbc://(local)\SQL2016/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0 postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG --full-refresh yes +IF /I "%APPVEYOR%"=="TRUE" py rdl.py mssql+pyodbc://(local)\SQL2016/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0 postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG --full-refresh yes IF /I NOT "%APPVEYOR%"=="TRUE" py rdl.py mssql+pyodbc://(local)/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0 postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG --full-refresh no -REM if %errorlevel% neq 0 exit /b %errorlevel% -REM psql -U postgres -d relational_data_loader_integration_tests -a -v ON_ERROR_STOP=1 -f ./integration_tests/mssql_source/assertions/large_table_test_full_refresh_assertions.sql -REM if %errorlevel% neq 0 exit /b %errorlevel% +if %errorlevel% neq 0 exit /b %errorlevel% +psql -U postgres -d relational_data_loader_integration_tests -a -v ON_ERROR_STOP=1 -f ./integration_tests/mssql_source/assertions/large_table_test_full_refresh_assertions.sql +if %errorlevel% neq 0 exit /b %errorlevel% From 9e8e5e575f3113582943b1c03926533089f56215 Mon Sep 17 00:00:00 2001 From: David Ames Date: Sun, 8 Jul 2018 05:16:06 +1000 Subject: [PATCH 05/11] Implimented logging to DataLoadExecution table --- integration_tests/mssql_source/dockerfile | 3 ++ modules/DataLoadManager.py | 25 ++++++------ modules/DestinationTableManager.py | 17 --------- modules/RelationalDataLoader.py | 7 +++- modules/batch_executions/BatchExecution.py | 14 ------- .../batch_executions/BatchExecutionManager.py | 14 ------- .../data_load_tracking/DataLoadExecution.py | 20 ++++++++++ .../DataLoadTracker.py | 15 +++++--- .../DataLoadTrackerRepository.py | 38 +++++++++++++++++++ modules/data_sources/MsSqlDataSource.py | 1 - 10 files changed, 89 insertions(+), 65 deletions(-) create mode 100644 integration_tests/mssql_source/dockerfile delete mode 100644 modules/batch_executions/BatchExecution.py delete mode 100644 modules/batch_executions/BatchExecutionManager.py create mode 100644 modules/data_load_tracking/DataLoadExecution.py rename modules/{ => data_load_tracking}/DataLoadTracker.py (90%) create mode 100644 modules/data_load_tracking/DataLoadTrackerRepository.py diff --git a/integration_tests/mssql_source/dockerfile b/integration_tests/mssql_source/dockerfile new file mode 100644 index 0000000..0bdb54e --- /dev/null +++ b/integration_tests/mssql_source/dockerfile @@ -0,0 +1,3 @@ +FROM microsoft/mssql-server-linux:2017-latest + +ENV ACCEPT_EULA=Y diff --git a/modules/DataLoadManager.py b/modules/DataLoadManager.py index 48a40e6..8e72d27 100644 --- a/modules/DataLoadManager.py +++ b/modules/DataLoadManager.py @@ -1,33 +1,34 @@ import os import json +import uuid import logging from modules.BatchDataLoader import BatchDataLoader from modules.DestinationTableManager import DestinationTableManager -from modules.DataLoadTracker import DataLoadTracker - +from modules.data_load_tracking.DataLoadTracker import DataLoadTracker class DataLoadManager(object): - def __init__(self, configuration_path, data_source, logger=None): + def __init__(self, configuration_path, data_source, data_load_tracker_repository, logger=None): self.logger = logger or logging.getLogger(__name__) self.configuration_path = configuration_path self.data_source = data_source - + self.data_load_tracker_repository = data_load_tracker_repository + self.correlation_id = uuid.uuid4() def start_imports(self, target_engine, full_refresh): for file in os.listdir(self.configuration_path): self.start_single_import(target_engine, file, full_refresh) self.logger.info("Execution completed.") - def start_single_import(self, target_engine, configuration_name, requested_full_refresh): - self.logger.debug("Using configuration file : {0}".format(configuration_name)) + def start_single_import(self, target_engine, model_name, requested_full_refresh): + self.logger.debug("Using configuration file : {0}".format(model_name)) - config_file = os.path.abspath(self.configuration_path + configuration_name) + config_file = os.path.abspath(self.configuration_path + model_name) self.logger.debug("Using configuration file : {0}".format(config_file)) with open(config_file) as json_data: pipeline_configuration = json.load(json_data) - self.logger.info("Execute Starting for: {0} requested_full_refresh: {1}".format(configuration_name, requested_full_refresh)) + self.logger.info("Execute Starting for: {0} requested_full_refresh: {1}".format(model_name, requested_full_refresh)) destination_table_manager = DestinationTableManager(target_engine) @@ -41,14 +42,13 @@ def start_single_import(self, target_engine, configuration_name, requested_full_ self.data_source.assert_data_source_is_valid(pipeline_configuration['source_table'], pipeline_configuration['columns']) - last_sync_version = destination_table_manager.get_last_sync_version(pipeline_configuration['target_schema'], - pipeline_configuration['load_table']) + last_sync_version = self.data_load_tracker_repository.get_last_sync_version(model_name) change_tracking_info = self.data_source.init_change_tracking(pipeline_configuration['source_table'], last_sync_version) - data_load_tracker = DataLoadTracker(configuration_name, json_data, full_refresh, change_tracking_info) + data_load_tracker = DataLoadTracker(model_name, json_data, full_refresh, change_tracking_info, self.correlation_id) self.logger.debug(" Change Tracking: this_sync_version: {0} next_sync_version: {1} force_full_load:{2} : ".format(change_tracking_info.this_sync_version, change_tracking_info.next_sync_version, change_tracking_info.force_full_load())) @@ -97,5 +97,6 @@ def start_single_import(self, target_engine, configuration_name, requested_full_ destination_table_manager.drop_table(pipeline_configuration['target_schema'], pipeline_configuration['stage_table']) data_load_tracker.completed_successfully() - self.logger.info("Import Complete for: {0}. {1}".format(configuration_name, data_load_tracker.get_statistics())) + self.data_load_tracker_repository.save(data_load_tracker) + self.logger.info("Import Complete for: {0}. {1}".format(model_name, data_load_tracker.get_statistics())) diff --git a/modules/DestinationTableManager.py b/modules/DestinationTableManager.py index 4bd73db..f948419 100644 --- a/modules/DestinationTableManager.py +++ b/modules/DestinationTableManager.py @@ -12,7 +12,6 @@ class DestinationTableManager(object): TIMESTAMP_COLUMN_NAME = "data_pipeline_timestamp" IS_DELETED_COLUMN_NAME = "data_pipeline_is_deleted" CHANGE_VERSION_COLUMN_NAME = "data_pipeline_change_version" - NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME = "data_pipeline_next_change_minimum_version" def __init__(self, target_engine, logger=None): self.logger = logger or logging.getLogger(__name__) @@ -57,9 +56,6 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs table.append_column( Column(self.CHANGE_VERSION_COLUMN_NAME, BigInteger)) - table.append_column( - Column(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME, BigInteger)) - if drop_first: self.logger.debug( "Dropping table {0}.{1}".format(schema_name, table_name)) @@ -79,16 +75,6 @@ def create_column(self, configuration): primary_key=configuration.get("primary_key", False), nullable=configuration['nullable']) - # TODO: this should come from a different log table which is only written to at the end of a successful load load. - def get_last_sync_version(self, schema_name, table_name): - sql = "SELECT max({0}) as version FROM {1}.{2}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME, schema_name, table_name) - - result = self.target_engine.execute(sql) - row = result.fetchone() - if row["version"] is None: - return 0 - return row["version"] - def rename_table(self, schema_name, source_table_name, target_table_name): @@ -135,7 +121,6 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column column_list = column_list + ",{0}".format(self.TIMESTAMP_COLUMN_NAME) column_list = column_list + ",{0}".format(self.IS_DELETED_COLUMN_NAME) column_list = column_list + ",{0}".format(self.CHANGE_VERSION_COLUMN_NAME) - column_list = column_list + ",{0}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME) primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in columns_configuration if 'primary_key' in column_configuration['destination'] and column_configuration['destination']['primary_key']] @@ -159,8 +144,6 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column sql_builder.write(os.linesep) sql_builder.write("{0} = EXCLUDED.{0},".format(self.CHANGE_VERSION_COLUMN_NAME)) sql_builder.write(os.linesep) - sql_builder.write("{0} = EXCLUDED.{0}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME)) - sql_builder.write(os.linesep) self.logger.debug("Upsert executing {0}".format(sql_builder.getvalue())) self.target_engine.execute(sql_builder.getvalue()) diff --git a/modules/RelationalDataLoader.py b/modules/RelationalDataLoader.py index 154f006..437f59e 100644 --- a/modules/RelationalDataLoader.py +++ b/modules/RelationalDataLoader.py @@ -2,7 +2,9 @@ import argparse from sqlalchemy import create_engine from modules.DataLoadManager import DataLoadManager +from modules.data_load_tracking.DataLoadTrackerRepository import DataLoadTrackerRepository from modules.data_sources.DataSourceFactory import DataSourceFactory +from sqlalchemy.orm import sessionmaker _LOG_LEVEL_STRINGS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'] @@ -20,7 +22,10 @@ def main(self): destination_engine = create_engine(args['destination-engine']) - data_load_manager = DataLoadManager(args['configuration-folder'], data_source) + session_maker = sessionmaker(bind=destination_engine) + repository = DataLoadTrackerRepository(session_maker) + repository.create_tables(destination_engine) + data_load_manager = DataLoadManager(args['configuration-folder'], data_source, repository) data_load_manager.start_imports(destination_engine, args['full_refresh']) def configure_logging(self, log_level): diff --git a/modules/batch_executions/BatchExecution.py b/modules/batch_executions/BatchExecution.py deleted file mode 100644 index f8c4a0f..0000000 --- a/modules/batch_executions/BatchExecution.py +++ /dev/null @@ -1,14 +0,0 @@ -from sqlalchemy import Column, DateTime, Integer, String, Boolean, BigInteger -from sqlalchemy.sql import func -from sqlalchemy.ext.declarative import declarative_base - -Base = declarative_base() - -class BatchExecution(Base): - __tablename__ = 'batch_execution' - id = Column(Integer, primary_key=True) - model_name = Column(String(250), nullable=False) - is_full_refresh = Column(Boolean, nullable=False) - start_synchronization_version = Column(BigInteger, nullable=False) - next_bookmark_synchronization_version = Column(BigInteger, nullable=False) - started_on = Column(DateTime(timezone=True), server_default=func.now()) \ No newline at end of file diff --git a/modules/batch_executions/BatchExecutionManager.py b/modules/batch_executions/BatchExecutionManager.py deleted file mode 100644 index cf7234c..0000000 --- a/modules/batch_executions/BatchExecutionManager.py +++ /dev/null @@ -1,14 +0,0 @@ -import os -import json -import logging -from modules.BatchDataLoader import BatchDataLoader -from modules.DestinationTableManager import DestinationTableManager -from modules.DataLoadTracker import DataLoadTracker - - - -class BatchExecutionManager(object): - def __init__(self, configuration_path, data_source, logger=None): - self.logger = logger or logging.getLogger(__name__) - self.configuration_path = configuration_path - self.data_source = data_source diff --git a/modules/data_load_tracking/DataLoadExecution.py b/modules/data_load_tracking/DataLoadExecution.py new file mode 100644 index 0000000..061dc31 --- /dev/null +++ b/modules/data_load_tracking/DataLoadExecution.py @@ -0,0 +1,20 @@ +from sqlalchemy import Column, DateTime, Integer, String, Boolean, BigInteger +from sqlalchemy.sql import func +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + +class DataLoadExecution(Base): + __tablename__ = 'data_load_execution' + __table_args__ = {'schema': 'data_pipeline'} + id = Column(Integer, primary_key=True) + model_name = Column(String(250), nullable=False) + is_full_refresh = Column(Boolean, nullable=False) + this_sync_version = Column(BigInteger, nullable=False) + next_sync_version = Column(BigInteger, nullable=False) + completed_on = Column(DateTime(timezone=True), server_default=func.now()) + execution_time_ms = Column(Integer, nullable=False) + rows_processed = Column(Integer, nullable=False) + correlation_id = Column(String(250), nullable=True) + status = Column(String(25), nullable=False) + diff --git a/modules/DataLoadTracker.py b/modules/data_load_tracking/DataLoadTracker.py similarity index 90% rename from modules/DataLoadTracker.py rename to modules/data_load_tracking/DataLoadTracker.py index 6f4b5fb..b5440e0 100644 --- a/modules/DataLoadTracker.py +++ b/modules/data_load_tracking/DataLoadTracker.py @@ -7,20 +7,22 @@ class DataLoadTracker: status = "Not Started" total_row_count = 0 batches = [] - configuration_name = None + model_name = None configuration = None - is_full_load = False + is_full_refresh = False total_execution_time = None total_row_count = 0 rows_per_second = 0 + correlation_id = None, - def __init__(self, configuration_name, configuration, is_full_load, change_tracking_info): - self.configuration_name = configuration_name + def __init__(self, model_name, configuration, is_full_refresh, change_tracking_info, correlation_id): + self.model_name = model_name self.configuration = configuration - self.is_full_load = is_full_load + self.is_full_refresh = is_full_refresh self.started = datetime.now() self.status = "Not Started" self.change_tracking_info = change_tracking_info + self.correlation_id = correlation_id def start_batch(self): batch = self.Batch() @@ -37,7 +39,7 @@ def completed_successfully(self): self.rows_per_second = self.total_row_count / self.total_execution_time.total_seconds() def get_statistics(self): - load_type = 'full' if self.is_full_load else "incremental from version {0} ".format(self.change_tracking_info.this_sync_version) + load_type = 'full' if self.is_full_refresh else "incremental from version {0} ".format(self.change_tracking_info.this_sync_version) return "Rows: {0} ({1}), Total Execution Time: {2}. ({3:.2f} rows per second) ".format(self.total_row_count, load_type, self.total_execution_time, @@ -89,6 +91,7 @@ def load_completed_successfully(self): self.load_rows_per_second = self.row_count / self.load_execution_time.total_seconds() + # TODO: remove def load_skipped_due_to_zero_rows(self): self.status = "Skipped - Zero Rows" self.load_completed = datetime.now() diff --git a/modules/data_load_tracking/DataLoadTrackerRepository.py b/modules/data_load_tracking/DataLoadTrackerRepository.py new file mode 100644 index 0000000..ffa55de --- /dev/null +++ b/modules/data_load_tracking/DataLoadTrackerRepository.py @@ -0,0 +1,38 @@ +import logging +from modules.data_load_tracking.DataLoadExecution import DataLoadExecution, Base + + +class DataLoadTrackerRepository(object): + def __init__(self, session_maker, logger=None): + self.logger = logger or logging.getLogger(__name__) + self.session_maker = session_maker + + def create_tables(self, engine): + engine.execute("CREATE SCHEMA IF NOT EXISTS {0}".format("data_pipeline")) + Base.metadata.create_all(engine) + + def get_last_sync_version(self, model_name): + session = self.session_maker() + result = session.query(DataLoadExecution).filter_by(model_name=model_name, status="Load Completed Successfully").order_by(DataLoadExecution.completed_on).first() + + if result is None: + return 0 + return result.next_sync_version + + + def save(self, data_load_tracker): + + data_load_execution = DataLoadExecution(model_name=data_load_tracker.model_name, + correlation_id=data_load_tracker.correlation_id, + is_full_refresh=data_load_tracker.is_full_refresh, + this_sync_version=data_load_tracker.change_tracking_info.this_sync_version, + next_sync_version=data_load_tracker.change_tracking_info.next_sync_version, + execution_time_ms=int(data_load_tracker.total_execution_time.total_seconds() * 1000), + rows_processed=data_load_tracker.total_row_count, + status=data_load_tracker.status) + + + + session = self.session_maker() + session.add(data_load_execution) + session.commit() diff --git a/modules/data_sources/MsSqlDataSource.py b/modules/data_sources/MsSqlDataSource.py index fabafd7..bc81c14 100644 --- a/modules/data_sources/MsSqlDataSource.py +++ b/modules/data_sources/MsSqlDataSource.py @@ -34,7 +34,6 @@ def prefix_column(column_name, full_refresh, primary_key_column_name): def build_select_statement(self, table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info): column_array = list(map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_configuration['primary_key']), columns)) column_names = ", ".join(column_array) - column_names = "{0}, {1} as data_pipeline_next_change_minimum_version".format(column_names, change_tracking_info.next_sync_version) if full_refresh: return "SELECT TOP ({0}) {1} FROM {2}.{3} t WHERE t.{4} > {5} ORDER BY t.{4}".format(batch_configuration['size'], column_names, From ccc6e99e6c4509305d50d722072b994c037ab4fb Mon Sep 17 00:00:00 2001 From: David Ames Date: Sun, 8 Jul 2018 06:38:27 +1000 Subject: [PATCH 06/11] Fixed issue with change tracking order --- .vscode/settings.json | 3 +++ modules/DataLoadManager.py | 1 - modules/DestinationTableManager.py | 17 ++++++----------- modules/data_load_tracking/DataLoadTracker.py | 3 +-- .../DataLoadTrackerRepository.py | 3 ++- modules/data_sources/ChangeTrackingInfo.py | 2 +- ...d => test_incremental_refresh_from_mssql.cmd | 0 7 files changed, 13 insertions(+), 16 deletions(-) create mode 100644 .vscode/settings.json rename test_full_refresh_from_mssql.cmd => test_incremental_refresh_from_mssql.cmd (100%) diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..1337f53 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.pythonPath": "${workspaceFolder}\\venv\\Scripts\\python.exe" +} \ No newline at end of file diff --git a/modules/DataLoadManager.py b/modules/DataLoadManager.py index 8e72d27..ac72604 100644 --- a/modules/DataLoadManager.py +++ b/modules/DataLoadManager.py @@ -50,7 +50,6 @@ def start_single_import(self, target_engine, model_name, requested_full_refresh) data_load_tracker = DataLoadTracker(model_name, json_data, full_refresh, change_tracking_info, self.correlation_id) - self.logger.debug(" Change Tracking: this_sync_version: {0} next_sync_version: {1} force_full_load:{2} : ".format(change_tracking_info.this_sync_version, change_tracking_info.next_sync_version, change_tracking_info.force_full_load())) if not full_refresh and change_tracking_info.force_full_load(): self.logger.info("Change tracking has forced this to be a full load") diff --git a/modules/DestinationTableManager.py b/modules/DestinationTableManager.py index f948419..4ca7d1e 100644 --- a/modules/DestinationTableManager.py +++ b/modules/DestinationTableManager.py @@ -24,7 +24,6 @@ def create_schema(self, schema_name): def table_exists(self, schema_name, table_name): return self.target_engine.dialect.has_table(self.target_engine, table_name, schema_name) - def drop_table(self, schema_name, table_name): metadata = MetaData() self.logger.debug( @@ -36,9 +35,6 @@ def drop_table(self, schema_name, table_name): self.logger.debug( "Dropped table {0}.{1}".format(schema_name, table_name)) - - - def create_table(self, schema_name, table_name, columns_configuration, drop_first): metadata = MetaData() @@ -63,7 +59,6 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs self.logger.debug( "Dropped table {0}.{1}".format(schema_name, table_name)) - self.logger.debug("Creating table {0}.{1}".format(schema_name, table_name)) table.create(self.target_engine, checkfirst=False) self.logger.debug("Created table {0}.{1}".format(schema_name, table_name)) @@ -75,7 +70,6 @@ def create_column(self, configuration): primary_key=configuration.get("primary_key", False), nullable=configuration['nullable']) - def rename_table(self, schema_name, source_table_name, target_table_name): # Steps to efficiently rename a table. @@ -122,8 +116,9 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column column_list = column_list + ",{0}".format(self.IS_DELETED_COLUMN_NAME) column_list = column_list + ",{0}".format(self.CHANGE_VERSION_COLUMN_NAME) - - primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in columns_configuration if 'primary_key' in column_configuration['destination'] and column_configuration['destination']['primary_key']] + primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in + columns_configuration if 'primary_key' in column_configuration['destination'] and + column_configuration['destination']['primary_key']] primary_key_column_list = ','.join(map(str, primary_key_column_array)) @@ -134,15 +129,15 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column sql_builder.write(os.linesep) sql_builder.write(" ON CONFLICT({0}) DO UPDATE SET ".format(primary_key_column_list)) - for column_configuratiomn in columns_configuration: - sql_builder.write("{0} = EXCLUDED.{0},".format(column_configuratiomn['destination']['name'])) + for column_configuration in columns_configuration: + sql_builder.write("{0} = EXCLUDED.{0},".format(column_configuration['destination']['name'])) sql_builder.write(os.linesep) sql_builder.write("{0} = EXCLUDED.{0},".format(self.TIMESTAMP_COLUMN_NAME)) sql_builder.write(os.linesep) sql_builder.write("{0} = EXCLUDED.{0},".format(self.IS_DELETED_COLUMN_NAME)) sql_builder.write(os.linesep) - sql_builder.write("{0} = EXCLUDED.{0},".format(self.CHANGE_VERSION_COLUMN_NAME)) + sql_builder.write("{0} = EXCLUDED.{0}".format(self.CHANGE_VERSION_COLUMN_NAME)) sql_builder.write(os.linesep) self.logger.debug("Upsert executing {0}".format(sql_builder.getvalue())) diff --git a/modules/data_load_tracking/DataLoadTracker.py b/modules/data_load_tracking/DataLoadTracker.py index b5440e0..242ab27 100644 --- a/modules/data_load_tracking/DataLoadTracker.py +++ b/modules/data_load_tracking/DataLoadTracker.py @@ -32,7 +32,7 @@ def start_batch(self): def completed_successfully(self): self.completed = datetime.now() self.total_execution_time = self.completed - self.started - + self.status = "Completed Successfully" for batch in self.batches: self.total_row_count = self.total_row_count + batch.row_count @@ -91,7 +91,6 @@ def load_completed_successfully(self): self.load_rows_per_second = self.row_count / self.load_execution_time.total_seconds() - # TODO: remove def load_skipped_due_to_zero_rows(self): self.status = "Skipped - Zero Rows" self.load_completed = datetime.now() diff --git a/modules/data_load_tracking/DataLoadTrackerRepository.py b/modules/data_load_tracking/DataLoadTrackerRepository.py index ffa55de..83c6891 100644 --- a/modules/data_load_tracking/DataLoadTrackerRepository.py +++ b/modules/data_load_tracking/DataLoadTrackerRepository.py @@ -1,4 +1,5 @@ import logging +from sqlalchemy import desc from modules.data_load_tracking.DataLoadExecution import DataLoadExecution, Base @@ -13,7 +14,7 @@ def create_tables(self, engine): def get_last_sync_version(self, model_name): session = self.session_maker() - result = session.query(DataLoadExecution).filter_by(model_name=model_name, status="Load Completed Successfully").order_by(DataLoadExecution.completed_on).first() + result = session.query(DataLoadExecution).filter_by(model_name=model_name, status="Completed Successfully").order_by(desc(DataLoadExecution.completed_on)).first() if result is None: return 0 diff --git a/modules/data_sources/ChangeTrackingInfo.py b/modules/data_sources/ChangeTrackingInfo.py index 11fc5e8..524c7db 100644 --- a/modules/data_sources/ChangeTrackingInfo.py +++ b/modules/data_sources/ChangeTrackingInfo.py @@ -8,6 +8,6 @@ def __init__(self, this_sync_version, next_sync_version): self.next_sync_version = next_sync_version def force_full_load(self): - return bool(self.next_sync_version == 0) + return bool(self.this_sync_version == 0 or self.next_sync_version == 0) diff --git a/test_full_refresh_from_mssql.cmd b/test_incremental_refresh_from_mssql.cmd similarity index 100% rename from test_full_refresh_from_mssql.cmd rename to test_incremental_refresh_from_mssql.cmd From ce09d686d51af1472643b3b051e4fbf22b49832a Mon Sep 17 00:00:00 2001 From: David Ames Date: Sun, 8 Jul 2018 07:30:17 +1000 Subject: [PATCH 07/11] Fixing issues with incremental csv loading --- appveyor.yml | 2 +- .../source_database_setup/create_database.sql | 6 +++++- modules/DataLoadManager.py | 4 ++-- modules/data_sources/ChangeTrackingInfo.py | 9 +++------ modules/data_sources/CsvDataSource.py | 2 +- modules/data_sources/MsSqlDataSource.py | 3 ++- 6 files changed, 14 insertions(+), 12 deletions(-) diff --git a/appveyor.yml b/appveyor.yml index 66b2648..dcfd785 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -47,7 +47,7 @@ build_script: test_script: - test_full_refresh_from_csv.cmd - test_incremental_refresh_from_csv.cmd - - test_full_refresh_from_mssql.cmd + - test_incremental_refresh_from_mssql.cmd on_finish: #Enable this line to make the build pause after completion for RDP troubleshooting. diff --git a/integration_tests/mssql_source/source_database_setup/create_database.sql b/integration_tests/mssql_source/source_database_setup/create_database.sql index c97e59f..b715696 100644 --- a/integration_tests/mssql_source/source_database_setup/create_database.sql +++ b/integration_tests/mssql_source/source_database_setup/create_database.sql @@ -1,2 +1,6 @@ IF NOT EXISTS (SELECT * FROM sys.databases WHERE Name = 'RelationalDataLoaderIntegrationTestSource') - CREATE DATABASE RelationalDataLoaderIntegrationTestSource \ No newline at end of file + CREATE DATABASE RelationalDataLoaderIntegrationTestSource + +ALTER DATABASE RelationalDataLoaderIntegrationTestSource +SET CHANGE_TRACKING = ON +(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON) \ No newline at end of file diff --git a/modules/DataLoadManager.py b/modules/DataLoadManager.py index ac72604..5968a9f 100644 --- a/modules/DataLoadManager.py +++ b/modules/DataLoadManager.py @@ -50,8 +50,8 @@ def start_single_import(self, target_engine, model_name, requested_full_refresh) data_load_tracker = DataLoadTracker(model_name, json_data, full_refresh, change_tracking_info, self.correlation_id) - self.logger.debug(" Change Tracking: this_sync_version: {0} next_sync_version: {1} force_full_load:{2} : ".format(change_tracking_info.this_sync_version, change_tracking_info.next_sync_version, change_tracking_info.force_full_load())) - if not full_refresh and change_tracking_info.force_full_load(): + self.logger.debug(" Change Tracking: this_sync_version: {0} next_sync_version: {1} force_full_load:{2} : ".format(change_tracking_info.this_sync_version, change_tracking_info.next_sync_version, change_tracking_info.force_full_load)) + if not full_refresh and change_tracking_info.force_full_load: self.logger.info("Change tracking has forced this to be a full load") full_refresh = True diff --git a/modules/data_sources/ChangeTrackingInfo.py b/modules/data_sources/ChangeTrackingInfo.py index 524c7db..ef2a984 100644 --- a/modules/data_sources/ChangeTrackingInfo.py +++ b/modules/data_sources/ChangeTrackingInfo.py @@ -2,12 +2,9 @@ class ChangeTrackingInfo: this_sync_version = 0 next_sync_version = 0 - - def __init__(self, this_sync_version, next_sync_version): + force_full_load = 0 + def __init__(self, this_sync_version, next_sync_version, force_full_load): self.this_sync_version = this_sync_version self.next_sync_version = next_sync_version - - def force_full_load(self): - return bool(self.this_sync_version == 0 or self.next_sync_version == 0) - + self.force_full_load = force_full_load diff --git a/modules/data_sources/CsvDataSource.py b/modules/data_sources/CsvDataSource.py index 974c20e..ce7b3c8 100644 --- a/modules/data_sources/CsvDataSource.py +++ b/modules/data_sources/CsvDataSource.py @@ -64,4 +64,4 @@ def get_next_data_frame(self, table_configuration, columns, batch_configuration, return data_frame def init_change_tracking(self, table_configuration, last_sync_version): - return ChangeTrackingInfo(0,0) + return ChangeTrackingInfo(0, 0, False) diff --git a/modules/data_sources/MsSqlDataSource.py b/modules/data_sources/MsSqlDataSource.py index bc81c14..9732a56 100644 --- a/modules/data_sources/MsSqlDataSource.py +++ b/modules/data_sources/MsSqlDataSource.py @@ -123,4 +123,5 @@ def init_change_tracking(self, table_configuration, last_sync_version): row = result.fetchone() sql_builder.close() - return ChangeTrackingInfo(row["this_sync_version"], row["next_sync_version"]) + force_full_load = bool(row["this_sync_version"] == 0 or row["next_sync_version"] == 0) + return ChangeTrackingInfo(row["this_sync_version"], row["next_sync_version"], force_full_load) From 5fc95b6906adf7607c95a23774e8630aa935931e Mon Sep 17 00:00:00 2001 From: David Ames Date: Wed, 11 Jul 2018 20:32:19 +1000 Subject: [PATCH 08/11] Do a full refresh on model change --- .../mssql_source/config/LargeTableTest.json | 1 - .../source_database_setup/create_database.sql | 3 +- modules/DataLoadManager.py | 31 ++++++++++++++++--- .../data_load_tracking/DataLoadExecution.py | 3 +- modules/data_load_tracking/DataLoadTracker.py | 5 ++- .../DataLoadTrackerRepository.py | 14 +++------ 6 files changed, 38 insertions(+), 19 deletions(-) diff --git a/integration_tests/mssql_source/config/LargeTableTest.json b/integration_tests/mssql_source/config/LargeTableTest.json index 28133cb..96bc387 100644 --- a/integration_tests/mssql_source/config/LargeTableTest.json +++ b/integration_tests/mssql_source/config/LargeTableTest.json @@ -1,5 +1,4 @@ { - "source_table": { "name": "LargeTable", "schema": "dbo", diff --git a/integration_tests/mssql_source/source_database_setup/create_database.sql b/integration_tests/mssql_source/source_database_setup/create_database.sql index b715696..ee2fde9 100644 --- a/integration_tests/mssql_source/source_database_setup/create_database.sql +++ b/integration_tests/mssql_source/source_database_setup/create_database.sql @@ -2,5 +2,4 @@ IF NOT EXISTS (SELECT * FROM sys.databases WHERE Name = 'RelationalDataLoaderInt CREATE DATABASE RelationalDataLoaderIntegrationTestSource ALTER DATABASE RelationalDataLoaderIntegrationTestSource -SET CHANGE_TRACKING = ON -(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON) \ No newline at end of file +SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON) \ No newline at end of file diff --git a/modules/DataLoadManager.py b/modules/DataLoadManager.py index 5968a9f..7831f32 100644 --- a/modules/DataLoadManager.py +++ b/modules/DataLoadManager.py @@ -2,6 +2,8 @@ import json import uuid import logging +import hashlib + from modules.BatchDataLoader import BatchDataLoader from modules.DestinationTableManager import DestinationTableManager from modules.data_load_tracking.DataLoadTracker import DataLoadTracker @@ -25,35 +27,54 @@ def start_single_import(self, target_engine, model_name, requested_full_refresh) config_file = os.path.abspath(self.configuration_path + model_name) self.logger.debug("Using configuration file : {0}".format(config_file)) - with open(config_file) as json_data: - pipeline_configuration = json.load(json_data) + with open(config_file) as json_file: + model_checksum = hashlib.md5(json_file.read().encode('utf-8')).hexdigest() + json_file.seek(0) + pipeline_configuration = json.load(json_file) self.logger.info("Execute Starting for: {0} requested_full_refresh: {1}".format(model_name, requested_full_refresh)) destination_table_manager = DestinationTableManager(target_engine) + full_refresh_reason = "Command Line Argument" if requested_full_refresh else "N/A" full_refresh = requested_full_refresh if not requested_full_refresh and not destination_table_manager.table_exists(pipeline_configuration['target_schema'], pipeline_configuration['load_table']): self.logger.warning("The load table {0}.{1} does not exist. Swapping to full-refresh mode".format(pipeline_configuration['target_schema'], pipeline_configuration['load_table'])) + + full_refresh_reason = "Destination table does not exist" full_refresh = True self.data_source.assert_data_source_is_valid(pipeline_configuration['source_table'], pipeline_configuration['columns']) - last_sync_version = self.data_load_tracker_repository.get_last_sync_version(model_name) + last_successful_data_load_execution = self.data_load_tracker_repository.get_last_successful_data_load_execution(model_name) + + if last_successful_data_load_execution is None: + last_sync_version = 0 + full_refresh_reason = "First Execution" + full_refresh = True, + else: + self.logger.debug("Previous Checksum {0}. Current Checksum {1}".format(last_successful_data_load_execution.model_checksum, model_checksum)) + last_sync_version = last_successful_data_load_execution.next_sync_version + if not full_refresh and last_successful_data_load_execution.model_checksum != model_checksum: + self.logger.info("A model checksum change has forced this to be a full load") + full_refresh = True + full_refresh_reason = "Model Change" change_tracking_info = self.data_source.init_change_tracking(pipeline_configuration['source_table'], last_sync_version) - data_load_tracker = DataLoadTracker(model_name, json_data, full_refresh, change_tracking_info, self.correlation_id) - self.logger.debug(" Change Tracking: this_sync_version: {0} next_sync_version: {1} force_full_load:{2} : ".format(change_tracking_info.this_sync_version, change_tracking_info.next_sync_version, change_tracking_info.force_full_load)) if not full_refresh and change_tracking_info.force_full_load: self.logger.info("Change tracking has forced this to be a full load") full_refresh = True + full_refresh_reason = "Change Tracking Invalid" + + data_load_tracker = DataLoadTracker(model_name, model_checksum, json_file, full_refresh, change_tracking_info, + self.correlation_id, full_refresh_reason) columns = pipeline_configuration['columns'] destination_table_manager.create_schema(pipeline_configuration['target_schema']) diff --git a/modules/data_load_tracking/DataLoadExecution.py b/modules/data_load_tracking/DataLoadExecution.py index 061dc31..66c93a7 100644 --- a/modules/data_load_tracking/DataLoadExecution.py +++ b/modules/data_load_tracking/DataLoadExecution.py @@ -17,4 +17,5 @@ class DataLoadExecution(Base): rows_processed = Column(Integer, nullable=False) correlation_id = Column(String(250), nullable=True) status = Column(String(25), nullable=False) - + model_checksum = Column(String(100), nullable=False) + full_refresh_reason = Column(String(100), nullable=False) diff --git a/modules/data_load_tracking/DataLoadTracker.py b/modules/data_load_tracking/DataLoadTracker.py index 242ab27..31e101a 100644 --- a/modules/data_load_tracking/DataLoadTracker.py +++ b/modules/data_load_tracking/DataLoadTracker.py @@ -14,15 +14,18 @@ class DataLoadTracker: total_row_count = 0 rows_per_second = 0 correlation_id = None, + full_refresh_reason = "N/A" - def __init__(self, model_name, configuration, is_full_refresh, change_tracking_info, correlation_id): + def __init__(self, model_name, model_checksum, configuration, is_full_refresh, change_tracking_info, correlation_id, full_refresh_reason): self.model_name = model_name + self.model_checksum = model_checksum self.configuration = configuration self.is_full_refresh = is_full_refresh self.started = datetime.now() self.status = "Not Started" self.change_tracking_info = change_tracking_info self.correlation_id = correlation_id + self.full_refresh_reason = full_refresh_reason def start_batch(self): batch = self.Batch() diff --git a/modules/data_load_tracking/DataLoadTrackerRepository.py b/modules/data_load_tracking/DataLoadTrackerRepository.py index 83c6891..e05ef11 100644 --- a/modules/data_load_tracking/DataLoadTrackerRepository.py +++ b/modules/data_load_tracking/DataLoadTrackerRepository.py @@ -12,13 +12,9 @@ def create_tables(self, engine): engine.execute("CREATE SCHEMA IF NOT EXISTS {0}".format("data_pipeline")) Base.metadata.create_all(engine) - def get_last_sync_version(self, model_name): + def get_last_successful_data_load_execution(self, model_name): session = self.session_maker() - result = session.query(DataLoadExecution).filter_by(model_name=model_name, status="Completed Successfully").order_by(desc(DataLoadExecution.completed_on)).first() - - if result is None: - return 0 - return result.next_sync_version + return session.query(DataLoadExecution).filter_by(model_name=model_name, status="Completed Successfully").order_by(desc(DataLoadExecution.completed_on)).first() def save(self, data_load_tracker): @@ -30,9 +26,9 @@ def save(self, data_load_tracker): next_sync_version=data_load_tracker.change_tracking_info.next_sync_version, execution_time_ms=int(data_load_tracker.total_execution_time.total_seconds() * 1000), rows_processed=data_load_tracker.total_row_count, - status=data_load_tracker.status) - - + status=data_load_tracker.status, + model_checksum=data_load_tracker.model_checksum, + full_refresh_reason = data_load_tracker.full_refresh_reason) session = self.session_maker() session.add(data_load_execution) From ad89f7073e3283612a90ca61e4d53955ccd72101 Mon Sep 17 00:00:00 2001 From: David Ames Date: Sat, 14 Jul 2018 07:00:20 +1000 Subject: [PATCH 09/11] fix typo --- modules/DataLoadManager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/DataLoadManager.py b/modules/DataLoadManager.py index 7831f32..7e11f51 100644 --- a/modules/DataLoadManager.py +++ b/modules/DataLoadManager.py @@ -54,7 +54,7 @@ def start_single_import(self, target_engine, model_name, requested_full_refresh) if last_successful_data_load_execution is None: last_sync_version = 0 full_refresh_reason = "First Execution" - full_refresh = True, + full_refresh = True else: self.logger.debug("Previous Checksum {0}. Current Checksum {1}".format(last_successful_data_load_execution.model_checksum, model_checksum)) last_sync_version = last_successful_data_load_execution.next_sync_version From 578ea17e8097eb6e8b39cc2a8c9511b8523e4cc9 Mon Sep 17 00:00:00 2001 From: Chintan Raval Date: Thu, 2 Aug 2018 11:13:25 +1000 Subject: [PATCH 10/11] (a) ignore IDE settings (b) update README to point to test scripts for usage samples --- .gitignore | 2 ++ .vscode/settings.json | 3 --- README.md | 9 +-------- 3 files changed, 3 insertions(+), 11 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.gitignore b/.gitignore index 4a45b25..b0c833d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ +# IDE-specific settings .idea/ +.vscode/ # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 1337f53..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "python.pythonPath": "${workspaceFolder}\\venv\\Scripts\\python.exe" -} \ No newline at end of file diff --git a/README.md b/README.md index 8b3ac06..317a596 100644 --- a/README.md +++ b/README.md @@ -18,15 +18,8 @@ In the above example, dwsource is a 64bit ODBC system dsn ### Examples -#### CSV Source - -`py rdl.py csv://.\test_data\full_refresh postgresql+psycopg2://postgres:xxxx@localhost/dest_dw .\configuration\ --log-level INFO --full-refresh yes` -`py rdl.py csv://.\test_data\incremental_refresh postgresql+psycopg2://postgres:xxxx@localhost/dest_dw .\configuration\ --log-level INFO --full-refresh no` - - -#### MSSQL Source - +See `test_*.cmd` scripts for usage samples. ### Troubleshooting Run with `--log-level DEBUG` on the command line. From fd27b9bcf7be42616a122dda5d486104957f9a96 Mon Sep 17 00:00:00 2001 From: dames Date: Tue, 7 Aug 2018 07:41:01 +1000 Subject: [PATCH 11/11] Active the venv for testing --- appveyor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/appveyor.yml b/appveyor.yml index dcfd785..82b15fe 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -39,7 +39,7 @@ build_script: - psql -c "SELECT VERSION()" - createdb %DBNAME% - psql -d %DBNAME% -c "CREATE EXTENSION IF NOT EXISTS citext" - + - C:\projects\relational-data-loader\venv\Scripts\activate.bat #Install the dependencies for rdl. - pip install -r requirements.txt