Skip to content

Commit

Permalink
[17.01] By default, do not allow workflow invocations to schedule ind…
Browse files Browse the repository at this point in the history
…efinitely.

Give up after a month but allow admins to reduce this amount as well.
  • Loading branch information
jmchilton committed Mar 27, 2017
1 parent 65cbdde commit b33b3c1
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 18 deletions.
4 changes: 4 additions & 0 deletions config/galaxy.ini.sample
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,10 @@ use_interactive = True
# collections.
#force_beta_workflow_scheduled_for_collections=False

# This is the maximum amount of time a workflow invocation may stay in an active
# scheduling state in seconds. Set to -1 to disable this maximum and allow any workflow
# invocation to schedule indefinitely. The default corresponds to 1 month.
#maximum_workflow_invocation_duration = 2678400

# Force serial scheduling of workflows within the context of a particular history
#history_local_serial_workflow_scheduling=False
Expand Down
1 change: 1 addition & 0 deletions lib/galaxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def __init__( self, **kwargs ):
self.force_beta_workflow_scheduled_for_collections = string_as_bool( kwargs.get( 'force_beta_workflow_scheduled_for_collections', 'False' ) )

self.history_local_serial_workflow_scheduling = string_as_bool( kwargs.get( 'history_local_serial_workflow_scheduling', 'False' ) )
self.maximum_workflow_invocation_duration = int( kwargs.get( "maximum_workflow_invocation_duration", 2678400 ) )

# Per-user Job concurrency limitations
self.cache_user_job_count = string_as_bool( kwargs.get( 'cache_user_job_count', False ) )
Expand Down
5 changes: 5 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4063,6 +4063,11 @@ def has_input_for_step( self, step_id ):
return True
return False

@property
def seconds_since_created( self ):
create_time = self.create_time or galaxy.model.orm.now.now() # In case not flushed yet
return (galaxy.model.orm.now.now() - create_time).total_seconds()


class WorkflowInvocationToSubworkflowInvocationAssociation( object, Dictifiable ):
dict_collection_visible_keys = ( 'id', 'workflow_step_id', 'workflow_invocation_id', 'subworkflow_invocation_id' )
Expand Down
12 changes: 12 additions & 0 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,18 @@ def __init__( self, trans, workflow, workflow_run_config, workflow_invocation=No

def invoke( self ):
workflow_invocation = self.workflow_invocation
maximum_duration = getattr( self.trans.app.config, "maximum_workflow_invocation_duration", -1 )
log.debug("Workflow invocation [%s] exceeded maximum number of seconds allowed for scheduling [%s], failing." % (workflow_invocation.id, maximum_duration))
if maximum_duration > 0 and workflow_invocation.seconds_since_created > maximum_duration:
log.debug("Workflow invocation [%s] exceeded maximum number of seconds allowed for scheduling [%s], failing." % (workflow_invocation.id, maximum_duration))
workflow_invocation.state = model.WorkflowInvocation.states.FAILED
# All jobs ran successfully, so we can save now
self.trans.sa_session.add( workflow_invocation )

# Not flushing in here, because web controller may create multiple
# invocations.
return self.progress.outputs

remaining_steps = self.progress.remaining_steps()
delayed_steps = False
for step in remaining_steps:
Expand Down
20 changes: 4 additions & 16 deletions test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
from galaxy.exceptions import error_codes
from galaxy.tools.verify.test_data import TestDataResolver

from base.workflows_format_2 import (
convert_and_import_workflow,
ImporterGalaxyInterface,
)

SIMPLE_NESTED_WORKFLOW_YAML = """
class: GalaxyWorkflow
Expand Down Expand Up @@ -69,7 +65,7 @@
"""


class BaseWorkflowsApiTestCase( api.ApiTestCase, ImporterGalaxyInterface ):
class BaseWorkflowsApiTestCase( api.ApiTestCase ):
# TODO: Find a new file for this class.

def setUp( self ):
Expand All @@ -88,20 +84,12 @@ def _workflow_names( self ):
names = [w[ "name" ] for w in index_response.json()]
return names

# Import importer interface...
def import_workflow(self, workflow, **kwds):
workflow_str = dumps(workflow, indent=4)
data = {
'workflow': workflow_str,
}
data.update(**kwds)
upload_response = self._post( "workflows", data=data )
self._assert_status_code_is( upload_response, 200 )
return upload_response.json()
upload_response = self.workflow_populator.import_workflow(workflow, **kwds)
return upload_response

def _upload_yaml_workflow(self, has_yaml, **kwds):
workflow = convert_and_import_workflow(has_yaml, galaxy_interface=self, **kwds)
return workflow[ "id" ]
return self.workflow_populator.upload_yaml_workflow(has_yaml, **kwds)

def _setup_workflow_run( self, workflow, inputs_by='step_id', history_id=None ):
uploaded_workflow_id = self.workflow_populator.create_workflow( workflow )
Expand Down
1 change: 1 addition & 0 deletions test/api/test_workflows_from_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ def test_implicit_connections( self ):

def _steps_by_label(self, workflow_as_dict):
by_label = {}
assert "steps" in workflow_as_dict, workflow_as_dict
for step in workflow_as_dict["steps"].values():
by_label[step['label']] = step
return by_label
2 changes: 1 addition & 1 deletion test/base/data/test_workflow_pause.ga
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
},
"post_job_actions": {},
"tool_errors": null,
"tool_id": "cat1",
"tool_id": "cat",
"tool_state": "{\"__page__\": 0, \"__rerun_remap_job_id__\": null, \"input1\": \"null\", \"queries\": \"[]\"}",
"tool_version": "1.0.0",
"type": "tool",
Expand Down
22 changes: 21 additions & 1 deletion test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
from six import StringIO

from base import api_asserts
from base.workflows_format_2 import (
convert_and_import_workflow,
ImporterGalaxyInterface,
)

# Simple workflow that takes an input and call cat wrapper on it.
workflow_str = resource_string( __name__, "data/test_workflow_1.ga" )
Expand Down Expand Up @@ -253,6 +257,10 @@ def create_workflow_response( self, workflow, **create_kwds ):
upload_response = self._post( "workflows/upload", data=data )
return upload_response

def upload_yaml_workflow(self, has_yaml, **kwds):
workflow = convert_and_import_workflow(has_yaml, galaxy_interface=self, **kwds)
return workflow[ "id" ]

def wait_for_invocation( self, workflow_id, invocation_id, timeout=DEFAULT_TIMEOUT ):
url = "workflows/%s/usage/%s" % ( workflow_id, invocation_id )
return wait_on_state( lambda: self._get( url ), timeout=timeout )
Expand All @@ -264,7 +272,7 @@ def wait_for_workflow( self, workflow_id, invocation_id, history_id, assert_ok=T
self.dataset_populator.wait_for_history( history_id, assert_ok=assert_ok, timeout=timeout )


class WorkflowPopulator( BaseWorkflowPopulator ):
class WorkflowPopulator( BaseWorkflowPopulator, ImporterGalaxyInterface ):

def __init__( self, galaxy_interactor ):
self.galaxy_interactor = galaxy_interactor
Expand All @@ -276,6 +284,18 @@ def _post( self, route, data={} ):
def _get( self, route ):
return self.galaxy_interactor.get( route )

# Required for ImporterGalaxyInterface interface - so we can recurisvely import
# nested workflows.
def import_workflow(self, workflow, **kwds):
workflow_str = json.dumps(workflow, indent=4)
data = {
'workflow': workflow_str,
}
data.update(**kwds)
upload_response = self._post( "workflows", data=data )
assert upload_response.status_code == 200, upload_response
return upload_response.json()


class LibraryPopulator( object ):

Expand Down
48 changes: 48 additions & 0 deletions test/integration/test_maximum_worklfow_invocation_duration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Integration tests for maximum workflow invocation duration configuration option."""

import time

from json import dumps

from base import integration_util
from base.populators import (
DatasetPopulator,
WorkflowPopulator,
)


class MaximumWorkflowInvocationDurationTestCase(integration_util.IntegrationTestCase):
"""Start a Pulsar job."""

framework_tool_and_types = True

def setUp( self ):
super( MaximumWorkflowInvocationDurationTestCase, self ).setUp()
self.dataset_populator = DatasetPopulator( self.galaxy_interactor )
self.workflow_populator = WorkflowPopulator( self.galaxy_interactor )

@classmethod
def handle_galaxy_config_kwds(cls, config):
config["maximum_workflow_invocation_duration"] = 20

def do_test(self):
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_pause")
workflow_id = self.workflow_populator.create_workflow(workflow)
history_id = self.dataset_populator.new_history()
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3")
index_map = {
'0': dict(src="hda", id=hda1["id"])
}
request = {}
request["history"] = "hist_id=%s" % history_id
request[ "inputs" ] = dumps(index_map)
request[ "inputs_by" ] = 'step_index'
url = "workflows/%s/invocations" % (workflow_id)
invocation_response = self._post(url, data=request)
invocation_url = url + "/" + invocation_response.json()["id"]
time.sleep(5)
state = self._get(invocation_url).json()["state"]
assert state != "failed", state
time.sleep(35)
state = self._get(invocation_url).json()["state"]
assert state == "failed", state

0 comments on commit b33b3c1

Please sign in to comment.