From 0d9efae1fd6e61f741ebf20d289cbb9444826c6f Mon Sep 17 00:00:00 2001 From: Sean Budd Date: Tue, 14 May 2019 15:45:05 +1000 Subject: [PATCH] remove csv gunk --- README.md | 3 +- rdl/RelationalDataLoader.py | 3 +- rdl/data_sources/CsvDataSource.py | 79 ------------------------------- 3 files changed, 2 insertions(+), 83 deletions(-) delete mode 100644 rdl/data_sources/CsvDataSource.py diff --git a/README.md b/README.md index f7780ae..650325d 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,7 @@ usage: py -m rdl process [-h] [-f [FORCE_FULL_REFRESH_MODELS]] positional arguments: source-connection-string The source connections string as a 64bit ODBC system - dsn. Eg: mssql+pyodbc://dwsource or - csv://c://some//Path//To//Csv//Files// + dsn. Eg: mssql+pyodbc://dwsource destination-connection-string The destination database connection string. Provide in PostgreSQL + Psycopg format. Eg: 'postgresql+psycopg2: diff --git a/rdl/RelationalDataLoader.py b/rdl/RelationalDataLoader.py index 5c335ce..6937d85 100644 --- a/rdl/RelationalDataLoader.py +++ b/rdl/RelationalDataLoader.py @@ -97,8 +97,7 @@ def get_arguments(self): 'source_connection_string', metavar='source-connection-string', type=self.raw_connection_string_to_valid_source_connection_string, - help='The source connections string as a 64bit ODBC system dsn. Eg: mssql+pyodbc://dwsource or ' - 'csv://c://some//Path//To//Csv//Files//') + help='The source connections string as a 64bit ODBC system dsn. Eg: mssql+pyodbc://dwsource') process_command_parser.add_argument( 'destination_connection_string', diff --git a/rdl/data_sources/CsvDataSource.py b/rdl/data_sources/CsvDataSource.py deleted file mode 100644 index b2748b2..0000000 --- a/rdl/data_sources/CsvDataSource.py +++ /dev/null @@ -1,79 +0,0 @@ -import logging -import os.path -from pathlib import Path -import pandas - -from rdl.ColumnTypeResolver import ColumnTypeResolver -from rdl.data_sources.ChangeTrackingInfo import ChangeTrackingInfo -from rdl.shared.Utils import prevent_senstive_data_logging - - -class CsvDataSource(object): - def __init__(self, connection_string, logger=None): - self.logger = logger or logging.getLogger(__name__) - self.source_path = Path(connection_string[len(self.connection_string_prefix()):]) - self.column_type_resolver = ColumnTypeResolver() - - @staticmethod - def can_handle_connection_string(connection_string): - return connection_string.startswith(CsvDataSource.connection_string_prefix()) - - @staticmethod - def connection_string_prefix(): - return 'csv://' - - def assert_data_source_is_valid(self, table_config, configured_columns): - csv_file = os.path.abspath(self.source_path / f"{table_config['name']}.csv") - self.logger.debug(f"Path to CSV file: '{csv_file}'") - - if not os.path.exists(csv_file): - message = f"'{csv_file}' does not exist and will not be processed." - raise ValueError(message) - - data_frame = pandas.read_csv(csv_file, nrows=1) - - for column in configured_columns: - self.assert_column_exists(column['source_name'], data_frame, csv_file) - - def assert_column_exists(self, column_name, data_frame, csv_file): - if column_name in data_frame.columns: - return True - - message = f"Column '{column_name}' does not exist in source '{csv_file}'" - raise ValueError(message) - - # For now, the CSV data sources will get all rows in the CSV regardless of - # batch size. - Ie, they don't currently support paging. - @prevent_senstive_data_logging - def get_next_data_frame( - self, - table_config, - columns, - batch_config, - batch_tracker, - batch_key_tracker, - full_refresh, - change_tracking_info): - - # There is no incremental loading in CSV - therefore, we will check if we have loaded data before in that run - # if we have, we have loaded all the data. - if batch_key_tracker.bookmarks[batch_key_tracker.primary_keys[0]] > 0: - return None - - csv_file = os.path.abspath(self.source_path / f"{table_config['name']}.csv") - self.logger.debug(f"Path to CSV file: '{csv_file}'") - - if not os.path.exists(csv_file): - self.logger.warning(f"'{csv_file}' does not exist. Returning a None dataframe.") - return None - - self.logger.debug(f"Starting read of file: '{csv_file}'") - - data_frame = pandas.read_csv(csv_file) - self.logger.debug("Completed read") - - batch_tracker.extract_completed_successfully(len(data_frame)) - return data_frame - - def get_change_tracking_info(self, table_config, last_sync_version): - return ChangeTrackingInfo(0, 0, False, True)