From 82924c69162ef4d975fbeae7ea4f9a21b1852575 Mon Sep 17 00:00:00 2001 From: Blake Dewey Date: Tue, 7 Jul 2015 11:28:26 -0400 Subject: [PATCH 1/6] PEP8 corrections --- nipype/pipeline/plugins/sge.py | 219 ++++++++++++++++----------------- 1 file changed, 109 insertions(+), 110 deletions(-) diff --git a/nipype/pipeline/plugins/sge.py b/nipype/pipeline/plugins/sge.py index c5955c7eb6..2611899d0e 100644 --- a/nipype/pipeline/plugins/sge.py +++ b/nipype/pipeline/plugins/sge.py @@ -30,68 +30,68 @@ class QJobInfo: :author Hans J. Johnson """ - def __init__(self, jobNum, jobQueueState, jobTime, jobQueueName, jobSlots, qsub_command_line): + def __init__(self, job_num, job_queue_state, job_time, job_queue_name, job_slots, qsub_command_line): # self._jobName = None # Ascii text name of job not unique - self._jobNum = int( - jobNum) # The primary unique identifier for this job, must be an integer! + self._job_num = int( + job_num) # The primary unique identifier for this job, must be an integer! # self._jobOwn = None # Who owns this job - self._jobQueueState = str( - jobQueueState) # ["running","zombie",...??] + self._job_queue_state = str( + job_queue_state) # ["running","zombie",...??] # self._jobActionState = str(jobActionState) # ['r','qw','S',...??] - self._jobTime = jobTime # The job start time - self._jobInfoCreationTime = time.time( + self._job_time = job_time # The job start time + self._job_info_creation_time = time.time( ) # When this job was created (for comparing against initalization) - self._jobQueueName = jobQueueName # Where the job is running - self._jobSlots = jobSlots # How many slots are being used + self._job_queue_name = job_queue_name # Where the job is running + self._job_slots = job_slots # How many slots are being used self._qsub_command_line = qsub_command_line def __repr__(self): - return str(self._jobNum).ljust(8) \ - + str(self._jobQueueState).ljust(12) \ - + str(self._jobSlots).ljust(3) \ - + time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(self._jobTime)).ljust(20) \ - + str(self._jobQueueName).ljust(8) \ + return str(self._job_num).ljust(8) \ + + str(self._job_queue_state).ljust(12) \ + + str(self._job_slots).ljust(3) \ + + time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(self._job_time)).ljust(20) \ + + str(self._job_queue_name).ljust(8) \ + str(self._qsub_command_line) def is_initializing(self): - return self._jobQueueState == "initializing" + return self._job_queue_state == "initializing" def is_zombie(self): - return self._jobQueueState == "zombie" + return self._job_queue_state == "zombie" def is_running(self): - return self._jobQueueState == "running" + return self._job_queue_state == "running" def is_pending(self): - return self._jobQueueState == "pending" + return self._job_queue_state == "pending" def is_job_state_pending(self): """ Return True, unless job is in the "zombie" status """ - time_diff = (time.time() - self._jobInfoCreationTime) + time_diff = (time.time() - self._job_info_creation_time) if self.is_zombie(): sge_debug_print( "DONE! QJobInfo.IsPending found in 'zombie' list, returning False so claiming done!\n{0}".format(self)) - isPendingStatus = False # Job explicitly found as being completed! + is_pending_status = False # Job explicitly found as being completed! elif self.is_initializing() and (time_diff > 600): # if initializing for more than 5 minute, failure due to # initialization and completion before registration sge_debug_print( "FAILURE! QJobInfo.IsPending found long running at {1} seconds" "'initializing' returning False for to break loop!\n{0}".format(self, time_diff)) - isPendingStatus = True # Job initialization took too long, so report! + is_pending_status = True # Job initialization took too long, so report! else: # self.is_running() || self.is_pending(): - isPendingStatus = True # Job cache last listed as running - return isPendingStatus # The job is in one of the hold states + is_pending_status = True # Job cache last listed as running + return is_pending_status # The job is in one of the hold states - def update_info(self, jobQueueState, jobTime, jobQueueName, jobSlots): - self._jobQueueState = jobQueueState - self._jobTime = jobTime - self._jobQueueName = jobQueueName - self._jobSlots = jobSlots + def update_info(self, job_queue_state, job_time, job_queue_name, job_slots): + self._job_queue_state = job_queue_state + self._job_time = job_time + self._job_queue_name = job_queue_name + self._job_slots = job_slots def set_state(self, new_state): - self._jobQueueState = new_state + self._job_queue_state = new_state class QstatSubstitute: @@ -99,14 +99,14 @@ class QstatSubstitute: """A wrapper for Qstat to avoid overloading the SGE/OGS server with rapid continuous qstat requests""" - def __init__(self, qstatInstantExecutable='qstat', qstatCachedExecutable='qstat'): + def __init__(self, qstat_instant_executable='qstat', qstat_cached_executable='qstat'): """ - :param qstatInstantExecutable: - :param qstatCachedExecutable: + :param qstat_instant_executable: + :param qstat_cached_executable: """ - self._qstatInstantExecutable = qstatInstantExecutable - self._qstatCachedExecutable = qstatCachedExecutable - self._OutOfScopeJobs = list() # Initialize first + self._qstat_instant_executable = qstat_instant_executable + self._qstat_cached_executable = qstat_cached_executable + self._out_of_scope_jobs = list() # Initialize first self._task_dictionary = dict( ) # {'taskid': QJobInfo(), .... } The dictionaryObject self._remove_old_jobs() @@ -117,39 +117,41 @@ def _remove_old_jobs(self): are jobs that existed prior to starting a new jobs, so they are irrelevant. """ self._run_qstat("QstatInitialization", True) - # If qstat does not exist on this system, then quietly - # fail during init + # If qstat does not exist on this system, then quietly + # fail during init def add_startup_job(self, taskid, qsub_command_line): """ :param taskid: The job id - :param scriptFile: When initializing, re-use the jobQueue name + :param qsub_command_line: When initializing, re-use the job_queue_name :return: NONE """ taskid = int(taskid) # Ensure that it is an integer - self._task_dictionary[taskid] = QJobInfo( - taskid, "initializing", time.time(), "noQueue", 1, qsub_command_line) + self._task_dictionary[taskid] = QJobInfo(taskid, "initializing", time.time(), + "noQueue", 1, qsub_command_line) - def _qacct_verified_complete(self, taskid): + @staticmethod + def _qacct_verified_complete(taskid): """ request definitive job completion information for the current job from the qacct report """ - sge_debug_print( - "WARNING: CONTACTING qacct for finished jobs, {0}: {1}".format(time.time(), "Verifying Completion")) + sge_debug_print("WARNING: " + "CONTACTING qacct for finished jobs, " + "{0}: {1}".format(time.time(), "Verifying Completion")) - thisCommand = 'qacct' + this_command = 'qacct' qacct_retries = 10 - isComplete = False + is_complete = False while qacct_retries > 0: qacct_retries -= 1 try: proc = subprocess.Popen( - [thisCommand, '-o', os.getlogin(), '-j', str(taskid)], + [this_command, '-o', os.getlogin(), '-j', str(taskid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) qacct_result, _ = proc.communicate() if qacct_result.find(str(taskid)): - isComplete = True + is_complete = True sge_debug_print( "NOTE: qacct for jobs\n{0}".format(qacct_result)) break @@ -157,7 +159,7 @@ def _qacct_verified_complete(self, taskid): sge_debug_print("NOTE: qacct call failed") time.sleep(5) pass - return isComplete + return is_complete def _parse_qstat_job_list(self, xml_job_list): current_jobs_parsed = list() @@ -166,38 +168,39 @@ def _parse_qstat_job_list(self, xml_job_list): # jobown = # current_job_element.getElementsByTagName('JB_owner')[0].childNodes[0].data try: - jobQueueName = current_job_element.getElementsByTagName( + job_queue_name = current_job_element.getElementsByTagName( 'queue_name')[0].childNodes[0].data except: - jobQueueName = "unknown" + job_queue_name = "unknown" try: - jobSlots = current_job_element.getElementsByTagName( + job_slots = current_job_element.getElementsByTagName( 'slots')[0].childNodes[0].data except: - jobSlots = "uknown" - jobQueueState = current_job_element.getAttribute('state') - jobNum = int(current_job_element.getElementsByTagName( + job_slots = "uknown" + job_queue_state = current_job_element.getAttribute('state') + job_num = int(current_job_element.getElementsByTagName( 'JB_job_number')[0].childNodes[0].data) try: - jobtimeText = current_job_element.getElementsByTagName( + job_time_text = current_job_element.getElementsByTagName( 'JAT_start_time')[0].childNodes[0].data - jobTime = float(time.mktime(time.strptime( - jobtimeText, "%Y-%m-%dT%H:%M:%S"))) + job_time = float(time.mktime(time.strptime( + job_time_text, "%Y-%m-%dT%H:%M:%S"))) except: - jobTime = float(0.0) + job_time = float(0.0) # Make job entry - taskId = int(jobNum) - if taskId in self._task_dictionary: - self._task_dictionary[taskId].update_info( - jobQueueState, jobTime, jobQueueName, jobSlots) + task_id = int(job_num) + if task_id in self._task_dictionary: + self._task_dictionary[task_id].update_info( + job_queue_state, job_time, job_queue_name, job_slots) sge_debug_print("Updating job: {0}".format( - self._task_dictionary[taskId])) - current_jobs_parsed.append(jobNum) + self._task_dictionary[task_id])) + current_jobs_parsed.append(job_num) + # Changed from job_num as "in" is used to check which does not cast else: # Any Job that was not explicitly added with qsub command is # out of scope - self._OutOfScopeJobs.append(int(taskId)) + self._out_of_scope_jobs.append(int(task_id)) # To ensure that every job is in the dictionary has a state reported # by the SGE environment, it is necessary to explicitly check jobs @@ -211,21 +214,21 @@ def _parse_qstat_job_list(self, xml_job_list): if is_completed: self._task_dictionary[dictionary_job].set_state("zombie") else: - sge_debug_print( - "ERROR: Job not in current parselist, and not in done list {0}: {1}".format( - dictionary_job, self._task_dictionary[dictionary_job])) + sge_debug_print("ERROR: Job not in current parselist, " + "and not in done list {0}: {1}".format(dictionary_job, + self._task_dictionary[dictionary_job])) pass if self._task_dictionary[dictionary_job].is_initializing(): is_completed = self._qacct_verified_complete(dictionary_job) if is_completed: self._task_dictionary[dictionary_job].set_state("zombie") else: - sge_debug_print( - "ERROR: Job not in still in intializing mode, and not in done list {0}: {1}".format( - dictionary_job, self._task_dictionary[dictionary_job])) + sge_debug_print("ERROR: Job not in still in intializing mode, " + "and not in done list {0}: {1}".format(dictionary_job, + self._task_dictionary[dictionary_job])) pass - def _run_qstat(self, reasonForQstat, forceInstant=True): + def _run_qstat(self, reason_for_qstat, force_instant=True): """ request all job information for the current user in xmlformat. See documentation from java documentation: http://arc.liv.ac.uk/SGE/javadocs/jgdi/com/sun/grid/jgdi/monitoring/filter/JobStateFilter.html @@ -233,19 +236,19 @@ def _run_qstat(self, reasonForQstat, forceInstant=True): -s z gives recently completed jobs (**recently** is very ambiguous) -s s suspended jobs """ - sge_debug_print( - "WARNING: CONTACTING qmaster for jobs, {0}: {1}".format(time.time(), reasonForQstat)) - if forceInstant: - thisCommand = self._qstatInstantExecutable + sge_debug_print("WARNING: CONTACTING qmaster for jobs, " + "{0}: {1}".format(time.time(), reason_for_qstat)) + if force_instant: + this_command = self._qstat_instant_executable else: - thisCommand = self._qstatCachedExecutable + this_command = self._qstat_cached_executable qstat_retries = 10 while qstat_retries > 0: qstat_retries -= 1 try: proc = subprocess.Popen( - [thisCommand, '-u', os.getlogin(), '-xml', '-s', 'psrz'], + [this_command, '-u', os.getlogin(), '-xml', '-s', 'psrz'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) qstat_xml_result, _ = proc.communicate() @@ -256,12 +259,12 @@ def _run_qstat(self, reasonForQstat, forceInstant=True): self._parse_qstat_job_list(runjobs) break except Exception as inst: - exceptionMessage = "QstatParsingError:\n\t{0}\n\t{1}\n".format( + exception_message = "QstatParsingError:\n\t{0}\n\t{1}\n".format( type( inst), # the exception instance inst # __str__ allows args to printed directly ) - sge_debug_print(exceptionMessage) + sge_debug_print(exception_message) time.sleep(5) pass @@ -270,36 +273,32 @@ def print_dictionary(self): for vv in self._task_dictionary.values(): sge_debug_print(str(vv)) - def is_job_pending(self, taskId, recursionNumber=12): - taskId = int(taskId) # Ensure that it is an integer - self._run_qstat( - "checking job pending status {0}".format(taskId), False) - if taskId in self._task_dictionary: + def is_job_pending(self, task_id, recursion_number=12): + task_id = int(task_id) # Ensure that it is an integer + self._run_qstat("checking job pending status {0}".format(task_id), False) + # Check if the task is in the dictionary first (before running qstat) + if task_id in self._task_dictionary: # Trust the cache, only False if state='zombie' - jobIsPending = self._task_dictionary[taskId].is_job_state_pending() + job_is_pending = self._task_dictionary[task_id].is_job_state_pending() else: - self._run_qstat( - "checking job pending status {0}".format(taskId), True) - if taskId in self._task_dictionary: + self._run_qstat("checking job pending status {0}".format(task_id), True) + if task_id in self._task_dictionary: # Trust the cache, only False if state='zombie' - jobIsPending = self._task_dictionary[ - taskId].is_job_state_pending() + job_is_pending = self._task_dictionary[task_id].is_job_state_pending() else: - sge_debug_print( - "ERROR: Job {0} not in task list, even after forced qstat!".format(taskId)) - jobIsPending = False - if not jobIsPending: - sge_debug_print( - "DONE! Returning for {0} claiming done!".format(taskId)) - if taskId in self._task_dictionary: - sge_debug_print( - "NOTE: Adding {0} to OutOfScopeJobs list!".format(taskId)) - self._OutOfScopeJobs.append(int(taskId)) - self._task_dictionary.pop(taskId) + sge_debug_print("ERROR: Job {0} not in task list, " + "even after forced qstat!".format(task_id)) + job_is_pending = False + if not job_is_pending: + sge_debug_print("DONE! Returning for {0} claiming done!".format(task_id)) + if task_id in self._task_dictionary: + sge_debug_print("NOTE: Adding {0} to OutOfScopeJobs list!".format(task_id)) + self._out_of_scope_jobs.append(int(task_id)) + self._task_dictionary.pop(task_id) else: - sge_debug_print( - "ERROR: Job {0} not in task list, but attempted to be removed!".format(taskId)) - return jobIsPending + sge_debug_print("ERROR: Job {0} not in task list, " + "but attempted to be removed!".format(task_id)) + return job_is_pending def qsub_sanitize_job_name(testjobname): @@ -338,8 +337,8 @@ def __init__(self, **kwargs): """ self._retry_timeout = 2 self._max_tries = 2 - instantQstat = 'qstat' - cachedQstat = 'qstat' + instant_qstat = 'qstat' + cached_qstat = 'qstat' if 'plugin_args' in kwargs and kwargs['plugin_args']: if 'retry_timeout' in kwargs['plugin_args']: @@ -347,10 +346,10 @@ def __init__(self, **kwargs): if 'max_tries' in kwargs['plugin_args']: self._max_tries = kwargs['plugin_args']['max_tries'] if 'qstatProgramPath' in kwargs['plugin_args']: - instantQstat = kwargs['plugin_args']['qstatProgramPath'] + instant_qstat = kwargs['plugin_args']['qstatProgramPath'] if 'qstatCachedProgramPath' in kwargs['plugin_args']: - cachedQstat = kwargs['plugin_args']['qstatCachedProgramPath'] - self._refQstatSubstitute = QstatSubstitute(instantQstat, cachedQstat) + cached_qstat = kwargs['plugin_args']['qstatCachedProgramPath'] + self._refQstatSubstitute = QstatSubstitute(instant_qstat, cached_qstat) super(SGEPlugin, self).__init__(template, **kwargs) From 80a14bf3f4449962d61348315dc0da367c3dd7e1 Mon Sep 17 00:00:00 2001 From: Blake Dewey Date: Tue, 7 Jul 2015 11:30:06 -0400 Subject: [PATCH 2/6] Typo in setting job_slots to "unknown' --- nipype/pipeline/plugins/sge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/plugins/sge.py b/nipype/pipeline/plugins/sge.py index 2611899d0e..f6b4f94268 100644 --- a/nipype/pipeline/plugins/sge.py +++ b/nipype/pipeline/plugins/sge.py @@ -176,7 +176,7 @@ def _parse_qstat_job_list(self, xml_job_list): job_slots = current_job_element.getElementsByTagName( 'slots')[0].childNodes[0].data except: - job_slots = "uknown" + job_slots = "unknown" job_queue_state = current_job_element.getAttribute('state') job_num = int(current_job_element.getElementsByTagName( 'JB_job_number')[0].childNodes[0].data) From 8723e5f711cf590e10fa1339782d05b2f5c67400 Mon Sep 17 00:00:00 2001 From: Blake Dewey Date: Tue, 7 Jul 2015 11:31:27 -0400 Subject: [PATCH 3/6] clean up function definitions --- nipype/pipeline/plugins/sge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/plugins/sge.py b/nipype/pipeline/plugins/sge.py index f6b4f94268..baa036191a 100644 --- a/nipype/pipeline/plugins/sge.py +++ b/nipype/pipeline/plugins/sge.py @@ -273,7 +273,7 @@ def print_dictionary(self): for vv in self._task_dictionary.values(): sge_debug_print(str(vv)) - def is_job_pending(self, task_id, recursion_number=12): + def is_job_pending(self, task_id): task_id = int(task_id) # Ensure that it is an integer self._run_qstat("checking job pending status {0}".format(task_id), False) # Check if the task is in the dictionary first (before running qstat) From 62c83c6384ef74601808e6ed5cc56a2db5e145f4 Mon Sep 17 00:00:00 2001 From: Blake Dewey Date: Tue, 7 Jul 2015 11:32:16 -0400 Subject: [PATCH 4/6] replace os.getlogin() with more precise version using pwd --- nipype/pipeline/plugins/sge.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nipype/pipeline/plugins/sge.py b/nipype/pipeline/plugins/sge.py index baa036191a..7e56d7cc63 100644 --- a/nipype/pipeline/plugins/sge.py +++ b/nipype/pipeline/plugins/sge.py @@ -2,6 +2,7 @@ """ import os +import pwd import re import subprocess import time @@ -146,7 +147,7 @@ def _qacct_verified_complete(taskid): qacct_retries -= 1 try: proc = subprocess.Popen( - [this_command, '-o', os.getlogin(), '-j', str(taskid)], + [this_command, '-o', pwd.getpwuid(os.getuid())[0], '-j', str(taskid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) qacct_result, _ = proc.communicate() @@ -248,7 +249,7 @@ def _run_qstat(self, reason_for_qstat, force_instant=True): qstat_retries -= 1 try: proc = subprocess.Popen( - [this_command, '-u', os.getlogin(), '-xml', '-s', 'psrz'], + [this_command, '-u', pwd.getpwuid(os.getuid())[0], '-xml', '-s', 'psrz'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) qstat_xml_result, _ = proc.communicate() From bffaef469806aafe489f87fdd201512d20e9a242 Mon Sep 17 00:00:00 2001 From: Blake Dewey Date: Tue, 7 Jul 2015 11:34:06 -0400 Subject: [PATCH 5/6] changed job_num to task_id in current_jobs_parsed. we use "in" to check and since we are checking an int against a list of strings, we will always get false. --- nipype/pipeline/plugins/sge.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nipype/pipeline/plugins/sge.py b/nipype/pipeline/plugins/sge.py index 7e56d7cc63..dc97e0ead8 100644 --- a/nipype/pipeline/plugins/sge.py +++ b/nipype/pipeline/plugins/sge.py @@ -196,12 +196,12 @@ def _parse_qstat_job_list(self, xml_job_list): job_queue_state, job_time, job_queue_name, job_slots) sge_debug_print("Updating job: {0}".format( self._task_dictionary[task_id])) - current_jobs_parsed.append(job_num) + current_jobs_parsed.append(task_id) # Changed from job_num as "in" is used to check which does not cast else: # Any Job that was not explicitly added with qsub command is # out of scope - self._out_of_scope_jobs.append(int(task_id)) + self._out_of_scope_jobs.append(task_id) # To ensure that every job is in the dictionary has a state reported # by the SGE environment, it is necessary to explicitly check jobs From e9f00e33cac7cb432c2a98d5b91171d304ff2d96 Mon Sep 17 00:00:00 2001 From: Blake Dewey Date: Tue, 7 Jul 2015 11:35:37 -0400 Subject: [PATCH 6/6] check the task_dictionary before calling qstat and check qstat if it comes up pending. this avoids calling qstat (and qacct) over and over again when we have alread populated it into the task dictionary. --- nipype/pipeline/plugins/sge.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nipype/pipeline/plugins/sge.py b/nipype/pipeline/plugins/sge.py index dc97e0ead8..c2495a831f 100644 --- a/nipype/pipeline/plugins/sge.py +++ b/nipype/pipeline/plugins/sge.py @@ -276,11 +276,14 @@ def print_dictionary(self): def is_job_pending(self, task_id): task_id = int(task_id) # Ensure that it is an integer - self._run_qstat("checking job pending status {0}".format(task_id), False) # Check if the task is in the dictionary first (before running qstat) if task_id in self._task_dictionary: # Trust the cache, only False if state='zombie' job_is_pending = self._task_dictionary[task_id].is_job_state_pending() + # Double check pending jobs in case of change (since we don't check at the beginning) + if job_is_pending: + self._run_qstat("checking job pending status {0}".format(task_id), False) + job_is_pending = self._task_dictionary[task_id].is_job_state_pending() else: self._run_qstat("checking job pending status {0}".format(task_id), True) if task_id in self._task_dictionary: