Permalink
Browse files

Receiver refactoring; removed/modified Receiver imports

  • Loading branch information...
1 parent cba59dc commit 41492d8cbceba6a9cf6b16df5c0d98e2b221022d Michael Meisinger committed Oct 5, 2010
Showing with 552 additions and 396 deletions.
  1. +21 −0 README.txt
  2. +2 −2 ion/agents/instrumentagents/SBE49_driver.py
  3. +2 −4 ion/agents/instrumentagents/test/test_SBE49.py
  4. +3 −2 ion/core/base_process.py
  5. +6 −8 ion/core/cc/cc_agent.py
  6. +0 −3 ion/core/cc/container.py
  7. +0 −3 ion/core/cc/shell_api.py
  8. +1 −1 ion/core/messaging/receiver.py
  9. +8 −3 ion/core/process/process.py
  10. +29 −0 ion/core/process/supervisor.py
  11. 0 ion/core/process/test/__init__.py
  12. +105 −0 ion/core/process/test/test_worker.py
  13. +61 −0 ion/core/process/worker.py
  14. +4 −3 ion/core/test/test_baseprocess.py
  15. +4 −4 ion/data/datastore/datastore_service.py
  16. +31 −38 ion/data/datastore/test/test_registry.py
  17. +107 −108 ion/data/test/test_dataobject.py
  18. +0 −6 ion/demo/lca/javaint_service.py
  19. +0 −1 ion/play/hello_process.py
  20. +2 −3 ion/play/hello_service.py
  21. +1 −1 ion/services/base_service.py
  22. +1 −1 ion/services/cei/dtrs.py
  23. +1 −1 ion/services/cei/epu_controller.py
  24. +0 −1 ion/services/cei/sensor_aggregator.py
  25. +0 −1 ion/services/cei/sleeper/epu_work_producer.py
  26. +8 −7 ion/services/cei/sleeper/epu_worker.py
  27. +11 −12 ion/services/cei/test/mockloop/provisioner.py
  28. +5 −3 ion/services/cei/test/test_sensors.py
  29. +3 −3 ion/services/coi/attributestore.py
  30. +1 −3 ion/services/coi/authorization.py
  31. +45 −46 ion/services/coi/resource_registry.py
  32. +0 −1 ion/services/coi/service_registry.py
  33. +2 −4 ion/services/coi/state_repository.py
  34. +12 −16 ion/services/dm/distribution/base_consumer.py
  35. +11 −12 ion/services/dm/distribution/consumers/test/test_forwarding_consumer.py
  36. +0 −1 ion/services/dm/distribution/pubsub_service.py
  37. +0 −2 ion/services/dm/distribution/test/test_baseconsumer.py
  38. +11 −12 ion/services/dm/distribution/test/test_pubsub.py
  39. +7 −12 ion/services/dm/ingestion/ingestion_registry.py
  40. +7 −12 ion/services/dm/inventory/data_registry.py
  41. +3 −6 ion/services/dm/presentation/presentation_service.py
  42. +7 −10 ion/services/dm/preservation/preservation_registry.py
  43. +14 −16 ion/services/dm/preservation/preservation_service.py
  44. +3 −4 ion/services/dm/transformation/transformation_service.py
  45. +0 −4 ion/services/dm/util/dap_grid_timeseries_producer.py
  46. +0 −4 ion/services/dm/util/data_stream_producer.py
  47. +0 −2 ion/services/dm/util/test/test_dsp.py
  48. +0 −1 ion/services/sa/data_acquisition.py
  49. +0 −1 ion/services/sa/data_processing.py
  50. +0 −1 ion/services/sa/data_product_registry.py
  51. +3 −3 ion/services/sa/fetcher.py
  52. +1 −3 ion/services/sa/instrument_registry.py
  53. +6 −0 ion/services/sa/test/test_fetcher.py
  54. +3 −1 ion/test/iontest.py
View
@@ -134,6 +134,27 @@ To compile all code to see if there are Python compile errors anywhere:
Change log:
===========
+2010-10-04:
+- MASSIVE REFACTORING IN BASE CLASSES
+- Added OTP style apps and app files as primary way to start up processes
+ in the container
+- Added a FSM based StateObject. Many manager/controller level objects now make
+ use of states. States and operations INIT -> initialize() -> READY ->
+ activate() -> READY -> terminate() -> TERMINATED
+- BaseProcess (and subclasses), Receiver, ProcessDesc, Container etc are all
+ BasicLifecyleObjects.
+- Massively enhanced the capability container API. Delegated the actual
+ implementation to manager classes: proc, exchange, app manager
+- Refactored the way processes are spawned
+- Refactored the Receiver use. There are now subclasses for Receivers that
+ manage and declare the specific types of AMQP resources, such as worker and
+ fanout. No more declare_messaging necessary.
+- Refactored the former magnet code into more object oriented style.
+- Message headers now contain status code for every message. 'OK is the default
+ and 'ERROR' is set on error
+- Changed reply_ok and reply_err: a dict content value will not be modified
+- Fixed imports and tests throughout the code base
+
2010-09-20:
- Added start scripts in bin/
- Use ant install to install Python dependencies (calls python setup.py install)
@@ -72,7 +72,7 @@ class SBE49InstrumentDriver(InstrumentDriver):
controlled vocabulary
"""
- def __init__(self, receiver=None, spawnArgs=None, **kwargs):
+ def __init__(self, *args, **kwargs):
self.connected = False
self.instrument = None
self.command = None
@@ -131,7 +131,7 @@ def __init__(self, receiver=None, spawnArgs=None, **kwargs):
"ptcb2": 0.0
}
- InstrumentDriver.__init__(self, receiver, spawnArgs, **kwargs)
+ InstrumentDriver.__init__(self, *args, **kwargs)
@defer.inlineCallbacks
def plc_init(self):
@@ -216,10 +216,8 @@ def attach(self, topic_name):
"""
Attach to the given topic name
"""
- yield self.init()
- self.dataReceiver = Receiver(__name__, topic_name)
- self.dataReceiver.handle(self.receive)
- self.dr_id = yield self.dataReceiver.activate()
+ self.dataReceiver = Receiver(name=topic_name, handler=self.receive)
+ yield self.dataReceiver.attach()
self.receive_cnt = 0
self.received_msg = []
View
@@ -57,6 +57,7 @@ def __init__(self, receiver=None, spawnargs=None, **kwargs):
# An Id with the process ID (fully qualified)
procid = self.spawn_args.get('proc-id', ProcessInstantiator.create_process_id())
+ procid = pu.get_process_id(procid)
self.id = procid
assert isinstance(self.id, Id), "Process id must be Id"
@@ -284,9 +285,9 @@ def _receive_rpc(self, payload, msg):
d = self.rpc_conv.pop(payload['conv-id'])
content = payload.get('content', None)
res = (content, payload, msg)
- if type(content) is dict and content.get('status',None) == 'OK':
+ if type(content) is dict and payload.get('status',None) == 'OK':
pass
- elif type(content) is dict and content.get('status',None) == 'ERROR':
+ elif type(content) is dict and payload.get('status',None) == 'ERROR':
log.warn('RPC reply is an ERROR: '+str(content.get('value',None)))
else:
log.error('RPC reply is not well formed. Use reply_ok or reply_err')
View
@@ -18,7 +18,7 @@
from ion.core.ioninit import ion_config
from ion.core.base_process import BaseProcess, ProcessFactory, ProcessDesc
from ion.core.cc.container import Container
-from ion.core.messaging.receiver import Receiver
+from ion.core.messaging.receiver import Receiver, FanoutReceiver
import ion.util.procutils as pu
@@ -39,16 +39,14 @@ def plc_init(self):
def plc_activate(self):
# Declare CC announcement name
annName = 'cc_announce'
- messaging = {'name_type':'fanout', 'args':{'scope':'system'}}
- yield ioninit.container_instance.configure_messaging(self.ann_name, messaging)
log.info("Declared CC anounce name: "+str(self.ann_name))
# Attach to CC announcement name
- annReceiver = Receiver(annName+'.'+self.receiver.label, self.ann_name)
- annReceiver.group = self.receiver.group
- self.ann_receiver = annReceiver
- self.ann_receiver.handle(self.receive)
- self.add_receiver(self.ann_receiver)
+ self.ann_receiver = FanoutReceiver(name=self.ann_name,
+ label=annName+'.'+self.receiver.label,
+ scope=FanoutReceiver.SCOPE_SYSTEM,
+ group=self.receiver.group,
+ handler=self.receive)
annid = yield self.ann_receiver.attach()
log.info("Listening to CC anouncements: "+str(annid))
View
@@ -35,9 +35,6 @@ class Container(BasicLifecycleObject):
Represents an instance of the Capability Container. Typically, in one Twisted
process (= one UNIX process), there is only one instance of a CC. In test cases,
however, there might be more.
-
- As a context, Container interfaces the messaging space with the local
- Spawnable and their Receivers...
"""
implements(IContainer)
View
@@ -10,7 +10,6 @@
log = ion.util.ionlog.getLogger(__name__)
from ion.core import ioninit
-from ion.core.messaging.receiver import Receiver
@defer.inlineCallbacks
def send(to_name, data, exchange_space=None):
@@ -75,8 +74,6 @@ def spawn(m, space=None, spawnArgs=None):
return Spawnable.spawn_m(m, space, spawnArgs)
elif type(m) is types.FunctionType:
return Spawnable.spawn_f(m, space)
- elif isinstance(m, Receiver):
- return Spawnable.spawn_mr(m, space)
@staticmethod
def kill(id):
@@ -44,7 +44,7 @@ class Receiver(BasicLifecycleObject):
SCOPE_SYSTEM = 'system'
SCOPE_LOCAL = 'local'
- def __init__(self, label, name, scope='global', xspace=None, process=None, group=None, handler=None):
+ def __init__(self, name, scope='global', label=None, xspace=None, process=None, group=None, handler=None):
"""
@param label descriptive label for the receiver
@param name the actual exchange name. Used for routing
@@ -160,7 +160,7 @@ def build(self, spawnargs=None, container=None):
spawnargs = spawnargs or {}
container = container or ioninit.container_instance
- #log.debug("ProcessFactory.build(name=%s, args=%s)" % (self.name,spawnArgs))
+ #log.debug("ProcessFactory.build(name=%s, args=%s)" % (self.name,spawnargs))
# Create a process receiver
procname = spawnargs.get('proc-name', self.name)
@@ -184,7 +184,12 @@ class ProcessInstantiator(object):
def create_process_id(cls, container=None):
container = container or ioninit.container_instance
cls.idcount += 1
- return Id(cls.idcount, container.id)
+ if container:
+ containerid = container.id
+ else:
+ # Purely for tests to avoid a _start_container() in setUp()
+ containerid = "TEST-CONTAINER-ID"
+ return Id(cls.idcount, containerid)
@classmethod
@defer.inlineCallbacks
@@ -207,7 +212,7 @@ def spawn_from_module(cls, module, space=None, spawnargs=None, container=None, a
raise RuntimeError("Process model factory must provide IProcessFactory")
procid = ProcessInstantiator.create_process_id(container)
- spawnargs['proc-id'] = procid
+ spawnargs['proc-id'] = procid.full
process = yield defer.maybeDeferred(module.factory.build, spawnargs)
if not IProcess.providedBy(process):
@@ -0,0 +1,29 @@
+#!/usr/bin/env python
+
+"""
+@file ion/core/process/supervisor.py
+@author Michael Meisinger
+@brief base class for processes that supervise other processes and compensate
+ failures
+"""
+
+from twisted.internet import defer
+
+import ion.util.ionlog
+log = ion.util.ionlog.getLogger(__name__)
+
+from ion.core.base_process import BaseProcess, ProcessFactory
+import ion.util.procutils as pu
+
+class Supervisor(BaseProcess):
+ """
+ Base class for a supervisor process. A supervisor is a process with the
+ purpose to monitor child processes and to restart them in case
+ of failure. Spawing child processes is a function of the BaseProcess itself.
+ """
+
+ def event_failure(self):
+ return
+
+# Spawn of the process using the module name
+factory = ProcessFactory(Supervisor)
No changes.
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+
+"""
+@file ion/core/process/test_worker.py
+@author Michael Meisinger
+@brief test worker processes
+"""
+from twisted.internet import defer
+
+import ion.util.ionlog
+log = ion.util.ionlog.getLogger(__name__)
+
+from ion.core import ioninit
+from ion.core.base_process import BaseProcess, ProcessFactory
+from ion.core.cc.container import Container
+from ion.test.iontest import IonTestCase
+import ion.util.procutils as pu
+
+class WorkerTest(IonTestCase):
+ """
+ Testing worker processes
+ """
+
+ @defer.inlineCallbacks
+ def setUp(self):
+ yield self._start_container()
+
+ @defer.inlineCallbacks
+ def tearDown(self):
+ yield self._stop_container()
+
+ @defer.inlineCallbacks
+ def test_worker_queue(self):
+ workers = [
+ {'name':'workerProc1','module':'ion.core.process.worker','spawnargs':{'receiver-name':'worker1','scope':'system','receiver-type':'worker'}},
+ {'name':'workerProc2','module':'ion.core.process.worker','spawnargs':{'receiver-name':'worker1','scope':'system','receiver-type':'worker'}},
+ ]
+ sup = yield self._spawn_processes(workers)
+ log.info("Supervisor: "+repr(sup))
+
+ wc = WorkerClient()
+ wcId = yield self._spawn_process(wc)
+
+ wq_name = ioninit.sys_name + ".worker1"
+ for i in range(1,11):
+ yield wc.submit_work(wq_name, i, 0.5)
+
+ yield pu.asleep(7)
+ log.info("Work results: %s" % (wc.workresult))
+ log.info("Worker results: %s" % (wc.worker))
+
+ sum = 0
+ for w,v in wc.worker.items():
+ sum += v
+ self.assertEqual(sum, 10)
+
+ @defer.inlineCallbacks
+ def test_fanout(self):
+ workers = [
+ {'name':'fanoutProc1','module':'ion.core.process.worker','spawnargs':{'receiver-name':'fanout1','scope':'system','receiver-type':'fanout'}},
+ {'name':'fanoutProc2','module':'ion.core.process.worker','spawnargs':{'receiver-name':'fanout1','scope':'system','receiver-type':'fanout'}},
+ ]
+ sup = yield self._spawn_processes(workers)
+ log.info("Supervisor: "+repr(sup))
+
+ wc = WorkerClient()
+ wcId = yield self._spawn_process(wc)
+
+ wq_name = ioninit.sys_name + ".fanout1"
+ for i in range(1,6):
+ yield wc.submit_work(wq_name, i, 0.5)
+
+ yield pu.asleep(5)
+ log.info("Work results: "+str(wc.workresult))
+ log.info("Worker results: "+str(wc.worker))
+
+ sum = 0
+ for w,v in wc.worker.items():
+ sum += v
+ self.assertEqual(sum, 10)
+
+class WorkerClient(BaseProcess):
+ """
+ Client for worker processes.
+ """
+ def __init__(self, *args, **kwargs):
+ BaseProcess.__init__(self, *args, **kwargs)
+ self.workresult = {}
+ self.worker = {}
+
+ def op_result(self, content, headers, msg):
+ ts = pu.currenttime_ms()
+ log.info("Work result received %s at %s " % (content, ts))
+ workid = content['work-id']
+ worker = headers['sender']
+ self.workresult[workid] = ts
+ if worker in self.worker:
+ wcnt = self.worker[worker] + 1
+ else:
+ wcnt = 1
+ self.worker[worker] = wcnt
+
+ @defer.inlineCallbacks
+ def submit_work(self, to, workid, work):
+ yield self.send(to, 'work', {'work-id':workid,'work':work})
View
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+
+"""
+@file ion/core/process/worker.py
+@author Michael Meisinger
+@brief base class for a worker process
+"""
+
+from twisted.internet import defer
+
+import ion.util.ionlog
+log = ion.util.ionlog.getLogger(__name__)
+
+from ion.core.base_process import BaseProcess, ProcessFactory
+from ion.core.messaging.receiver import Receiver, WorkerReceiver, FanoutReceiver
+from ion.services.base_service import BaseService
+import ion.util.procutils as pu
+
+class WorkerProcess(BaseProcess):
+ """
+ Worker process
+ """
+ @defer.inlineCallbacks
+ def plc_init(self):
+ msg_name = str(self.spawn_args['receiver-name'])
+ rec_type = str(self.spawn_args['receiver-type'])
+ scope = str(self.spawn_args['scope'])
+
+ if rec_type == 'worker':
+ self.workReceiver = WorkerReceiver(
+ label=__name__,
+ name=msg_name,
+ scope=scope,
+ handler=self.receive)
+ elif rec_type == 'fanout':
+ self.workReceiver = FanoutReceiver(
+ label=__name__,
+ name=msg_name,
+ scope=scope,
+ handler=self.receive)
+ else:
+ raise RuntimeError("Unknown receiver-type: "+str(rec_type))
+
+ yield self.workReceiver.attach()
+
+ @defer.inlineCallbacks
+ def op_work(self, content, headers, msg):
+ yield self._work(content)
+ yield self.reply_ok(msg, {'work-id':content['work-id']})
+
+ @defer.inlineCallbacks
+ def _work(self,content):
+ myid = self.proc_name + ":" + self.id.local
+ workid = str(content['work-id'])
+ waittime = float(content['work'])
+ log.info("worker="+myid+" job="+workid+" work="+str(waittime))
+ yield pu.asleep(waittime)
+ log.info("worker="+myid+" job="+workid+" done at="+str(pu.currenttime_ms()))
+
+# Spawn of the process using the module name
+factory = ProcessFactory(WorkerProcess)
@@ -61,9 +61,10 @@ def test_process_basics(self):
self.assertEquals(pid1, p1.id)
self.assertEquals(p1._get_state(), "ACTIVE")
- args = {'proc-id':Id('local','container')}
+ procid = Id('local','container')
+ args = {'proc-id':procid.full}
p2 = BaseProcess(spawnargs=args)
- self.assertEquals(p2.id, args['proc-id'])
+ self.assertEquals(p2.id, procid)
yield p2.initialize()
self.assertEquals(p2._get_state(), "READY")
yield p2.activate()
@@ -178,7 +179,7 @@ def test_error_in_op(self):
pid1 = yield self.test_sup.spawn_child(child1)
(cont,hdrs,msg) = yield self.test_sup.rpc_send(pid1,'echofail2','content123')
- self.assertEquals(cont['status'], 'ERROR')
+ self.assertEquals(hdrs['status'], 'ERROR')
log.info('Process 1 responded to error correctly')
@defer.inlineCallbacks
Oops, something went wrong.

0 comments on commit 41492d8

Please sign in to comment.