diff --git a/gridmap/job.py b/gridmap/job.py index 5e0d01e..9581d03 100644 --- a/gridmap/job.py +++ b/gridmap/job.py @@ -63,8 +63,9 @@ from gridmap.runner import _heart_beat if DRMAA_PRESENT: - from drmaa import (InvalidJobException, JobControlAction, - JOB_IDS_SESSION_ALL, Session) + from drmaa import (ExitTimeoutException, InvalidJobException, + JobControlAction, JOB_IDS_SESSION_ALL, Session, + TIMEOUT_NO_WAIT) # Python 2.x backward compatibility if sys.version_info < (3, 0): @@ -274,7 +275,7 @@ def __init__(self, temp_dir='/scratch'): # uninitialized field (set in check method) self.jobs = [] self.ids = [] - self.session_id = -1 + self.session_id = None self.id_to_job = {} def __enter__(self): @@ -291,18 +292,27 @@ def __exit__(self, exc_type, exc_value, exc_tb): # Always close socket self.socket.close() - # If we encounter an exception, try to kill all jobs - if exc_type is not None: - self.logger.info('Encountered %s, so killing all jobs.', - exc_type.__name__) + # Clean up if we have a valid session + if self.session_id is not None: with Session(self.session_id) as session: - # try to kill off all old jobs + # If we encounter an exception, kill all jobs + if exc_type is not None: + self.logger.info('Encountered %s, so killing all jobs.', + exc_type.__name__) + # try to kill off all old jobs + try: + session.control(JOB_IDS_SESSION_ALL, + JobControlAction.TERMINATE) + except InvalidJobException: + self.logger.debug("Could not kill all jobs for " + + "session.", exc_info=True) + + # Get rid of job info to prevent memory leak try: - session.control(JOB_IDS_SESSION_ALL, - JobControlAction.TERMINATE) - except InvalidJobException: - self.logger.debug("Could not kill all jobs for session.", - exc_info=True) + session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT, + dispose=True) + except ExitTimeoutException: + pass def check(self, session_id, jobs): """ @@ -316,9 +326,10 @@ def check(self, session_id, jobs): self.session_id = session_id # determines in which interval to check if jobs are alive + self.logger.debug('Starting local hearbeat') local_heart = multiprocessing.Process(target=_heart_beat, args=(-1, self.home_address, -1, - "", CHECK_FREQUENCY)) + "", CHECK_FREQUENCY)) local_heart.start() try: self.logger.debug("Starting ZMQ event loop") @@ -432,6 +443,8 @@ def check_if_alive(self): # could have been an exception, we check right away elif isinstance(job.ret, Exception): + job.cause_of_death = 'exception' + # Send error email, in addition to raising and logging exception send_error_mail(job) diff --git a/gridmap/version.py b/gridmap/version.py index fc20c68..de9b760 100644 --- a/gridmap/version.py +++ b/gridmap/version.py @@ -28,5 +28,5 @@ :organization: ETS ''' -__version__ = '0.12.1' +__version__ = '0.12.2' VERSION = tuple(int(x) for x in __version__.split('.'))