Skip to content

Commit

Permalink
Make more job options configurable at the destination level.
Browse files Browse the repository at this point in the history
Makes a host of job-related parameters configurable at the destination level instead of the of only at the app level. This solves major problems like allowing the local job runner and the drmma run as real-user runner to operate within in the same environment and provides little conveniences like allowing configuration of cleanup job at the destination level.
  • Loading branch information
jmchilton committed Feb 22, 2016
1 parent 0613e03 commit 81b5ca5
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 32 deletions.
49 changes: 38 additions & 11 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
# Override with config.default_job_shell.
DEFAULT_JOB_SHELL = '/bin/bash'

DEFAULT_CLEANUP_JOB = "always"


class JobDestination( Bunch ):
"""
Expand Down Expand Up @@ -772,12 +774,19 @@ def __init__( self, job, queue, use_persisted_destination=False ):
self.__galaxy_system_pwent = None

def _job_dataset_path_rewriter( self, working_directory ):
if self.app.config.outputs_to_working_directory:
outputs_to_working_directory = util.asbool(self.get_destination_configuration("outputs_to_working_directory", False))
if outputs_to_working_directory:
dataset_path_rewriter = OutputsToWorkingDirectoryPathRewriter( working_directory )
else:
dataset_path_rewriter = NullDatasetPathRewriter( )
return dataset_path_rewriter

@property
def cleanup_job(self):
""" Remove the job after it is complete, should return "always", "onsuccess", or "never".
"""
self.get_destination_configuration("cleanup_job", DEFAULT_CLEANUP_JOB)

def can_split( self ):
# Should the job handler split this job up?
return self.app.config.use_tasked_jobs and self.tool.parallelism
Expand Down Expand Up @@ -969,7 +978,9 @@ def fail( self, message, exception=False, stdout="", stderr="", exit_code=None )
# Get the exception and let the tool attempt to generate
# a better message
etype, evalue, tb = sys.exc_info()
if self.app.config.outputs_to_working_directory:

outputs_to_working_directory = util.asbool(self.get_destination_configuration("outputs_to_working_directory", False))
if outputs_to_working_directory:
for dataset_path in self.get_output_fnames():
try:
shutil.move( dataset_path.false_path, dataset_path.real_path )
Expand Down Expand Up @@ -1013,7 +1024,8 @@ def fail( self, message, exception=False, stdout="", stderr="", exit_code=None )
# If the job was deleted, call tool specific fail actions (used for e.g. external metadata) and clean up
if self.tool:
self.tool.job_failed( self, message, exception )
delete_files = self.app.config.cleanup_job == 'always' or (self.app.config.cleanup_job == 'onsuccess' and job.state == job.states.DELETED)
cleanup_job = self.cleanup_job
delete_files = cleanup_job == 'always' or (cleanup_job == 'onsuccess' and job.state == job.states.DELETED)
self.cleanup( delete_files=delete_files )

def pause( self, job=None, message=None ):
Expand Down Expand Up @@ -1093,6 +1105,14 @@ def set_job_destination( self, job_destination, external_id=None, flush=True, jo
if flush:
self.sa_session.flush()

def get_destination_configuration(self, key, default=None):
""" Get a destination parameter that can be defaulted back
in app.config if it needs to be applied globally.
"""
return self.get_job().get_destination_configuration(
self.app.config, key, default
)

def finish( self, stdout, stderr, tool_exit_code=None, remote_working_directory=None ):
"""
Called to indicate that the associated command has been run. Updates
Expand Down Expand Up @@ -1138,7 +1158,8 @@ def finish( self, stdout, stderr, tool_exit_code=None, remote_working_directory=
self.version_string = open(version_filename).read()
os.unlink(version_filename)

if self.app.config.outputs_to_working_directory and not self.__link_file_check():
outputs_to_working_directory = util.asbool(self.get_destination_configuration("outputs_to_working_directory", False))
if outputs_to_working_directory and not self.__link_file_check():
for dataset_path in self.get_output_fnames():
try:
shutil.move( dataset_path.false_path, dataset_path.real_path )
Expand Down Expand Up @@ -1204,7 +1225,8 @@ def finish( self, stdout, stderr, tool_exit_code=None, remote_working_directory=
# either use the metadata from originating output dataset, or call set_meta on the copies
# it would be quicker to just copy the metadata from the originating output dataset,
# but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta()
if ( self.app.config.retry_metadata_internally and
retry_internally = util.asbool(self.get_destination_configuration("retry_metadata_internally", True))
if ( retry_internally and
not self.external_output_metadata.external_metadata_set_successfully(dataset, self.sa_session ) ):
# If Galaxy was expected to sniff type and didn't - do so.
if dataset.ext == "_sniff_":
Expand Down Expand Up @@ -1360,7 +1382,8 @@ def path_rewriter( path ):
self._collect_metrics( job )
self.sa_session.flush()
log.debug( 'job %d ended (finish() executed in %s)' % (self.job_id, finish_timer) )
delete_files = self.app.config.cleanup_job == 'always' or ( job.state == job.states.OK and self.app.config.cleanup_job == 'onsuccess' )
cleanup_job = self.cleanup_job
delete_files = cleanup_job == 'always' or ( job.state == job.states.OK and cleanup_job == 'onsuccess' )
self.cleanup( delete_files=delete_files )

def check_tool_output( self, stdout, stderr, tool_exit_code, job ):
Expand Down Expand Up @@ -1528,7 +1551,8 @@ def get_output_file_id( self, file ):
if self.output_paths is None:
self.get_output_fnames()
for dp in self.output_paths:
if self.app.config.outputs_to_working_directory and os.path.basename( dp.false_path ) == file:
outputs_to_working_directory = util.asbool(self.get_destination_configuration("outputs_to_working_directory", False))
if outputs_to_working_directory and os.path.basename( dp.false_path ) == file:
return dp.dataset_id
elif os.path.basename( dp.real_path ) == file:
return dp.dataset_id
Expand Down Expand Up @@ -1649,7 +1673,8 @@ def __link_file_check( self ):
def _change_ownership( self, username, gid ):
job = self.get_job()
# FIXME: hardcoded path
cmd = [ '/usr/bin/sudo', '-E', self.app.config.external_chown_script, self.working_directory, username, str( gid ) ]
external_chown_script = self.get_destination_configuration("external_chown_script", None)
cmd = [ '/usr/bin/sudo', '-E', external_chown_script, self.working_directory, username, str( gid ) ]
log.debug( '(%s) Changing ownership of working directory with: %s' % ( job.id, ' '.join( cmd ) ) )
p = subprocess.Popen( cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
# TODO: log stdout/stderr
Expand All @@ -1658,7 +1683,8 @@ def _change_ownership( self, username, gid ):

def change_ownership_for_run( self ):
job = self.get_job()
if self.app.config.external_chown_script and job.user is not None:
external_chown_script = self.get_destination_configuration("external_chown_script", None)
if external_chown_script and job.user is not None:
try:
self._change_ownership( self.user_system_pwent[0], str( self.user_system_pwent[3] ) )
except:
Expand All @@ -1667,7 +1693,8 @@ def change_ownership_for_run( self ):

def reclaim_ownership( self ):
job = self.get_job()
if self.app.config.external_chown_script and job.user is not None:
external_chown_script = self.get_destination_configuration("external_chown_script", None)
if external_chown_script and job.user is not None:
self._change_ownership( self.galaxy_system_pwent[0], str( self.galaxy_system_pwent[3] ) )

@property
Expand Down Expand Up @@ -1832,7 +1859,7 @@ def finish( self, stdout, stderr, tool_exit_code=None ):
# if the job was deleted, don't finish it
if task.state == task.states.DELETED:
# Job was deleted by an administrator
delete_files = self.app.config.cleanup_job in ( 'always', 'onsuccess' )
delete_files = self.cleanup_job in ( 'always', 'onsuccess' )
self.cleanup( delete_files=delete_files )
return
elif task.state == task.states.ERROR:
Expand Down
5 changes: 3 additions & 2 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ def finish_job( self, job_state ):
exit_code = 0

# clean up the job files
if self.app.config.cleanup_job == "always" or ( not stderr and self.app.config.cleanup_job == "onsuccess" ):
cleanup_job = job_state.job_wrapper.cleanup_job
if cleanup_job == "always" or ( not stderr and cleanup_job == "onsuccess" ):
job_state.cleanup()

try:
Expand All @@ -600,7 +601,7 @@ def fail_job( self, job_state ):
# something necessary
if not job_state.runner_state_handled:
job_state.job_wrapper.fail( getattr( job_state, 'fail_message', 'Job failed' ) )
if self.app.config.cleanup_job == "always":
if job_state.job_wrapper.cleanup_job == "always":
job_state.cleanup()

def mark_as_finished(self, job_state):
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/jobs/runners/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def queue_job( self, job_wrapper ):
# job was deleted while we were preparing it
if job_wrapper.get_state() == model.Job.states.DELETED:
log.info("(%s) Job deleted by user before it entered the queue" % galaxy_id_tag )
if self.app.config.cleanup_job in ("always", "onsuccess"):
if job_wrapper.cleanup_job in ("always", "onsuccess"):
job_wrapper.cleanup()
return

Expand Down
7 changes: 4 additions & 3 deletions lib/galaxy/jobs/runners/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ def queue_job( self, job_wrapper ):
log.exception( "(%s) failure preparing job script" % galaxy_id_tag )
return

cleanup_job = job_wrapper.cleanup_job
try:
open(submit_file, "w").write(submit_file_contents)
except:
if self.app.config.cleanup_job == "always":
except Exception:
if cleanup_job == "always":
cjs.cleanup()
# job_wrapper.fail() calls job_wrapper.cleanup()
job_wrapper.fail( "failure preparing submit file", exception=True )
Expand All @@ -109,7 +110,7 @@ def queue_job( self, job_wrapper ):
# job was deleted while we were preparing it
if job_wrapper.get_state() == model.Job.states.DELETED:
log.debug( "Job %s deleted by user before it entered the queue" % galaxy_id_tag )
if self.app.config.cleanup_job in ( "always", "onsuccess" ):
if cleanup_job in ("always", "onsuccess"):
os.unlink( submit_file )
cjs.cleanup()
job_wrapper.cleanup()
Expand Down
24 changes: 13 additions & 11 deletions lib/galaxy/jobs/runners/drmaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ def __init__( self, app, nworkers, **kwargs ):
self.ds = drmaa.Session()
self.ds.initialize()

# external_runJob_script can be None, in which case it's not used.
self.external_runJob_script = app.config.drmaa_external_runjob_script
self.external_killJob_script = app.config.drmaa_external_killjob_script
self.userid = None

self._init_monitor_thread()
Expand Down Expand Up @@ -115,6 +112,10 @@ def get_native_spec( self, url ):
def queue_job( self, job_wrapper ):
"""Create job script and submit it to the DRM"""
# prepare the job

# external_runJob_script can be None, in which case it's not used.
external_runjob_script = job_wrapper.get_destination_configuration("drmaa_external_runjob_script", None)

include_metadata = asbool( job_wrapper.job_destination.params.get( "embed_metadata_in_job", True) )
if not self.prepare_job( job_wrapper, include_metadata=include_metadata):
return
Expand All @@ -129,7 +130,7 @@ def queue_job( self, job_wrapper ):
job_name = 'g%s' % galaxy_id_tag
if job_wrapper.tool.old_id:
job_name += '_%s' % job_wrapper.tool.old_id
if self.external_runJob_script is None:
if external_runjob_script is None:
job_name += '_%s' % job_wrapper.user
job_name = ''.join( map( lambda x: x if x in ( string.letters + string.digits + '_' ) else '_', job_name ) )
ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_name=job_name )
Expand Down Expand Up @@ -159,7 +160,7 @@ def queue_job( self, job_wrapper ):
# job was deleted while we were preparing it
if job_wrapper.get_state() == model.Job.states.DELETED:
log.debug( "(%s) Job deleted by user before it entered the queue" % galaxy_id_tag )
if self.app.config.cleanup_job in ( "always", "onsuccess" ):
if job_wrapper.cleanup_job in ( "always", "onsuccess" ):
job_wrapper.cleanup()
return

Expand All @@ -168,7 +169,7 @@ def queue_job( self, job_wrapper ):
log.debug( "(%s) native specification is: %s", galaxy_id_tag, native_spec )

# runJob will raise if there's a submit problem
if self.external_runJob_script is None:
if external_runjob_script is None:
# TODO: create a queue for retrying submission indefinitely
# TODO: configurable max tries and sleep
trynum = 0
Expand Down Expand Up @@ -208,7 +209,7 @@ def queue_job( self, job_wrapper ):
log.debug( '(%s) submitting with credentials: %s [uid: %s]' % ( galaxy_id_tag, pwent[0], pwent[2] ) )
filename = self.store_jobtemplate(job_wrapper, jt)
self.userid = pwent[2]
external_job_id = self.external_runjob(filename, pwent[2]).strip()
external_job_id = self.external_runjob(external_runjob_script, filename, pwent[2]).strip()
log.info( "(%s) queued as %s" % ( galaxy_id_tag, external_job_id ) )

# store runner information for tracking if Galaxy restarts
Expand Down Expand Up @@ -310,11 +311,12 @@ def stop_job( self, job ):
try:
ext_id = job.get_job_runner_external_id()
assert ext_id not in ( None, 'None' ), 'External job id is None'
if self.external_killJob_script is None:
kill_script = job.get_destination_configuration(self.app.config, "external_killJob_script", None)
if kill_script is None:
self.ds.control( ext_id, drmaa.JobControlAction.TERMINATE )
else:
# FIXME: hardcoded path
subprocess.Popen( [ '/usr/bin/sudo', '-E', self.external_killJob_script, str( ext_id ), str( self.userid ) ], shell=False )
subprocess.Popen( [ '/usr/bin/sudo', '-E', kill_script, str( ext_id ), str( self.userid ) ], shell=False )
log.debug( "(%s/%s) Removed from DRM queue at user's request" % ( job.get_id(), ext_id ) )
except drmaa.InvalidJobException:
log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.get_id(), ext_id ) )
Expand Down Expand Up @@ -361,12 +363,12 @@ def store_jobtemplate(self, job_wrapper, jt):
log.debug( '(%s) Job script for external submission is: %s' % ( job_wrapper.job_id, filename ) )
return filename

def external_runjob(self, jobtemplate_filename, username):
def external_runjob(self, external_runjob_script, jobtemplate_filename, username):
""" runs an external script the will QSUB a new job.
The external script will be run with sudo, and will setuid() to the specified user.
Effectively, will QSUB as a different user (then the one used by Galaxy).
"""
script_parts = self.external_runJob_script.split()
script_parts = external_runjob_script.split()
script = script_parts[0]
command = [ '/usr/bin/sudo', '-E', script]
for script_argument in script_parts[1:]:
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/jobs/runners/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def queue_job( self, job_wrapper ):
if job_wrapper.get_state() == model.Job.states.DELETED:
log.debug( "Job %s deleted by user before it entered the PBS queue" % job_wrapper.job_id )
pbs.pbs_disconnect(c)
if self.app.config.cleanup_job in ( "always", "onsuccess" ):
if job_wrapper.cleanup_job in ( "always", "onsuccess" ):
self.cleanup( ( ofile, efile, ecfile, job_file ) )
job_wrapper.cleanup()
return
Expand Down Expand Up @@ -480,7 +480,7 @@ def fail_job( self, pbs_job_state ):
if pbs_job_state.stop_job:
self.stop_job( self.sa_session.query( self.app.model.Job ).get( pbs_job_state.job_wrapper.job_id ) )
pbs_job_state.job_wrapper.fail( pbs_job_state.fail_message )
if self.app.config.cleanup_job == "always":
if pbs_job_state.job_wrapper.cleanup_job == "always":
self.cleanup( ( pbs_job_state.output_file, pbs_job_state.error_file, pbs_job_state.exit_code_file, pbs_job_state.job_file ) )

def get_stage_in_out( self, fnames, symlink=False ):
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def finish_job( self, job_state ):
# and cleanup job if needed.
completed_normally = \
job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]
cleanup_job = self.app.config.cleanup_job
cleanup_job = job_wrapper.cleanup_job
client_outputs = self.__client_outputs(client, job_wrapper)
finish_args = dict( client=client,
job_completed_normally=completed_normally,
Expand Down
12 changes: 12 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,18 @@ def set_final_state( self, final_state ):
if self.workflow_invocation_step:
self.workflow_invocation_step.update()

def get_destination_configuration(self, config, key, default=None):
""" Get a destination parameter that can be defaulted back
in specified config if it needs to be applied globally.
"""
param_unspecified = object()
config_value = (self.destination_params or {}).get(key, param_unspecified)
if config_value is param_unspecified:
config_value = getattr(config, key, param_unspecified)
if config_value is param_unspecified:
config_value = default
return config_value


class Task( object, JobLike ):
"""
Expand Down
1 change: 0 additions & 1 deletion lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ def __init__( self, config_file, tool_source, app, guid=None, repository_id=None
except Exception, e:
global_tool_errors.add_error(config_file, "Tool Loading", e)
raise e
self.external_runJob_script = app.config.drmaa_external_runjob_script
self.history_manager = histories.HistoryManager( app )

@property
Expand Down

0 comments on commit 81b5ca5

Please sign in to comment.