Skip to content

Commit

Permalink
Merge rq and celery branches to master by introducing command line op…
Browse files Browse the repository at this point in the history
…tion -e (engine) and RQ_Executor, Celery_Executor, RQ_Step_Executor, Celery_Step_Executor
  • Loading branch information
Bo Peng committed Sep 29, 2016
1 parent d08dd0b commit f69c72f
Show file tree
Hide file tree
Showing 11 changed files with 614 additions and 380 deletions.
16 changes: 12 additions & 4 deletions pysos/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from .pattern import glob_wildcards
from .sos_eval import interpolate, Undetermined
from .target import FileTarget, fileMD5
from .sos_executor import DAG_Executor
from .sos_executor import Base_Executor, MP_Executor, RQ_Executor, Celery_Executor
from .monitor import ProcessMonitor, summarizeExecution

__all__ = ['SoS_Action', 'execute_script', 'sos_run',
Expand Down Expand Up @@ -499,7 +499,7 @@ def sos_run(workflow, **kwargs):
env.sos_dict.set('__step_output__', copy.deepcopy(env.sos_dict['_input']))
if env.run_mode == 'prepare':
env.logger.debug('Preparing nested workflow {}'.format(workflow))
return DAG_Executor(wf, args=env.sos_dict['__args__'], nested=True).prepare()
return Base_Executor(wf, args=env.sos_dict['__args__'], nested=True).prepare()
elif env.run_mode == 'run':
env.logger.info('Executing workflow ``{}`` with input ``{}``'
.format(workflow, short_repr(env.sos_dict['_input'], True)))
Expand All @@ -508,8 +508,16 @@ def sos_run(workflow, **kwargs):
# been changed and need to be re-prepared) so it is necessary to prepare
# the workflow at run mode.
#
dag = DAG_Executor(wf, args=env.sos_dict['__args__'], nested=True).prepare()
return DAG_Executor(wf, args=env.sos_dict['__args__'], nested=True).run(dag)
dag = Base_Executor(wf, args=env.sos_dict['__args__'], nested=True).prepare()
if env.__task_engine__ is None:
if env.max_jobs == 1:
return Base_Executor(wf, args=env.sos_dict['__args__'], nested=True).run(dag)
else:
return MP_Executor(wf, args=env.sos_dict['__args__'], nested=True).run(dag)
elif env.__task_engine__ == 'rq':
return RQ_Executor(wf, args=env.sos_dict['__args__'], nested=True).run(dag)
elif env.__task_engine__ == 'celery':
return Celery_Executor(wf, args=env.sos_dict['__args__'], nested=True).run(dag)
elif env.run_mode == 'interactive':
raise RuntimeError('Action sos_run is not supported in interactive mode')

Expand Down
15 changes: 13 additions & 2 deletions pysos/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def cmd_run(args, workflow_args):
from .utils import env, get_traceback
from .sos_script import SoS_Script
from .target import FileTarget
from .sos_executor import DAG_Executor
from .sos_executor import Base_Executor, MP_Executor, RQ_Executor, Celery_Executor
env.max_jobs = args.__max_jobs__
env.verbosity = args.verbosity
# kill all remainging processes when the master process is killed.
Expand All @@ -156,7 +156,18 @@ def cmd_run(args, workflow_args):
try:
script = SoS_Script(filename=args.script)
workflow = script.workflow(args.workflow)
executor = DAG_Executor(workflow, args=workflow_args, config_file=args.__config__)
if args.__engine__ is None:
if args.__max_jobs__ == 1:
# single process executor
executor = Base_Executor(workflow, args=workflow_args, config_file=args.__config__)
else:
executor = MP_Executor(workflow, args=workflow_args, config_file=args.__config__)
elif args.__engine__ == 'rq':
executor = RQ_Executor(workflow, args=workflow_args, config_file=args.__config__)
elif args.__engine__ == 'celery':
executor = Celery_Executor(workflow, args=workflow_args, config_file=args.__config__)
else:
raise ValueError('Only the default multiprocessing and a rq engine is allowed')
# even with the -r option, prepare() can be executed if there are
# auxiliary_sections, or if there are targets where a DAG is required
# Issue #213
Expand Down
129 changes: 116 additions & 13 deletions pysos/sos_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from queue import Empty

from ._version import __version__
from .sos_step import Prepare_Step_Executor, Run_Step_Executor, Interactive_Step_Executor
from .sos_step import Prepare_Step_Executor, SP_Step_Executor, MP_Step_Executor, RQ_Step_Executor, Celery_Step_Executor, Interactive_Step_Executor
from .utils import env, Error, WorkflowDict, get_traceback, ProgressBar, frozendict, dict_merge
from .sos_eval import Undetermined, SoS_exec
from .sos_script import SoS_Script
Expand Down Expand Up @@ -76,13 +76,14 @@ def append(self, line, error):
class Base_Executor:
'''This is the base class of all executor that provides common
set up and tear functions for all executors.'''
def __init__(self, workflow=None, args=[], config_file=None, new_dict=True):
def __init__(self, workflow=None, args=[], config_file=None, nested=False):
env.__task_engine__ = None
self.workflow = workflow
self.new_dict = new_dict
self.nested = nested

# if creating a new dictionary, set it up with some basic varibles
# and functions
if not new_dict:
if nested:
SoS_exec('import os, sys, glob')
SoS_exec('from pysos.runtime import *')
self._base_symbols = set(dir(__builtins__)) | set(env.sos_dict.keys()) | set(SOS_KEYWORDS) | set(keyword.kwlist)
Expand Down Expand Up @@ -187,7 +188,7 @@ def prepare(self, targets=None):

# process step of the pipelinp
#
if self.new_dict:
if not self.nested:
env.sos_dict.set('__step_output__', None)

dag = SoS_DAG()
Expand Down Expand Up @@ -325,15 +326,91 @@ def prepare(self, targets=None):

return dag

def run(self, dag):
'''Execute a workflow with specified command line args. If sub is True, this
workflow is a nested workflow and be treated slightly differently.
'''
env.run_mode = 'run'
# passing run_mode to SoS dict so that users can execute blocks of
# python statements in different run modes.
env.sos_dict.set('run_mode', env.run_mode)
# process step of the pipelinp
#
prog = ProgressBar(self.workflow.name, dag.num_nodes(), disp=dag.num_nodes() > 1 and env.verbosity == 1)
while True:
# find any step that can be executed and run it, and update the DAT
# with status.
runnable = dag.find_executable()
if runnable is None:
# no runnable
#dag.show_nodes()
break
# find the section from runnable
section = self.workflow.section_by_id(runnable._step_uuid)
#
# this is to keep compatibility of dag run with sequential run because
# in sequential run, we evaluate global section of each step in
# order to determine values of options such as skip.
# The consequence is that global definitions are available in
# SoS namespace.
try:
SoS_exec(section.global_def)
except Exception as e:
if env.verbosity > 2:
sys.stderr.write(get_traceback())
raise RuntimeError('Failed to execute statements\n"{}"\n{}'.format(
section.global_def, e))

class DAG_Executor(Base_Executor):
# clear existing keys, otherwise the results from some random result
# might mess with the execution of another step that does not define input
for k in ['__step_input__', '__default_output__', '__step_output__']:
if k in env.sos_dict:
env.sos_dict.pop(k)
# if the step has its own context
env.sos_dict.quick_update(runnable._context)
# execute section with specified input
runnable._status = 'running'
q = mp.Queue()
executor = SP_Step_Executor(section, q)
p = mp.Process(target=executor.run)
p.start()
#
res = q.get()
# if we does get the result
p.join()
# if the job is failed
if isinstance(res, Exception):
raise RuntimeError(res)
#
for k, v in res.items():
env.sos_dict.set(k, v)
#
# set context to the next logic step.
for edge in dag.out_edges(runnable):
node = edge[1]
# if node is the logical next step...
if node._node_index is not None and runnable._node_index is not None \
and node._node_index == runnable._node_index + 1:
node._context.update(env.sos_dict.clone_selected_vars(
node._context['__signature_vars__'] | node._context['__environ_vars__'] \
| {'_input', '__step_output__', '__default_output__'}))
runnable._status = 'completed'
prog.progress(1)
#env.logger.error('completed')
prog.done()


class MP_Executor(Base_Executor):
#
# Execute a workflow sequentially in batch mode
def __init__(self, workflow, args=[], config_file=None, nested=False):
Base_Executor.__init__(self, workflow, args, config_file, new_dict=not nested)
Base_Executor.__init__(self, workflow, args, config_file, nested=nested)
if hasattr(env, 'accessed_vars'):
delattr(env, 'accessed_vars')

def step_executor(self, section, queue):
return MP_Step_Executor(section, queue)

def run(self, dag):
'''Execute a workflow with specified command line args. If sub is True, this
workflow is a nested workflow and be treated slightly differently.
Expand All @@ -345,8 +422,7 @@ def run(self, dag):
# process step of the pipelinp
#
procs = [None for x in range(env.max_jobs)]
prog = ProgressBar(self.workflow.name, dag.num_nodes(),
disp=dag.num_nodes() > 1 and env.verbosity == 1)
prog = ProgressBar(self.workflow.name, dag.num_nodes(), disp=dag.num_nodes() > 1 and env.verbosity == 1)
while True:
# step 1: check existing jobs and see if they are completed
for idx, proc in enumerate(procs):
Expand Down Expand Up @@ -388,8 +464,7 @@ def run(self, dag):
# if there is empty slot, submit
if proc is not None:
continue
# the strategy (or better termed no strategy) is to find
# any step that can be executed and run it, and update the DAT
# find any step that can be executed and run it, and update the DAT
# with status.
runnable = dag.find_executable()
if runnable is None:
Expand Down Expand Up @@ -422,7 +497,7 @@ def run(self, dag):
# execute section with specified input
runnable._status = 'running'
q = mp.Queue()
executor = Run_Step_Executor(section, q)
executor = self.step_executor(section, q)
p = mp.Process(target=executor.run)
procs[idx] = (p, q, runnable._node_uuid)
p.start()
Expand All @@ -436,11 +511,39 @@ def run(self, dag):
time.sleep(0.1)
prog.done()


class RQ_Executor(MP_Executor):
def __init__(self, workflow, args=[], config_file=None, nested=False):
MP_Executor.__init__(self, workflow, args, config_file, nested=nested)
env.__task_engine__ = 'RQ'

from rq import Queue as rqQueue
from redis import Redis

redis_conn = Redis()
self.redis_queue = rqQueue(connection=redis_conn)

def step_executor(self, section, queue):
return RQ_Step_Executor(section, queue, self.redis_euque)

class Celery_Executor(MP_Executor):
def __init__(self, workflow, args=[], config_file=None, nested=False):
MP_Executor.__init__(self, workflow, args, config_file, nested=nested)
env.__task_engine__ = 'Celery'

from celery import Celery
celery_app = Celery('pysos.sos_step', broker='redis://localhost', backend='redis://localhost')

def step_executor(self, section, queue):
# pass celery_app if needed
return Celery_Step_Executor(section, queue)

class Interactive_Executor(Base_Executor):
'''Interactive executor called from by iPython Jupyter or Spyder'''
def __init__(self):
# we actually do not have our own workflow, everything is passed from ipython
Base_Executor.__init__(self, new_dict=False)
# by nested = True we actually mean no new dictionary
Base_Executor.__init__(self, nested=True)

def parse_command_line(self, command_line):
parser = argparse.ArgumentParser()
Expand Down
87 changes: 83 additions & 4 deletions pysos/sos_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import copy
import glob
import fnmatch
import multiprocessing as mp
from multiprocessing.pool import AsyncResult

from io import StringIO
from tokenize import generate_tokens
Expand Down Expand Up @@ -803,14 +801,14 @@ def log(self, stage=0, msg=None):
if stage == 'start':
env.logger.trace('Preparing ``{}_{}``: {}'.format(self.step.name, self.step.index, self.step.comment.strip()))

class Run_Step_Executor(Queued_Step_Executor):
class SP_Step_Executor(Queued_Step_Executor):
'''Single process step executor'''
def __init__(self, step, queue):
env.run_mode = 'run'
if hasattr(env, 'accessed_vars'):
delattr(env, 'accessed_vars')
Queued_Step_Executor.__init__(self, step, queue)
self.step_tokens = self.get_tokens()
self.pool = None

def get_tokens(self):
'''Get tokens after input statement'''
Expand Down Expand Up @@ -906,8 +904,15 @@ def verify_output(self):
raise RuntimeError('Output target {} does not exist after the completion of step {}'
.format(target, env.sos_dict['step_name']))


class MP_Step_Executor(SP_Step_Executor):
def __init__(self, step, queue):
SP_Step_Executor.__init__(self, step, queue)
self.pool = None

def submit_task(self):
# if concurrent is set, create a pool object
import multiprocessing as mp
if self.pool is None and env.max_jobs > 1 and len(self._groups) > 1 and \
'concurrent' in env.sos_dict['_runtime'] and env.sos_dict['_runtime']['concurrent']:
self.pool = mp.Pool(min(env.max_jobs, len(self._groups)))
Expand Down Expand Up @@ -935,6 +940,7 @@ def submit_task(self):
))

def wait_for_results(self):
from multiprocessing.pool import AsyncResult
if self.pool is None:
return
try:
Expand All @@ -952,6 +958,79 @@ def wait_for_results(self):
self.pool.close()
self.pool.join()


class RQ_Step_Executor(SP_Step_Executor):
#
# This is not working yet
def __init__(self, step, queue, redis_queuq):
SP_Step_Executor.__init__(self, step, queue)
self.redis_queue = redis_queue

def submit_task(self):
self.proc_results.append( self.redis_queue.enqueue(
execute_task, # function
self.step.task, # task
self.step.global_def, # global process
# if pool, it must not be in prepare mode and have
# __signature_vars__
env.sos_dict.clone_selected_vars(env.sos_dict['__signature_vars__'] \
| {'_input', '_output', '_depends', 'input', 'output', 'depends', '_idx'}),
self.step.sigil
) )

def wait_for_results(self):
while True:
# wait for results
try:
if any(x.result is None for x in self.proc_results):
time.sleep(1)
else:
return
env.logger.error('{}/{} completed'.format(len([x for x in self.proc_results if x.result is not None]), len(self.proc_results)))
except KeyboardInterrupt:
# if keyboard interrupt
raise RuntimeError('KeyboardInterrupt fro m {} (master)'.format(os.getpid()))
except Exception as e:
# if keyboard interrupt etc
env.logger.error('Caught {}'.format(e))
raise

class Celery_Step_Executor(SP_Step_Executor):
def __init__(self, step, queue):
SP_Step_Executor.__init__(self, step, queue)

# make execute_task a celery task ...
from celery import shared_task
execute_task.celery = shared_task(execute_task)

def submit_task(self):
# if concurrent is set, create a pool object
self.proc_results.append(
execute_task.celery.delay( # function
self.step.task, # task
self.step.global_def, # global process
env.sos_dict.clone_selected_vars(env.sos_dict['__signature_vars__'] \
| {'_input', '_output', '_depends', 'input', 'output', 'depends', '_idx'}),
self.step.sigil
) )

def wait_for_results(self):
while True:
# wait for results
try:
if any(not x.ready() for x in self.proc_results):
time.sleep(1)
else:
break
env.logger.error('{}/{} completed'.format(len([x for x in self.proc_results if x.ready()]), len(self.proc_results)))
except KeyboardInterrupt:
# if keyboard interrupt
raise RuntimeError('KeyboardInterrupt fro m {} (master)'.format(os.getpid()))
except Exception as e:
# if keyboard interrupt etc
env.logger.error('Caught {}'.format(e))
raise

class Interactive_Step_Executor(Base_Step_Executor):
def __init__(self, step):
env.run_mode = 'interactive'
Expand Down

0 comments on commit f69c72f

Please sign in to comment.