Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ build_script:
#Setup the source MSSQL database
- sqlcmd -b -E -S "(local)\SQL2016" -i .\integration_tests\mssql_source\source_database_setup\create_database.sql
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RelationalDataLoaderIntegrationTestSource -i .\integration_tests\mssql_source\source_database_setup\create_large_table.sql
- sqlcmd -b -E -f 65001 -S "(local)\SQL2016" -d RelationalDataLoaderIntegrationTestSource -i .\integration_tests\mssql_source\source_database_setup\create_compound_pk.sql

#Setup the target PostgreSQL database
- psql -c "SELECT VERSION()"
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/csv_source/config/ColumnTest.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"source_table": {
"name": "ColumnTest",
"schema": "dbo",
"primary_key": "id"
"primary_keys": ["id"]
},
"target_schema": "rdl_integration_tests",
"stage_table": "stage_source_data",
Expand Down
35 changes: 35 additions & 0 deletions integration_tests/mssql_source/config/CompoundPkTest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"source_table": {
"name": "CompoundPk",
"schema": "dbo",
"primary_keys": ["Id1","Id2"]
},
"target_schema": "rdl_integration_tests",
"stage_table": "stage_compound_pk",
"load_table": "load_compound_pk",

"batch": {
"size": 100000
},
"columns": [
{
"source_name": "Id1",
"destination": {
"name": "id_1",
"type": "int",
"nullable": false,
"primary_key": true
}
},
{
"source_name": "Id2",
"destination": {
"name": "id_2",
"type": "int",
"nullable": false,
"primary_key": true
}
}

]
}
2 changes: 1 addition & 1 deletion integration_tests/mssql_source/config/LargeTableTest.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"source_table": {
"name": "LargeTable",
"schema": "dbo",
"primary_key": "Id"
"primary_keys": ["Id"]
},
"target_schema": "rdl_integration_tests",
"stage_table": "stage_large_data",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
IF object_id('CompoundPk') IS NULL
CREATE TABLE CompoundPk (
Id1 INT,
Id2 INT ,
CONSTRAINT PK_CompoundPK PRIMARY KEY (Id1, Id2))
ELSE
TRUNCATE TABLE CompoundPk

INSERT CompoundPk
(
Id1,
Id2
)
SELECT 1,1
UNION ALL
SELECT 1,2
UNION ALL
SELECT 2,2
UNION ALL
SELECT 2,1



17 changes: 9 additions & 8 deletions modules/BatchDataLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,29 @@ def __init__(self, data_source, source_table_configuration, target_schema, targe
self.change_tracking_info = change_tracking_info

# Imports rows, returns True if >0 rows were found
def load_batch(self, previous_batch_key):
def load_batch(self, batch_key_tracker):
batch_tracker = self.data_load_tracker.start_batch()

self.logger.debug("ImportBatch Starting from previous_batch_key: {0}. Full Refresh: {1} this_sync_version: {2}".format(previous_batch_key, self.full_refresh, self.change_tracking_info.this_sync_version))
self.logger.debug("ImportBatch Starting from previous_batch_key: {0}. Full Refresh: {1} this_sync_version: {2}".format(batch_key_tracker.bookmarks, self.full_refresh, self.change_tracking_info.this_sync_version))

data_frame = self.data_source.get_next_data_frame(self.source_table_configuration, self.columns,
self.batch_configuration, batch_tracker, previous_batch_key,
self.batch_configuration, batch_tracker, batch_key_tracker,
self.full_refresh, self.change_tracking_info)

if data_frame is None or len(data_frame) == 0:
self.logger.debug("There are no rows to import, returning -1")
self.logger.debug("There are no more rows to import.")
batch_tracker.load_skipped_due_to_zero_rows()
return -1
batch_key_tracker.has_more_data = False
return

data_frame = self.attach_column_transformers(data_frame)
self.write_data_frame_to_table(data_frame)
batch_tracker.load_completed_successfully()

last_key_returned = data_frame.iloc[-1][self.source_table_configuration['primary_key']]
for primary_key in batch_key_tracker.primary_keys:
batch_key_tracker.set_bookmark(primary_key, data_frame.iloc[-1][primary_key])

self.logger.info("Batch key {0} Completed. {1}".format(last_key_returned, batch_tracker.get_statistics()))
return last_key_returned
self.logger.info("Batch keys {0} Completed. {1}".format(batch_key_tracker.bookmarks, batch_tracker.get_statistics()))

def write_data_frame_to_table(self, data_frame):
qualified_target_table = "{0}.{1}".format(self.target_schema, self.target_table)
Expand Down
15 changes: 15 additions & 0 deletions modules/BatchKeyTracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

class BatchKeyTracker(object):

def __init__(self, primary_keys):
self.primary_keys = primary_keys
self.has_more_data = True
self.bookmarks = {}

for primary_key in primary_keys:
self.bookmarks[primary_key] = 0

def set_bookmark(self, key, value):
self.bookmarks[key] = value


9 changes: 5 additions & 4 deletions modules/DataLoadManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from modules.BatchDataLoader import BatchDataLoader
from modules.DestinationTableManager import DestinationTableManager
from modules.data_load_tracking.DataLoadTracker import DataLoadTracker

from modules.BatchKeyTracker import BatchKeyTracker

class DataLoadManager(object):
def __init__(self, configuration_path, data_source, data_load_tracker_repository, logger=None):
Expand Down Expand Up @@ -97,9 +97,10 @@ def start_single_import(self, target_engine, model_name, requested_full_refresh)
full_refresh,
change_tracking_info)

previous_unique_column_value = 0
while previous_unique_column_value > -1:
previous_unique_column_value = batch_data_loader.load_batch(previous_unique_column_value)

batch_key_tracker = BatchKeyTracker(pipeline_configuration['source_table']['primary_keys']);
while batch_key_tracker.has_more_data:
batch_data_loader.load_batch(batch_key_tracker)

if full_refresh:
# Rename the stage table to the load table.
Expand Down
6 changes: 4 additions & 2 deletions modules/data_sources/CsvDataSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ def assert_column_exists(self, column_name, data_frame, csv_file):


# For now, the CSV data sources will get all rows in the CSV regardless of batch size. - Ie, they don't currently support paging.
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key, full_refresh, change_tracking_info):
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, batch_key_tracker, full_refresh, change_tracking_info):

if previous_batch_key > 0:
# There is no incremental loading in CSV - therefore, we will check if we have loaded data before in that run
# if we have, we have loaded all the data.
if batch_key_tracker.bookmarks[batch_key_tracker.primary_keys[0]] > 0:
return None

csv_file = os.path.abspath(self.source_path / "{0}.csv".format(table_configuration['name']))
Expand Down
94 changes: 71 additions & 23 deletions modules/data_sources/MsSqlDataSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from modules.data_sources.ChangeTrackingInfo import ChangeTrackingInfo
from sqlalchemy.sql import text


class MsSqlDataSource(object):

def __init__(self, connection_string, logger=None):
Expand All @@ -31,32 +32,40 @@ def prefix_column(column_name, full_refresh, primary_key_column_name):
else:
return "t.{0}".format(column_name)

def build_select_statement(self, table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info):
column_array = list(map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_configuration['primary_key']), columns))
def build_select_statement(self, table_configuration, columns, batch_configuration, batch_key_tracker, full_refresh,
change_tracking_info):
column_array = list(
map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_configuration['primary_keys']),
columns))
column_names = ", ".join(column_array)

if full_refresh:
return "SELECT TOP ({0}) {1} FROM {2}.{3} t WHERE t.{4} > {5} ORDER BY t.{4}".format(batch_configuration['size'],
column_names,
table_configuration[
'schema'],
table_configuration[
'name'],
table_configuration[
'primary_key'],
previous_batch_key)
order_by = ", t.".join(table_configuration['primary_keys'])

return "SELECT TOP ({0}) {1} FROM {2}.{3} t WHERE {4} ORDER BY {5}".format(batch_configuration['size'],
column_names,
table_configuration[
'schema'],
table_configuration[
'name'],
self.build_where_clause(batch_key_tracker, "t"),
order_by)
else:
order_by = ", chg.".join(table_configuration['primary_keys'])

sql_builder = io.StringIO()
sql_builder.write("SELECT TOP ({0}) {1}, ".format(batch_configuration['size'], column_names))
sql_builder.write("chg.SYS_CHANGE_VERSION as data_pipeline_change_version, CASE chg.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 END as data_pipeline_is_deleted \n")
sql_builder.write(
"chg.SYS_CHANGE_VERSION as data_pipeline_change_version, CASE chg.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 END as data_pipeline_is_deleted \n")
sql_builder.write("FROM CHANGETABLE(CHANGES {0}.{1}, {2}) chg ".format(table_configuration['schema'],
table_configuration['name'],
change_tracking_info.this_sync_version))
sql_builder.write(" LEFT JOIN {0}.{1} t on chg.{2} = t.{2} ".format( table_configuration['schema'],
table_configuration['name'],
table_configuration['primary_key'],))
sql_builder.write(" LEFT JOIN {0}.{1} t on {2} ".format(table_configuration['schema'],
table_configuration['name'],
self.build_change_table_on_clause(batch_key_tracker)))

sql_builder.write("WHERE chg.{0} > {1} ORDER BY chg.{0}".format(table_configuration['primary_key'],
previous_batch_key))
sql_builder.write("WHERE {0}".format(self.build_where_clause(batch_key_tracker, "t")))
sql_builder.write("ORDER BY {0}".format(order_by))

return sql_builder.getvalue()

Expand All @@ -65,7 +74,8 @@ def assert_data_source_is_valid(self, table_configuration, configured_columns):
columns_in_database = self.get_table_columns(table_configuration)

for column in configured_columns:
self.assert_column_exists(column['source_name'], columns_in_database, "{0}.{1}".format(table_configuration['schema'], table_configuration['name']))
self.assert_column_exists(column['source_name'], columns_in_database,
"{0}.{1}".format(table_configuration['schema'], table_configuration['name']))

def assert_column_exists(self, column_name, columns_in_database, table_name):
if column_name in columns_in_database:
Expand All @@ -82,9 +92,10 @@ def get_table_columns(self, table_configuration):
autoload_with=self.database_engine)
return list(map(lambda column: column.name, table.columns))


def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key, full_refresh, change_tracking_info):
sql = self.build_select_statement(table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info,)
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, batch_key_tracker,
full_refresh, change_tracking_info):
sql = self.build_select_statement(table_configuration, columns, batch_configuration, batch_key_tracker,
full_refresh, change_tracking_info, )
self.logger.debug("Starting read of SQL Statement: {0}".format(sql))
data_frame = pandas.read_sql_query(sql, self.database_engine)

Expand All @@ -111,9 +122,11 @@ def init_change_tracking(self, table_configuration, last_sync_version):
sql_builder.write("DECLARE @last_sync_version bigint = {0}; \n".format(last_sync_version))
sql_builder.write("DECLARE @this_sync_version bigint = 0; \n")
sql_builder.write("DECLARE @next_sync_version bigint = CHANGE_TRACKING_CURRENT_VERSION(); \n")
sql_builder.write("IF @last_sync_version >= CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('{0}.{1}'))\n".format(table_configuration['schema'],table_configuration['name']))
sql_builder.write("IF @last_sync_version >= CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('{0}.{1}'))\n".format(
table_configuration['schema'], table_configuration['name']))
sql_builder.write(" SET @this_sync_version = @last_sync_version; \n")
sql_builder.write(" SELECT @next_sync_version as next_sync_version, @this_sync_version as this_sync_version; \n")
sql_builder.write(
" SELECT @next_sync_version as next_sync_version, @this_sync_version as this_sync_version; \n")

self.logger.debug("Getting ChangeTrackingInformation for {0}.{1}. {2}".format(table_configuration['schema'],
table_configuration['name'],
Expand All @@ -125,3 +138,38 @@ def init_change_tracking(self, table_configuration, last_sync_version):

force_full_load = bool(row["this_sync_version"] == 0 or row["next_sync_version"] == 0)
return ChangeTrackingInfo(row["this_sync_version"], row["next_sync_version"], force_full_load)

@staticmethod
def build_where_clause(batch_key_tracker, table_alias):
has_value = False

try:
sql_builder = io.StringIO()
for primary_key in batch_key_tracker.bookmarks:
if has_value:
sql_builder.write(" AND ")

sql_builder.write(
" {0}.{1} > {2}".format(table_alias, primary_key, batch_key_tracker.bookmarks[primary_key]))
has_value = True

return sql_builder.getvalue()
finally:
sql_builder.close()

@staticmethod
def build_change_table_on_clause(batch_key_tracker):
has_value = False

try:
sql_builder = io.StringIO()
for primary_key in batch_key_tracker.bookmarks:
if has_value:
sql_builder.write(" AND ")

sql_builder.write(" chg.{0} = t.{0}".format(primary_key))
has_value = True

return sql_builder.getvalue()
finally:
sql_builder.close()