Permalink
Browse files

BaseProcess refactoring; added second receiver for backend interactio…

…ns; logging improvements
  • Loading branch information...
1 parent 77d4d39 commit ba806341b07b85e37f82d952fbc71335c734de6f Michael Meisinger committed Aug 15, 2010
Showing with 42 additions and 24 deletions.
  1. +7 −0 README.txt
  2. +27 −18 ion/core/base_process.py
  3. +3 −1 ion/core/bootstrap.py
  4. +4 −4 ion/services/base_service.py
  5. +1 −1 ion/util/procutils.py
View
@@ -114,6 +114,13 @@ Other Dependencies
Change log:
===========
+2010-08-14:
+- BaseProcess: added backend receiver, used for sending out any messages
+ from self.send and self.rpc_send. This keeps the message queue for the process
+ frontend separate from the process backend, e.g. for RPC during a message
+ processing.
+- Changed BaseProcess logging to make message send and receive easier to spot.
+
2010-08-06:
- BaseProcess.spawn() now calls init() automatically. No need to call init()
on a process anymore manually. For testing only. Note: The correct way to
@@ -3,7 +3,6 @@
"""
@file ion/core/base_process.py
@author Michael Meisinger
-@author Stephen Pasco
@brief base class for all processes within Magnet
"""
@@ -25,11 +24,7 @@
CONF = ioninit.config(__name__)
CF_conversation_log = CONF['conversation_log']
-# Define the exported public names of this module
-__all__ = ['BaseProcess','ProcessDesc','ProtocolFactory','Message','processes','procRegistry']
-
-# Static store (kvs) to register process instances with names
-# @todo CHANGE
+# @todo CHANGE: Static store (kvs) to register process instances with names
procRegistry = Store()
# @todo HACK: Dict of process "alias" to process declaration
@@ -47,7 +42,7 @@ class BaseProcess(object):
calls, spawning and terminating child processes. Subclasses may use the
plc-* process life cycle events.
"""
- # Conversation ID counter
+ # @todo CHANGE: Conversation ID counter
convIdCnt = 0
def __init__(self, receiver=None, spawnArgs=None, **kwargs):
@@ -57,7 +52,6 @@ def __init__(self, receiver=None, spawnArgs=None, **kwargs):
@param spawnArgs standard and additional spawn arguments
"""
self.proc_state = "NEW"
- self.id = None
spawnArgs = spawnArgs.copy() if spawnArgs else {}
self.spawn_args = spawnArgs
self.proc_init_time = pu.currenttime_ms()
@@ -76,6 +70,14 @@ def __init__(self, receiver=None, spawnArgs=None, **kwargs):
receiver = Receiver(self.proc_name)
self.receiver = receiver
receiver.handle(self.receive)
+ self.id = None
+
+ # We need a second receiver (i.e. messaging queue) for backend
+ # interactions, while processing incoming messages. Otherwise deadlock
+ # because only one message can be consumed before ACK.
+ self.backend_receiver = Receiver(self.proc_name + "_back")
+ self.backend_receiver.handle(self.receive)
+ self.backend_id = None
# Dict of all receivers of this process. Key is the name
self.receivers = {}
@@ -90,7 +92,7 @@ def __init__(self, receiver=None, spawnArgs=None, **kwargs):
# List of ProcessDesc instances of defined and spawned child processes
self.child_procs = []
- logging.info("Process init'd: proc-name=%s, sup-id=%s, sys-name=%s" % (
+ logging.info("NEW Process [%s], sup-id=%s, sys-name=%s" % (
self.proc_name, self.proc_supid, self.sys_name))
def add_receiver(self, receiver):
@@ -107,7 +109,7 @@ def spawn(self):
"""
assert not self.receiver.spawned, "Process already spawned"
self.id = yield spawn(self.receiver)
- logging.debug('spawn()=' + str(self.id))
+ logging.debug('Process spawn(): pid=%s' % (self.id))
yield defer.maybeDeferred(self.plc_spawn)
# Call init right away. This is what you would expect anyways in a
@@ -153,6 +155,8 @@ def op_init(self, content, headers, msg):
self.proc_state = "INIT"
try:
+ self.id = self.receiver.spawned.id
+ self.backend_id = yield spawn(self.backend_receiver)
yield defer.maybeDeferred(self.plc_init)
self.proc_state = "ACTIVE"
logging.info('----- Process %s INIT OK -----' % (self.proc_name))
@@ -173,7 +177,6 @@ def plc_init(self):
"""
Process life cycle event: on initialization of process (once)
"""
- logging.info('BaseProcess.plc_init()')
@defer.inlineCallbacks
def op_shutdown(self, content, headers, msg):
@@ -200,7 +203,6 @@ def plc_shutdown(self):
"""
Process life cycle event: on shutdown of process (once)
"""
- logging.info('BaseProcess.plc_shutdown()')
def receive(self, payload, msg):
"""
@@ -227,14 +229,17 @@ def _receive_rpc(self, payload, msg):
Handling of RPC reply messages.
@TODO: Handle the error case
"""
- logging.info('>>> BaseProcess.receive(): Message received, RPC reply. <<<')
+ fromname = payload['sender']
+ if 'sender-name' in payload:
+ fromname = payload['sender-name']
+ logging.info('>>> [%s] receive(): RPC reply from [%s] <<<' % (self.proc_name, fromname))
d = self.rpc_conv.pop(payload['conv-id'])
content = payload.get('content', None)
res = (content, payload, msg)
if type(content) is dict and content.get('status',None) == 'OK':
pass
elif type(content) is dict and content.get('status',None) == 'ERROR':
- logging.warn('RPC reply is an ERROR')
+ logging.warn('RPC reply is an ERROR: '+str(content.get('value',None)))
else:
logging.error('RPC reply is not well formed. Use reply_ok or reply_err')
# @todo is it OK to ack the response at this point already?
@@ -253,7 +258,10 @@ def _receive_msg(self, payload, msg):
Handling of non-RPC messages. Messages are dispatched according to
message attributes.
"""
- logging.info('>>> BaseProcess.receive(): Message received, dispatching... >>>')
+ fromname = payload['sender']
+ if 'sender-name' in payload:
+ fromname = payload['sender-name']
+ logging.info('#####>>> [%s] receive(): Message from [%s], dispatching... >>>' % (self.proc_name, fromname))
convid = payload.get('conv-id', None)
conv = self.conversations.get(convid, None) if convid else None
# Perform a dispatch of message by operation
@@ -265,7 +273,7 @@ def _cb(res):
logging.debug("<<< ACK msg")
d1 = msg.ack()
def _err(res):
- logging.error("Error in message processing: "+str(res))
+ logging.error("*****Error in message processing: "+str(res)+"*****")
if msg._state == "RECEIVED":
# Only if msg has not been ack/reject/requeued before
logging.debug("<<< ACK msg")
@@ -342,12 +350,13 @@ def send(self, recv, operation, content, headers=None):
Starts a new conversation.
@retval Deferred for send of message
"""
- send = self.receiver.spawned.id.full
+ send = self.backend_id
msgheaders = self._prepare_message(headers)
- return pu.send(self.receiver, send, recv, operation, content, msgheaders)
+ return pu.send(self.backend_receiver, send, recv, operation, content, msgheaders)
def _prepare_message(self, headers):
msgheaders = {}
+ msgheaders['sender-name'] = self.proc_name
if headers:
msgheaders.update(headers)
if not 'conv-id' in msgheaders:
@@ -19,7 +19,7 @@
from ion.core import ioninit, base_process
from ion.core.base_process import BaseProcess, ProcessDesc
from ion.core.cc.modloader import ModuleLoader
-from ion.resources import description_utility
+from ion.resources import description_utility
from ion.services.coi import service_registry
from ion.data.datastore import registry
@@ -105,6 +105,8 @@ def _set_container_args(contargs=None):
ioninit.cont_args[k.strip()] = v.strip()
else:
ioninit.cont_args['args'] = contargs
+ if 'contid' in ioninit.cont_args:
+ Container.id = ioninit.cont_args['contid']
@defer.inlineCallbacks
def declare_messaging(messagingCfg, cgroup=None):
@@ -99,23 +99,23 @@ def slc_init(self):
called once after the receipt of the process init message. Use this to
perform complex, potentially deferred initializations.
"""
- logging.debug('slc_init()')
+ #logging.debug('slc_init()')
def slc_start(self):
"""
Service life cycle event: start of service process. Will be called once
or many times after the slc_init of a service process. At this point,
all service dependencies must be present.
"""
- logging.info('slc_start()')
+ #logging.info('slc_start()')
def slc_stop(self):
"""
Service life cycle event: stop of service process. Will be called to
stop a started service process, and before shutdown. A stopped service
can be restarted again.
"""
- logging.info('slc_stop()')
+ #logging.info('slc_stop()')
def slc_shutdown(self):
"""
@@ -124,7 +124,7 @@ def slc_shutdown(self):
No further asyncronous activities are allowed by the process after
reply from this function.
"""
- logging.info('slc_stop()')
+ #logging.info('slc_shutdown()')
@classmethod
def service_declare(cls, **kwargs):
@@ -47,7 +47,7 @@ def log_message(msg):
body = msg.payload
lstr = ""
procname = str(body.get('receiver',None))
- lstr += "===Message=== receiver=%s op=%s" % (procname, body.get('op', None))
+ lstr += "===Message=== receiver=%s op=%s===" % (procname, body.get('op', None))
if body.get('quiet', False):
lstr += " (Q)"
else:

0 comments on commit ba80634

Please sign in to comment.