Skip to content
Browse files

Added process shutdown support to BaseProcess; refactored BaseProcess…

… members and dependent classes
  • Loading branch information...
1 parent 0dc971d commit f92eeba6948f0186392d9ce23f1261470d10d877 Michael Meisinger committed Jun 2, 2010
View
3 README.txt
@@ -120,6 +120,9 @@ again (see above). Please review the branch logs for any hints.
Change log:
===========
+2010-06-02:
+- BaseProcess self members renamed to conform to PEP8
+- Added process shutdown to BaseProcess
2010-05-25:
- Made Cassandra backend parameterizable with keyspace/colfamily and added
SuperColumn support.
View
226 ion/core/base_process.py
@@ -42,37 +42,36 @@ class BaseProcess(object):
This is the base class for all processes. Processes can be spawned and
have a unique identifier. Each process has one main process receiver and can
define additional receivers as needed. This base class provides a lot of
- mechanics for developing processes, such as sending and receiving messages.
-
- @todo tighter integration with Spawnable
+ mechanics for processes, such as sending and receiving messages, RPC style
+ calls, spawning and terminating child processes. Subclasses may use the
+ plc-* process life cycle events.
"""
# Conversation ID counter
convIdCnt = 0
- def __init__(self, receiver=None, spawnArgs=None):
+ def __init__(self, receiver=None, spawnArgs=None, **kwargs):
"""
Initialize process using an optional receiver and optional spawn args
@param receiver instance of a Receiver for process control
@param spawnArgs standard and additional spawn arguments
"""
- #logging.debug('BaseProcess.__init__()')
- self.procState = "UNINITIALIZED"
+ self.proc_state = "UNINITIALIZED"
spawnArgs = spawnArgs.copy() if spawnArgs else {}
- self.spawnArgs = spawnArgs
- self.init_time = pu.currenttime_ms()
+ self.spawn_args = spawnArgs
+ self.proc_init_time = pu.currenttime_ms()
# Name (human readable label) of this process.
- self.procName = self.spawnArgs.get('proc-name', __name__)
+ self.proc_name = self.spawn_args.get('proc-name', __name__)
# The system unique ID; propagates from root supv to all child procs
sysname = ioninit.cont_args.get('sysname', Container.id)
- self.sysName = self.spawnArgs.get('sys-name', sysname)
+ self.sys_name = self.spawn_args.get('sys-name', sysname)
# The process ID of the supervisor process
- self.procSupId = pu.get_process_id(self.spawnArgs.get('sup-id', None))
+ self.proc_supid = pu.get_process_id(self.spawn_args.get('sup-id', None))
if not receiver:
- receiver = Receiver(self.procName)
+ receiver = Receiver(self.proc_name)
self.receiver = receiver
receiver.handle(self.receive)
@@ -90,7 +89,7 @@ def __init__(self, receiver=None, spawnArgs=None):
self.child_procs = []
logging.info("Process init'd: proc-name=%s, sup-id=%s, sys-name=%s" % (
- self.procName, self.procSupId, self.sysName))
+ self.proc_name, self.proc_supid, self.sys_name))
def add_receiver(self, receiver):
key = receiver.name
@@ -116,48 +115,105 @@ def op_init(self, content, headers, msg):
"""
Init operation, on receive of the init message
"""
- if self.procState == "UNINITIALIZED":
+ if self.proc_state == "UNINITIALIZED":
yield defer.maybeDeferred(self.plc_init)
- logging.info('----- Process %s INITIALIZED -----' % (self.procName))
+ logging.info('----- Process %s INITIALIZED -----' % (self.proc_name))
- yield self.reply_ok(msg)
- self.procState = "INITIALIZED"
+ if msg != None:
+ # msg is None only if called from local process self.init()
+ yield self.reply_ok(msg)
+ self.proc_state = "INITIALIZED"
def plc_init(self):
"""
Process life cycle event: on initialization of process (once)
"""
logging.info('BaseProcess.plc_init()')
+ @defer.inlineCallbacks
+ def op_shutdown(self, content, headers, msg):
+ """
+ Init operation, on receive of the init message
+ """
+ assert self.proc_state == "INITIALIZED", "Process must be initalized"
+
+ if len(self.child_procs) > 0:
+ logging.info("Shutting down child processes")
+ while len(self.child_procs) > 0:
+ child = self.child_procs.pop()
+ res = yield self.shutdown_child(child)
+
+ yield defer.maybeDeferred(self.plc_shutdown)
+ logging.info('----- Process %s TERMINATED -----' % (self.proc_name))
+
+ if msg != None:
+ # msg is None only if called from local process self.shutdown()
+ yield self.reply_ok(msg)
+ self.proc_state = "TERMINATED"
+
+ def plc_shutdown(self):
+ """
+ Process life cycle event: on shutdown of process (once)
+ """
+ logging.info('BaseProcess.plc_shutdown()')
+
def receive(self, payload, msg):
"""
This is the main entry point for received messages. Messages are
- dispatched to operation handling methods. RPC is caught and completed.
+ distinguished into RPC replies (by conversation ID) and other received
+ messages.
"""
# Check if this response is in reply to an RPC call
if 'conv-id' in payload and payload['conv-id'] in self.rpc_conv:
- logging.info('BaseProcess: Received RPC reply. content=' +str(payload['content']))
- d = self.rpc_conv.pop(payload['conv-id'])
- content = payload.get('content', None)
- res = (content, payload, msg)
- # @todo is it OK to ack the response at this point already?
- d1 = msg.ack()
- if d1:
- d1.addCallback(lambda res1: d.callback(res))
- d1.addErrback(lambda c: d.errback(c))
- else:
- # Support for older carrot version where ack did not return deferred
- d.callback(res)
+ self._receive_rpc(payload, msg)
+ else:
+ self._receive_msg(payload, msg)
+
+ def _receive_rpc(self, payload, msg):
+ """
+ Handling of RPC replies.
+ """
+ logging.info('BaseProcess: Received RPC reply. content=' +str(payload['content']))
+ d = self.rpc_conv.pop(payload['conv-id'])
+ content = payload.get('content', None)
+ res = (content, payload, msg)
+ # @todo is it OK to ack the response at this point already?
+ d1 = msg.ack()
+ if d1:
+ d1.addCallback(lambda res1: d.callback(res))
+ d1.addErrback(lambda c: d.errback(c))
+ return d1
else:
- logging.info('BaseProcess: Message received, dispatching...')
- convid = payload.get('conv-id', None)
- conv = self.conversations.get(convid, None) if convid else None
- # Perform a dispatch of message by operation
- d = pu.dispatch_message(payload, msg, self, conv)
- def _cb(res):
- logging.info("ACK msg")
- d1 = msg.ack()
- d.addCallbacks(_cb, logging.error)
+ # Support for older carrot version where ack did not return deferred
+ d.callback(res)
+ return d
+
+ def _receive_msg(self, payload, msg):
+ """
+ Handling of non-RPC messages. Messages are dispatched according to
+ message attributes.
+ """
+ logging.info('BaseProcess: Message received, dispatching...')
+ convid = payload.get('conv-id', None)
+ conv = self.conversations.get(convid, None) if convid else None
+ # Perform a dispatch of message by operation
+ d = self._dispatch_message(payload, msg, self, conv)
+ def _cb(res):
+ logging.info("ACK msg")
+ d1 = msg.ack()
+ d.addCallbacks(_cb, logging.error)
+ return d
+
+ def _dispatch_message(self, payload, msg, target, conv):
+ """
+ Dispatch of messages to operations within this process instance. The
+ default behavior is to dispatch to 'op_*' functions, where * is the
+ 'op' message attribute.
+ @retval deferred
+ """
+ assert payload['op'] == 'init' or self.proc_state == "INITIALIZED"
+ d = pu.dispatch_message(payload, msg, target, conv)
+ return d
def op_none(self, content, headers, msg):
"""
@@ -277,7 +333,7 @@ def get_scoped_name(self, scope, name):
if scope == 'local':
scoped_name = str(Container.id) + "." + name
elif scope == 'system':
- scoped_name = self.sysName + "." + name
+ scoped_name = self.sys_name + "." + name
elif scope == 'global':
pass
else:
@@ -299,7 +355,7 @@ def spawn_child(self, childproc, init=True):
assert not childproc in self.child_procs
self.child_procs.append(childproc)
child_id = yield childproc.spawn(self)
- yield procRegistry.put(str(childproc.procName), str(child_id))
+ yield procRegistry.put(str(childproc.proc_name), str(child_id))
if init:
yield childproc.init()
defer.returnValue(child_id)
@@ -310,20 +366,40 @@ def link_child(self, supervisor):
def spawn_link(self, childproc, supervisor):
pass
+ def shutdown_child(self, childproc):
+ return childproc.shutdown()
+
def get_child_def(self, name):
"""
@retval the ProcessDesc instance of a child process by name
"""
for child in self.child_procs:
- if child.procName == name:
+ if child.proc_name == name:
return child
def get_child_id(self, name):
"""
@retval the process id a child process by name
"""
child = self.get_child_def(name)
- return child.procId if child else None
+ return child.proc_id if child else None
+
+ def init(self):
+ """
+ Initializes this process instance. Typically a call should not be
+ necessary because the init message is received from the supervisor
+ process. It may be necessary for the root supervisor and for test
+ processes.
+ @retval Deferred
+ """
+ return self.op_init(None, None, None)
+
+ def shutdown(self):
+ """
+ Recursivey terminates all child processes and then itself.
+ @retval Deferred
+ """
+ return self.op_shutdown(None, None, None)
class ProcessDesc(object):
"""
@@ -339,55 +415,65 @@ def __init__(self, **kwargs):
@param node ID of container to spawn process on (optional)
@param spawnargs dict of additional spawn arguments (optional)
"""
- self.procName = kwargs.get('name', None)
- self.procModule = kwargs.get('module', None)
- self.procClass = kwargs.get('class', kwargs.get('procclass', None))
- self.procNode = kwargs.get('node', None)
- self.spawnArgs = kwargs.get('spawnargs', None)
- self.procId = None
- self.procState = 'DEFINED'
+ self.proc_name = kwargs.get('name', None)
+ self.proc_module = kwargs.get('module', None)
+ self.proc_class = kwargs.get('class', kwargs.get('procclass', None))
+ self.proc_node = kwargs.get('node', None)
+ self.spawn_args = kwargs.get('spawnargs', None)
+ self.proc_id = None
+ self.proc_state = 'DEFINED'
@defer.inlineCallbacks
def spawn(self, supProc=None):
"""
Spawns this process description with the initialized attributes.
@param supProc the process instance that should be set as supervisor
"""
- assert self.procState == 'DEFINED', "Cannot spawn process twice"
- self.supProcess = supProc
- if self.procNode == None:
- logging.info('Spawning name=%s node=%s' % (self.procName, self.procNode))
+ assert self.proc_state == 'DEFINED', "Cannot spawn process twice"
+ self.sup_process = supProc
+ if self.proc_node == None:
+ logging.info('Spawning name=%s node=%s' %
+ (self.proc_name, self.proc_node))
# Importing service module
- proc_mod = pu.get_module(self.procModule)
- self.procModObj = proc_mod
+ proc_mod = pu.get_module(self.proc_module)
+ self.proc_mod_obj = proc_mod
# Spawn instance of a process
# During spawn, the supervisor process id, system name and proc name
# get provided as spawn args, in addition to any give spawn args.
- spawnargs = {'proc-name':self.procName,
- 'sup-id':self.supProcess.receiver.spawned.id.full,
- 'sys-name':self.supProcess.sysName}
- if self.spawnArgs:
- spawnargs.update(self.spawnArgs)
- #logging.debug("spawn(%s, args=%s)" % (self.procModule, spawnargs))
+ spawnargs = {'proc-name':self.proc_name,
+ 'sup-id':self.sup_process.receiver.spawned.id.full,
+ 'sys-name':self.sup_process.sys_name}
+ if self.spawn_args:
+ spawnargs.update(self.spawn_args)
+ #logging.debug("spawn(%s, args=%s)" % (self.proc_module, spawnargs))
proc_id = yield spawn(proc_mod, None, spawnargs)
- self.procId = proc_id
- self.procState = 'SPAWNED'
+ self.proc_id = proc_id
+ self.proc_state = 'SPAWNED'
- #logging.info("Process "+self.procClass+" ID: "+str(proc_id))
+ #logging.info("Process "+self.proc_class+" ID: "+str(proc_id))
else:
- logging.error('Cannot spawn '+self.procClass+' on node='+str(self.procNode))
- defer.returnValue(self.procId)
+ logging.error('Cannot spawn '+self.proc_class+' on node='+str(self.proc_node))
+ defer.returnValue(self.proc_id)
@defer.inlineCallbacks
def init(self):
- (content, headers, msg) = yield self.supProcess.rpc_send(self.procId,
+ (content, headers, msg) = yield self.sup_process.rpc_send(self.proc_id,
'init', {}, {'quiet':True})
if content.get('status','ERROR') == 'OK':
- self.procState = 'INIT_OK'
+ self.proc_state = 'INIT_OK'
+ else:
+ self.proc_state = 'INIT_ERROR'
+
+ @defer.inlineCallbacks
+ def shutdown(self):
+ (content, headers, msg) = yield self.sup_process.rpc_send(self.proc_id,
+ 'shutdown', {}, {'quiet':True})
+ if content.get('status','ERROR') == 'OK':
+ self.proc_state = 'TERMINATED'
else:
- self.procState = 'INIT_ERROR'
+ self.proc_state = 'SHUTDOWN_ERROR'
class ProtocolFactory(ProtocolFactory):
View
41 ion/core/bootstrap.py
@@ -127,27 +127,15 @@ def spawn_processes(procs, sup=None):
Spawns a set of processes.
@param procs list of processes (as description dict) to start up
@param sup spawned BaseProcess instance acting as supervisor
- @retval supervisor BaseProcess instance
+ @retval Deferred, for supervisor BaseProcess instance
"""
- global sup_seq
children = []
for procDef in procs:
child = ProcessDesc(**procDef)
children.append(child)
- if not sup:
- # Makes the boostrap a process
- logging.info("Spawning supervisor")
- if sup_seq == 0:
- supname = "bootstrap"
- else:
- supname = "supervisor."+str(sup_seq)
- suprec = base_process.factory.build({'proc-name':supname})
- sup = suprec.procinst
- sup.receiver.group = supname
- supId = yield sup.spawn()
- yield base_process.procRegistry.put(supname, str(supId))
- sup_seq += 1
+ if sup == None:
+ sup = yield create_supervisor()
logging.info("Spawning child processes")
for child in children:
@@ -158,6 +146,29 @@ def spawn_processes(procs, sup=None):
defer.returnValue(sup)
@defer.inlineCallbacks
+def create_supervisor():
+ """
+ Creates a supervisor process
+ @retval Deferred, for supervisor BaseProcess instance
+ """
+ global sup_seq
+ # Makes the boostrap a process
+ logging.info("Spawning supervisor")
+ if sup_seq == 0:
+ supname = "bootstrap"
+ else:
+ supname = "supervisor."+str(sup_seq)
+ suprec = base_process.factory.build({'proc-name':supname})
+ sup = suprec.procinst
+ sup.receiver.group = supname
+ supId = yield sup.spawn()
+ yield sup.init()
+ yield base_process.procRegistry.put(supname, str(supId))
+ sup_seq += 1
+ defer.returnValue(sup)
+
+
+@defer.inlineCallbacks
def bs_register_services():
"""
Register all the declared processes.
View
95 ion/core/test/test_baseprocess.py
@@ -17,9 +17,9 @@
from magnet.spawnable import spawn
from ion.core import ioninit
-from ion.core.base_process import BaseProcess
+from ion.core.base_process import BaseProcess, ProcessDesc, ProtocolFactory
-from ion.test.iontest import IonTestCase
+from ion.test.iontest import IonTestCase, ReceiverProcess
import ion.util.procutils as pu
class BaseProcessTest(IonTestCase):
@@ -34,32 +34,91 @@ def setUp(self):
@defer.inlineCallbacks
def tearDown(self):
yield self._stop_container()
-
+
@defer.inlineCallbacks
- def test_process(self):
+ def test_process_basics(self):
p1 = BaseProcess()
self.assertTrue(p1.receiver)
-
+
rec = Receiver("myname")
p2 = BaseProcess(rec)
-
+
args = {'arg1':'value1','arg2':{}}
p3 = BaseProcess(None, args)
- self.assertEquals(p3.spawnArgs, args)
+ self.assertEquals(p3.spawn_args, args)
pid1 = yield p1.spawn()
-
- tp = EchoProcess()
- pid2 = yield tp.spawn()
-
- yield p1.rpc_send(pid2,'test','content')
-
+
+
@defer.inlineCallbacks
- def _test_rpc(self):
- pass
-
+ def test_child_processes(self):
+ p1 = BaseProcess()
+ pid1 = yield p1.spawn()
+ yield p1.init()
+
+ child = ProcessDesc(name='echo', module='ion.core.test.test_baseprocess')
+ pid2 = yield p1.spawn_child(child)
+
+ (cont,hdrs,msg) = yield p1.rpc_send(pid2,'echo','content123')
+ self.assertEquals(cont['value'], 'content123')
+
+ yield p1.shutdown()
+
+ @defer.inlineCallbacks
+ def test_process(self):
+ # Also test the ReceiverProcess helper class
+ p1 = ReceiverProcess()
+ pid1 = yield p1.spawn()
+ yield p1.init()
+
+ processes = [
+ {'name':'echo','module':'ion.core.test.test_baseprocess','class':'EchoProcess'},
+ ]
+ sup = yield self._spawn_processes(processes, sup=p1)
+ assert sup == p1
+
+ pid2 = p1.get_child_id('echo')
+
+ yield p1.send(pid2, 'echo','content123')
+ logging.info('Sent echo message')
+
+ msg = yield p1.await_message()
+ logging.info('Received echo message')
+
+ self.assertEquals(msg.payload['op'], 'result')
+ self.assertEquals(msg.payload['content']['value'], 'content123')
+
+ yield sup.shutdown()
+
+ @defer.inlineCallbacks
+ def test_shutdown(self):
+ processes = [
+ {'name':'echo1','module':'ion.core.test.test_baseprocess','class':'EchoProcess'},
+ {'name':'echo2','module':'ion.core.test.test_baseprocess','class':'EchoProcess'},
+ {'name':'echo3','module':'ion.core.test.test_baseprocess','class':'EchoProcess'},
+ ]
+ sup = yield self._spawn_processes(processes)
+
+ yield self._shutdown_processes()
+
+
class EchoProcess(BaseProcess):
@defer.inlineCallbacks
- def op_test(self, content, headers, msg):
+ def op_echo(self, content, headers, msg):
+ logging.info("Message received: "+str(content))
+ yield self.reply_ok(msg, content)
+
+ @defer.inlineCallbacks
+ def op_echofail1(self, content, headers, msg):
logging.info("Message received: "+str(content))
- yield self.reply(msg,'result',content,{})
+ ex = RuntimeError("I'm supposed to fail")
+ yield self.reply_err(msg, ex)
+
+ @defer.inlineCallbacks
+ def op_echofail2(self, content, headers, msg):
+ logging.info("Message received: "+str(content))
+ raise RuntimeError("I'm supposed to fail")
+ yield self.reply_ok(msg, content)
+
+# Spawn of the process using the module name
+factory = ProtocolFactory(EchoProcess)
View
6 ion/core/worker.py
@@ -23,8 +23,8 @@ class WorkerProcess(BaseService):
@defer.inlineCallbacks
def slc_init(self):
- msg_name = self.spawnArgs['service-name']
- scope = self.spawnArgs['scope']
+ msg_name = self.spawn_args['service-name']
+ scope = self.spawn_args['scope']
logging.info("slc_init name received:"+msg_name)
msg_name1 = self.get_scoped_name(scope, msg_name)
logging.info("slc_init name used:"+msg_name1)
@@ -43,7 +43,7 @@ def op_work(self, content, headers, msg):
@defer.inlineCallbacks
def _work(self,content):
- myid = self.procName + ":" + self.receiver.spawned.id.local
+ myid = self.proc_name + ":" + self.receiver.spawned.id.local
workid = str(content['work-id'])
waittime = float(content['work'])
logging.info("worker="+myid+" job="+workid+" work="+str(waittime))
View
83 ion/services/base_service.py
@@ -3,7 +3,7 @@
"""
@file ion/services/base_service.py
@author Michael Meisinger
-@brief base classes for all service interfaces, and clients.
+@brief base classes for all service processes and clients.
"""
import logging
@@ -19,25 +19,36 @@
class BaseService(BaseProcess):
"""
This is the superclass for all service processes. A service process is a
- Capability Container process that can be spawned anywhere in the network
- and that provides a service under a defined service name. The service
- subclass must have declaration with service name and dependencies.
+ Capability Container process that can be spawned anywhere in the network
+ and that provides a service under a defined service name (message queue).
+ The service subclass must have declaration with defaule service name,
+ version identifier and dependencies.
"""
+ # Service declaration, to be set by the subclass
declare = {}
def __init__(self, receiver=None, spawnArgs=None):
"""
- Initializes base service. The service name is taken from the service
- declaration
+ Initializes base service. The default service name is taken from the
+ service declaration, a different service name can be provided in the
+ spawnargs using the 'servicename' attribute. The service name, in its
+ qualified form prefixed by the system name is the public name of the
+ service inbound queue that is shared among all service processes with
+ the same name
"""
BaseProcess.__init__(self, receiver, spawnArgs)
- # Determine service known messging name either from spawn args or
- # if not given from service declaration
- self.svc_name = self.spawnArgs.get('servicename', self.declare['name'])
+ # Determine public service messaging name either from spawn args or
+ # use default name from service declaration
+ #default_svcname = self.declare['name'] + '_' + self.declare['version']
+ default_svcname = self.declare['name']
+ self.svc_name = self.spawn_args.get('servicename', default_svcname)
assert self.svc_name, "Service must have a declare with a valid name"
+ # Scope (prefix) the service name with the system name
msgName = self.get_scoped_name('system', self.svc_name)
+
+ # Create a receiver (inbound queue consumer) for service name
svcReceiver = Receiver(self.svc_name+'.'+self.receiver.label, msgName)
if hasattr(self.receiver, 'group'):
svcReceiver.group = self.receiver.group
@@ -46,11 +57,28 @@ def __init__(self, receiver=None, spawnArgs=None):
self.add_receiver(self.svc_receiver)
@defer.inlineCallbacks
+ def op_start(self, content, headers, msg):
+ """
+ Start service operation, on receive of a start message
+ """
+ yield defer.maybeDeferred(self.slc_start)
+ yield self.reply_ok(msg)
+
+ @defer.inlineCallbacks
+ def op_stop(self, content, headers, msg):
+ """
+ Stop service operation, on receive of a stop message
+ """
+ yield defer.maybeDeferred(self.slc_stop)
+ yield self.reply_ok(msg)
+
+ @defer.inlineCallbacks
def plc_init(self):
yield self._declare_service_name()
svcid = yield spawn(self.svc_receiver)
logging.info('Service process bound to name=%s as pid=%s' % (self.svc_receiver.name, svcid))
yield defer.maybeDeferred(self.slc_init)
+ yield defer.maybeDeferred(self.slc_start)
@defer.inlineCallbacks
def _declare_service_name(self):
@@ -59,20 +87,43 @@ def _declare_service_name(self):
messaging = {'name_type':'worker', 'args':{'scope':'system'}}
yield Container.configure_messaging(msgName, messaging)
+ @defer.inlineCallbacks
+ def plc_shutdown(self):
+ yield defer.maybeDeferred(self.slc_stop)
+ yield defer.maybeDeferred(self.slc_shutdown)
+
def slc_init(self):
"""
Service life cycle event: initialization of service process. This is
- called once after the receipt of the process init message.
+ called once after the receipt of the process init message. Use this to
+ perform complex, potentially deferred initializations.
"""
logging.debug('slc_init()')
- @classmethod
- def _add_messages(cls):
- pass
+ def slc_start(self):
+ """
+ Service life cycle event: start of service process. Will be called once
+ or many times after the slc_init of a service process. At this point,
+ all service dependencies must be present.
+ """
+ logging.info('slc_start()')
- @classmethod
- def _add_conv_type(cls):
- pass
+ def slc_stop(self):
+ """
+ Service life cycle event: stop of service process. Will be called to
+ stop a started service process, and before shutdown. A stopped service
+ can be restarted again.
+ """
+ logging.info('slc_stop()')
+
+ def slc_shutdown(self):
+ """
+ Service life cycle event: final shutdown of service process. Will be
+ called after a slc_stop before the actual termination of the process.
+ No further asyncronous activities are allowed by the process after
+ reply from this function.
+ """
+ logging.info('slc_stop()')
@classmethod
def service_declare(cls, **kwargs):
View
4 ion/services/coi/attributestore.py
@@ -29,8 +29,8 @@ class AttributeStoreService(BaseService):
@defer.inlineCallbacks
def slc_init(self):
# use spawn args to determine backend class, second config file
- backendcls = self.spawnArgs.get('backend_class', CONF.getValue('backend_class', None))
- backendargs = self.spawnArgs.get('backend_args', CONF.getValue('backend_args', {}))
+ backendcls = self.spawn_args.get('backend_class', CONF.getValue('backend_class', None))
+ backendargs = self.spawn_args.get('backend_args', CONF.getValue('backend_args', {}))
if backendcls:
self.backend = pu.get_class(backendcls)
else:
View
4 ion/services/coi/datastore.py
@@ -31,8 +31,8 @@ class DatastoreService(BaseService):
@defer.inlineCallbacks
def slc_init(self):
# use spawn args to determine backend class, second config file
- backendcls = self.spawnArgs.get('backend_class', CONF.getValue('backend_class', None))
- backendargs = self.spawnArgs.get('backend_args', CONF.getValue('backend_args', {}))
+ backendcls = self.spawn_args.get('backend_class', CONF.getValue('backend_class', None))
+ backendargs = self.spawn_args.get('backend_args', CONF.getValue('backend_args', {}))
if backendcls:
self.backend = pu.get_class(backendcls)
else:
View
47 ion/test/iontest.py
@@ -15,6 +15,7 @@
from ion.core import base_process, bootstrap, ioninit
from ion.core import ioninit
+from ion.core.base_process import BaseProcess
from ion.data.store import Store
import ion.util.procutils as pu
@@ -47,11 +48,13 @@ def _start_container(self):
self.cont_conn = yield container.startContainer(mopt)
bootstrap.init_container()
self.procRegistry = base_process.procRegistry
+ self.test_sup = yield bootstrap.create_supervisor()
logging.info("============Magnet container started, "+repr(self.cont_conn))
@defer.inlineCallbacks
def _start_core_services(self):
- sup = yield bootstrap.bootstrap(None, bootstrap.ion_core_services)
+ sup = yield bootstrap.spawn_processes(bootstrap.ion_core_services,
+ self.test_sup)
logging.info("============Core ION services started============")
defer.returnValue(sup)
@@ -61,6 +64,7 @@ def _stop_container(self):
reinitialization.
"""
logging.info("Closing ION container")
+ self.test_sup = None
dcs = reactor.getDelayedCalls()
logging.info("Cancelling %s delayed reactor calls!" % len(dcs))
for dc in dcs:
@@ -71,16 +75,53 @@ def _stop_container(self):
bootstrap.reset_container()
logging.info("============ION container closed============")
+ def _shutdown_processes(self, proc=None):
+ """
+ Shuts down spawned test processes.
+ """
+ if proc:
+ return proc.shutdown()
+ else:
+ return self.test_sup.shutdown()
def _declare_messaging(self, messaging):
return bootstrap.declare_messaging(messaging)
- def _spawn_processes(self, procs):
- return bootstrap.spawn_processes(procs)
+ def _spawn_processes(self, procs, sup=None):
+ sup = sup if sup else self.test_sup
+ return bootstrap.spawn_processes(procs, sup)
def _get_procid(self, name):
"""
@param name process instance label given when spawning
@retval process id of the process (locally) identified by name
"""
return self.procRegistry.get(name)
+
+
+class ReceiverProcess(BaseProcess):
+ """
+ A simple process that can send messages and tracks all received
+ messages
+ """
+ def __init__(self, *args, **kwargs):
+ BaseProcess.__init__(self, *args, **kwargs)
+ self.inbox = defer.DeferredQueue()
+
+ def _dispatch_message(self, payload, msg, target, conv):
+ """
+ Dispatch of messages to operations within this process instance. The
+ default behavior is to dispatch to 'op_*' functions, where * is the
+ 'op' message attribute.
+ @retval deferred
+ """
+ logging.info('ReceiverProcess: Received message op=%s from sender=%s' %
+ (msg.payload['op'], msg.payload['sender']))
+ self.inbox.put(msg)
+ return defer.succeed(True)
+
+ def await_message(self):
+ """
+ @retval Deferred for arriving message
+ """
+ return self.inbox.get()

0 comments on commit f92eeba

Please sign in to comment.
Something went wrong with that request. Please try again.