Skip to content

Commit

Permalink
small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-schick committed May 16, 2022
1 parent 9547cb3 commit cf686d3
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 4 deletions.
3 changes: 2 additions & 1 deletion mara_pipelines/incremental_processing/processed_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,5 @@ def already_processed_file(node_path: str, file_name: str) -> datetime:
FROM data_integration_processed_file WHERE node_path = {'%s'}
AND file_name = {'%s'}
""", (node_path, file_name,))
return cursor.fetchone()[0]
row = cursor.fetchone()
return row[0] if row else None
9 changes: 8 additions & 1 deletion mara_pipelines/parallel_tasks/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read
self._db_alias = db_alias
self.timezone = timezone

self.use_workers = self.read_mode == [ReadMode.ALL, ReadMode.ONLY_NEW, ReadMode.ONLY_CHANGED]
self.use_workers = self.read_mode in [ReadMode.ALL] # NOTE: It should be possible to implement here ReadMode.ONLY_NEW
# and ReadMode.ONLY_CHANGED as well. I tried it but run into
# issues with the lambda function used in `process_commands`:
#
# The commands passed via `feed_workers` are processed through a
# multiprocessing.Queue. Probably all objects/functions passed over
# there need to be declared on root level. See as well:
# https://stackoverflow.com/a/8805244

@property
def db_alias(self):
Expand Down
1 change: 1 addition & 0 deletions mara_pipelines/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def run(self):
raise ValueError(f"Unexecpted type passed to command_queue: {value}")

for command in commands:
command.parent = self
if not command.run():
return False

Expand Down
5 changes: 3 additions & 2 deletions tests/test_parallel_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def empty_files():
root_path = EMPTY_FILES_BASE_PATH
root_path.mkdir(parents=True, exist_ok=True)

file_list = [str((root_path / str(file)).absolute()) for file in range(1, 8*3)]
file_list = [str((root_path / str(file)).absolute()) for file in range(25)]

# create empty files
for file in file_list:
Expand Down Expand Up @@ -64,6 +64,7 @@ def test_read_mode_all(empty_files):
description="Runs a test pipeline which checks if a file exist",
file_pattern='*',
read_mode=ReadMode.ALL,
target_table=None))
target_table=None,
max_number_of_parallel_tasks=4))

run_pipeline(pipeline)

0 comments on commit cf686d3

Please sign in to comment.