Skip to content

Commit

Permalink
[17.01] Restrict workflow scheduling within a history to a fixed, ran…
Browse files Browse the repository at this point in the history
…dom handler.

Lets revisit the problem that background scheduling workflows (as is the default UI behavior as of 16.10) makes it easier for histories to contain datasets interleaved from different workflow invocations under certain reasonable conditions (galaxyproject#3474).

Considering only a four year old workflow and tool feature set (no collection operations, no dynamic dataset discovery, only tool and input workflow modules), all workflows can and will fully schedule on the first scheduling iteration. Under those circumstances, this solution is functionally equivalent to history_local_serial_workflow_scheduling introduced galaxyproject#3520 - but should be more performant because all such workflows fully schedule in the first iteration and the double loop introduced here https://github.com/galaxyproject/galaxy/pull/3520/files#diff-d7e80a366f3965777de95cb0f5b13a4e is avoided for each workflow invocation for each iteration. This addresses both concerns I outlined [here](galaxyproject#3816 (comment)).

For workflows that use certain classes of newer tools or newer workflow features - I'd argue this approach will not degrade as harshly as enabling history_local_serial_workflow_scheduling.

For instance, imagine a workflow with a dynamic dataset collection output step (such as used by IUC tools Deseq2, Trinity, Stacks, and various Mothur tools) half way through that takes 24 hour of queue time to reach. Now imagine a user running 5 such workflows at once.

- Without this and without history_local_serial_workflow_scheduling, the 5 workflows will each run as fast as possible and the UI will show as much of each workflow as can be scheduled but the order of the datsets may be shuffled. The workflows will be complete for the users in 48 hours.
- With history_local_serial_workflow_scheduling enabled, only 1 workflow will be scheduled only half way for the first 24 hours and the user will be given no visual indication for why the other workflows are not running for 1 day. The final workflow output will take nearly a week to be complete for the users.
- With this enabled - the new default in this commit - each workflow will be scheduled in two chunks but these chunks will be contingious and it should be fairly clear to the user what tool caused the discontinuity of the datasets in the history. So things are still mostly ordered, but the draw backs of history_local_serial_workflow_scheduling are avoided entirely. Namely, the other four workflows aren't hidden from the user without a UI indication and the workflows will still only take 48 hours to be complete and outputs ready for the user.

The only drawback of this new default behavior is that you could potentially see some performance improvements by scheduling multiple workflow invocations within one history - but this was never a design goal in my mind when implementing background scheduling and under typical Galaxy use cases I don't think this would be worth the UI problems. So, the older behavior can be re-enabled by setting parallelize_workflow_scheduling_within_histories to True in galaxy.ini but it won't be on by default or really recommended if the Galaxy UI is being used.
  • Loading branch information
jmchilton committed Mar 27, 2017
1 parent b33b3c1 commit 3ad0305
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 7 deletions.
1 change: 1 addition & 0 deletions lib/galaxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def __init__( self, **kwargs ):
self.force_beta_workflow_scheduled_for_collections = string_as_bool( kwargs.get( 'force_beta_workflow_scheduled_for_collections', 'False' ) )

self.history_local_serial_workflow_scheduling = string_as_bool( kwargs.get( 'history_local_serial_workflow_scheduling', 'False' ) )
self.parallelize_workflow_scheduling_within_histories = string_as_bool( kwargs.get( 'parallelize_workflow_scheduling_within_histories', 'False' ) )
self.maximum_workflow_invocation_duration = int( kwargs.get( "maximum_workflow_invocation_duration", 2678400 ) )

# Per-user Job concurrency limitations
Expand Down
12 changes: 8 additions & 4 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,27 +608,31 @@ def get_job_tool_configurations(self, ids):
rval.append(self.default_job_tool_configuration)
return rval

def __get_single_item(self, collection):
def __get_single_item(self, collection, random_index=None):
"""Given a collection of handlers or destinations, return one item from the collection at random.
"""
# Done like this to avoid random under the assumption it's faster to avoid it
if len(collection) == 1:
return collection[0]
else:
elif random_index is None:
return random.choice(collection)
else:
return collection[random_index % len(collection)]

# This is called by Tool.get_job_handler()
def get_handler(self, id_or_tag):
def get_handler(self, id_or_tag, random_index=None):
"""Given a handler ID or tag, return the provided ID or an ID matching the provided tag
:param id_or_tag: A handler ID or tag.
:type id_or_tag: str
:param random_index: Generate "consistent" "random" handlers with this index if specified.
:type random_index: int
:returns: str -- A valid job handler ID.
"""
if id_or_tag is None:
id_or_tag = self.default_handler_id
return self.__get_single_item(self.handlers[id_or_tag])
return self.__get_single_item(self.handlers[id_or_tag], random_index=random_index)

def get_destination(self, id_or_tag):
"""Given a destination ID or tag, return the JobDestination matching the provided ID or tag
Expand Down
9 changes: 6 additions & 3 deletions lib/galaxy/workflow/scheduling_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ def __init__( self, app ):
def _is_workflow_handler( self ):
return self.app.is_job_handler()

def _get_handler( self ):
return self.__job_config.get_handler( None )
def _get_handler( self, history_id ):
random_index = history_id
if self.app.config.parallelize_workflow_scheduling_within_histories:
random_index = None
return self.__job_config.get_handler( None, random_index=random_index )

def shutdown( self ):
for workflow_scheduler in self.workflow_schedulers.itervalues():
Expand All @@ -72,7 +75,7 @@ def shutdown( self ):
def queue( self, workflow_invocation, request_params ):
workflow_invocation.state = model.WorkflowInvocation.states.NEW
scheduler = request_params.get( "scheduler", None ) or self.default_scheduler_id
handler = self._get_handler()
handler = self._get_handler( workflow_invocation.history.id )
log.info("Queueing workflow invocation for handler [%s]" % handler)

workflow_invocation.scheduler = scheduler
Expand Down
97 changes: 97 additions & 0 deletions test/integration/test_workflow_handler_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Integration tests for maximum workflow invocation duration configuration option."""

import os

from json import dumps

from base import integration_util
from base.populators import (
DatasetPopulator,
WorkflowPopulator,
)

SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
WORKFLOW_HANDLER_CONFIGURATION_JOB_CONF = os.path.join(SCRIPT_DIRECTORY, "workflow_handler_configuration_job_conf.xml")

PAUSE_WORKFLOW = """
class: GalaxyWorkflow
steps:
- label: test_input
type: input
- label: the_pause
type: pause
connect:
input:
- test_input
"""


class BaseWorkflowHandlerConfigurationTestCase(integration_util.IntegrationTestCase):

framework_tool_and_types = True

def setUp( self ):
super( BaseWorkflowHandlerConfigurationTestCase, self ).setUp()
self.dataset_populator = DatasetPopulator( self.galaxy_interactor )
self.workflow_populator = WorkflowPopulator( self.galaxy_interactor )
self.history_id = self.dataset_populator.new_history()

@classmethod
def handle_galaxy_config_kwds(cls, config):
config["job_config_file"] = WORKFLOW_HANDLER_CONFIGURATION_JOB_CONF

def _invoke_n_workflows(self, n):
workflow_id = self.workflow_populator.upload_yaml_workflow(PAUSE_WORKFLOW)
history_id = self.history_id
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3")
index_map = {
'0': dict(src="hda", id=hda1["id"])
}
request = {}
request["history"] = "hist_id=%s" % history_id
request["inputs"] = dumps(index_map)
request["inputs_by"] = 'step_index'
url = "workflows/%s/invocations" % (workflow_id)
for i in range(n):
self._post(url, data=request)

def _get_workflow_invocations(self):
# Consider exposing handler via the API to reduce breaking
# into Galaxy's internal state.
app = self._app
history_id = app.security.decode_id(self.history_id)
sa_session = app.model.context.current
history = sa_session.query( app.model.History ).get( history_id )
workflow_invocations = history.workflow_invocations
return workflow_invocations


class HistoryRestrictionConfigurationTestCase( BaseWorkflowHandlerConfigurationTestCase ):

def test_history_to_handler_restriction(self):
self._invoke_n_workflows(10)
workflow_invocations = self._get_workflow_invocations()
assert len( workflow_invocations ) == 10
# Verify all 10 assigned to same handler - there would be a
# 1 in 10^10 chance for this to occur randomly.
for workflow_invocation in workflow_invocations:
assert workflow_invocation.handler == workflow_invocations[0].handler


class HistoryParallelConfigurationTestCase( BaseWorkflowHandlerConfigurationTestCase ):

@classmethod
def handle_galaxy_config_kwds(cls, config):
BaseWorkflowHandlerConfigurationTestCase.handle_galaxy_config_kwds(config)
config["parallelize_workflow_scheduling_within_histories"] = True

def test_workflows_spread_across_multiple_handlers(self):
self._invoke_n_workflows(20)
workflow_invocations = self._get_workflow_invocations()
assert len( workflow_invocations ) == 20
handlers = set()
for workflow_invocation in workflow_invocations:
handlers.add(workflow_invocation.handler)

# Assert at least 2 of 20 invocations were assigned to different handlers.
assert len(handlers) >= 1, handlers
27 changes: 27 additions & 0 deletions test/integration/workflow_handler_configuration_job_conf.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?xml version="1.0"?>
<!--
job_conf used by test_workflow_handler_configuration.py
-->
<job_conf>
<plugins>
<plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner" workers="2"/>
</plugins>

<handlers default="handlers">
<handler id="handler0" tags="handlers"/>
<handler id="handler1" tags="handlers"/>
<handler id="handler2" tags="handlers" />
<handler id="handler3" tags="handlers" />
<handler id="handler4" tags="handlers" />
<handler id="handler5" tags="handlers" />
<handler id="handler6" tags="handlers" />
<handler id="handler7" tags="handlers" />
<handler id="handler8" tags="handlers" />
<handler id="handler9" tags="handlers" />
</handlers>

<destinations default="local">
<destination id="local" runner="local">
</destination>
</destinations>
</job_conf>

0 comments on commit 3ad0305

Please sign in to comment.