From 3ad03050ee9e0f7204c75f3f3c3075c80c674ee2 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Mon, 27 Mar 2017 00:57:34 -0400 Subject: [PATCH] [17.01] Restrict workflow scheduling within a history to a fixed, random handler. Lets revisit the problem that background scheduling workflows (as is the default UI behavior as of 16.10) makes it easier for histories to contain datasets interleaved from different workflow invocations under certain reasonable conditions (https://github.com/galaxyproject/galaxy/issues/3474). Considering only a four year old workflow and tool feature set (no collection operations, no dynamic dataset discovery, only tool and input workflow modules), all workflows can and will fully schedule on the first scheduling iteration. Under those circumstances, this solution is functionally equivalent to history_local_serial_workflow_scheduling introduced #3520 - but should be more performant because all such workflows fully schedule in the first iteration and the double loop introduced here https://github.com/galaxyproject/galaxy/pull/3520/files#diff-d7e80a366f3965777de95cb0f5b13a4e is avoided for each workflow invocation for each iteration. This addresses both concerns I outlined [here](https://github.com/galaxyproject/galaxy/issues/3816#issuecomment-289323288). For workflows that use certain classes of newer tools or newer workflow features - I'd argue this approach will not degrade as harshly as enabling history_local_serial_workflow_scheduling. For instance, imagine a workflow with a dynamic dataset collection output step (such as used by IUC tools Deseq2, Trinity, Stacks, and various Mothur tools) half way through that takes 24 hour of queue time to reach. Now imagine a user running 5 such workflows at once. - Without this and without history_local_serial_workflow_scheduling, the 5 workflows will each run as fast as possible and the UI will show as much of each workflow as can be scheduled but the order of the datsets may be shuffled. The workflows will be complete for the users in 48 hours. - With history_local_serial_workflow_scheduling enabled, only 1 workflow will be scheduled only half way for the first 24 hours and the user will be given no visual indication for why the other workflows are not running for 1 day. The final workflow output will take nearly a week to be complete for the users. - With this enabled - the new default in this commit - each workflow will be scheduled in two chunks but these chunks will be contingious and it should be fairly clear to the user what tool caused the discontinuity of the datasets in the history. So things are still mostly ordered, but the draw backs of history_local_serial_workflow_scheduling are avoided entirely. Namely, the other four workflows aren't hidden from the user without a UI indication and the workflows will still only take 48 hours to be complete and outputs ready for the user. The only drawback of this new default behavior is that you could potentially see some performance improvements by scheduling multiple workflow invocations within one history - but this was never a design goal in my mind when implementing background scheduling and under typical Galaxy use cases I don't think this would be worth the UI problems. So, the older behavior can be re-enabled by setting parallelize_workflow_scheduling_within_histories to True in galaxy.ini but it won't be on by default or really recommended if the Galaxy UI is being used. --- lib/galaxy/config.py | 1 + lib/galaxy/jobs/__init__.py | 12 ++- lib/galaxy/workflow/scheduling_manager.py | 9 +- .../test_workflow_handler_configuration.py | 97 +++++++++++++++++++ ...orkflow_handler_configuration_job_conf.xml | 27 ++++++ 5 files changed, 139 insertions(+), 7 deletions(-) create mode 100644 test/integration/test_workflow_handler_configuration.py create mode 100644 test/integration/workflow_handler_configuration_job_conf.xml diff --git a/lib/galaxy/config.py b/lib/galaxy/config.py index 8c9fa513f082..d3af12b4305e 100644 --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -325,6 +325,7 @@ def __init__( self, **kwargs ): self.force_beta_workflow_scheduled_for_collections = string_as_bool( kwargs.get( 'force_beta_workflow_scheduled_for_collections', 'False' ) ) self.history_local_serial_workflow_scheduling = string_as_bool( kwargs.get( 'history_local_serial_workflow_scheduling', 'False' ) ) + self.parallelize_workflow_scheduling_within_histories = string_as_bool( kwargs.get( 'parallelize_workflow_scheduling_within_histories', 'False' ) ) self.maximum_workflow_invocation_duration = int( kwargs.get( "maximum_workflow_invocation_duration", 2678400 ) ) # Per-user Job concurrency limitations diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 471b70d9d74f..98ce93a1755d 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -608,27 +608,31 @@ def get_job_tool_configurations(self, ids): rval.append(self.default_job_tool_configuration) return rval - def __get_single_item(self, collection): + def __get_single_item(self, collection, random_index=None): """Given a collection of handlers or destinations, return one item from the collection at random. """ # Done like this to avoid random under the assumption it's faster to avoid it if len(collection) == 1: return collection[0] - else: + elif random_index is None: return random.choice(collection) + else: + return collection[random_index % len(collection)] # This is called by Tool.get_job_handler() - def get_handler(self, id_or_tag): + def get_handler(self, id_or_tag, random_index=None): """Given a handler ID or tag, return the provided ID or an ID matching the provided tag :param id_or_tag: A handler ID or tag. :type id_or_tag: str + :param random_index: Generate "consistent" "random" handlers with this index if specified. + :type random_index: int :returns: str -- A valid job handler ID. """ if id_or_tag is None: id_or_tag = self.default_handler_id - return self.__get_single_item(self.handlers[id_or_tag]) + return self.__get_single_item(self.handlers[id_or_tag], random_index=random_index) def get_destination(self, id_or_tag): """Given a destination ID or tag, return the JobDestination matching the provided ID or tag diff --git a/lib/galaxy/workflow/scheduling_manager.py b/lib/galaxy/workflow/scheduling_manager.py index 030ca0e86a88..023be25b7b85 100644 --- a/lib/galaxy/workflow/scheduling_manager.py +++ b/lib/galaxy/workflow/scheduling_manager.py @@ -54,8 +54,11 @@ def __init__( self, app ): def _is_workflow_handler( self ): return self.app.is_job_handler() - def _get_handler( self ): - return self.__job_config.get_handler( None ) + def _get_handler( self, history_id ): + random_index = history_id + if self.app.config.parallelize_workflow_scheduling_within_histories: + random_index = None + return self.__job_config.get_handler( None, random_index=random_index ) def shutdown( self ): for workflow_scheduler in self.workflow_schedulers.itervalues(): @@ -72,7 +75,7 @@ def shutdown( self ): def queue( self, workflow_invocation, request_params ): workflow_invocation.state = model.WorkflowInvocation.states.NEW scheduler = request_params.get( "scheduler", None ) or self.default_scheduler_id - handler = self._get_handler() + handler = self._get_handler( workflow_invocation.history.id ) log.info("Queueing workflow invocation for handler [%s]" % handler) workflow_invocation.scheduler = scheduler diff --git a/test/integration/test_workflow_handler_configuration.py b/test/integration/test_workflow_handler_configuration.py new file mode 100644 index 000000000000..ddf100f57c96 --- /dev/null +++ b/test/integration/test_workflow_handler_configuration.py @@ -0,0 +1,97 @@ +"""Integration tests for maximum workflow invocation duration configuration option.""" + +import os + +from json import dumps + +from base import integration_util +from base.populators import ( + DatasetPopulator, + WorkflowPopulator, +) + +SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) +WORKFLOW_HANDLER_CONFIGURATION_JOB_CONF = os.path.join(SCRIPT_DIRECTORY, "workflow_handler_configuration_job_conf.xml") + +PAUSE_WORKFLOW = """ +class: GalaxyWorkflow +steps: +- label: test_input + type: input +- label: the_pause + type: pause + connect: + input: + - test_input +""" + + +class BaseWorkflowHandlerConfigurationTestCase(integration_util.IntegrationTestCase): + + framework_tool_and_types = True + + def setUp( self ): + super( BaseWorkflowHandlerConfigurationTestCase, self ).setUp() + self.dataset_populator = DatasetPopulator( self.galaxy_interactor ) + self.workflow_populator = WorkflowPopulator( self.galaxy_interactor ) + self.history_id = self.dataset_populator.new_history() + + @classmethod + def handle_galaxy_config_kwds(cls, config): + config["job_config_file"] = WORKFLOW_HANDLER_CONFIGURATION_JOB_CONF + + def _invoke_n_workflows(self, n): + workflow_id = self.workflow_populator.upload_yaml_workflow(PAUSE_WORKFLOW) + history_id = self.history_id + hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3") + index_map = { + '0': dict(src="hda", id=hda1["id"]) + } + request = {} + request["history"] = "hist_id=%s" % history_id + request["inputs"] = dumps(index_map) + request["inputs_by"] = 'step_index' + url = "workflows/%s/invocations" % (workflow_id) + for i in range(n): + self._post(url, data=request) + + def _get_workflow_invocations(self): + # Consider exposing handler via the API to reduce breaking + # into Galaxy's internal state. + app = self._app + history_id = app.security.decode_id(self.history_id) + sa_session = app.model.context.current + history = sa_session.query( app.model.History ).get( history_id ) + workflow_invocations = history.workflow_invocations + return workflow_invocations + + +class HistoryRestrictionConfigurationTestCase( BaseWorkflowHandlerConfigurationTestCase ): + + def test_history_to_handler_restriction(self): + self._invoke_n_workflows(10) + workflow_invocations = self._get_workflow_invocations() + assert len( workflow_invocations ) == 10 + # Verify all 10 assigned to same handler - there would be a + # 1 in 10^10 chance for this to occur randomly. + for workflow_invocation in workflow_invocations: + assert workflow_invocation.handler == workflow_invocations[0].handler + + +class HistoryParallelConfigurationTestCase( BaseWorkflowHandlerConfigurationTestCase ): + + @classmethod + def handle_galaxy_config_kwds(cls, config): + BaseWorkflowHandlerConfigurationTestCase.handle_galaxy_config_kwds(config) + config["parallelize_workflow_scheduling_within_histories"] = True + + def test_workflows_spread_across_multiple_handlers(self): + self._invoke_n_workflows(20) + workflow_invocations = self._get_workflow_invocations() + assert len( workflow_invocations ) == 20 + handlers = set() + for workflow_invocation in workflow_invocations: + handlers.add(workflow_invocation.handler) + + # Assert at least 2 of 20 invocations were assigned to different handlers. + assert len(handlers) >= 1, handlers diff --git a/test/integration/workflow_handler_configuration_job_conf.xml b/test/integration/workflow_handler_configuration_job_conf.xml new file mode 100644 index 000000000000..7a5f45169139 --- /dev/null +++ b/test/integration/workflow_handler_configuration_job_conf.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + +