Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fixed BaseProcess: states, message dispatching, error handling

- Changed BaseProcess state labels
- Allowed messages to be received in INIT state, with warning
- spawn() now calls init(). Removed this init from all other modules
- Significantly enhanced test_baseprocess
- Exceptions in message dispatching now lead to a reply with status=ERROR
  • Loading branch information...
commit 911af587f0c31acdac9ac411724ff2eadcd7a007 1 parent ea45e76
@mmeisinger mmeisinger authored
View
8 README.txt
@@ -111,6 +111,14 @@ Other Dependencies
Change log:
===========
+2010-08-06:
+- BaseProcess.spawn() now calls init() automatically. No need to call init()
+ on a process anymore manually. For testing only. Note: The correct way to
+ spawn a process is through a parent process with spawn_child()
+- Modified and fixed the BaseProcess states, when receiving messages
+- MAJOR update to BaseProcess message dispatching and subsequent error handling.
+ On error, reply_err messages are sent back, if reply-to header set.
+
2010-08-03:
- Added ant build.xml file to LCAarch root dir. Start with ant.
Supports ant clean, which removes all *.pyc from ion path.
View
16 ion/agents/instrumentagents/test/test_SBE49.py
@@ -14,23 +14,22 @@
from ion.core import bootstrap
class TestSBE49(IonTestCase):
-
-
+
+
@defer.inlineCallbacks
def setUp(self):
yield self._start_container()
-
+
self.sup = yield bootstrap.create_supervisor()
self.driver = SBE49InstrumentDriver()
self.driver_pid = yield self.driver.spawn()
- yield self.driver.init()
self.driver_client = SBE49InstrumentDriverClient(proc=self.sup,
target=self.driver_pid)
-
+
@defer.inlineCallbacks
def tearDown(self):
yield self._stop_container()
-
+
@defer.inlineCallbacks
def test_driver_load(self):
config_vals = {'addr':'127.0.0.1', 'port':'9000'}
@@ -39,7 +38,7 @@ def test_driver_load(self):
self.assertEqual(result['addr'], config_vals['addr'])
self.assertEqual(result['port'], config_vals['port'])
-
+
@defer.inlineCallbacks
def test_fetch_set(self):
params = {'baudrate':'19200', 'outputsal':'N'}
@@ -56,7 +55,7 @@ def test_fetch_set(self):
self.assertEqual(result['status'], 'OK')
self.assertEqual(result['baudrate'], params['baudrate'])
self.assertEqual(result['outputsal'], params['outputsal'])
-
+
@defer.inlineCallbacks
def test_execute(self):
"""
@@ -68,4 +67,3 @@ def test_execute(self):
self.assertEqual(result['status'], 'OK')
result = yield self.driver_client.execute(cmd2)
self.assertEqual(result['status'], 'OK')
-
View
148 ion/core/base_process.py
@@ -56,7 +56,8 @@ def __init__(self, receiver=None, spawnArgs=None, **kwargs):
@param receiver instance of a Receiver for process control
@param spawnArgs standard and additional spawn arguments
"""
- self.proc_state = "UNINITIALIZED"
+ self.proc_state = "NEW"
+ self.id = None
spawnArgs = spawnArgs.copy() if spawnArgs else {}
self.spawn_args = spawnArgs
self.proc_init_time = pu.currenttime_ms()
@@ -99,8 +100,8 @@ def add_receiver(self, receiver):
@defer.inlineCallbacks
def spawn(self):
"""
- Spawns this process using the process' receiver. Self spawn can
- only be called once per instance.
+ Spawns this process using the process' receiver and initializes it in
+ the same call. Self spawn can only be called once per instance.
@note this method is not called when spawned through magnet. This makes
it tricky to do consistent initialization on spawn.
"""
@@ -108,8 +109,27 @@ def spawn(self):
self.id = yield spawn(self.receiver)
logging.debug('spawn()=' + str(self.id))
yield defer.maybeDeferred(self.plc_spawn)
+
+ # Call init right away. This is what you would expect anyways in a
+ # container executed spawn
+ yield self.init()
+
defer.returnValue(self.id)
+ def init(self):
+ """
+ DO NOT CALL. Automatically called by spawn().
+ Initializes this process instance. Typically a call should not be
+ necessary because the init message is received from the supervisor
+ process. It may be necessary for the root supervisor and for test
+ processes.
+ @retval Deferred
+ """
+ if self.proc_state == "NEW":
+ return self.op_init(None, None, None)
+ else:
+ return defer.succeed(None)
+
def plc_spawn(self):
"""
Process life cycle event: on spawn of process (once)
@@ -123,14 +143,31 @@ def op_init(self, content, headers, msg):
"""
Init operation, on receive of the init message
"""
- if self.proc_state == "UNINITIALIZED":
- yield defer.maybeDeferred(self.plc_init)
- logging.info('----- Process %s INITIALIZED -----' % (self.proc_name))
-
- if msg != None:
- # msg is None only if called from local process self.init()
- yield self.reply_ok(msg)
- self.proc_state = "INITIALIZED"
+ if self.proc_state == "NEW":
+ # @TODO: Right after giving control to the process specific init,
+ # the process can enable message consumption and messages can be
+ # received. How to deal with the situation that the process is not
+ # fully initialized yet???? Stop message floodgate until init'd?
+
+ # Change state from NEW early, to prevent consistenct probs.
+ self.proc_state = "INIT"
+
+ try:
+ yield defer.maybeDeferred(self.plc_init)
+ self.proc_state = "ACTIVE"
+ logging.info('----- Process %s INIT OK -----' % (self.proc_name))
+ if msg != None:
+ # msg is None only if called from local process self.init()
+ yield self.reply_ok(msg)
+ except Exception, ex:
+ self.proc_state = "ERROR"
+ logging.exception('----- Process %s INIT ERROR -----' % (self.proc_name))
+ if msg != None:
+ # msg is None only if called from local process self.init()
+ yield self.reply_err(msg, "Process %s INIT ERROR" % (self.proc_name) + str(ex))
+ else:
+ self.proc_state = "ERROR"
+ logging.error('Process %s in wrong state %s for op_init' % (self.proc_name, self.proc_state))
def plc_init(self):
"""
@@ -143,7 +180,7 @@ def op_shutdown(self, content, headers, msg):
"""
Init operation, on receive of the init message
"""
- assert self.proc_state == "INITIALIZED", "Process must be initalized"
+ assert self.proc_state == "ACTIVE", "Process not initalized"
if len(self.child_procs) > 0:
logging.info("Shutting down child processes")
@@ -167,24 +204,39 @@ def plc_shutdown(self):
def receive(self, payload, msg):
"""
- This is the main entry point for received messages. Messages are
- distinguished into RPC replies (by conversation ID) and other received
+ This is the first and MAIN entry point for received messages. Messages are
+ separated into RPC replies (by conversation ID) and other received
messages.
"""
- # Check if this response is in reply to an RPC call
- if 'conv-id' in payload and payload['conv-id'] in self.rpc_conv:
- self._receive_rpc(payload, msg)
- else:
- self._receive_msg(payload, msg)
+ try:
+ # Check if this response is in reply to an outstanding RPC call
+ if 'conv-id' in payload and payload['conv-id'] in self.rpc_conv:
+ d = self._receive_rpc(payload, msg)
+ else:
+ d = self._receive_msg(payload, msg)
+ except Exception, ex:
+ # Unexpected error condition in message processing (only before
+ # any callback is called)
+ logging.exception('Error in process %s receive ' % self.proc_name)
+ # @TODO: There was an error and now what??
+ if msg and msg.payload['reply-to']:
+ d = self.reply_err(msg, 'ERROR in process receive(): '+str(ex))
def _receive_rpc(self, payload, msg):
"""
- Handling of RPC replies.
+ Handling of RPC reply messages.
+ @TODO: Handle the error case
"""
- logging.info('BaseProcess: Received RPC reply.')
+ logging.info('>>> BaseProcess.receive(): Message received, RPC reply. <<<')
d = self.rpc_conv.pop(payload['conv-id'])
content = payload.get('content', None)
res = (content, payload, msg)
+ if type(content) is dict and content.get('status',None) == 'OK':
+ pass
+ elif type(content) is dict and content.get('status',None) == 'ERROR':
+ logging.warn('RPC reply is an ERROR')
+ else:
+ logging.error('RPC reply is not well formed. Use reply_ok or reply_err')
# @todo is it OK to ack the response at this point already?
d1 = msg.ack()
if d1:
@@ -201,15 +253,27 @@ def _receive_msg(self, payload, msg):
Handling of non-RPC messages. Messages are dispatched according to
message attributes.
"""
- logging.info('BaseProcess: Message received, dispatching...')
+ logging.info('>>> BaseProcess.receive(): Message received, dispatching... >>>')
convid = payload.get('conv-id', None)
conv = self.conversations.get(convid, None) if convid else None
# Perform a dispatch of message by operation
+ # @todo: Handle failure case. Message reject?
d = self._dispatch_message(payload, msg, self, conv)
def _cb(res):
- logging.info("ACK msg")
- d1 = msg.ack()
- d.addCallbacks(_cb, logging.error)
+ if msg._state == "RECEIVED":
+ # Only if msg has not been ack/reject/requeued before
+ logging.debug("<<< ACK msg")
+ d1 = msg.ack()
+ def _err(res):
+ logging.error("Error in message processing: "+str(res))
+ if msg._state == "RECEIVED":
+ # Only if msg has not been ack/reject/requeued before
+ logging.debug("<<< ACK msg")
+ d1 = msg.ack()
+ # @todo Should we send an err or rather reject the msg?
+ if msg and msg.payload['reply-to']:
+ d2 = self.reply_err(msg, 'ERROR in process receive(): '+str(res))
+ d.addCallbacks(_cb, _err)
return d
def _dispatch_message(self, payload, msg, target, conv):
@@ -220,10 +284,26 @@ def _dispatch_message(self, payload, msg, target, conv):
@retval deferred
"""
#@BUG Added hack to handle messages from plc_init in cc_agent!
- #assert payload['op'] == 'init' or self.proc_state == "INITIALIZED"
- assert payload['op'] == 'init' or self.proc_state == "INITIALIZED" or (payload['op'] == 'identify' and payload['content']=='started')
- d = pu.dispatch_message(payload, msg, target, conv)
- return d
+ if payload['op'] == 'init' or \
+ self.proc_state == "INIT" or self.proc_state == "ACTIVE" or \
+ (payload['op'] == 'identify' and payload['content']=='started'):
+ # Regular message handling in expected state
+ if payload['op'] != 'init' and self.proc_state == "INIT":
+ logging.warn('Process %s received message before completed init' % (self.proc_name))
+
+ d = pu.dispatch_message(payload, msg, target, conv)
+ return d
+ else:
+ text = "Process %s in invalid state %s." % (self.proc_name, self.proc_state)
+ logging.error(text)
+
+ # @todo: Requeue would be ok, but does not work (Rabbit limitation)
+ #d = msg.requeue()
+ if msg and msg.payload['reply-to']:
+ d = self.reply_err(msg, text)
+ if not d:
+ d = defer.succeed(None)
+ return d
def op_none(self, content, headers, msg):
"""
@@ -394,16 +474,6 @@ def get_child_id(self, name):
child = self.get_child_def(name)
return child.proc_id if child else None
- def init(self):
- """
- Initializes this process instance. Typically a call should not be
- necessary because the init message is received from the supervisor
- process. It may be necessary for the root supervisor and for test
- processes.
- @retval Deferred
- """
- return self.op_init(None, None, None)
-
def shutdown(self):
"""
Recursivey terminates all child processes and then itself.
View
1  ion/core/bootstrap.py
@@ -166,7 +166,6 @@ def create_supervisor():
sup = suprec.procinst
sup.receiver.group = supname
supId = yield sup.spawn()
- yield sup.init()
yield base_process.procRegistry.put(supname, str(supId))
sup_seq += 1
defer.returnValue(sup)
View
6 ion/core/cc/modloader.py
@@ -49,8 +49,7 @@ def _load_module(self, mod):
#logging.info('Loading Module %s' % (mod))
try:
modo = pu.get_module(mod)
- except StandardError, ie:
- #logging.exception(ie)
+ except Exception, ie:
logging.error("Error importing module: "+str(mod))
def _load_package(self, pack, recurse=False):
@@ -65,6 +64,5 @@ def _load_package(self, pack, recurse=False):
self._load_module(pack+'.'+fname[:len(fname)-3])
elif os.path.isdir(os.path.join(path1,fname)) and recurse:
self._load_package(pack+'.'+fname)
- except StandardError, ie:
- #logging.exception(ie)
+ except Exception, ie:
logging.error("Error importing package: "+str(pack))
View
121 ion/core/test/test_baseprocess.py
@@ -14,6 +14,7 @@
from twisted.trial import unittest
from twisted.internet import defer
+from magnet.container import Container
from magnet.spawnable import Receiver
from magnet.spawnable import send
from magnet.spawnable import spawn
@@ -26,7 +27,7 @@
class BaseProcessTest(IonTestCase):
"""
- Tests the process base classe.
+ Tests the process base class, the root class of all message based interaction.
"""
@defer.inlineCallbacks
@@ -40,7 +41,19 @@ def tearDown(self):
@defer.inlineCallbacks
def test_process_basics(self):
p1 = BaseProcess()
+ self.assertEquals(p1.id, None)
self.assertTrue(p1.receiver)
+ self.assertEquals(p1.receiver.spawned, None)
+ self.assertEquals(p1.proc_state, "NEW")
+
+ pid1 = yield p1.spawn()
+ self.assertEquals(pid1, p1.receiver.spawned.id)
+
+ # Note: this tests init without actually sending a message.
+ self.assertEquals(p1.proc_state, "ACTIVE")
+
+ yield p1.op_init(None, None, None)
+ self.assertEquals(p1.proc_state, "ERROR")
rec = Receiver("myname")
p2 = BaseProcess(rec)
@@ -49,14 +62,10 @@ def test_process_basics(self):
p3 = BaseProcess(None, args)
self.assertEquals(p3.spawn_args, args)
- pid1 = yield p1.spawn()
-
-
@defer.inlineCallbacks
def test_child_processes(self):
p1 = BaseProcess()
pid1 = yield p1.spawn()
- yield p1.init()
child = ProcessDesc(name='echo', module='ion.core.test.test_baseprocess')
pid2 = yield p1.spawn_child(child)
@@ -67,11 +76,40 @@ def test_child_processes(self):
yield p1.shutdown()
@defer.inlineCallbacks
+ def test_spawn_child(self):
+ child1 = ProcessDesc(name='echo', module='ion.core.test.test_baseprocess')
+ self.assertEquals(child1.proc_state,'DEFINED')
+
+ pid1 = yield self.test_sup.spawn_child(child1)
+ self.assertEquals(child1.proc_state,'INIT_OK')
+ proc = self._get_procinstance(pid1)
+ self.assertEquals(str(proc.__class__),"<class 'ion.core.test.test_baseprocess.EchoProcess'>")
+ self.assertEquals(pid1, proc.receiver.spawned.id)
+ logging.info('Process 1 spawned and initd correctly')
+
+ (cont,hdrs,msg) = yield self.test_sup.rpc_send(pid1,'echo','content123')
+ self.assertEquals(cont['value'], 'content123')
+ logging.info('Process 1 responsive correctly')
+
+ # The following tests the process attaching a second receiver
+ msgName = self.test_sup.get_scoped_name('global', pu.create_guid())
+ messaging = {'name_type':'worker', 'args':{'scope':'global'}}
+ yield Container.configure_messaging(msgName, messaging)
+ extraRec = Receiver(proc.proc_name, msgName)
+ extraRec.handle(proc.receive)
+ extraid = yield spawn(extraRec)
+ logging.info('Created new receiver %s with pid %s' % (msgName, extraid))
+
+ (cont,hdrs,msg) = yield self.test_sup.rpc_send(msgName,'echo','content456')
+ self.assertEquals(cont['value'], 'content456')
+ logging.info('Process 1 responsive correctly on second receiver')
+
+
+ @defer.inlineCallbacks
def test_process(self):
# Also test the ReceiverProcess helper class
p1 = ReceiverProcess()
pid1 = yield p1.spawn()
- yield p1.init()
processes = [
{'name':'echo','module':'ion.core.test.test_baseprocess','class':'EchoProcess'},
@@ -93,6 +131,67 @@ def test_process(self):
yield sup.shutdown()
@defer.inlineCallbacks
+ def test_message_before_init(self):
+ child2 = ProcessDesc(name='echo', module='ion.core.test.test_baseprocess')
+ pid2 = yield self.test_sup.spawn_child(child2, init=False)
+ self.assertEquals(child2.proc_state, 'SPAWNED')
+ proc2 = self._get_procinstance(pid2)
+
+ (cont,hdrs,msg) = yield self.test_sup.rpc_send(pid2,'echo','content123')
+ self.assertEquals(cont['status'], 'ERROR')
+ logging.info('Process 1 rejected first message correctly')
+
+ yield child2.init()
+ self.assertEquals(child2.proc_state, 'INIT_OK')
+ logging.info('Process 1 rejected initialized OK')
+
+ (cont,hdrs,msg) = yield self.test_sup.rpc_send(pid2,'echo','content123')
+ self.assertEquals(cont['value'], 'content123')
+ logging.info('Process 1 responsive correctly after init')
+
+ @defer.inlineCallbacks
+ def test_message_during_init(self):
+ child2 = ProcessDesc(name='echo', module='ion.core.test.test_baseprocess')
+ pid2 = yield self.test_sup.spawn_child(child2, init=False)
+ proc2 = self._get_procinstance(pid2)
+ proc2.plc_init = proc2.plc_noinit
+ self.assertEquals(proc2.proc_state, 'NEW')
+
+ msgName = self.test_sup.get_scoped_name('global', pu.create_guid())
+ messaging = {'name_type':'worker', 'args':{'scope':'global'}}
+ yield Container.configure_messaging(msgName, messaging)
+ extraRec = Receiver(proc2.proc_name, msgName)
+ extraRec.handle(proc2.receive)
+ extraid = yield spawn(extraRec)
+ logging.info('Created new receiver %s with pid %s' % (msgName, extraid))
+
+ yield self.test_sup.send(pid2, 'init',{},{'quiet':True})
+ logging.info('Sent init to process 1')
+
+ yield pu.asleep(0.5)
+ self.assertEquals(proc2.proc_state, 'INIT')
+
+ (cont,hdrs,msg) = yield self.test_sup.rpc_send(msgName,'echo','content123')
+ self.assertEquals(cont['value'], 'content123')
+ logging.info('Process 1 responsive correctly after init')
+
+ yield pu.asleep(2)
+ self.assertEquals(proc2.proc_state, 'ACTIVE')
+
+ (cont,hdrs,msg) = yield self.test_sup.rpc_send(msgName,'echo','content123')
+ self.assertEquals(cont['value'], 'content123')
+ logging.info('Process 1 responsive correctly after init')
+
+ @defer.inlineCallbacks
+ def test_error_in_op(self):
+ child1 = ProcessDesc(name='echo', module='ion.core.test.test_baseprocess')
+ pid1 = yield self.test_sup.spawn_child(child1)
+
+ (cont,hdrs,msg) = yield self.test_sup.rpc_send(pid1,'echofail2','content123')
+ self.assertEquals(cont['status'], 'ERROR')
+ logging.info('Process 1 responded to error correctly')
+
+ @defer.inlineCallbacks
def test_send_byte_string(self):
"""
@brief Test that any arbitrary byte string can be sent through the
@@ -100,7 +199,6 @@ def test_send_byte_string(self):
"""
p1 = ReceiverProcess()
pid1 = yield p1.spawn()
- yield p1.init()
processes = [
{'name':'echo','module':'ion.core.test.test_baseprocess','class':'EchoProcess'},
@@ -120,8 +218,6 @@ def test_send_byte_string(self):
yield sup.shutdown()
-
-
@defer.inlineCallbacks
def test_shutdown(self):
processes = [
@@ -135,6 +231,13 @@ def test_shutdown(self):
class EchoProcess(BaseProcess):
+
+ @defer.inlineCallbacks
+ def plc_noinit(self):
+ logging.info("In init: "+self.proc_state)
+ yield pu.asleep(1)
+ logging.info("Leaving init: "+self.proc_state)
+
@defer.inlineCallbacks
def op_echo(self, content, headers, msg):
logging.info("Message received: "+str(content))
View
2  ion/core/test/test_worker.py
@@ -45,7 +45,6 @@ def test_worker_queue(self):
wc = WorkerClient()
wcId = yield wc.spawn()
- yield wc.init()
wq_name = Container.id + ".worker1"
for i in range(1,11):
@@ -77,7 +76,6 @@ def test_fanout(self):
wc = WorkerClient()
wcId = yield wc.spawn()
- yield wc.init()
wq_name = Container.id + ".fanout1"
for i in range(1,6):
View
24 ion/data/datastore/datastore_service.py
@@ -2,7 +2,7 @@
"""
@file ion/play/rdf_store/rdf_service.py
-@package ion.play.rdf_store.rdf_service
+@package ion.play.rdf_store.rdf_service
@author David Stuebe
@brief A service that provides git symantics for push pull commit and diff to
the rdf workspace composed of associations and objects.
@@ -29,7 +29,7 @@ class DataStoreService(BaseService):
def __init__(self, receiver, spawnArgs=None):
"""
- @brief Init method for the DataStore Frontend service
+ @brief Init method for the DataStore Frontend service
@param frontend - an instance of a CAStore Frontend
"""
# Service class initializer. Basic config, but no yields allowed.
@@ -45,7 +45,7 @@ def slc_init(self):
def op_push(self, content, headers, msg):
logging.info('op_push: '+str(content)+ ', Remote Frontend:'+self.frontend)
-
+
# The following line shows how to reply to a message
yield self.reply_ok(msg)
@@ -53,7 +53,7 @@ def op_push(self, content, headers, msg):
@defer.inlineCallbacks
def op_pull(self, content, headers, msg):
logging.info('op_pull: '+str(content) + ', Remote Frontend:'+self.frontend)
-
+
# The following line shows how to reply to a message
yield self.reply_ok(msg)
@@ -74,7 +74,7 @@ class DataStoreServiceClient(BaseServiceClient):
"""
def __init__(self, frontend, proc=None, **kwargs):
"""
- @brief Init method for the DataStore Frontend client
+ @brief Init method for the DataStore Frontend client
@param frontend - an instance of a CAStore Frontend
"""
if not 'targetname' in kwargs:
@@ -99,7 +99,7 @@ def push(self, repository_name,permit_bracnch=False):
logging.info('pushing: '+repository_name+ ', Local Frontend:'+self.frontend)
(content, headers, msg) = yield self.rpc_send('push', repository_name)
- """
+ """
Steps:
Send the commit DAG to the service
Receive the Commits which need to be pushed
@@ -118,12 +118,12 @@ def pull(self, repository_name):
"""
yield self._check_init()
logging.info('pulling: '+repository_name+ ', Local Frontend:'+self.frontend)
- """
+ """
Steps:
Send the current commit DAG to the service
Receive the Commits which need to be pulled
"""
-
+
(content, headers, msg) = yield self.rpc_send('pull', repository_name)
logging.info('Service reply: '+str(content))
defer.returnValue(str(content))
@@ -132,16 +132,16 @@ def pull(self, repository_name):
@defer.inlineCallbacks
def clone(self, repository_name):
"""
- @brief Clone a repository to the local object store from the remote datastore
+ @brief Clone a repository to the local object store from the remote datastore
@param repository_name - the name (key) of a repository in the remote datastore
"""
yield self._check_init()
logging.info('cloning: '+repository_name+ ', Local Frontend:'+self.frontend)
- """
+ """
Steps:
Send the repo name
Receive the Commits which need to be pulled
- """
+ """
(content, headers, msg) = yield self.rpc_send('clone', repository_name)
logging.info('Service reply: '+str(content))
defer.returnValue(str(content))
@@ -151,5 +151,3 @@ def clone(self, repository_name):
# Spawn of the process using the module name
factory = ProtocolFactory(DataStoreService)
-
-
View
2  ion/services/dm/ingest.py
@@ -46,7 +46,7 @@ def _do_ingest(self, content):
dds = json.loads(str(content['dds']))
das = json.loads(str(content['das']))
except KeyError, ke:
- logging.exception(ke)
+ logging.exception('Unable to find headers in DAP message!')
logging.error('Unable to find headers in DAP message!')
raise ke
View
44 ion/services/dm/test/test_pubsub.py
@@ -57,13 +57,13 @@ def tearDown(self):
def test_pubsub(self):
dpsc = DataPubsubClient(self.sup)
-
+
# Create and Register a topic
- topic = PubSubTopicResource.create('Davids Topic',"oceans, oil spill, fun things to do")
+ topic = PubSubTopicResource.create('Davids Topic',"oceans, oil spill, fun things to do")
topic = yield dpsc.define_topic(topic)
logging.info('Defined Topic: '+str(topic))
-
+
#Create and register self.sup as a publisher
publisher = PublisherResource.create('Test Publisher', self.sup, topic, 'DataObject')
publisher = yield dpsc.define_publisher(publisher)
@@ -102,9 +102,9 @@ def test_pubsub(self):
{'time':(111,112,123,114,115,116,117,118,119,120), \
'height':(8,6,4,-2,-1,5,3,1,4,5)})
result = yield dpsc.publish(self.sup, topic.reference(), dmsg)
-
-
-
+
+
+
# Need to await the delivery of data messages into the (separate) consumers
yield pu.asleep(1)
@@ -138,14 +138,14 @@ def test_chainprocess(self):
dpsc = DataPubsubClient(self.sup)
-
+
#Create and register 3 topics!
- topic_raw = PubSubTopicResource.create("topic_raw","oceans, oil spill, fun things to do")
+ topic_raw = PubSubTopicResource.create("topic_raw","oceans, oil spill, fun things to do")
topic_raw = yield dpsc.define_topic(topic_raw)
- topic_qc = PubSubTopicResource.create("topic_qc","oceans_qc, oil spill")
+ topic_qc = PubSubTopicResource.create("topic_qc","oceans_qc, oil spill")
topic_qc = yield dpsc.define_topic(topic_qc)
-
+
topic_evt = PubSubTopicResource.create("topic_evt", "spill events")
topic_evt = yield dpsc.define_topic(topic_evt)
@@ -160,9 +160,9 @@ def test_chainprocess(self):
dc1_id = yield dc1.spawn()
# Use subscribe - does not exist yet
yield dc1.attach(topic_raw)
-
+
dc1.set_ondata(e_process,topic_qc,topic_evt)
-
+
dc2 = DataConsumer()
dc2_id = yield dc2.spawn()
@@ -181,7 +181,7 @@ def test_chainprocess(self):
'height':{'long_name':'person height','units':'meters'}}}, \
{'time':(101,102,103,104,105,106,107,108,109,110), \
'height':(5,2,4,5,-1,9,3,888,3,4)})
-
+
result = yield dpsc.publish(self.sup, topic_raw.reference(), dmsg)
if result:
logging.info('Published Message')
@@ -204,7 +204,7 @@ def test_chainprocess(self):
'height':{'long_name':'person height','units':'meters'}}}, \
{'time':(111,112,123,114,115,116,117,118,119,120), \
'height':(8,6,4,-2,-1,5,3,1,4,5)})
-
+
result = yield dpsc.publish(self.sup, topic_raw.reference(), dmsg)
# Need to await the delivery of data messages into the consumers
@@ -219,11 +219,11 @@ def test_chainprocess(self):
def e_process(content,headers,topic1,topic2):
- # This is a data processing function that takes a sample message, filters for
+ # This is a data processing function that takes a sample message, filters for
# out of range values, adds to an event queue, and computes a new sample packet
# where outlyers are zero'ed out
#print "In data process"
-
+
logging.debug('e_process recieved:' + str(content))
# Convert the message to a DAPMessageObject
@@ -231,12 +231,12 @@ def e_process(content,headers,topic1,topic2):
logging.debug('DAP Data:'+str(dap_msg))
# Read the message object and convert to a pydap object
data = dap_tools.dap_msg2ds(dap_msg)
-
+
resdata = []
messages = []
# Process the array of data
for ind in range(data.height.shape[0]):
-
+
ts = data.time[ind]
samp = data.height[ind]
if samp<0 or samp>100:
@@ -247,10 +247,10 @@ def e_process(content,headers,topic1,topic2):
# Must convert pydap/numpy Int32 to int!
ds = (int(ts),int(newsamp))
resdata.append(ds)
-
- # Messages contains a list of tuples, (topic, data) to send
+
+ # Messages contains a list of tuples, (topic, data) to send
messages.append((topic1,{'metadata':{},'data':resdata}))
-
+
logging.debug("messages" + str(messages))
return messages
@@ -265,7 +265,6 @@ def set_ondata(self, ondata,topic1,topic2):
@defer.inlineCallbacks
def attach(self, topic):
- yield self.init()
self.dataReceiver = Receiver(__name__, topic.queue.name)
self.dataReceiver.handle(self.receive)
self.dr_id = yield spawn(self.dataReceiver)
@@ -330,4 +329,3 @@ def execute_process(self, content, headers):
result = messages
"""
'''
-
View
11 ion/test/iontest.py
@@ -57,7 +57,7 @@ def _start_container(self):
#Load All Resource Descriptions for future decoding
description_utility.load_descriptions()
-
+
logging.info("============Magnet container started, "+repr(self.cont_conn))
@defer.inlineCallbacks
@@ -107,6 +107,15 @@ def _get_procid(self, name):
"""
return self.procRegistry.get(name)
+ def _get_procinstance(self, pid):
+ """
+ @param pid process id
+ @retval BaseProcess instance for process id
+ """
+ for rec in base_process.receivers:
+ if rec.spawned.id.full == str(pid):
+ return rec.procinst
+ return None
class ReceiverProcess(BaseProcess):
"""
View
23 ion/util/procutils.py
@@ -13,6 +13,7 @@
import time
import logging
logging = logging.getLogger(__name__)
+import uuid
from twisted.internet import defer, reactor
from magnet.container import Id
@@ -77,6 +78,12 @@ def log_message(msg):
lstr += "\n============="
logging.info(lstr)
+def create_guid():
+ """
+ @retval Return global unique id string
+ """
+ return str(uuid.uuid4())
+
def get_process_id(long_id):
"""Returns the instance part of a long process id
"""
@@ -100,12 +107,14 @@ def send(receiver, send, recv, operation, content, headers=None):
"""
msg = {}
# The following headers are FIPA ACL Message Format based
- # Exchange name of sender, receiver, reply-to
+ # Exchange name of sender (DO NOT SEND replies here)
msg['sender'] = str(send)
+ # Exchange name of message recipient
msg['receiver'] = str(recv)
+ # Exchange name for message replies
msg['reply-to'] = str(send)
- # Wire form encoding, such as 'json', 'fudge', 'XDR', 'XML', 'custom'
- msg['encoding'] = 'json'
+ # Wire form encoding, such as 'json', 'fudge', 'XDR', 'XML'
+ msg['encoding'] = 'msgpack'
# Language of the format specification
msg['language'] = 'ion1'
# Identifier of a registered format specification (i.e. message schema)
@@ -132,8 +141,8 @@ def send(receiver, send, recv, operation, content, headers=None):
#logging.debug("Send message op="+operation+" to="+str(recv))
try:
yield receiver.send(recv, msg)
- except StandardError, e:
- log_exception("Send error: ", e)
+ except Exception, ex:
+ log_exception("Send error: ", ex)
else:
logging.info("Message sent! to=%s op=%s" % (msg.get('receiver',None), msg.get('op',None)))
@@ -164,8 +173,8 @@ def dispatch_message(payload, msg, dispatchIn, conv=None):
logging.error("Receive() failed. Cannot dispatch to catch")
else:
logging.error("Invalid message. No 'op' in header", payload)
- except StandardError, e:
- log_exception('Exception while dispatching: ',e)
+ except Exception, ex:
+ log_exception('Exception while dispatching: ',ex)
id_seqs = {}
def create_unique_id(ns):
Please sign in to comment.
Something went wrong with that request. Please try again.