Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# IDE-specific settings
.idea/
.vscode/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
9 changes: 1 addition & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,8 @@ In the above example, dwsource is a 64bit ODBC system dsn


### Examples
#### CSV Source

`py rdl.py csv://.\test_data\full_refresh postgresql+psycopg2://postgres:xxxx@localhost/dest_dw .\configuration\ --log-level INFO --full-refresh yes`
`py rdl.py csv://.\test_data\incremental_refresh postgresql+psycopg2://postgres:xxxx@localhost/dest_dw .\configuration\ --log-level INFO --full-refresh no`


#### MSSQL Source


See `test_*.cmd` scripts for usage samples.

### Troubleshooting
Run with `--log-level DEBUG` on the command line.
Expand Down
4 changes: 2 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ build_script:
- psql -c "SELECT VERSION()"
- createdb %DBNAME%
- psql -d %DBNAME% -c "CREATE EXTENSION IF NOT EXISTS citext"

- C:\projects\relational-data-loader\venv\Scripts\activate.bat
#Install the dependencies for rdl.
- pip install -r requirements.txt


test_script:
- test_full_refresh_from_csv.cmd
- test_incremental_refresh_from_csv.cmd
- test_full_refresh_from_mssql.cmd
- test_incremental_refresh_from_mssql.cmd

on_finish:
#Enable this line to make the build pause after completion for RDP troubleshooting.
Expand Down
1 change: 0 additions & 1 deletion integration_tests/mssql_source/config/LargeTableTest.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{

"source_table": {
"name": "LargeTable",
"schema": "dbo",
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/mssql_source/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM microsoft/mssql-server-linux:2017-latest

ENV ACCEPT_EULA=Y
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
IF NOT EXISTS (SELECT * FROM sys.databases WHERE Name = 'RelationalDataLoaderIntegrationTestSource')
CREATE DATABASE RelationalDataLoaderIntegrationTestSource
CREATE DATABASE RelationalDataLoaderIntegrationTestSource

ALTER DATABASE RelationalDataLoaderIntegrationTestSource
SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
15 changes: 11 additions & 4 deletions modules/BatchDataLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

class BatchDataLoader(object):
def __init__(self, data_source, source_table_configuration, target_schema, target_table, columns, data_load_tracker,
batch_configuration, target_engine, logger=None):
batch_configuration, target_engine, full_refresh, change_tracking_info, logger=None):
self.logger = logger or logging.getLogger(__name__)
self.source_table_configuration = source_table_configuration
self.columns = columns
Expand All @@ -16,15 +16,18 @@ def __init__(self, data_source, source_table_configuration, target_schema, targe
self.data_load_tracker = data_load_tracker
self.batch_configuration = batch_configuration
self.target_engine = target_engine
self.full_refresh = full_refresh
self.change_tracking_info = change_tracking_info

# Imports rows, returns True if >0 rows were found
def load_batch(self, previous_batch_key):
batch_tracker = self.data_load_tracker.start_batch()

self.logger.debug("ImportBatch Starting from previous_batch_key: {0}".format(previous_batch_key))
self.logger.debug("ImportBatch Starting from previous_batch_key: {0}. Full Refresh: {1} this_sync_version: {2}".format(previous_batch_key, self.full_refresh, self.change_tracking_info.this_sync_version))

data_frame = self.data_source.get_next_data_frame(self.source_table_configuration, self.columns,
self.batch_configuration, batch_tracker, previous_batch_key)
self.batch_configuration, batch_tracker, previous_batch_key,
self.full_refresh, self.change_tracking_info)

if data_frame is None or len(data_frame) == 0:
self.logger.debug("There are no rows to import, returning -1")
Expand All @@ -44,7 +47,6 @@ def write_data_frame_to_table(self, data_frame):
qualified_target_table = "{0}.{1}".format(self.target_schema, self.target_table)
self.logger.debug("Starting write to table {0}".format(qualified_target_table))
data = StringIO()

data_frame.to_csv(data, header=False, index=False, na_rep='', float_format='%.16g')
# Float_format is used to truncate any insignificant digits. Unfortunately it gives us an artificial limitation

Expand All @@ -58,6 +60,7 @@ def write_data_frame_to_table(self, data_frame):

sql = "COPY {0}({1}) FROM STDIN with csv".format(qualified_target_table, column_list)
self.logger.debug("Writing to table using command {0}".format(sql))

curs.copy_expert(sql=sql, file=data)

self.logger.debug("Completed write to table {0}".format(qualified_target_table))
Expand All @@ -70,6 +73,10 @@ def get_destination_column_name(self, source_column_name):
if column['source_name'] == source_column_name:
return column['destination']['name']

# Internal columns - map them straight through
if source_column_name.startswith("data_pipeline_"):
return source_column_name;

message = 'A source column with name {0} was not found in the column configuration'.format(source_column_name)
raise ValueError(message)

Expand Down
64 changes: 50 additions & 14 deletions modules/DataLoadManager.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,81 @@
import os
import json
import uuid
import logging
import hashlib

from modules.BatchDataLoader import BatchDataLoader
from modules.DestinationTableManager import DestinationTableManager
from modules.DataLoadTracker import DataLoadTracker

from modules.data_load_tracking.DataLoadTracker import DataLoadTracker


class DataLoadManager(object):
def __init__(self, configuration_path, data_source, logger=None):
def __init__(self, configuration_path, data_source, data_load_tracker_repository, logger=None):
self.logger = logger or logging.getLogger(__name__)
self.configuration_path = configuration_path
self.data_source = data_source

self.data_load_tracker_repository = data_load_tracker_repository
self.correlation_id = uuid.uuid4()
def start_imports(self, target_engine, full_refresh):
for file in os.listdir(self.configuration_path):
self.start_single_import(target_engine, file, full_refresh)

self.logger.info("Execution completed.")

def start_single_import(self, target_engine, configuration_name, requested_full_refresh):
self.logger.debug("Using configuration file : {0}".format(configuration_name))
def start_single_import(self, target_engine, model_name, requested_full_refresh):
self.logger.debug("Using configuration file : {0}".format(model_name))

config_file = os.path.abspath(self.configuration_path + configuration_name)
config_file = os.path.abspath(self.configuration_path + model_name)
self.logger.debug("Using configuration file : {0}".format(config_file))
with open(config_file) as json_data:
pipeline_configuration = json.load(json_data)
with open(config_file) as json_file:
model_checksum = hashlib.md5(json_file.read().encode('utf-8')).hexdigest()
json_file.seek(0)
pipeline_configuration = json.load(json_file)

self.logger.info("Execute Starting for: {0} requested_full_refresh: {1}".format(configuration_name, requested_full_refresh))
self.logger.info("Execute Starting for: {0} requested_full_refresh: {1}".format(model_name, requested_full_refresh))

destination_table_manager = DestinationTableManager(target_engine)

full_refresh_reason = "Command Line Argument" if requested_full_refresh else "N/A"
full_refresh = requested_full_refresh
if not requested_full_refresh and not destination_table_manager.table_exists(pipeline_configuration['target_schema'],
pipeline_configuration['load_table']):
self.logger.warning("The load table {0}.{1} does not exist. Swapping to full-refresh mode".format(pipeline_configuration['target_schema'],
pipeline_configuration['load_table']))
full_refresh = True

data_load_tracker = DataLoadTracker(configuration_name, json_data, full_refresh)
full_refresh_reason = "Destination table does not exist"
full_refresh = True

self.data_source.assert_data_source_is_valid(pipeline_configuration['source_table'],
pipeline_configuration['columns'])

last_successful_data_load_execution = self.data_load_tracker_repository.get_last_successful_data_load_execution(model_name)

if last_successful_data_load_execution is None:
last_sync_version = 0
full_refresh_reason = "First Execution"
full_refresh = True
else:
self.logger.debug("Previous Checksum {0}. Current Checksum {1}".format(last_successful_data_load_execution.model_checksum, model_checksum))
last_sync_version = last_successful_data_load_execution.next_sync_version
if not full_refresh and last_successful_data_load_execution.model_checksum != model_checksum:
self.logger.info("A model checksum change has forced this to be a full load")
full_refresh = True
full_refresh_reason = "Model Change"

change_tracking_info = self.data_source.init_change_tracking(pipeline_configuration['source_table'],
last_sync_version)



if not full_refresh and change_tracking_info.force_full_load:
self.logger.info("Change tracking has forced this to be a full load")
full_refresh = True
full_refresh_reason = "Change Tracking Invalid"

data_load_tracker = DataLoadTracker(model_name, model_checksum, json_file, full_refresh, change_tracking_info,
self.correlation_id, full_refresh_reason)

columns = pipeline_configuration['columns']
destination_table_manager.create_schema(pipeline_configuration['target_schema'])

Expand All @@ -60,7 +93,9 @@ def start_single_import(self, target_engine, configuration_name, requested_full_
columns,
data_load_tracker,
pipeline_configuration['batch'],
target_engine)
target_engine,
full_refresh,
change_tracking_info)

previous_unique_column_value = 0
while previous_unique_column_value > -1:
Expand All @@ -82,5 +117,6 @@ def start_single_import(self, target_engine, configuration_name, requested_full_
destination_table_manager.drop_table(pipeline_configuration['target_schema'],
pipeline_configuration['stage_table'])
data_load_tracker.completed_successfully()
self.logger.info("Import for configuration: {0} Complete. {1}".format(configuration_name, data_load_tracker.get_statistics()))
self.data_load_tracker_repository.save(data_load_tracker)
self.logger.info("Import Complete for: {0}. {1}".format(model_name, data_load_tracker.get_statistics()))

30 changes: 18 additions & 12 deletions modules/DestinationTableManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import logging
from modules.ColumnTypeResolver import ColumnTypeResolver

from sqlalchemy import MetaData, DateTime, Boolean
from sqlalchemy import MetaData, DateTime, Boolean, BigInteger
from sqlalchemy.schema import Column, Table
from sqlalchemy.sql import func


class DestinationTableManager(object):
TIMESTAMP_COLUMN_NAME = "data_pipeline_timestamp"
IS_DELETED_COLUMN_NAME = "data_pipeline_is_deleted"
CHANGE_VERSION_COLUMN_NAME = "data_pipeline_change_version"

def __init__(self, target_engine, logger=None):
self.logger = logger or logging.getLogger(__name__)
Expand All @@ -23,7 +24,6 @@ def create_schema(self, schema_name):
def table_exists(self, schema_name, table_name):
return self.target_engine.dialect.has_table(self.target_engine, table_name, schema_name)


def drop_table(self, schema_name, table_name):
metadata = MetaData()
self.logger.debug(
Expand All @@ -35,9 +35,6 @@ def drop_table(self, schema_name, table_name):
self.logger.debug(
"Dropped table {0}.{1}".format(schema_name, table_name))




def create_table(self, schema_name, table_name, columns_configuration, drop_first):
metadata = MetaData()

Expand All @@ -52,9 +49,8 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs
table.append_column(
Column(self.IS_DELETED_COLUMN_NAME, Boolean, server_default='f', default=False))




table.append_column(
Column(self.CHANGE_VERSION_COLUMN_NAME, BigInteger))

if drop_first:
self.logger.debug(
Expand All @@ -66,6 +62,7 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs
self.logger.debug("Creating table {0}.{1}".format(schema_name, table_name))
table.create(self.target_engine, checkfirst=False)
self.logger.debug("Created table {0}.{1}".format(schema_name, table_name))

return

def create_column(self, configuration):
Expand Down Expand Up @@ -116,8 +113,12 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column
column_array = list(map(lambda column: column['destination']['name'], columns_configuration))
column_list = ','.join(map(str, column_array))
column_list = column_list + ",{0}".format(self.TIMESTAMP_COLUMN_NAME)
column_list = column_list + ",{0}".format(self.IS_DELETED_COLUMN_NAME)
column_list = column_list + ",{0}".format(self.CHANGE_VERSION_COLUMN_NAME)

primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in columns_configuration if 'primary_key' in column_configuration['destination'] and column_configuration['destination']['primary_key']]
primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in
columns_configuration if 'primary_key' in column_configuration['destination'] and
column_configuration['destination']['primary_key']]

primary_key_column_list = ','.join(map(str, primary_key_column_array))

Expand All @@ -128,11 +129,16 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column
sql_builder.write(os.linesep)
sql_builder.write(" ON CONFLICT({0}) DO UPDATE SET ".format(primary_key_column_list))

for column_configuratiomn in columns_configuration:
sql_builder.write("{0} = EXCLUDED.{0},".format(column_configuratiomn['destination']['name']))
for column_configuration in columns_configuration:
sql_builder.write("{0} = EXCLUDED.{0},".format(column_configuration['destination']['name']))
sql_builder.write(os.linesep)

sql_builder.write("{0} = EXCLUDED.{0}".format(self.TIMESTAMP_COLUMN_NAME))
sql_builder.write("{0} = EXCLUDED.{0},".format(self.TIMESTAMP_COLUMN_NAME))
sql_builder.write(os.linesep)
sql_builder.write("{0} = EXCLUDED.{0},".format(self.IS_DELETED_COLUMN_NAME))
sql_builder.write(os.linesep)
sql_builder.write("{0} = EXCLUDED.{0}".format(self.CHANGE_VERSION_COLUMN_NAME))
sql_builder.write(os.linesep)

self.logger.debug("Upsert executing {0}".format(sql_builder.getvalue()))
self.target_engine.execute(sql_builder.getvalue())
Expand Down
7 changes: 6 additions & 1 deletion modules/RelationalDataLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import argparse
from sqlalchemy import create_engine
from modules.DataLoadManager import DataLoadManager
from modules.data_load_tracking.DataLoadTrackerRepository import DataLoadTrackerRepository
from modules.data_sources.DataSourceFactory import DataSourceFactory
from sqlalchemy.orm import sessionmaker

_LOG_LEVEL_STRINGS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']

Expand All @@ -20,7 +22,10 @@ def main(self):

destination_engine = create_engine(args['destination-engine'])

data_load_manager = DataLoadManager(args['configuration-folder'], data_source)
session_maker = sessionmaker(bind=destination_engine)
repository = DataLoadTrackerRepository(session_maker)
repository.create_tables(destination_engine)
data_load_manager = DataLoadManager(args['configuration-folder'], data_source, repository)
data_load_manager.start_imports(destination_engine, args['full_refresh'])

def configure_logging(self, log_level):
Expand Down
21 changes: 21 additions & 0 deletions modules/data_load_tracking/DataLoadExecution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from sqlalchemy import Column, DateTime, Integer, String, Boolean, BigInteger
from sqlalchemy.sql import func
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class DataLoadExecution(Base):
__tablename__ = 'data_load_execution'
__table_args__ = {'schema': 'data_pipeline'}
id = Column(Integer, primary_key=True)
model_name = Column(String(250), nullable=False)
is_full_refresh = Column(Boolean, nullable=False)
this_sync_version = Column(BigInteger, nullable=False)
next_sync_version = Column(BigInteger, nullable=False)
completed_on = Column(DateTime(timezone=True), server_default=func.now())
execution_time_ms = Column(Integer, nullable=False)
rows_processed = Column(Integer, nullable=False)
correlation_id = Column(String(250), nullable=True)
status = Column(String(25), nullable=False)
model_checksum = Column(String(100), nullable=False)
full_refresh_reason = Column(String(100), nullable=False)
Loading