Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Process control flow must signal calling gl on reply-by short-circuit #215

Merged
merged 1 commit into from

1 participant

@daf
daf commented

Was causing a deadlock when reply-by already exceeded. Fixes OOIION-648

@daf daf Process control flow must signal calling gl on reply-by short-circuit
Was causing a deadlock when reply-by already exceeded.
a64602b
@daf daf merged commit 8c94b41 into from
@daf daf deleted the branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 4, 2013
  1. @daf

    Process control flow must signal calling gl on reply-by short-circuit

    daf authored
    Was causing a deadlock when reply-by already exceeded.
This page is out of date. Refresh to see the latest.
Showing with 17 additions and 3 deletions.
  1. +6 −0 pyon/ion/process.py
  2. +11 −3 pyon/ion/test/test_process.py
View
6 pyon/ion/process.py
@@ -11,6 +11,7 @@
from gevent import greenlet, Timeout
from pyon.util.async import spawn
from pyon.core.exception import IonException, ContainerError
+from pyon.core.exception import Timeout as IonTimeout
from pyon.util.containers import get_ion_ts
from pyon.core.bootstrap import CFG
import threading
@@ -306,6 +307,11 @@ def _control_flow(self):
if context is not None and 'reply-by' in context:
if start_proc_time >= int(context['reply-by']):
log.info("control_flow: attempting to process message already exceeding reply-by, ignore")
+
+ # raise a timeout in the calling thread to allow endpoints to continue processing
+ e = IonTimeout("Reply-by time has already occurred (reply-by: %s, op start time: %s)" % (context['reply-by'], start_proc_time))
+ calling_gl.kill(exception=e, block=False)
+
continue
# also check ar if it is set, if it is, that means it is cancelled
View
14 pyon/ion/test/test_process.py
@@ -322,12 +322,20 @@ def test__control_flow_expired_call(self):
p.get_ready_event().wait(timeout=5)
self.addCleanup(p.stop)
+ def make_call(call, ctx, val):
+ ar = p._routing_call(call, ctx, val)
+ return ar.get(timeout=10)
+
ctx = { 'reply-by' : 0 } # no need for real time, as it compares by CURRENT >= this value
futurear = AsyncResult()
- ar = p._routing_call(futurear.set, ctx, sentinel.val)
+ gl = spawn(make_call, futurear.set, ctx, sentinel.val)
+ gl.join(timeout=10)
+
+ # reply-by raises an IonTimeout in the calling greenlet
+ self.assertTrue(hasattr(gl, "exception"))
+ self.assertIsInstance(gl.exception, IonTimeout)
- # ar won't be set, nor will futurear, but we'll be unblocked, so prove we can
- self.assertRaises(Timeout, ar.get, timeout=2)
+ # futurear will never get set
self.assertFalse(futurear.ready())
# put a new call through
Something went wrong with that request. Please try again.