Permalink
Browse files

Merge branch 'release/0.12.2'

  • Loading branch information...
dan-blanchard committed Jan 10, 2014
2 parents 3c554a5 + 9e0e651 commit 49d611adda9d0aea578c744f93868fd8b0cf03fd
Showing with 28 additions and 15 deletions.
  1. +27 −14 gridmap/job.py
  2. +1 −1 gridmap/version.py
View
@@ -63,8 +63,9 @@
from gridmap.runner import _heart_beat
if DRMAA_PRESENT:
from drmaa import (InvalidJobException, JobControlAction,
JOB_IDS_SESSION_ALL, Session)
from drmaa import (ExitTimeoutException, InvalidJobException,
JobControlAction, JOB_IDS_SESSION_ALL, Session,
TIMEOUT_NO_WAIT)
# Python 2.x backward compatibility
if sys.version_info < (3, 0):
@@ -274,7 +275,7 @@ def __init__(self, temp_dir='/scratch'):
# uninitialized field (set in check method)
self.jobs = []
self.ids = []
self.session_id = -1
self.session_id = None
self.id_to_job = {}
def __enter__(self):
@@ -291,18 +292,27 @@ def __exit__(self, exc_type, exc_value, exc_tb):
# Always close socket
self.socket.close()
# If we encounter an exception, try to kill all jobs
if exc_type is not None:
self.logger.info('Encountered %s, so killing all jobs.',
exc_type.__name__)
# Clean up if we have a valid session
if self.session_id is not None:
with Session(self.session_id) as session:
# try to kill off all old jobs
# If we encounter an exception, kill all jobs
if exc_type is not None:
self.logger.info('Encountered %s, so killing all jobs.',
exc_type.__name__)
# try to kill off all old jobs
try:
session.control(JOB_IDS_SESSION_ALL,
JobControlAction.TERMINATE)
except InvalidJobException:
self.logger.debug("Could not kill all jobs for " +
"session.", exc_info=True)
# Get rid of job info to prevent memory leak
try:
session.control(JOB_IDS_SESSION_ALL,
JobControlAction.TERMINATE)
except InvalidJobException:
self.logger.debug("Could not kill all jobs for session.",
exc_info=True)
session.synchronize([JOB_IDS_SESSION_ALL], TIMEOUT_NO_WAIT,
dispose=True)
except ExitTimeoutException:
pass
def check(self, session_id, jobs):
"""
@@ -316,9 +326,10 @@ def check(self, session_id, jobs):
self.session_id = session_id
# determines in which interval to check if jobs are alive
self.logger.debug('Starting local hearbeat')
local_heart = multiprocessing.Process(target=_heart_beat,
args=(-1, self.home_address, -1,
"", CHECK_FREQUENCY))
"", CHECK_FREQUENCY))
local_heart.start()
try:
self.logger.debug("Starting ZMQ event loop")
@@ -432,6 +443,8 @@ def check_if_alive(self):
# could have been an exception, we check right away
elif isinstance(job.ret, Exception):
job.cause_of_death = 'exception'
# Send error email, in addition to raising and logging exception
send_error_mail(job)
View
@@ -28,5 +28,5 @@
:organization: ETS
'''
__version__ = '0.12.1'
__version__ = '0.12.2'
VERSION = tuple(int(x) for x in __version__.split('.'))

0 comments on commit 49d611a

Please sign in to comment.