From 174ab9b5cabff93adae55a1ef3c3e22b0b2815e6 Mon Sep 17 00:00:00 2001 From: Sandeep Kalra Date: Mon, 22 Jun 2015 00:18:21 +1000 Subject: [PATCH] Create a running-window queue to batch submission jobs instead of dumping all jobs on the cluster queue at once. --- gridmap/job.py | 348 ++++++++++++++++++++++++++----------------------- 1 file changed, 185 insertions(+), 163 deletions(-) diff --git a/gridmap/job.py b/gridmap/job.py index e271026..e755715 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -253,7 +253,7 @@ class JobMonitor(object): """ Job monitor that communicates with other nodes via 0MQ. """ - def __init__(self, temp_dir='/scratch'): + def __init__(self, session, temp_dir='/scratch', jobs=[], white_list="", max_processes=1): """ set up socket """ @@ -286,9 +286,14 @@ def __init__(self, temp_dir='/scratch'): self.logger.info("Setting up JobMonitor on %s", self.home_address) # uninitialized field (set in check method) - self.jobs = [] + self.jobs = jobs + self.jobs_iter = iter(jobs) + self.all_queued = False + self.all_processed = False self.ids = [] - self.session_id = None + self.session = session + self.white_list = white_list + self.max_processes = max_processes self.id_to_job = {} def __enter__(self): @@ -306,37 +311,29 @@ def __exit__(self, exc_type, exc_value, exc_tb): self.socket.close() # Clean up if we have a valid session - if self.session_id is not None: - with Session(self.session_id) as session: - # If we encounter an exception, kill all jobs - if exc_type is not None: - self.logger.info('Encountered %s, so killing all jobs.', - exc_type.__name__) - # try to kill off all old jobs - try: - session.control(JOB_IDS_SESSION_ALL, - JobControlAction.TERMINATE) - except InvalidJobException: - self.logger.debug("Could not kill all jobs for " + - "session.", exc_info=True) - - # Get rid of job info to prevent memory leak - try: - session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT, - dispose=True) - except ExitTimeoutException: - pass + if exc_type is not None: + self.logger.info('Encountered %s, so killing all jobs.', + exc_type.__name__) + # try to kill off all old jobs + try: + self.session.control(JOB_IDS_SESSION_ALL, + JobControlAction.TERMINATE) + except InvalidJobException: + self.self.logger.debug("Could not kill all jobs for " + + "session.", exc_info=True) + + # Get rid of job info to prevent memory leak + try: + self.session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT, + dispose=True) + except ExitTimeoutException: + pass - def check(self, session_id, jobs): + def check(self): """ serves input and output data """ # save list of jobs - self.jobs = jobs - self.id_to_job = {job.id: job for job in self.jobs} - - # keep track of DRMAA session_id (for resubmissions) - self.session_id = session_id # determines in which interval to check if jobs are alive self.logger.debug('Starting local hearbeat') @@ -379,12 +376,12 @@ def check(self, session_id, jobs): job.ret = tmp_job.ret job.traceback = tmp_job.traceback self.logger.info("Received output from %s", - job_id) + job_id) # Returned exception instead of job, so store that elif isinstance(msg["data"], tuple): job.ret, job.traceback = msg["data"] self.logger.info("Received exception from %s", - job_id) + job_id) else: self.logger.error(("Received message with " + "invalid data: %s"), msg) @@ -432,66 +429,78 @@ def check(self, session_id, jobs): # Kill child processes that we don't need anymore local_heart.terminate() - def check_if_alive(self): + def check_job_status(self, job): """ - check if jobs are alive and determine cause of death if not + check if jobs should be removed from the queue. + check if jobs are alive and determine cause of death if not. """ - self.logger.debug('Checking if jobs are alive') - for job in self.jobs: - - # noting was returned yet - if job.ret == _JOB_NOT_FINISHED: - - # exclude first-timers - if job.timestamp is not None: - # check heart-beats if there was a long delay - current_time = datetime.now() - time_delta = current_time - job.timestamp - if time_delta.seconds > MAX_TIME_BETWEEN_HEARTBEATS: - self.logger.debug("It has been %s seconds since we " + - "received a message from job %s", - time_delta.seconds, job.id) - self.logger.error("Job died for unknown reason") - job.cause_of_death = "unknown" - elif (len(job.track_cpu) > MAX_IDLE_HEARTBEATS and - all(cpu_load <= IDLE_THRESHOLD and not running - for cpu_load, running in - job.track_cpu[-MAX_IDLE_HEARTBEATS:])): - self.logger.error('Job stalled for unknown reason.') - job.cause_of_death = 'stalled' - - # could have been an exception, we check right away - elif isinstance(job.ret, Exception): - job.cause_of_death = 'exception' - - # Send error email, in addition to raising and logging exception - if SEND_ERROR_MAIL: - send_error_mail(job) - - # Format traceback much like joblib does - self.logger.error("-" * 80) - self.logger.error("GridMap job traceback for %s:", job.name) - self.logger.error("-" * 80) - self.logger.error("Exception: %s", type(job.ret).__name__) - self.logger.error("Job ID: %s", job.id) - self.logger.error("Host: %s", job.host_name) - self.logger.error("." * 80) - self.logger.error(job.traceback) - raise job.ret - - # attempt to resubmit - if job.cause_of_death: - self.logger.info("Creating error report") - - # send report - if SEND_ERROR_MAIL: - send_error_mail(job) - - # try to resubmit - old_id = job.id - job.track_cpu = [] - job.track_mem = [] - handle_resubmit(self.session_id, job, temp_dir=self.temp_dir) + remove_from_queue = False + # noting was returned yet + if job.ret == _JOB_NOT_FINISHED: + + # exclude first-timers + if job.timestamp is not None: + # check heart-beats if there was a long delay + current_time = datetime.now() + time_delta = current_time - job.timestamp + if time_delta.seconds > MAX_TIME_BETWEEN_HEARTBEATS: + self.logger.debug("It has been %s seconds since we " + + "received a message from job %s", + time_delta.seconds, job.id) + self.logger.error("Job died for unknown reason") + job.cause_of_death = "unknown" + elif (len(job.track_cpu) > MAX_IDLE_HEARTBEATS and + all(cpu_load <= IDLE_THRESHOLD and not running + for cpu_load, running in + job.track_cpu[-MAX_IDLE_HEARTBEATS:])): + self.logger.error('Job stalled for unknown reason.') + job.cause_of_death = 'stalled' + + # could have been an exception, we check right away + elif isinstance(job.ret, Exception): + job.cause_of_death = 'exception' + + # Send error email, in addition to raising and logging exception + if SEND_ERROR_MAIL: + send_error_mail(job) + + # Format traceback much like joblib does + self.logger.error("-" * 80) + self.logger.error("GridMap job traceback for %s:", job.name) + self.logger.error("-" * 80) + self.logger.error("Exception: %s", type(job.ret).__name__) + self.logger.error("Job ID: %s", job.id) + self.logger.error("Host: %s", job.host_name) + self.logger.error("." * 80) + self.logger.error(job.traceback) + # raise job.ret + else: + # Job returned. Remove from queue. + remove_from_queue = True + + # attempt to resubmit + if job.cause_of_death: + self.logger.info("Creating error report") + + # send report + if SEND_ERROR_MAIL: + send_error_mail(job) + + # try to resubmit + old_id = job.id + job.track_cpu = [] + job.track_mem = [] + + try: + handle_resubmit(self.session, job, temp_dir=self.temp_dir) + except JobException: + """ + This job has already hit maximum number of resubmits. + Remove from queue. + """ + remove_from_queue = True + + else: # Update job ID if successfully resubmitted self.logger.info('Resubmitted job %s; it now has ID %s', old_id, @@ -499,25 +508,35 @@ def check_if_alive(self): del self.id_to_job[old_id] self.id_to_job[job.id] = job - # break out of loop to avoid too long delay - break + return remove_from_queue - def all_jobs_done(self): - """ - checks for all jobs if they are done - """ - if self.logger.getEffectiveLevel() == logging.DEBUG: - num_jobs = len(self.jobs) - num_completed = sum((job.ret != _JOB_NOT_FINISHED and - not isinstance(job.ret, Exception)) - for job in self.jobs) - self.logger.debug('%i out of %i jobs completed', num_completed, - num_jobs) + def check_if_alive(self): + self.logger.debug('Checking if jobs are alive') + n = len(self.id_to_job) + for jid, job in self.id_to_job.items(): + remove_from_queue = self.check_job_status(job) + if remove_from_queue: + del self.id_to_job[jid] + n -= 1 + + if self.all_queued and n == 0: + self.all_processed = True + elif not self.all_queued: + while n < self.max_processes: + try: + job = next(self.jobs_iter) + except StopIteration: + self.all_queued = True + break + else: + job.white_list = self.white_list + job.home_address = self.home_address + _append_job_to_session(self.session, job) + self.id_to_job[job.id] = job + n += 1 - # exceptions will be handled in check_if_alive - return all((job.ret != _JOB_NOT_FINISHED and not isinstance(job.ret, - Exception)) - for job in self.jobs) + def all_jobs_done(self): + return self.all_processed def send_error_mail(job): @@ -627,7 +646,7 @@ def send_error_mail(job): s.quit() -def handle_resubmit(session_id, job, temp_dir='/scratch/'): +def handle_resubmit(session, job, temp_dir='/scratch/'): """ heuristic to determine if the job should be resubmitted @@ -654,7 +673,7 @@ def handle_resubmit(session_id, job, temp_dir='/scratch/'): job.num_resubmits += 1 job.cause_of_death = "" - _resubmit(session_id, job, temp_dir) + _resubmit(session, job, temp_dir) else: raise JobException(("Job {0} ({1}) failed after {2} " + "resubmissions").format(job.name, job.id, @@ -737,7 +756,7 @@ def _submit_jobs(jobs, home_address, temp_dir='/scratch', white_list=None, return sid -def _append_job_to_session(session, job, temp_dir='/scratch/', quiet=True): +def _append_job_to_session(temp_dir='/scratch/', quiet=True): """ For an active session, append new job based on information stored in job object. Also sets job.id to the ID of the job on the grid. @@ -754,44 +773,47 @@ def _append_job_to_session(session, job, temp_dir='/scratch/', quiet=True): :type quiet: bool """ - jt = session.createJobTemplate() - logger = logging.getLogger(__name__) - # logger.debug('{}'.format(job.environment)) - jt.jobEnvironment = job.environment - - # Run module using python -m to avoid ImportErrors when unpickling jobs - jt.remoteCommand = sys.executable - jt.args = ['-m', 'gridmap.runner', '{}'.format(job.home_address), job.path] - jt.nativeSpecification = job.native_specification - jt.jobName = job.name - jt.workingDirectory = job.working_dir - jt.outputPath = ":{}".format(temp_dir) - jt.errorPath = ":{}".format(temp_dir) - - # Create temp directory if necessary - if not os.path.exists(temp_dir): - try: - os.makedirs(temp_dir) - except OSError: - logger.warning(("Failed to create temporary directory " + - "{}. Your jobs may not start " + - "correctly.").format(temp_dir)) + global _append_job_to_session + + def _append_job_to_session(session, job): + jt = session.createJobTemplate() + logger = logging.getLogger(__name__) + # logger.debug('{}'.format(job.environment)) + jt.jobEnvironment = job.environment + + # Run module using python -m to avoid ImportErrors when unpickling jobs + jt.remoteCommand = sys.executable + jt.args = ['-m', 'gridmap.runner', '{}'.format(job.home_address), job.path] + jt.nativeSpecification = job.native_specification + jt.jobName = job.name + jt.workingDirectory = job.working_dir + jt.outputPath = ":{}".format(temp_dir) + jt.errorPath = ":{}".format(temp_dir) + + # Create temp directory if necessary + if not os.path.exists(temp_dir): + try: + os.makedirs(temp_dir) + except OSError: + logger.warning(("Failed to create temporary directory " + + "{}. Your jobs may not start " + + "correctly.").format(temp_dir)) - job_id = session.runJob(jt) + job_id = session.runJob(jt) - # set job fields that depend on the job_id assigned by grid engine - job.id = job_id - job.log_stdout_fn = os.path.join(temp_dir, '{}.o{}'.format(job.name, - job_id)) - job.log_stderr_fn = os.path.join(temp_dir, '{}.e{}'.format(job.name, - job_id)) + # set job fields that depend on the job_id assigned by grid engine + job.id = job_id + job.log_stdout_fn = os.path.join(temp_dir, '{}.o{}'.format(job.name, + job_id)) + job.log_stderr_fn = os.path.join(temp_dir, '{}.e{}'.format(job.name, + job_id)) - if not quiet: - print('Your job {} has been submitted with id {}'.format(job.name, - job_id), - file=sys.stderr) + if not quiet: + print('Your job {} has been submitted with id {}'.format(job.name, + job_id), + file=sys.stderr) - session.deleteJobTemplate(jt) + session.deleteJobTemplate(jt) def process_jobs(jobs, temp_dir='/scratch/', white_list=None, quiet=True, @@ -824,24 +846,26 @@ def process_jobs(jobs, temp_dir='/scratch/', white_list=None, quiet=True, local = True if not local: - # initialize monitor to get port number - with JobMonitor(temp_dir=temp_dir) as monitor: - # get interface and port - home_address = monitor.home_address - # job_id field is attached to each job object - sid = _submit_jobs(jobs, home_address, temp_dir=temp_dir, - white_list=white_list, quiet=quiet) + # initialize _append_to_session function + _append_job_to_session(temp_dir=temp_dir, quiet=quiet) + + # initialize session and monitor to get port number + with Session() as session, JobMonitor(session=session, + temp_dir=temp_dir, + jobs=jobs, + white_list=white_list, + max_processes=max_processes) as monitor: # handling of inputs, outputs and heartbeats - monitor.check(sid, jobs) + monitor.check() else: _process_jobs_locally(jobs, max_processes=max_processes) return [job.ret for job in jobs] -def _resubmit(session_id, job, temp_dir): +def _resubmit(session, job, temp_dir): """ Resubmit a failed job. @@ -852,16 +876,15 @@ def _resubmit(session_id, job, temp_dir): if DRMAA_PRESENT: # append to session - with Session(session_id) as session: - # try to kill off old job - try: - session.control(job.id, JobControlAction.TERMINATE) - logger.info("zombie job killed") - except Exception: - logger.error("Could not kill job with SGE id %s", job.id, - exc_info=True) - # create new job - _append_job_to_session(session, job, temp_dir=temp_dir) + # try to kill off old job + try: + session.control(job.id, JobControlAction.TERMINATE) + logger.info("zombie job killed") + except Exception: + logger.error("Could not kill job with SGE id %s", job.id, + exc_info=True) + # create new job + _append_job_to_session(session, job, temp_dir=temp_dir) else: logger.error("Could not restart job because we're in local mode.") @@ -929,4 +952,3 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job', max_processes=max_processes) return job_results -