Permalink
Browse files

Changed BaseServiceClient __init__ args and all client impls; takes n…

…ow global name of service
  • Loading branch information...
1 parent e710e44 commit bc50a3872631c670795a50ac6e8f4361535cb589 Michael Meisinger committed May 17, 2010
View
@@ -104,6 +104,7 @@ Change log:
global names, because they will clash.
- Less verbose trace output for process init messages and changes to other
trace output as well.
+- Changed BaseServiceClient and tests. Initializer arguments different.
2010-05-10:
- Based on entries in config files, service process modules are sought and
loaded in order to collect the service process declarations. This enables
View
@@ -93,7 +93,7 @@ def _set_container_args(contargs=None):
logging.error('Invalid argument format: ', e)
elif contargs.find('=') > 0:
# Key=value arguments separated by comma
- print "Parsing KV"
+ logging.info("Parsing KV")
args = contargs.split(',')
for a in args:
k,s,v = a.partition('=')
View
@@ -86,18 +86,22 @@ def op_identify(self, content, headers, msg):
logging.info("op_identify(). Send announcement")
yield self._send_announcement('identify')
+ @defer.inlineCallbacks
+ def op_spawn(self, content, headers, msg):
+ """
+ Service operation: spawns a local module
+ """
+ procMod = str(content['module'])
+ child = ProcessDesc(procMod.rpartition('.')[2], procMod)
+ pid = yield self.spawn_child(child)
+ yield self.reply(msg, 'result', {'status':'OK', 'process-id':str(pid)})
+
def op_start_node(self, content, headers, msg):
pass
def op_terminate_node(self, content, headers, msg):
pass
- @defer.inlineCallbacks
- def op_spawn(self, content, headers, msg):
- procMod = str(content['module'])
- child = ProcessDesc(procMod, procMod)
- yield self.spawn_child(child)
-
def op_get_node_id(self, content, headers, msg):
pass
@@ -113,4 +117,5 @@ def op_get_config(self, content, headers, msg):
twistd -n --pidfile t2.pid magnet -h amoeba.ucsd.edu -a sysname=mm res/scripts/newcc.py
send (2, {'op':'identify','content':''})
+send (2, {'op':'spawn','content':{'module':'ion.play.hello_service'}})
"""
View
@@ -11,4 +11,6 @@
ION_CONF_FILENAME = 'res/config/ion.config'
# @todo use magnet version system
-VERSION = "ion 0.2.0"
+VERSION = "ion 0.2.1"
+
+MIN_MAGNET = "0.3.4"
View
@@ -7,6 +7,8 @@
"""
import logging.config
+import magnet
+import re
from ion.core import ionconst as ic
from ion.util.config import Config
@@ -38,3 +40,12 @@ def get_config(confname, conf=None):
if conf == None:
conf = ion_config
return Config(conf.getValue(confname)).getObject()
+
+def check_magnet_version():
+ minmv = ic.MIN_MAGNET.split('.')
+ mv = magnet.__version__.split('.')
+ mv[2] = mv[2].partition('+')[0]
+ if mv[0]<minmv[0] or mv[1]<minmv[1] or mv[2]<minmv[2]:
+ logging.error("*********** ATTENTION! Magnet %s required. Is %s ***********" % (ic.MIN_MAGNET, magnet.__version__))
+
+check_magnet_version()
View
@@ -82,9 +82,8 @@ class FetcherClient(BaseServiceClient):
Client class for the fetcher.
@note RPC style interactions
"""
- def __init__(self, *args):
- BaseServiceClient.__init__(self, *args)
- self.svcname = "fetcher"
+ def __init__(self, proc=None, pid=None):
+ BaseServiceClient.__init__(self, "fetcher", proc, pid)
@defer.inlineCallbacks
def get_url(self, requested_url):
@@ -18,10 +18,9 @@ def setUp(self):
yield self._start_container()
services = [{'name':'fetcher', 'module':'ion.data.fetcher',
'class': 'FetcherService'},]
- yield self._spawn_processes(services)
+ sup = yield self._spawn_processes(services)
- self.dest = yield self._get_procid('fetcher')
- self.fc = FetcherClient(self.dest)
+ self.fc = FetcherClient(sup)
@defer.inlineCallbacks
def tearDown(self):
View
@@ -37,14 +37,12 @@ def op_hello(self, content, headers, msg):
class HelloServiceClient(BaseServiceClient):
- """This is an exemplar service class that calls the hello service. It
+ """
+ This is an exemplar service class that calls the hello service. It
applies the RPC pattern.
"""
-
- def __init__(self, *args):
- BaseServiceClient.__init__(self, *args)
- self.svcname = "hello_service"
-
+ def __init__(self, proc=None, pid=None):
+ BaseServiceClient.__init__(self, "hello", proc, pid)
@defer.inlineCallbacks
def hello(self, text='Hi there'):
@@ -33,10 +33,6 @@ def test_hello(self):
]
sup = yield self._spawn_processes(services)
- logging.info("Supervisor: "+repr(sup))
- hsid = yield self._get_procid("hello1")
- logging.info("Hello service process 1: "+repr(hsid))
-
- hc = HelloServiceClient(hsid, sup)
+ hc = HelloServiceClient(sup)
res = yield hc.hello("Hi there, hello1")
@@ -37,7 +37,8 @@ def __init__(self, receiver=None, spawnArgs=None):
msgName = self.get_scoped_name('system', svcname)
svcReceiver = Receiver(svcname+'.'+self.receiver.label, msgName)
- svcReceiver.group = self.receiver.group
+ if hasattr(self.receiver, 'group'):
+ svcReceiver.group = self.receiver.group
self.svc_receiver = svcReceiver
self.svc_receiver.handle(self.receive)
self.add_receiver(self.svc_receiver)
@@ -92,13 +93,16 @@ class BaseServiceClient(object):
can perform client side optimizations (such as caching and transformation
of certain service results).
"""
- def __init__(self, svc=None, proc=None):
+ def __init__(self, svcname=None, proc=None, svcpid=None):
"""
Initializes a service client
- @param svc target exchange name (service process id)
+ @param svc service name (globally known name)
@param proc a BaseProcess instance as originator of requests
+ @param svcpid target exchange name (service process id or name)
"""
- self.svc = svc
+ assert svcname or svcpid, "Need either service name or process-id"
+ self.svcname = svcname
+ self.svc = svcpid
if not proc:
proc = BaseProcess()
self.proc = proc
@@ -109,12 +113,13 @@ def _check_init(self):
Called in client methods to ensure that there exists a spawned process
to send messages from
"""
+ if not self.proc.is_spawned():
+ yield self.proc.spawn()
if not self.svc:
assert self.svcname, 'Must hace svcname to access service'
- svcid = yield base_process.procRegistry.get(self.svcname)
+ svcid = self.proc.get_scoped_name('system', self.svcname)
+ #svcid = yield base_process.procRegistry.get(self.svcname)
self.svc = str(svcid)
- if not self.proc.is_spawned():
- yield self.proc.spawn()
@defer.inlineCallbacks
def attach(self):
@@ -80,9 +80,8 @@ class DatastoreClient(BaseServiceClient):
"""
Class for the client accessing the object store service via ION Exchange
"""
- def __init__(self, *args):
- BaseServiceClient.__init__(self, *args)
- self.svcname = "objstore"
+ def __init__(self, proc=None, pid=None):
+ BaseServiceClient.__init__(self, "datastore", proc, pid)
@defer.inlineCallbacks
def put(self, key, value, parents=None):
@@ -72,9 +72,8 @@ class ResourceRegistryClient(BaseServiceClient):
"""
Class for the client accessing the resource registry.
"""
- def __init__(self, *args):
- BaseServiceClient.__init__(self, *args)
- self.svcname = "resource_registry"
+ def __init__(self, proc=None, pid=None):
+ BaseServiceClient.__init__(self, "resource_registry", proc, pid)
def registerResourceType(self, rt_desc):
pass
@@ -78,9 +78,8 @@ class ServiceRegistryClient(BaseServiceClient):
finding and accessing any other services. This client knows how to find the
service registry
"""
- def __init__(self, *args):
- BaseServiceClient.__init__(self, *args)
- self.svcname = "service_registry"
+ def __init__(self, proc=None, pid=None):
+ BaseServiceClient.__init__(self, "service_registry", proc, pid)
@defer.inlineCallbacks
def register_service(self, svc):
@@ -106,8 +106,7 @@ def test_put(self):
sup = yield self._spawn_processes(services)
- oss1 = yield base_process.procRegistry.get("datastore1")
- osc = DatastoreClient(oss1, sup)
+ osc = DatastoreClient(sup)
res1 = yield osc.put('key1','value1')
logging.info('Result1 put: '+str(res1))
@@ -31,7 +31,7 @@ def tearDown(self):
def test_service_reg(self):
sd1 = ServiceDesc(name='svc1')
- c = ServiceRegistryClient(None, self.sup)
+ c = ServiceRegistryClient(self.sup)
res1 = yield c.register_service(sd1)
si1 = ServiceInstanceDesc(xname=self.sup.id.full, svc_name='svc1')
@@ -80,9 +80,8 @@ def find_topic(self, content, headers, msg):
class DataPubsubClient(BaseServiceClient):
"""Client class for accessing the data pubsub service.
"""
- def __init__(self, *args):
- BaseServiceClient.__init__(self, *args)
- self.svcname = "data_pubsub"
+ def __init__(self, proc=None, pid=None):
+ BaseServiceClient.__init__(self, "data_pubsub", proc, pid)
@defer.inlineCallbacks
def define_topic(self, topic_name):
@@ -60,10 +60,7 @@ def test_pubsub(self):
sup = yield self._spawn_processes(services)
- dps = yield self._get_procid("data_pubsub")
- logging.info("DataPubsubservice: "+repr(dps))
-
- dpsc = DataPubsubClient(dps, sup)
+ dpsc = DataPubsubClient(sup)
topic_name = yield dpsc.define_topic("topic1")
logging.info('Service reply: '+str(topic_name))
@@ -106,10 +103,7 @@ def test_chainprocess(self):
sup = yield self._spawn_processes(services)
- dps = yield self._get_procid("data_pubsub")
- logging.info("DataPubsubservice: "+repr(dps))
-
- dpsc = DataPubsubClient(dps, sup)
+ dpsc = DataPubsubClient(sup)
topic_raw = yield dpsc.define_topic("topic_raw")
topic_qc = yield dpsc.define_topic("topic_qc")
topic_evt = yield dpsc.define_topic("topic_qcevent")
View
@@ -259,6 +259,7 @@ class FakeReceiver(object):
def __init__(self, id=None):
self.payload = None
self.spawned = FakeSpawnable()
+ self.group = 'fake'
@defer.inlineCallbacks
def send(self, to, msg):

0 comments on commit bc50a38

Please sign in to comment.