Skip to content
This repository has been archived by the owner on Sep 23, 2020. It is now read-only.

Commit

Permalink
Core refactoring; bug fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Meisinger committed Oct 4, 2010
1 parent c54a999 commit cba59dc
Show file tree
Hide file tree
Showing 19 changed files with 178 additions and 327 deletions.
2 changes: 1 addition & 1 deletion ion/agents/instrumentagents/SBE49_IA.py
Expand Up @@ -47,7 +47,7 @@ def plc_init(self):
target=driver_id)

#@defer.inlineCallbacks
#def plc_shutdown(self):
#def plc_terminate(self):
# yield self.pd.shutdown()


Expand Down
2 changes: 1 addition & 1 deletion ion/agents/instrumentagents/SBE49_driver.py
Expand Up @@ -148,7 +148,7 @@ def plc_init(self):
log.debug("Instrument driver initialized")

@defer.inlineCallbacks
def plc_shutdown(self):
def plc_terminate(self):
yield self.op_disconnect(None, None, None)

def isConnected(self):
Expand Down
26 changes: 6 additions & 20 deletions ion/core/base_process.py
Expand Up @@ -227,12 +227,12 @@ def on_terminate(self, msg=None, *args, **kwargs):
except Exception, ex:
log.exception("Error terminating child %s" % child.proc_id)

yield defer.maybeDeferred(self.plc_shutdown)
yield defer.maybeDeferred(self.plc_terminate)
log.info('----- Process %s TERMINATED -----' % (self.proc_name))

def plc_shutdown(self):
def plc_terminate(self):
"""
Process life cycle event: on shutdown of process (once)
Process life cycle event: on termination of process (once)
"""

def on_error(self, cause= None, *args, **kwargs):
Expand Down Expand Up @@ -475,23 +475,7 @@ def get_conversation(self, headers):
# --- Process and child process management

def get_scoped_name(self, scope, name):
"""
Returns a name that is scoped. Local=Name prefixed by container id.
System=Name prefixed by system name, ie id of root process's container.
Global=Name unchanged.
@param scope one of "local", "system" or "global"
@param name name to be scoped
"""
scoped_name = name
if scope == 'local':
scoped_name = str(Id.default_container_id) + "." + name
elif scope == 'system':
scoped_name = self.sys_name + "." + name
elif scope == 'global':
pass
else:
assert 0, "Unknown scope: "+scope
return scoped_name
return pu.get_scoped_name(name, scope)

# OTP style functions for working with processes and modules/apps

Expand Down Expand Up @@ -535,6 +519,8 @@ def get_child_id(self, name):
child = self.get_child_def(name)
return child.proc_id if child else None

def __str__(self):
return "BaseProcess(id=%s,name=%s)" % (self.id, self.proc_name)

# Spawn of the process using the module name
factory = ProcessFactory(BaseProcess)
Expand Down
4 changes: 3 additions & 1 deletion ion/core/bootstrap.py
Expand Up @@ -112,7 +112,9 @@ def _set_container_args(contargs=None):
Container.id = ioninit.cont_args['contid']
if 'sysname' in ioninit.cont_args:
ioninit.sys_name = ioninit.cont_args['sysname']

else:
ioninit.sys_name = ioninit.container_instance.id

def declare_messaging(messagingCfg, cgroup=None):
return ioninit.container_instance.declare_messaging(messagingCfg, cgroup)

Expand Down
10 changes: 4 additions & 6 deletions ion/core/cc/cc_agent.py
Expand Up @@ -19,15 +19,13 @@
from ion.core.base_process import BaseProcess, ProcessFactory, ProcessDesc
from ion.core.cc.container import Container
from ion.core.messaging.receiver import Receiver
from ion.core.supervisor import Supervisor
import ion.util.procutils as pu


class CCAgent(ResourceAgent):
"""
Capability Container agent process interface
"""
@defer.inlineCallbacks
def plc_init(self):
# Init self and container
annName = 'cc_announce'
Expand All @@ -37,7 +35,10 @@ def plc_init(self):
self.contalive = {}
self.last_identify = 0

@defer.inlineCallbacks
def plc_activate(self):
# Declare CC announcement name
annName = 'cc_announce'
messaging = {'name_type':'fanout', 'args':{'scope':'system'}}
yield ioninit.container_instance.configure_messaging(self.ann_name, messaging)
log.info("Declared CC anounce name: "+str(self.ann_name))
Expand All @@ -48,16 +49,13 @@ def plc_init(self):
self.ann_receiver = annReceiver
self.ann_receiver.handle(self.receive)
self.add_receiver(self.ann_receiver)
annid = yield self.ann_receiver.activate()
annid = yield self.ann_receiver.attach()
log.info("Listening to CC anouncements: "+str(annid))

# Start with an identify request. Will lead to an announce by myself
#@todo - Can not send a message to a base process which is not initialized!
yield self.send(self.ann_name, 'identify', 'started', {'quiet':True})

# Convenience HACK: Add a few functions to container shell
self._augment_shell()

@defer.inlineCallbacks
def _send_announcement(self, event):
"""
Expand Down
4 changes: 3 additions & 1 deletion ion/core/ioninit.py
Expand Up @@ -9,6 +9,7 @@
import logging
import logging.config
import re
import os

from ion.core import ionconst as ic
from ion.util.config import Config
Expand Down Expand Up @@ -68,7 +69,8 @@ def adjust_dir(filename):
"""
if not filename:
return None
if testing:
#if testing:
if os.getcwd().endswith("_trial_temp"):
return "../" + filename
else:
return filename
Expand Down
61 changes: 40 additions & 21 deletions ion/core/messaging/receiver.py
Expand Up @@ -20,6 +20,7 @@
from ion.core.id import Id
from ion.core.messaging import messaging
from ion.util.state_object import BasicLifecycleObject
import ion.util.procutils as pu

class IReceiver(Interface):
"""
Expand All @@ -39,15 +40,27 @@ class Receiver(BasicLifecycleObject):
"""
implements(IReceiver)

def __init__(self, label, name, process=None, group=None, handler=None):
SCOPE_GLOBAL = 'global'
SCOPE_SYSTEM = 'system'
SCOPE_LOCAL = 'local'

def __init__(self, label, name, scope='global', xspace=None, process=None, group=None, handler=None):
"""
@param label descriptive label
@param name the actual name in the exchange. Used for routing
@param label descriptive label for the receiver
@param name the actual exchange name. Used for routing
@param xspace the name of the exchange space. None for default
@param scope name scope. One of 'global', 'system' or 'local'
@param process IProcess instance that the receiver belongs to
@param group a string grouping multiple receivers
@param handler a callable for the message handler, shorthand for add_handler
"""
BasicLifecycleObject.__init__(self)

self.label = label
self.name = name
# @todo scope and xspace are overlapping. Use xspace and map internally?
self.scope = scope
self.xspace = xspace
self.process = process
self.group = group

Expand All @@ -57,37 +70,39 @@ def __init__(self, label, name, process=None, group=None, handler=None):
if handler:
self.add_handler(handler)

self.xname = pu.get_scoped_name(self.name, self.scope)

@defer.inlineCallbacks
def attach(self, *args, **kwargs):
"""
@brief Boilderplate method that calls initialize and activate
"""
yield self.initialize(*args, **kwargs)
yield self.activate(*args, **kwargs)
defer.returnValue(self.name)
defer.returnValue(self.xname)

@defer.inlineCallbacks
def on_initialize(self, *args, **kwargs):
"""
@brief Declare the queue and binding only.
@retval Deferred
"""
assert self.name, "Receiver must have a name"
assert self.xname, "Receiver must have a name"
container = ioninit.container_instance
xnamestore = container.exchange_manager.exchange_space.store
name_config = yield xnamestore.get(self.name)
name_config = yield xnamestore.get(self.xname)
if not name_config:
raise RuntimeError("Messaging name undefined: "+self.name)
raise RuntimeError("Messaging name undefined: "+self.xname)

yield self._init_receiver(name_config)
#log.debug("Receiver %s initialized (queue attached) cfg=%s" % (self.name,name_config))
#log.debug("Receiver %s initialized (queue attached) cfg=%s" % (self.xname,name_config))

@defer.inlineCallbacks
def _init_receiver(self, receiver_config, store_config=False):
container = ioninit.container_instance
if store_config:
xnamestore = container.exchange_manager.exchange_space.store
yield xnamestore.put(self.name, receiver_config)
yield xnamestore.put(self.xname, receiver_config)

self.consumer = yield container.new_consumer(receiver_config)

Expand All @@ -99,7 +114,7 @@ def on_activate(self, *args, **kwargs):
"""
self.consumer.register_callback(self.receive)
yield self.consumer.iterconsume()
#log.debug("Receiver %s activated (consumer enabled)" % self.name)
#log.debug("Receiver %s activated (consumer enabled)" % self.xname)

@defer.inlineCallbacks
def on_deactivate(self, *args, **kwargs):
Expand All @@ -117,8 +132,12 @@ def on_terminate(self, *args, **kwargs):
"""
yield self.consumer.close()

def on_error(self, *args, **kwargs):
raise RuntimeError("Illegal state change")
def on_error(self, cause= None, *args, **kwargs):
if cause:
log.error("Receiver error: %s" % cause)
pass
else:
raise RuntimeError("Illegal state change")

def add_handler(self, callback):
self.handlers.append(callback)
Expand All @@ -138,8 +157,8 @@ def receive(self, msg):
d = defer.maybeDeferred(handler, data, msg)

def __str__(self):
return "Receiver(label=%s,name=%s,group=%s)" % (
self.label, self.name, self.group)
return "Receiver(label=%s,xname=%s,group=%s)" % (
self.label, self.xname, self.group)

class ProcessReceiver(Receiver):
"""
Expand All @@ -151,9 +170,9 @@ def on_initialize(self, *args, **kwargs):
"""
@retval Deferred
"""
assert self.name, "Receiver must have a name"
assert self.xname, "Receiver must have a name"

name_config = messaging.process(self.name)
name_config = messaging.process(self.xname)
name_config.update({'name_type':'process'})

yield self._init_receiver(name_config, store_config=True)
Expand All @@ -168,9 +187,9 @@ def on_initialize(self, *args, **kwargs):
"""
@retval Deferred
"""
assert self.name, "Receiver must have a name"
assert self.xname, "Receiver must have a name"

name_config = messaging.worker(self.name)
name_config = messaging.worker(self.xname)
name_config.update({'name_type':'worker'})

yield self._init_receiver(name_config, store_config=True)
Expand All @@ -185,15 +204,15 @@ def on_initialize(self, *args, **kwargs):
"""
@retval Deferred
"""
assert self.name, "Receiver must have a name"
assert self.xname, "Receiver must have a name"

name_config = messaging.fanout(self.name)
name_config = messaging.fanout(self.xname)
name_config.update({'name_type':'fanout'})

yield self._init_receiver(name_config, store_config=True)

class NameReceiver(Receiver):
pass

class ServiceReceiver(Receiver):
class ServiceWorkerReceiver(WorkerReceiver):
pass
6 changes: 3 additions & 3 deletions ion/core/process/process.py
Expand Up @@ -61,8 +61,8 @@ def spawn(self, parent=None, container=None, activate=True):
Boilerplate for initialize()
@param parent the process instance that should be set as supervisor
"""
log.info('Spawning name=%s on node=%s' %
(self.proc_name, self.proc_node))
#log.info('Spawning name=%s on node=%s' %
# (self.proc_name, self.proc_node))
self.sup_process = parent
self.container = container or ioninit.container_instance
pid = yield self.initialize(activate)
Expand Down Expand Up @@ -112,7 +112,7 @@ def on_terminate(self, *args, **kwargs):

def on_error(self, cause=None, *args, **kwargs):
if cause:
#log.error("ProcessDesc error: %s" % cause)
log.error("ProcessDesc error: %s" % cause)
pass
else:
raise RuntimeError("Illegal state change for ProcessDesc")
Expand Down
33 changes: 0 additions & 33 deletions ion/core/supervisor.py

This file was deleted.

0 comments on commit cba59dc

Please sign in to comment.