diff --git a/nipype/pipeline/plugins/sge.py b/nipype/pipeline/plugins/sge.py index c5955c7eb6..c2495a831f 100644 --- a/nipype/pipeline/plugins/sge.py +++ b/nipype/pipeline/plugins/sge.py @@ -2,6 +2,7 @@ """ import os +import pwd import re import subprocess import time @@ -30,68 +31,68 @@ class QJobInfo: :author Hans J. Johnson """ - def __init__(self, jobNum, jobQueueState, jobTime, jobQueueName, jobSlots, qsub_command_line): + def __init__(self, job_num, job_queue_state, job_time, job_queue_name, job_slots, qsub_command_line): # self._jobName = None # Ascii text name of job not unique - self._jobNum = int( - jobNum) # The primary unique identifier for this job, must be an integer! + self._job_num = int( + job_num) # The primary unique identifier for this job, must be an integer! # self._jobOwn = None # Who owns this job - self._jobQueueState = str( - jobQueueState) # ["running","zombie",...??] + self._job_queue_state = str( + job_queue_state) # ["running","zombie",...??] # self._jobActionState = str(jobActionState) # ['r','qw','S',...??] - self._jobTime = jobTime # The job start time - self._jobInfoCreationTime = time.time( + self._job_time = job_time # The job start time + self._job_info_creation_time = time.time( ) # When this job was created (for comparing against initalization) - self._jobQueueName = jobQueueName # Where the job is running - self._jobSlots = jobSlots # How many slots are being used + self._job_queue_name = job_queue_name # Where the job is running + self._job_slots = job_slots # How many slots are being used self._qsub_command_line = qsub_command_line def __repr__(self): - return str(self._jobNum).ljust(8) \ - + str(self._jobQueueState).ljust(12) \ - + str(self._jobSlots).ljust(3) \ - + time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(self._jobTime)).ljust(20) \ - + str(self._jobQueueName).ljust(8) \ + return str(self._job_num).ljust(8) \ + + str(self._job_queue_state).ljust(12) \ + + str(self._job_slots).ljust(3) \ + + time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(self._job_time)).ljust(20) \ + + str(self._job_queue_name).ljust(8) \ + str(self._qsub_command_line) def is_initializing(self): - return self._jobQueueState == "initializing" + return self._job_queue_state == "initializing" def is_zombie(self): - return self._jobQueueState == "zombie" + return self._job_queue_state == "zombie" def is_running(self): - return self._jobQueueState == "running" + return self._job_queue_state == "running" def is_pending(self): - return self._jobQueueState == "pending" + return self._job_queue_state == "pending" def is_job_state_pending(self): """ Return True, unless job is in the "zombie" status """ - time_diff = (time.time() - self._jobInfoCreationTime) + time_diff = (time.time() - self._job_info_creation_time) if self.is_zombie(): sge_debug_print( "DONE! QJobInfo.IsPending found in 'zombie' list, returning False so claiming done!\n{0}".format(self)) - isPendingStatus = False # Job explicitly found as being completed! + is_pending_status = False # Job explicitly found as being completed! elif self.is_initializing() and (time_diff > 600): # if initializing for more than 5 minute, failure due to # initialization and completion before registration sge_debug_print( "FAILURE! QJobInfo.IsPending found long running at {1} seconds" "'initializing' returning False for to break loop!\n{0}".format(self, time_diff)) - isPendingStatus = True # Job initialization took too long, so report! + is_pending_status = True # Job initialization took too long, so report! else: # self.is_running() || self.is_pending(): - isPendingStatus = True # Job cache last listed as running - return isPendingStatus # The job is in one of the hold states + is_pending_status = True # Job cache last listed as running + return is_pending_status # The job is in one of the hold states - def update_info(self, jobQueueState, jobTime, jobQueueName, jobSlots): - self._jobQueueState = jobQueueState - self._jobTime = jobTime - self._jobQueueName = jobQueueName - self._jobSlots = jobSlots + def update_info(self, job_queue_state, job_time, job_queue_name, job_slots): + self._job_queue_state = job_queue_state + self._job_time = job_time + self._job_queue_name = job_queue_name + self._job_slots = job_slots def set_state(self, new_state): - self._jobQueueState = new_state + self._job_queue_state = new_state class QstatSubstitute: @@ -99,14 +100,14 @@ class QstatSubstitute: """A wrapper for Qstat to avoid overloading the SGE/OGS server with rapid continuous qstat requests""" - def __init__(self, qstatInstantExecutable='qstat', qstatCachedExecutable='qstat'): + def __init__(self, qstat_instant_executable='qstat', qstat_cached_executable='qstat'): """ - :param qstatInstantExecutable: - :param qstatCachedExecutable: + :param qstat_instant_executable: + :param qstat_cached_executable: """ - self._qstatInstantExecutable = qstatInstantExecutable - self._qstatCachedExecutable = qstatCachedExecutable - self._OutOfScopeJobs = list() # Initialize first + self._qstat_instant_executable = qstat_instant_executable + self._qstat_cached_executable = qstat_cached_executable + self._out_of_scope_jobs = list() # Initialize first self._task_dictionary = dict( ) # {'taskid': QJobInfo(), .... } The dictionaryObject self._remove_old_jobs() @@ -117,39 +118,41 @@ def _remove_old_jobs(self): are jobs that existed prior to starting a new jobs, so they are irrelevant. """ self._run_qstat("QstatInitialization", True) - # If qstat does not exist on this system, then quietly - # fail during init + # If qstat does not exist on this system, then quietly + # fail during init def add_startup_job(self, taskid, qsub_command_line): """ :param taskid: The job id - :param scriptFile: When initializing, re-use the jobQueue name + :param qsub_command_line: When initializing, re-use the job_queue_name :return: NONE """ taskid = int(taskid) # Ensure that it is an integer - self._task_dictionary[taskid] = QJobInfo( - taskid, "initializing", time.time(), "noQueue", 1, qsub_command_line) + self._task_dictionary[taskid] = QJobInfo(taskid, "initializing", time.time(), + "noQueue", 1, qsub_command_line) - def _qacct_verified_complete(self, taskid): + @staticmethod + def _qacct_verified_complete(taskid): """ request definitive job completion information for the current job from the qacct report """ - sge_debug_print( - "WARNING: CONTACTING qacct for finished jobs, {0}: {1}".format(time.time(), "Verifying Completion")) + sge_debug_print("WARNING: " + "CONTACTING qacct for finished jobs, " + "{0}: {1}".format(time.time(), "Verifying Completion")) - thisCommand = 'qacct' + this_command = 'qacct' qacct_retries = 10 - isComplete = False + is_complete = False while qacct_retries > 0: qacct_retries -= 1 try: proc = subprocess.Popen( - [thisCommand, '-o', os.getlogin(), '-j', str(taskid)], + [this_command, '-o', pwd.getpwuid(os.getuid())[0], '-j', str(taskid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) qacct_result, _ = proc.communicate() if qacct_result.find(str(taskid)): - isComplete = True + is_complete = True sge_debug_print( "NOTE: qacct for jobs\n{0}".format(qacct_result)) break @@ -157,7 +160,7 @@ def _qacct_verified_complete(self, taskid): sge_debug_print("NOTE: qacct call failed") time.sleep(5) pass - return isComplete + return is_complete def _parse_qstat_job_list(self, xml_job_list): current_jobs_parsed = list() @@ -166,38 +169,39 @@ def _parse_qstat_job_list(self, xml_job_list): # jobown = # current_job_element.getElementsByTagName('JB_owner')[0].childNodes[0].data try: - jobQueueName = current_job_element.getElementsByTagName( + job_queue_name = current_job_element.getElementsByTagName( 'queue_name')[0].childNodes[0].data except: - jobQueueName = "unknown" + job_queue_name = "unknown" try: - jobSlots = current_job_element.getElementsByTagName( + job_slots = current_job_element.getElementsByTagName( 'slots')[0].childNodes[0].data except: - jobSlots = "uknown" - jobQueueState = current_job_element.getAttribute('state') - jobNum = int(current_job_element.getElementsByTagName( + job_slots = "unknown" + job_queue_state = current_job_element.getAttribute('state') + job_num = int(current_job_element.getElementsByTagName( 'JB_job_number')[0].childNodes[0].data) try: - jobtimeText = current_job_element.getElementsByTagName( + job_time_text = current_job_element.getElementsByTagName( 'JAT_start_time')[0].childNodes[0].data - jobTime = float(time.mktime(time.strptime( - jobtimeText, "%Y-%m-%dT%H:%M:%S"))) + job_time = float(time.mktime(time.strptime( + job_time_text, "%Y-%m-%dT%H:%M:%S"))) except: - jobTime = float(0.0) + job_time = float(0.0) # Make job entry - taskId = int(jobNum) - if taskId in self._task_dictionary: - self._task_dictionary[taskId].update_info( - jobQueueState, jobTime, jobQueueName, jobSlots) + task_id = int(job_num) + if task_id in self._task_dictionary: + self._task_dictionary[task_id].update_info( + job_queue_state, job_time, job_queue_name, job_slots) sge_debug_print("Updating job: {0}".format( - self._task_dictionary[taskId])) - current_jobs_parsed.append(jobNum) + self._task_dictionary[task_id])) + current_jobs_parsed.append(task_id) + # Changed from job_num as "in" is used to check which does not cast else: # Any Job that was not explicitly added with qsub command is # out of scope - self._OutOfScopeJobs.append(int(taskId)) + self._out_of_scope_jobs.append(task_id) # To ensure that every job is in the dictionary has a state reported # by the SGE environment, it is necessary to explicitly check jobs @@ -211,21 +215,21 @@ def _parse_qstat_job_list(self, xml_job_list): if is_completed: self._task_dictionary[dictionary_job].set_state("zombie") else: - sge_debug_print( - "ERROR: Job not in current parselist, and not in done list {0}: {1}".format( - dictionary_job, self._task_dictionary[dictionary_job])) + sge_debug_print("ERROR: Job not in current parselist, " + "and not in done list {0}: {1}".format(dictionary_job, + self._task_dictionary[dictionary_job])) pass if self._task_dictionary[dictionary_job].is_initializing(): is_completed = self._qacct_verified_complete(dictionary_job) if is_completed: self._task_dictionary[dictionary_job].set_state("zombie") else: - sge_debug_print( - "ERROR: Job not in still in intializing mode, and not in done list {0}: {1}".format( - dictionary_job, self._task_dictionary[dictionary_job])) + sge_debug_print("ERROR: Job not in still in intializing mode, " + "and not in done list {0}: {1}".format(dictionary_job, + self._task_dictionary[dictionary_job])) pass - def _run_qstat(self, reasonForQstat, forceInstant=True): + def _run_qstat(self, reason_for_qstat, force_instant=True): """ request all job information for the current user in xmlformat. See documentation from java documentation: http://arc.liv.ac.uk/SGE/javadocs/jgdi/com/sun/grid/jgdi/monitoring/filter/JobStateFilter.html @@ -233,19 +237,19 @@ def _run_qstat(self, reasonForQstat, forceInstant=True): -s z gives recently completed jobs (**recently** is very ambiguous) -s s suspended jobs """ - sge_debug_print( - "WARNING: CONTACTING qmaster for jobs, {0}: {1}".format(time.time(), reasonForQstat)) - if forceInstant: - thisCommand = self._qstatInstantExecutable + sge_debug_print("WARNING: CONTACTING qmaster for jobs, " + "{0}: {1}".format(time.time(), reason_for_qstat)) + if force_instant: + this_command = self._qstat_instant_executable else: - thisCommand = self._qstatCachedExecutable + this_command = self._qstat_cached_executable qstat_retries = 10 while qstat_retries > 0: qstat_retries -= 1 try: proc = subprocess.Popen( - [thisCommand, '-u', os.getlogin(), '-xml', '-s', 'psrz'], + [this_command, '-u', pwd.getpwuid(os.getuid())[0], '-xml', '-s', 'psrz'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) qstat_xml_result, _ = proc.communicate() @@ -256,12 +260,12 @@ def _run_qstat(self, reasonForQstat, forceInstant=True): self._parse_qstat_job_list(runjobs) break except Exception as inst: - exceptionMessage = "QstatParsingError:\n\t{0}\n\t{1}\n".format( + exception_message = "QstatParsingError:\n\t{0}\n\t{1}\n".format( type( inst), # the exception instance inst # __str__ allows args to printed directly ) - sge_debug_print(exceptionMessage) + sge_debug_print(exception_message) time.sleep(5) pass @@ -270,36 +274,35 @@ def print_dictionary(self): for vv in self._task_dictionary.values(): sge_debug_print(str(vv)) - def is_job_pending(self, taskId, recursionNumber=12): - taskId = int(taskId) # Ensure that it is an integer - self._run_qstat( - "checking job pending status {0}".format(taskId), False) - if taskId in self._task_dictionary: + def is_job_pending(self, task_id): + task_id = int(task_id) # Ensure that it is an integer + # Check if the task is in the dictionary first (before running qstat) + if task_id in self._task_dictionary: # Trust the cache, only False if state='zombie' - jobIsPending = self._task_dictionary[taskId].is_job_state_pending() + job_is_pending = self._task_dictionary[task_id].is_job_state_pending() + # Double check pending jobs in case of change (since we don't check at the beginning) + if job_is_pending: + self._run_qstat("checking job pending status {0}".format(task_id), False) + job_is_pending = self._task_dictionary[task_id].is_job_state_pending() else: - self._run_qstat( - "checking job pending status {0}".format(taskId), True) - if taskId in self._task_dictionary: + self._run_qstat("checking job pending status {0}".format(task_id), True) + if task_id in self._task_dictionary: # Trust the cache, only False if state='zombie' - jobIsPending = self._task_dictionary[ - taskId].is_job_state_pending() + job_is_pending = self._task_dictionary[task_id].is_job_state_pending() else: - sge_debug_print( - "ERROR: Job {0} not in task list, even after forced qstat!".format(taskId)) - jobIsPending = False - if not jobIsPending: - sge_debug_print( - "DONE! Returning for {0} claiming done!".format(taskId)) - if taskId in self._task_dictionary: - sge_debug_print( - "NOTE: Adding {0} to OutOfScopeJobs list!".format(taskId)) - self._OutOfScopeJobs.append(int(taskId)) - self._task_dictionary.pop(taskId) + sge_debug_print("ERROR: Job {0} not in task list, " + "even after forced qstat!".format(task_id)) + job_is_pending = False + if not job_is_pending: + sge_debug_print("DONE! Returning for {0} claiming done!".format(task_id)) + if task_id in self._task_dictionary: + sge_debug_print("NOTE: Adding {0} to OutOfScopeJobs list!".format(task_id)) + self._out_of_scope_jobs.append(int(task_id)) + self._task_dictionary.pop(task_id) else: - sge_debug_print( - "ERROR: Job {0} not in task list, but attempted to be removed!".format(taskId)) - return jobIsPending + sge_debug_print("ERROR: Job {0} not in task list, " + "but attempted to be removed!".format(task_id)) + return job_is_pending def qsub_sanitize_job_name(testjobname): @@ -338,8 +341,8 @@ def __init__(self, **kwargs): """ self._retry_timeout = 2 self._max_tries = 2 - instantQstat = 'qstat' - cachedQstat = 'qstat' + instant_qstat = 'qstat' + cached_qstat = 'qstat' if 'plugin_args' in kwargs and kwargs['plugin_args']: if 'retry_timeout' in kwargs['plugin_args']: @@ -347,10 +350,10 @@ def __init__(self, **kwargs): if 'max_tries' in kwargs['plugin_args']: self._max_tries = kwargs['plugin_args']['max_tries'] if 'qstatProgramPath' in kwargs['plugin_args']: - instantQstat = kwargs['plugin_args']['qstatProgramPath'] + instant_qstat = kwargs['plugin_args']['qstatProgramPath'] if 'qstatCachedProgramPath' in kwargs['plugin_args']: - cachedQstat = kwargs['plugin_args']['qstatCachedProgramPath'] - self._refQstatSubstitute = QstatSubstitute(instantQstat, cachedQstat) + cached_qstat = kwargs['plugin_args']['qstatCachedProgramPath'] + self._refQstatSubstitute = QstatSubstitute(instant_qstat, cached_qstat) super(SGEPlugin, self).__init__(template, **kwargs)