Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parallel read #74

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ docs/_build/

# Environments
/.venv

/tests/.tmp
225 changes: 195 additions & 30 deletions mara_pipelines/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import functools
import multiprocessing
import os
import queue
import sys
import signal
import atexit
Expand All @@ -16,6 +17,7 @@
from multiprocessing import queues
from multiprocessing.context import BaseContext
from queue import Empty
from typing import Callable

from . import pipelines, config
from .logging import logger, pipeline_events, system_statistics, run_log, node_cost
Expand Down Expand Up @@ -245,6 +247,27 @@ def track_finished_pipelines():
next_node.parent.replace(next_node, sub_pipeline)
queue([sub_pipeline])

if next_node.use_workers:
# run the generation of commands in a subprocess
if next_node.parent in running_pipelines:
running_pipelines[next_node.parent][1] += 1

command_queue = multiprocessing_context.Queue(
# allow max 100 waiting objects per worker to limit memory usage
maxsize=(len(sub_pipeline.nodes) * 100))

for _, node in sub_pipeline.nodes.items():
if isinstance(node, pipelines.Worker):
node.origin_parent = next_node
node.command_queue = command_queue

process = FeedWorkersProcess(next_node, command_queue, event_queue, multiprocessing_context)
process.start()
running_task_processes[next_node] = process

event_queue.put(pipeline_events.NodeStarted(
node_path=next_node.path(), start_time=task_start_time, is_pipeline=True))

except Exception as e:
event_queue.put(pipeline_events.NodeStarted(
node_path=next_node.path(), start_time=task_start_time, is_pipeline=True))
Expand All @@ -263,16 +286,20 @@ def track_finished_pipelines():

else:
# run a task in a subprocess
task_start_time = datetime.datetime.now(tz.utc)
if next_node.parent in running_pipelines:
running_pipelines[next_node.parent][1] += 1
event_queue.put(
pipeline_events.NodeStarted(next_node.path(), datetime.datetime.now(tz.utc), False))
pipeline_events.NodeStarted(next_node.path(), task_start_time, False))
event_queue.put(pipeline_events.Output(
node_path=next_node.path(), format=logger.Format.ITALICS,
message='★ ' + node_cost.format_duration(
node_durations_and_run_times.get(tuple(next_node.path()), [0, 0])[0])))

process = TaskProcess(next_node, event_queue, multiprocessing_context)
if isinstance(next_node, pipelines.Worker):
process = WorkerProcess(next_node, event_queue, multiprocessing_context)
else:
process = TaskProcess(next_node, event_queue, multiprocessing_context)
process.start()
running_task_processes[next_node] = process

Expand All @@ -291,6 +318,20 @@ def track_finished_pipelines():
for parent in task_process.task.parents()[:-1]:
failed_pipelines.add(parent)

if not task_process.succeeded:
# Note: We do not support 'task_process.task.parent.ignore_errors' here to avoid endless loops:
# It could happen that all worker nodes fail. For this case there is currently no control
# implemented to kill the feed worker process.

if isinstance(task_process, WorkerProcess) and \
isinstance(task_process.task.origin_parent, pipelines.ParallelTask) and \
task_process.task.origin_parent.use_workers:
# A worker task failed. We check if the 'FeedWorkerProcess' is still running ...
for node, process in running_task_processes.items():
if node == task_process.task.origin_parent and isinstance(process, FeedWorkersProcess):
# Feed worker process found --> terminate it
process.terimate()

end_time = datetime.datetime.now(tz.utc)
event_queue.put(
pipeline_events.Output(task_process.task.path(),
Expand Down Expand Up @@ -423,25 +464,77 @@ def initialize_run_logger() -> events.EventHandler:
return run_log.RunLogger()


class TaskProcess:
def __init__(self, task: pipelines.Task, event_queue: multiprocessing.Queue, multiprocessing_context: BaseContext):
class _Subprocess:
def __init__(self, task: pipelines.Node, event_queue: multiprocessing.Queue, multiprocessing_context: BaseContext,
name_prefix: str, target: Callable):
"""
Runs a task in a separate sub process.
Runs a separate sub process.

Args:
task: The task to run
event_queue: The query for writing events to
multiprocessing_context: The multiprocessing context in which the task process shall run
name_prefix: The name prefix for the process name
target: The task target.
"""
self._process: multiprocessing.Process = multiprocessing_context.Process(
name='task-' + '-'.join(task.path()),
target=TaskProcess.run,
name=name_prefix + '-' + '-'.join(task.path()),
target=target or _Subprocess.run,
args=(self,))
self.task = task
self.event_queue = event_queue
self._status_queue = multiprocessing_context.Queue()
self.start_time = datetime.datetime.now(tz.utc)
self._succeeded: bool = None
self._succeeded = None

def run(self):
raise NotImplementedError('Subprocess method run not implemented.')

def start(self):
self._process.start()

def terimate(self):
self._process.terminate()

def kill(self):
self._process.kill()

def join(self, timeout=None):
self._process.join(timeout=timeout)

def is_alive(self):
return self._process.is_alive()

@property
def succeeded(self):
if self._succeeded is None:
if self.is_alive():
return None

succeeded_from_queue = None
try:
succeeded_from_queue = self._status_queue.get(False)
except Empty:
pass

self._succeeded = succeeded_from_queue == True

return self._succeeded


class TaskProcess(_Subprocess):
def __init__(self, task: pipelines.Task, event_queue: multiprocessing.Queue, multiprocessing_context: BaseContext,
name_prefix: str = 'task', target: Callable = None):
"""
Runs a task in a separate sub process.

Args:
task: The task to run
event_queue: The query for writing events to
multiprocessing_context: The multiprocessing context in which the task process shall run
"""
super().__init__(task=task, event_queue=event_queue, multiprocessing_context=multiprocessing_context,
name_prefix=name_prefix, target=target or TaskProcess.run)

def run(self):
# redirect stdout and stderr to queue
Expand Down Expand Up @@ -470,33 +563,105 @@ def run(self):

self._status_queue.put(succeeded)

def start(self):
self._process.start()

def terimate(self):
self._process.terminate()
class FeedWorkersProcess(_Subprocess):
def __init__(self, task: pipelines.ParallelTask, command_queue: multiprocessing.Queue, event_queue: multiprocessing.Queue, multiprocessing_context: BaseContext) -> None:
super().__init__(task=task, event_queue=event_queue, multiprocessing_context=multiprocessing_context,
name_prefix='feed-workers', target=FeedWorkersProcess.run)
self.command_queue = command_queue

def kill(self):
self._process.kill()
def _command_queue_put(self, commands, max_retries: int = None):
"""Puts a command to the queue. When it is full, try again"""
retry_count: int = 0
while True:
try:
self.command_queue.put(commands)
break
except queue.Full:
retry_count += 1
if max_retries and retry_count > max_retries:
break

def join(self, timeout=None):
self._process.join(timeout=timeout)
# if the queue limit is reached, wait for one second and try again
time.sleep(1)

def is_alive(self):
return self._process.is_alive()
def run(self):
# redirect stdout and stderr to queue
logger.redirect_output(self.event_queue, self.task.path())

@property
def succeeded(self):
if self._succeeded is None:
if self.is_alive():
return None
succeeded = True
try:
for commands in self.task.feed_workers():
if not commands:
# skip empty command chains
continue
self._command_queue_put(commands)
except Exception as e:
logger.log(message=traceback.format_exc(), format=logger.Format.VERBATIM,
is_error=True)
succeeded = False
finally:
# per worker send a "DONE" message to inform that all commands are send
self.inform_worker_nodes('DONE')

succeeded_from_queue = None
try:
succeeded_from_queue = self._status_queue.get(False)
except Empty:
pass
self.command_queue.close()

self._succeeded = succeeded_from_queue == True
self._status_queue.put(succeeded)

return self._succeeded
def inform_worker_nodes(self, message: str):
"""Sends a message to all worker nodes"""
for _ in range(self.task.max_number_of_parallel_tasks):
# In case all workers crashed this might end into an endless loop.
# Therefore we use a max_retries here which will try over 3 seconds
# to put the "DONE" into the queue.
self._command_queue_put(message, max_retries=3)

def terminate(self):
# before terminating, tell all worker nodes to end with failture when they are
# finished
self.inform_worker_nodes('TERMINATE')

super().terimate()


class WorkerProcess(TaskProcess):
def __init__(self, task: pipelines.Worker, event_queue: multiprocessing.Queue, multiprocessing_context: BaseContext):
"""
Runs a task in a separate sub process.

Args:
task: The task to run
event_queue: The query for writing events to
status_queue: A queue for reporting whether the task succeeded
"""
super().__init__(task=task, event_queue=event_queue, multiprocessing_context=multiprocessing_context,
name_prefix='worker', target=WorkerProcess.run)

def run(self):
# redirect stdout and stderr to queue
logger.redirect_output(self.event_queue, self.task.path())

succeeded = True
attempt = 0
try:
while True:
if not self.task.run():
#max_retries = self.task.max_retries or config.default_task_max_retries()
#if attempt < max_retries:
# attempt += 1
# delay = pow(2, attempt + 2)
# logger.log(message=f'Retry {attempt}/{max_retries} in {delay} seconds',
# is_error=True, format=logger.Format.ITALICS)
# time.sleep(delay)
#else:
# succeeded = False
# break
succeeded = False
break
else:
break
except Exception as e:
logger.log(message=traceback.format_exc(), format=logger.Format.VERBATIM, is_error=True)
succeeded = False

self._status_queue.put(succeeded)
19 changes: 19 additions & 0 deletions mara_pipelines/incremental_processing/processed_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,22 @@ def already_processed_files(node_path: str) -> {str: datetime}:
FROM data_integration_processed_file WHERE node_path = {'%s'}
""", (node_path,))
return {row[0]: row[1] for row in cursor.fetchall()}


def already_processed_file(node_path: str, file_name: str) -> datetime:
"""
Returns the last modified timestamp for a single file if it has
already been processed, otherwise None.

Args:
node_path: The path of the node that processed the file
file_name: The file name
"""
with mara_db.postgresql.postgres_cursor_context('mara') as cursor:
cursor.execute(f"""
SELECT last_modified_timestamp
FROM data_integration_processed_file WHERE node_path = {'%s'}
AND file_name = {'%s'}
""", (node_path, file_name,))
row = cursor.fetchone()
return row[0] if row else None
Loading