Skip to content

Commit

Permalink
Utilize Executor API with run_flow and run_flow_selective
Browse files Browse the repository at this point in the history
  • Loading branch information
fridex committed Dec 10, 2017
1 parent bea77df commit 91dc60e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 43 deletions.
33 changes: 17 additions & 16 deletions selinon-cli
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ class Command(object):
help='path to file where should be generated config.py placed')
parser.add_argument('--keep-config-py', action='store_true', dest='keep_config_py',
help='do not remove config.py file after run')
parser.add_argument('--hide-progressbar', action='store_true', dest='hide_progressbar', default=False,
help='hide progressbar during execution')

parser.add_argument('--selective-task-names', action='store', dest='selective_task_names', type=str, nargs='+',
help='a list of tasks that should be run when run a selective flow')
Expand Down Expand Up @@ -319,27 +321,26 @@ class Command(object):
if args.concurrency != 1:
raise NotImplementedError("Concurrency is currently not implemented")

opts = {
'concurrency': args.concurrency,
'sleep_time': args.sleep_time,
'config_py': args.config_py,
'keep_config_py': args.keep_config_py,
}

if args.selective_task_names:
opts['selective'] = {
'task_names': args.selective_task_names,
'follow_subflows': args.selective_follow_subflows,
'run_subsequent': args.selective_run_subsequent
}
else:
executor = Executor(args.nodes_definition, args.flow_definitions,
concurrency=args.concurrency, sleep_time=args.sleep_time,
config_py=args.config_py, keep_config_py=args.keep_config_py,
show_progressbar=not args.hide_progressbar)

if not args.selective_task_names:
if args.selective_follow_subflows:
raise RequestError("Option --selective-follow-subflows requires --selective-task-names set")
if args.selective_run_subsequent:
raise RequestError("Option --selective-run-subsequent requires --selective-task-names set")

executor = Executor(args.nodes_definition, args.flow_definitions, **opts)
executor.run(args.flow_name, node_args)
executor.run_flow_selective(
args.flow_name,
args.selective_task_names,
node_args,
follow_subflows=args.follow_subflows,
run_subsequent=args.selective_run_subsequent
)
else:
executor.run(args.flow_name, node_args)

return 0

Expand Down
88 changes: 61 additions & 27 deletions selinon/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,48 +56,82 @@ class Executor(object):
executor_queues = QueuePool()
_logger = logging.getLogger(__name__)

def __init__(self, nodes_definition, flow_definitions, **opts):
DEFAULT_SLEEP_TIME = 1
DEFAULT_CONCURRENCY = 1

def __init__(self, nodes_definition, flow_definitions,
concurrency=DEFAULT_CONCURRENCY, sleep_time=DEFAULT_SLEEP_TIME,
config_py=None, keep_config_py=False, show_progressbar=True):
"""Instantiate execute.
:param nodes_definition: path to nodes.yaml file
:type nodes_definition: str
:param flow_definitions: a list of YAML files describing flows
:param opts: additional executor options, supported: concurrency, config_py path, sleep_time, keep_config_py
:type flow_definitions: list
:param concurrency: executor concurrency
:type concurrency: int
:param sleep_time: number of seconds to wait before querying queue
:type sleep_time: float
:param config_py: a path to file where Python code configuration should be generated
:type config_py: str
:param keep_config_py: if true, do not delete generated config.py
:type keep_config_py: bool
:param show_progressbar: show progressbar on executor run
:type show_progressbar: bool
"""
Config.set_config_yaml(nodes_definition, flow_definitions,
config_py=opts.pop('config_py', None),
keep_config_py=opts.pop('keep_config_py', False))

self.concurrency = opts.pop('concurrency', 1)
self.sleep_time = opts.pop('sleep_time', 1)
self.selective = opts.pop('selective', None)
config_py=config_py,
keep_config_py=keep_config_py)

if opts:
raise UnknownError("Unknown options supplied: %s" % opts)
self.concurrency = concurrency
self.sleep_time = sleep_time
self.show_progressbar = show_progressbar

def run(self, flow_name, node_args=None):
"""Run executor.
if concurrency != 1:
raise NotImplementedError("Concurrency is now unsupported")

:param flow_name: a flow name that should be run
:param node_args: arguments for the flow
"""
@staticmethod
def _prepare():
"""Prepare Selinon for executor run."""
# We need to assign a custom async result as we are not running Celery but our mocks instead
flexmock(SystemState, _get_async_result=SimulateAsyncResult)
# Overwrite used Celery functions so we do not rely on Celery logic at all
CeleryTask.apply_async = simulate_apply_async
CeleryTask.retry = simulate_retry

# Let's schedule the flow - our first starting task - task will be placed onto queue - see simulate_apply_async
if self.selective:
run_flow_selective(
flow_name,
self.selective['task_names'],
node_args,
self.selective['follow_subflows'],
self.selective['run_subsequent']
)
else:
run_flow(flow_name, node_args)
def run(self, flow_name, node_args=None):
"""Run executor.
:param flow_name: a flow name that should be run
:param node_args: arguments for the flow
"""
self._prepare()
run_flow(flow_name, node_args)
self._executor_run()

def run_flow_selective(self, flow_name, task_names, node_args, follow_subflows=False, run_subsequent=False):
"""Run only desired tasks in a flow.
:param flow_name: name of the flow that should be run
:param task_names: name of the tasks that should be run
:param node_args: arguments that should be supplied to flow
:param follow_subflows: if True, subflows will be followed and checked for nodes to be run
:param run_subsequent: trigger run of all tasks that depend on the desired task
:return: dispatcher id that is scheduled to run desired selective task flow
:raises selinon.errors.SelectiveNoPathError: there was no way found to the desired task in the flow
"""
self._prepare()
run_flow_selective(
flow_name,
task_names,
node_args,
follow_subflows,
run_subsequent
)
self._executor_run()

def _executor_run(self):
"""Perform task execution based on published message on queue."""
while not self.executor_queues.is_empty():
# TODO: concurrency
self._logger.debug("new executor run")
Expand All @@ -111,7 +145,7 @@ def run(self, flow_name, node_args=None):
Progress.sleep(wait_time=wait_time,
sleep_time=self.sleep_time,
info_text='Waiting for next task to process (%s seconds)... ' % round(wait_time, 3),
show_progressbar=self.concurrency == 1)
show_progressbar=self.show_progressbar and self.concurrency == 1)
try:
kwargs = celery_kwargs.get('kwargs')
# remove additional metadata placed by Selinon when doing tracing
Expand Down

0 comments on commit 91dc60e

Please sign in to comment.