Skip to content
Browse files

Process RPC refactoring and timeout management

  • Loading branch information...
1 parent 41d48fa commit 90805ffd879d5699d3ce94f3c49a0f24febdb8af Michael Meisinger committed Oct 28, 2010
View
4 README.txt
@@ -134,6 +134,10 @@ To compile all code to see if there are Python compile errors anywhere:
Change log:
===========
+2010-10-28:
+- Set RPC default timeout to 15 secs (see ion.config).
+ Use a different secs value in in rpc_send(..., ..., timeout=5)
+
2010-10-05:
- REFACTORING OF BASE CLASSES CONTINUED
- Changed ion.core.base_process.BaseProcess to ion.core.process.process.Process
View
13 ion/core/process/process.py
@@ -28,6 +28,7 @@
CONF = ioninit.config(__name__)
CF_conversation_log = CONF['conversation_log']
CF_fail_fast = CONF['fail_fast']
+CF_rpc_timeout = CONF['rpc_timeout']
# @todo CHANGE: Dict of "name" to process (service) declaration
processes = {}
@@ -297,14 +298,17 @@ def receive(self, payload, msg):
def _receive_rpc(self, payload, msg):
"""
Handling of RPC reply messages.
- @todo: Handle the error case
"""
fromname = payload['sender']
if 'sender-name' in payload:
fromname = payload['sender-name']
log.info('>>> [%s] receive(): RPC reply from [%s] <<<' % (self.proc_name, fromname))
rpc_deferred = self.rpc_conv.pop(payload['conv-id'])
content = payload.get('content', None)
+ if type(rpc_deferred) is str:
+ log.error("Message received after process %s RPC conv-id=%s timed out=%s: %s" % (
+ self.proc_name, payload['conv-id'], rpc_deferred, payload))
+ return
res = (content, payload, msg)
if not type(content) is dict:
log.error('RPC reply is not well formed. Use reply_ok or reply_err')
@@ -390,7 +394,7 @@ def op_none(self, content, headers, msg):
"""
The method called if operation callback operation is not defined
"""
- log.info('Catch message op=%s' % headers.get('op',None))
+ log.error('Process does not define op=%s' % headers.get('op',None))
# --- Outgoing message handling
@@ -404,11 +408,12 @@ def rpc_send(self, recv, operation, content, headers=None, **kwargs):
# Create a new deferred that the caller can yield on to wait for RPC
rpc_deferred = defer.Deferred()
# Timeout handling
- timeout = float(kwargs.get('timeout',0))
+ timeout = float(kwargs.get('timeout', CF_rpc_timeout))
def _timeoutf(d, convid, *args, **kwargs):
- log.info("RPC on conversation %s timed out! "%(convid))
+ log.warn("Process %s RPC conv-id=%s timed out! " % (self.proc_name,convid))
# Remove RPC. Delayed result will go to catch operation
d = self.rpc_conv.pop(convid)
+ self.rpc_conv[convid] = "TIMEOUT:%s" % pu.currenttime_ms()
d.errback(defer.TimeoutError())
if timeout:
rpc_deferred.setTimeout(timeout, _timeoutf, convid)
View
11 ion/core/test/test_baseprocess.py → ion/core/process/test/test_process.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
"""
-@file ion/core/test/test_baseprocess.py
+@file ion/core/process/test/test_baseprocess.py
@author Michael Meisinger
@brief test case for process base class
"""
@@ -223,6 +223,15 @@ def test_shutdown(self):
yield self._shutdown_processes()
+ @defer.inlineCallbacks
+ def test_rpc_timeout(self):
+ sup = self.test_sup
+ try:
+ yield sup.rpc_send('big_void', 'noop', 'arbitrary', timeout=1)
+ self.fail("TimeoutError expected")
+ except defer.TimeoutError, te:
+ log.info('Timeout received')
+
class EchoProcess(Process):
View
2 ion/test/iontest.py
@@ -100,7 +100,7 @@ def _stop_container(self):
bootstrap.reset_container()
# Temporary delay between tests with messaging to allow for complete
# close of connection (BUG: deferred/amqp client/missing yield?)
- yield pu.asleep(0.1)
+ #yield pu.asleep(0.1)
log.info("============ION container closed============")
def _shutdown_processes(self, proc=None):
View
1 res/config/ion.config
@@ -48,6 +48,7 @@
'ion.core.process.process':{
'conversation_log':False,
'fail_fast':True,
+ 'rpc_timeout':15,
},
'ion.data.backends.cassandra':{

0 comments on commit 90805ff

Please sign in to comment.
Something went wrong with that request. Please try again.