Skip to content
Browse files

Process control flow must signal calling gl on reply-by short-circuit

Was causing a deadlock when reply-by already exceeded.
  • Loading branch information...
1 parent 375f229 commit a64602b8b9a7d0232bf767a1a3d81c01141a1bf5 @daf daf committed
Showing with 17 additions and 3 deletions.
  1. +6 −0 pyon/ion/process.py
  2. +11 −3 pyon/ion/test/test_process.py
View
6 pyon/ion/process.py
@@ -11,6 +11,7 @@
from gevent import greenlet, Timeout
from pyon.util.async import spawn
from pyon.core.exception import IonException, ContainerError
+from pyon.core.exception import Timeout as IonTimeout
from pyon.util.containers import get_ion_ts
from pyon.core.bootstrap import CFG
import threading
@@ -306,6 +307,11 @@ def _control_flow(self):
if context is not None and 'reply-by' in context:
if start_proc_time >= int(context['reply-by']):
log.info("control_flow: attempting to process message already exceeding reply-by, ignore")
+
+ # raise a timeout in the calling thread to allow endpoints to continue processing
+ e = IonTimeout("Reply-by time has already occurred (reply-by: %s, op start time: %s)" % (context['reply-by'], start_proc_time))
+ calling_gl.kill(exception=e, block=False)
+
continue
# also check ar if it is set, if it is, that means it is cancelled
View
14 pyon/ion/test/test_process.py
@@ -322,12 +322,20 @@ def test__control_flow_expired_call(self):
p.get_ready_event().wait(timeout=5)
self.addCleanup(p.stop)
+ def make_call(call, ctx, val):
+ ar = p._routing_call(call, ctx, val)
+ return ar.get(timeout=10)
+
ctx = { 'reply-by' : 0 } # no need for real time, as it compares by CURRENT >= this value
futurear = AsyncResult()
- ar = p._routing_call(futurear.set, ctx, sentinel.val)
+ gl = spawn(make_call, futurear.set, ctx, sentinel.val)
+ gl.join(timeout=10)
+
+ # reply-by raises an IonTimeout in the calling greenlet
+ self.assertTrue(hasattr(gl, "exception"))
+ self.assertIsInstance(gl.exception, IonTimeout)
- # ar won't be set, nor will futurear, but we'll be unblocked, so prove we can
- self.assertRaises(Timeout, ar.get, timeout=2)
+ # futurear will never get set
self.assertFalse(futurear.ready())
# put a new call through

0 comments on commit a64602b

Please sign in to comment.
Something went wrong with that request. Please try again.