Skip to content

Commit

Permalink
Merge pull request ipython#4270 from minrk/task-timeout
Browse files Browse the repository at this point in the history
adjust Scheduler timeout logic

Timeout starts when task submission is attempted. That is, it starts each time in resubmit / retry cases.

Avoids issues of timeout firing only once while a task is pending, when it might come back and be retried after the timeout has fired.

Also adds a ten second timeout for sync calls in the tests, which should hopefully avoid hangs when things are misbehaving.

closes ipython#3210
  • Loading branch information
minrk committed Sep 27, 2013
2 parents 13d1ca3 + e999eb8 commit 570feaa
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
26 changes: 17 additions & 9 deletions IPython/parallel/controller/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
self.after = after
self.follow = follow
self.timeout = timeout
self.removed = False # used for lazy-delete from sorted queue

self.removed = False # used for lazy-delete from sorted queue
self.timestamp = time.time()
self.timeout_id = 0
self.blacklist = set()

def __lt__(self, other):
Expand All @@ -155,6 +156,7 @@ def __cmp__(self, other):
def dependents(self):
return self.follow.union(self.after)


class TaskScheduler(SessionFactory):
"""Python TaskScheduler object.
Expand Down Expand Up @@ -433,19 +435,14 @@ def dispatch_submission(self, raw_msg):
# location dependencies
follow = Dependency(md.get('follow', []))

# turn timeouts into datetime objects:
timeout = md.get('timeout', None)
if timeout:
timeout = time.time() + float(timeout)
timeout = float(timeout)

job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
header=header, targets=targets, after=after, follow=follow,
timeout=timeout, metadata=md,
)
if timeout:
# schedule timeout callback
self.loop.add_timeout(timeout, lambda : self.job_timeout(job))

# validate and reduce dependencies:
for dep in after,follow:
if not dep: # empty dependency
Expand All @@ -469,11 +466,14 @@ def dispatch_submission(self, raw_msg):
else:
self.save_unmet(job)

def job_timeout(self, job):
def job_timeout(self, job, timeout_id):
"""callback for a job's timeout.
The job may or may not have been run at this point.
"""
if job.timeout_id != timeout_id:
# not the most recent call
return
now = time.time()
if job.timeout >= (now + 1):
self.log.warn("task %s timeout fired prematurely: %s > %s",
Expand Down Expand Up @@ -590,6 +590,14 @@ def save_unmet(self, job):
if dep_id not in self.graph:
self.graph[dep_id] = set()
self.graph[dep_id].add(msg_id)

# schedule timeout callback
if job.timeout:
timeout_id = job.timeout_id = job.timeout_id + 1
self.loop.add_timeout(time.time() + job.timeout,
lambda : self.job_timeout(job, timeout_id)
)


def submit_task(self, job, indices=None):
"""Submit a task to any of a subset of our targets."""
Expand Down Expand Up @@ -633,7 +641,7 @@ def dispatch_result(self, raw_msg):
else:
self.finish_job(idx)
except Exception:
self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
self.log.error("task::Invalid result: %r", raw_msg, exc_info=True)
return

md = msg['metadata']
Expand Down
9 changes: 9 additions & 0 deletions IPython/parallel/tests/clienttest.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def skip_without_names(f, *args, **kwargs):


class ClusterTestCase(BaseZMQTestCase):
timeout = 10

def add_engines(self, n=1, block=True):
"""add multiple engines to our cluster"""
Expand All @@ -126,9 +127,17 @@ def wait_on_engines(self, timeout=5):

assert not len(self.client.ids) < n, "waiting for engines timed out"

def client_wait(self, client, jobs=None, timeout=-1):
"""my wait wrapper, sets a default finite timeout to avoid hangs"""
if timeout < 0:
timeout = self.timeout
return Client.wait(client, jobs, timeout)

def connect_client(self):
"""connect a client with my Context, and track its sockets for cleanup"""
c = Client(profile='iptest', context=self.context)
c.wait = lambda *a, **kw: self.client_wait(c, *a, **kw)

for name in filter(lambda n:n.endswith('socket'), dir(c)):
s = getattr(c, name)
s.setsockopt(zmq.LINGER, 0)
Expand Down
14 changes: 12 additions & 2 deletions IPython/parallel/tests/test_lbview.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,27 @@ def test_abort(self):
self.assertRaises(error.TaskAborted, ar3.get)

def test_retries(self):
self.minimum_engines(3)
view = self.view
view.timeout = 1 # prevent hang if this doesn't behave
def fail():
assert False
for r in range(len(self.client)-1):
with view.temp_flags(retries=r):
self.assertRaisesRemote(AssertionError, view.apply_sync, fail)

with view.temp_flags(retries=len(self.client), timeout=0.25):
with view.temp_flags(retries=len(self.client), timeout=0.1):
self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)

def test_short_timeout(self):
self.minimum_engines(2)
view = self.view
def fail():
import time
time.sleep(0.25)
assert False
with view.temp_flags(retries=1, timeout=0.01):
self.assertRaisesRemote(AssertionError, view.apply_sync, fail)

def test_invalid_dependency(self):
view = self.view
with view.temp_flags(after='12345'):
Expand Down

0 comments on commit 570feaa

Please sign in to comment.