From 70573b43e161292cdbdd9f28f3d2cc868878313b Mon Sep 17 00:00:00 2001 From: Michael Meisinger Date: Thu, 20 May 2010 21:10:40 -0700 Subject: [PATCH] added BaseProcessClient and changed all BaseServiceClient subclasses and tests --- README.txt | 4 +- ion/core/base_process.py | 53 +++++++++++++++++++ ion/data/fetcher.py | 9 ++-- ion/data/objstore.py | 2 - ion/data/test/test_fetcher.py | 2 +- ion/play/hello_service.py | 12 +++-- ion/play/test/test_hello.py | 2 +- ion/services/base_service.py | 35 +----------- ion/services/coi/attributestore.py | 11 ++-- ion/services/coi/datastore.py | 10 ++-- ion/services/coi/resource_registry.py | 38 ++++++------- ion/services/coi/service_registry.py | 16 +++--- ion/services/coi/test/test_attributestore.py | 4 +- ion/services/coi/test/test_datastore.py | 11 ++-- .../coi/test/test_resource_registry.py | 19 +++---- .../coi/test/test_service_registry.py | 9 ++-- ion/services/dm/datapubsub.py | 9 ++-- 17 files changed, 137 insertions(+), 109 deletions(-) diff --git a/README.txt b/README.txt index 4b7cddd3..aa87cd53 100644 --- a/README.txt +++ b/README.txt @@ -124,7 +124,9 @@ Change log: - The system now looks for a local config file ionlocal.config and if exists, overrides entries in ion.config. - Test cases use the config file to determine the broker host to use. If local - config override exists, a different broker (e.g. localhast) can be given. + config override exists, a different broker (e.g. localhost) can be given. +- Added BaseProcessClient and changed BaseServiceClient and all clients and + all test cases (again). 2010-05-16: - Removed support for BaseProcess.send_message and reply_message. Always use send, reply and rpc_send now. diff --git a/ion/core/base_process.py b/ion/core/base_process.py index c5ec5c09..754f63c5 100644 --- a/ion/core/base_process.py +++ b/ion/core/base_process.py @@ -366,3 +366,56 @@ def build(self, spawnArgs=None): instance = self.processClass(receiver, spawnArgs) receiver.procinst = instance return receiver + +class BaseProcessClient(object): + """ + This is the base class for a process client. A process client is code that + executes in the process space of a calling process. If no calling process + is given, a local one is created on the fly. This client adds some + glue to interact with a specific targer process + """ + def __init__(self, proc=None, target=None, targetname=None, **kwargs): + """ + Initializes a process client + @param proc a BaseProcess instance as originator of messages + @param target global scoped (process id or name) to send to + @param targetname system scoped exchange name to send messages to + """ + if not proc: + proc = BaseProcess() + self.proc = proc + assert target or targetname, "Need either target or targetname" + self.target = target + if not self.target: + self.target = self.proc.get_scoped_name('system', targetname) + + @defer.inlineCallbacks + def _check_init(self): + """ + Called in client methods to ensure that there exists a spawned process + to send messages from + """ + if not self.proc.is_spawned(): + yield self.proc.spawn() + + @defer.inlineCallbacks + def attach(self): + self._check_init() + + def rpc_send(self, *args): + """ + Sends an RPC message to the specified target via originator process + """ + return self.proc.rpc_send(self.target, *args) + + def send(self, *args): + """ + Sends a message to the specified target via originator process + """ + return self.proc.send(self.target, *args) + + def reply(self, *args): + """ + Replies to a message via the originator process + """ + return self.proc.reply(*args) diff --git a/ion/data/fetcher.py b/ion/data/fetcher.py index 9860562e..a69d19de 100644 --- a/ion/data/fetcher.py +++ b/ion/data/fetcher.py @@ -82,8 +82,10 @@ class FetcherClient(BaseServiceClient): Client class for the fetcher. @note RPC style interactions """ - def __init__(self, proc=None, pid=None): - BaseServiceClient.__init__(self, "fetcher", proc, pid) + def __init__(self, proc=None, **kwargs): + if not 'targetname' in kwargs: + kwargs['targetname'] = "fetcher" + BaseServiceClient.__init__(self, proc, **kwargs) @defer.inlineCallbacks def get_url(self, requested_url): @@ -95,8 +97,7 @@ def get_url(self, requested_url): yield self._check_init() logging.info('Sending request') - (content, headers, msg) = yield self.proc.rpc_send(self.svc, 'get_url', - requested_url) + (content, headers, msg) = yield self.rpc_send('get_url', requested_url) if 'failure' in content: raise ValueError('Error on URL: ' + content['failure']) defer.returnValue(content) diff --git a/ion/data/objstore.py b/ion/data/objstore.py index 6756a509..ffdbeb3d 100644 --- a/ion/data/objstore.py +++ b/ion/data/objstore.py @@ -17,9 +17,7 @@ from twisted.internet import defer from ion.data.dataobject import DataObject -from ion.data.cassandrads import CassandraStore from ion.data.store import Store -from ion.services.base_service import BaseService, BaseServiceClient import ion.util.procutils as pu diff --git a/ion/data/test/test_fetcher.py b/ion/data/test/test_fetcher.py index e5fd9f64..92a9bdb9 100644 --- a/ion/data/test/test_fetcher.py +++ b/ion/data/test/test_fetcher.py @@ -20,7 +20,7 @@ def setUp(self): 'class': 'FetcherService'},] sup = yield self._spawn_processes(services) - self.fc = FetcherClient(sup) + self.fc = FetcherClient(proc=sup) @defer.inlineCallbacks def tearDown(self): diff --git a/ion/play/hello_service.py b/ion/play/hello_service.py index 4b1fd53c..4186c396 100644 --- a/ion/play/hello_service.py +++ b/ion/play/hello_service.py @@ -17,10 +17,10 @@ class HelloService(BaseService): """Example service implementation """ - + # Declaration of service declare = BaseService.service_declare(name='hello', version='0.1.0', dependencies=[]) - + def __init__(self, receiver, spawnArgs=None): BaseService.__init__(self, receiver, spawnArgs) logging.info('HelloService.__init__()') @@ -41,13 +41,15 @@ class HelloServiceClient(BaseServiceClient): This is an exemplar service class that calls the hello service. It applies the RPC pattern. """ - def __init__(self, proc=None, pid=None): - BaseServiceClient.__init__(self, "hello", proc, pid) + def __init__(self, proc=None, **kwargs): + if not 'targetname' in kwargs: + kwargs['targetname'] = "hello" + BaseServiceClient.__init__(self, proc, **kwargs) @defer.inlineCallbacks def hello(self, text='Hi there'): yield self._check_init() - (content, headers, msg) = yield self.proc.rpc_send(self.svc, 'hello', text, {}) + (content, headers, msg) = yield self.rpc_send('hello', text) logging.info('Friends reply: '+str(content)) defer.returnValue(str(content)) diff --git a/ion/play/test/test_hello.py b/ion/play/test/test_hello.py index c64cf36a..a171dafc 100644 --- a/ion/play/test/test_hello.py +++ b/ion/play/test/test_hello.py @@ -34,5 +34,5 @@ def test_hello(self): sup = yield self._spawn_processes(services) - hc = HelloServiceClient(sup) + hc = HelloServiceClient(proc=sup) res = yield hc.hello("Hi there, hello1") diff --git a/ion/services/base_service.py b/ion/services/base_service.py index c208f028..af1c2335 100644 --- a/ion/services/base_service.py +++ b/ion/services/base_service.py @@ -13,7 +13,7 @@ from magnet.spawnable import spawn from ion.core import base_process -from ion.core.base_process import BaseProcess +from ion.core.base_process import BaseProcess, BaseProcessClient import ion.util.procutils as pu class BaseService(BaseProcess): @@ -86,7 +86,7 @@ def service_declare(cls, **kwargs): decl.update(kwargs) return decl -class BaseServiceClient(object): +class BaseServiceClient(BaseProcessClient): """ This is the base class for service client libraries. Service client libraries can be used from any process or standalone (in which case they spawn their @@ -94,34 +94,3 @@ class BaseServiceClient(object): can perform client side optimizations (such as caching and transformation of certain service results). """ - def __init__(self, svcname=None, proc=None, svcpid=None): - """ - Initializes a service client - @param svc service name (globally known name) - @param proc a BaseProcess instance as originator of requests - @param svcpid target exchange name (service process id or name) - """ - assert svcname or svcpid, "Need either service name or process-id" - self.svcname = svcname - self.svc = svcpid - if not proc: - proc = BaseProcess() - self.proc = proc - - @defer.inlineCallbacks - def _check_init(self): - """ - Called in client methods to ensure that there exists a spawned process - to send messages from - """ - if not self.proc.is_spawned(): - yield self.proc.spawn() - if not self.svc: - assert self.svcname, 'Must hace svcname to access service' - svcid = self.proc.get_scoped_name('system', self.svcname) - #svcid = yield base_process.procRegistry.get(self.svcname) - self.svc = str(svcid) - - @defer.inlineCallbacks - def attach(self): - yield self.proc.spawn() diff --git a/ion/services/coi/attributestore.py b/ion/services/coi/attributestore.py index 98fe05b2..263e07c3 100644 --- a/ion/services/coi/attributestore.py +++ b/ion/services/coi/attributestore.py @@ -80,21 +80,22 @@ class AttributeStoreClient(BaseServiceClient): """ Class for the client accessing the attribute store via Exchange """ - def __init__(self, proc=None, pid=None, svcname=None): - svcname = svcname if svcname else "attributestore" - BaseServiceClient.__init__(self, svcname, proc, pid) + def __init__(self, proc=None, **kwargs): + if not 'targetname' in kwargs: + kwargs['targetname'] = "attributestore" + BaseServiceClient.__init__(self, proc, **kwargs) @defer.inlineCallbacks def put(self, key, value): yield self._check_init() - (content, headers, msg) = yield self.proc.rpc_send(self.svc, 'put', {'key':str(key), 'value':value}) + (content, headers, msg) = yield self.rpc_send('put', {'key':str(key), 'value':value}) logging.info('Service reply: '+str(content)) defer.returnValue(str(content)) @defer.inlineCallbacks def get(self, key): yield self._check_init() - (content, headers, msg) = yield self.proc.rpc_send(self.svc, 'get', {'key':str(key)}) + (content, headers, msg) = yield self.rpc_send('get', {'key':str(key)}) logging.info('Service reply: '+str(content)) defer.returnValue(content['value']) diff --git a/ion/services/coi/datastore.py b/ion/services/coi/datastore.py index 557b81d3..e8cade3d 100644 --- a/ion/services/coi/datastore.py +++ b/ion/services/coi/datastore.py @@ -96,8 +96,10 @@ class DatastoreClient(BaseServiceClient): """ Class for the client accessing the object store service via ION Exchange """ - def __init__(self, proc=None, pid=None): - BaseServiceClient.__init__(self, "datastore", proc, pid) + def __init__(self, proc=None, **kwargs): + if not 'targetname' in kwargs: + kwargs['targetname'] = "datastore" + BaseServiceClient.__init__(self, proc, **kwargs) @defer.inlineCallbacks def put(self, key, value, parents=None): @@ -110,14 +112,14 @@ def put(self, key, value, parents=None): elif parents: cont['parents'] = [parents] - (content, headers, msg) = yield self.proc.rpc_send(self.svc, 'put', cont) + (content, headers, msg) = yield self.rpc_send('put', cont) logging.info('Service reply: '+str(content)) defer.returnValue(str(content)) @defer.inlineCallbacks def get(self, key): yield self._check_init() - (content, headers, msg) = yield self.proc.rpc_send(self.svc, 'get', {'key':str(key)}) + (content, headers, msg) = yield self.rpc_send('get', {'key':str(key)}) logging.info('Service reply: '+str(content)) defer.returnValue(str(content['value'])) diff --git a/ion/services/coi/resource_registry.py b/ion/services/coi/resource_registry.py index 690a7744..0c979ed6 100644 --- a/ion/services/coi/resource_registry.py +++ b/ion/services/coi/resource_registry.py @@ -28,7 +28,7 @@ class ResourceRegistryService(BaseService): # For now, keep registration in local memory store. def slc_init(self): self.datastore = Store() - + @defer.inlineCallbacks def op_register_resource(self, content, headers, msg): """ @@ -39,7 +39,7 @@ def op_register_resource(self, content, headers, msg): resdesc['lifecycle_state'] = ResourceLCState.RESLCS_NEW resid = pu.create_unique_id('R:') yield self.datastore.put(resid, resdesc) - yield self.reply(msg, 'result', {'res_id':str(resid)},) + yield self.reply(msg, 'result', {'res_id':str(resid)},) def op_define_resource_type(self, content, headers, msg): """ @@ -55,25 +55,27 @@ def op_get_resource_desc(self, content, headers, msg): logging.info('op_get_resource_desc: '+str(resid)) res_desc = yield self.datastore.get(resid) - yield self.reply(msg, 'result', {'res_desc':res_desc}) + yield self.reply(msg, 'result', {'res_desc':res_desc}) def op_set_resource_lcstate(self, content, headers, msg): """ Service operation: set the life cycle state of resource """ - + def op_find_resources(self, content, headers, msg): """ Service operation: find resources by criteria """ - + class ResourceRegistryClient(BaseServiceClient): """ Class for the client accessing the resource registry. """ - def __init__(self, proc=None, pid=None): - BaseServiceClient.__init__(self, "resource_registry", proc, pid) + def __init__(self, proc=None, **kwargs): + if not 'targetname' in kwargs: + kwargs['targetname'] = "resource_registry" + BaseServiceClient.__init__(self, proc, **kwargs) def registerResourceType(self, rt_desc): pass @@ -82,8 +84,8 @@ def registerResourceType(self, rt_desc): def register_resource(self, res_desc): yield self._check_init() - (content, headers, msg) = yield self.proc.rpc_send(self.svc, - 'register_resource', {'res_desc':res_desc.encode()}) + (content, headers, msg) = yield self.rpc_send('register_resource', + {'res_desc':res_desc.encode()}) logging.info('Service reply: '+str(headers)) defer.returnValue(str(content['res_id'])) @@ -91,8 +93,8 @@ def register_resource(self, res_desc): def get_resource_desc(self, res_id): yield self._check_init() - (content, headers, msg) = yield self.proc.rpc_send(self.svc, - 'get_resource_desc', {'res_id':res_id}) + (content, headers, msg) = yield self.rpc_send('get_resource_desc', + {'res_id':res_id}) logging.info('Service reply: '+str(content)) rd = ResourceDesc() rdd = content['res_desc'] @@ -101,7 +103,7 @@ def get_resource_desc(self, res_id): defer.returnValue(rd) else: defer.returnValue(None) - + class ResourceTypes(object): """Static class with constant definitions for resource types. Do not instantiate @@ -109,7 +111,7 @@ class ResourceTypes(object): RESTYPE_GENERIC = 'rt_generic' RESTYPE_SERVICE = 'rt_service' RESTYPE_UNASSIGNED = 'rt_unassigned' - + def __init__(self): raise RuntimeError('Do not instantiate '+self.__class__.__name__) @@ -124,7 +126,7 @@ class ResourceLCState(object): RESLCS_RETIRED = 'rlcs_retired' RESLCS_DEVELOPED = 'rlcs_developed' RESLCS_COMMISSIONED = 'rlcs_commissioned' - + def __init__(self): raise RuntimeError('Do not instantiate '+self.__class__.__name__) @@ -145,13 +147,13 @@ def setResourceDesc(self, **kwargs): self.set_attr('res_type',kwargs['res_type']) else: raise RuntimeError("Resource type missing") - + if 'name' in kwargs: self.set_attr('res_name',kwargs['name']) class ResourceTypeDesc(DataObject): """Structured object for a resource type description. - + Attributes: .res_name name of the resource type .res_type identifier of this resource type @@ -162,7 +164,7 @@ def __init__(self, **kwargs): DataObject.__init__(self) if len(kwargs) != 0: self.setResourceTypeDesc(**kwargs) - + def setResourceTypeDesc(self, **kwargs): if 'name' in kwargs: self.set_attr('name',kwargs['name']) @@ -178,7 +180,7 @@ def setResourceTypeDesc(self, **kwargs): self.set_attr('res_type',kwargs['res_type']) else: self.res_type = ResourceTypes.RESTYPE_UNASSIGNED - + if 'desc' in kwargs: self.set_attr('desc',kwargs['desc']) diff --git a/ion/services/coi/service_registry.py b/ion/services/coi/service_registry.py index b60583f8..67f4e17f 100644 --- a/ion/services/coi/service_registry.py +++ b/ion/services/coi/service_registry.py @@ -38,7 +38,7 @@ def op_register_service(self, content, headers, msg): logging.info('op_register_service: '+str(svcdesc)) yield self.datastore.put(svcdesc['name'],svcdesc) - yield self.reply(msg, 'result', {'status':'ok'}) + yield self.reply(msg, 'result', {'status':'ok'}) def op_get_service_desc(self, content, headers, msg): """ @@ -49,7 +49,7 @@ def op_get_service_desc(self, content, headers, msg): @defer.inlineCallbacks def op_register_instance(self, content, headers, msg): """ - Service operation: + Service operation: """ svcinstdesc = content['svcinst_desc'].copy() logging.info('op_register_instance: '+str(svcinstdesc)) @@ -78,25 +78,27 @@ class ServiceRegistryClient(BaseServiceClient): finding and accessing any other services. This client knows how to find the service registry """ - def __init__(self, proc=None, pid=None): - BaseServiceClient.__init__(self, "service_registry", proc, pid) + def __init__(self, proc=None, **kwargs): + if not 'targetname' in kwargs: + kwargs['targetname'] = "service_registry" + BaseServiceClient.__init__(self, proc, **kwargs) @defer.inlineCallbacks def register_service(self, svc): yield self._check_init() - (content, headers, msg) = yield self.proc.rpc_send(self.svc, 'register_service', + (content, headers, msg) = yield self.rpc_send('register_service', {'svc_desc':svc.encode()}) @defer.inlineCallbacks def register_service_instance(self, svc_inst): yield self._check_init() - (content, headers, msg) = yield self.proc.rpc_send(self.svc, 'register_instance', + (content, headers, msg) = yield self.rpc_send('register_instance', {'svcinst_desc':svc_inst.encode()}) @defer.inlineCallbacks def get_service_instance(self, service_name): yield self._check_init() - (content, headers, msg) = yield self.proc.rpc_send(self.svc, 'get_instance', + (content, headers, msg) = yield self.rpc_send('get_instance', {'svc_name': service_name}) defer.returnValue(content['svcinst_desc']) diff --git a/ion/services/coi/test/test_attributestore.py b/ion/services/coi/test/test_attributestore.py index 76c7f4c7..8924135c 100644 --- a/ion/services/coi/test/test_attributestore.py +++ b/ion/services/coi/test/test_attributestore.py @@ -38,7 +38,7 @@ def test_put(self): sup = yield self._spawn_processes(services) - asc1 = AttributeStoreClient(sup, svcname='as1') + asc1 = AttributeStoreClient(proc=sup, targetname='as1') res1 = yield asc1.put('key1','value1') logging.info('Result1 put: '+str(res1)) @@ -55,7 +55,7 @@ def test_put(self): res5 = yield asc1.get('non_existing') self.assertEqual(res5, None) - asc2 = AttributeStoreClient(sup, svcname='as2') + asc2 = AttributeStoreClient(proc=sup, targetname='as2') resx1 = yield asc2.get('key1') self.assertEqual(resx1, None) diff --git a/ion/services/coi/test/test_datastore.py b/ion/services/coi/test/test_datastore.py index ac2a6c34..2d6095ed 100644 --- a/ion/services/coi/test/test_datastore.py +++ b/ion/services/coi/test/test_datastore.py @@ -74,12 +74,12 @@ def _opfix(op, cont): self.assertEqual(vo2.identity,rv1[1]['identity']) self.assertEqual(vo3.identity,rv1[2]['identity']) self.assertEqual(vo4.identity,rv1[3]['identity']) - + # Check get_ancestors ra1 = _opfix('get_ancestors',_cont('key1')) print "ra1=", ra1 self.assertEqual(len(ra1),0) - + ra15 = _opfix('get_ancestors',_cont('key15')) print "ra15=", ra15 self.assertEqual(len(ra15),5) @@ -106,7 +106,7 @@ def test_put(self): sup = yield self._spawn_processes(services) - osc = DatastoreClient(sup) + osc = DatastoreClient(proc=sup) res1 = yield osc.put('key1','value1') logging.info('Result1 put: '+str(res1)) @@ -116,7 +116,7 @@ def test_put(self): res3 = yield osc.put('key1','value2') logging.info('Result3 put: '+str(res3)) - + res4 = yield osc.put('key1','value3',res3) logging.info('Result4 put: '+str(res4)) @@ -135,6 +135,3 @@ def test_put(self): res8 = yield osc.put('key1','value10',[res6,res7]) logging.info('Result8 put: '+str(res8)) - - - diff --git a/ion/services/coi/test/test_resource_registry.py b/ion/services/coi/test/test_resource_registry.py index 07293c64..58253670 100644 --- a/ion/services/coi/test/test_resource_registry.py +++ b/ion/services/coi/test/test_resource_registry.py @@ -19,7 +19,7 @@ class ResourceRegistryClientTest(unittest.TestCase): """ Testing client classes of resource registry """ - + def test_ResourceDesc(self): # Instantiate without args and then set rd1 = ResourceDesc() @@ -30,13 +30,13 @@ def test_ResourceDesc(self): # Instantiate with name only rd3 = ResourceDesc(res_type=ResourceTypes.RESTYPE_GENERIC) - + def test_ResourceTypeDesc(self): # Instantiate without args rtd1 = ResourceTypeDesc() rtd1.setResourceTypeDesc(name='gen',res_type=ResourceTypes.RESTYPE_GENERIC) print "Object identity "+str(rtd1.identity) - + self.assertEqual(rtd1.name,'gen') self.assertEqual(rtd1.res_type,ResourceTypes.RESTYPE_GENERIC) @@ -48,7 +48,7 @@ def test_ResourceTypeDesc(self): self.assertEqual(rtd3.name,'new') self.assertEqual(rtd3.based_on,ResourceTypes.RESTYPE_GENERIC) self.assertEqual(rtd3.res_type,ResourceTypes.RESTYPE_UNASSIGNED) - + class ResourceRegistryTest(IonTestCase): """ Testing service classes of resource registry @@ -57,17 +57,17 @@ class ResourceRegistryTest(IonTestCase): @defer.inlineCallbacks def setUp(self): yield self._start_container() - yield self._start_core_services() + self.sup = yield self._start_core_services() @defer.inlineCallbacks def tearDown(self): yield self._stop_container() - + @defer.inlineCallbacks def test_resource_reg(self): - + rd2 = ResourceDesc(name='res2',res_type=ResourceTypes.RESTYPE_GENERIC) - c = ResourceRegistryClient() + c = ResourceRegistryClient(proc=self.sup) rid = yield c.register_resource(rd2) logging.info('Resource registered with id '+str(rid)) @@ -78,6 +78,3 @@ def test_resource_reg(self): rd4 = yield c.get_resource_desc('NONE') self.assertFalse(rd4,'resource desc not None') - - - \ No newline at end of file diff --git a/ion/services/coi/test/test_service_registry.py b/ion/services/coi/test/test_service_registry.py index d9203861..b84995da 100644 --- a/ion/services/coi/test/test_service_registry.py +++ b/ion/services/coi/test/test_service_registry.py @@ -17,7 +17,7 @@ class ServiceRegistryClientTest(IonTestCase): """ Testing client classes of service registry """ - + @defer.inlineCallbacks def setUp(self): yield self._start_container() @@ -26,12 +26,12 @@ def setUp(self): @defer.inlineCallbacks def tearDown(self): yield self._stop_container() - + @defer.inlineCallbacks def test_service_reg(self): sd1 = ServiceDesc(name='svc1') - - c = ServiceRegistryClient(self.sup) + + c = ServiceRegistryClient(proc=self.sup) res1 = yield c.register_service(sd1) si1 = ServiceInstanceDesc(xname=self.sup.id.full, svc_name='svc1') @@ -39,4 +39,3 @@ def test_service_reg(self): ri2 = yield c.get_service_instance_name('svc1') self.assertEqual(ri2, self.sup.id.full) - \ No newline at end of file diff --git a/ion/services/dm/datapubsub.py b/ion/services/dm/datapubsub.py index 7e37b8ae..cba9a2dd 100644 --- a/ion/services/dm/datapubsub.py +++ b/ion/services/dm/datapubsub.py @@ -80,13 +80,16 @@ def find_topic(self, content, headers, msg): class DataPubsubClient(BaseServiceClient): """Client class for accessing the data pubsub service. """ - def __init__(self, proc=None, pid=None): - BaseServiceClient.__init__(self, "data_pubsub", proc, pid) + def __init__(self, proc=None, **kwargs): + if not 'targetname' in kwargs: + kwargs['targetname'] = "data_pubsub" + BaseServiceClient.__init__(self, proc, **kwargs) @defer.inlineCallbacks def define_topic(self, topic_name): yield self._check_init() - (content, headers, msg) = yield self.proc.rpc_send(self.svc, 'define_topic', {'topic_name':topic_name}, {}) + (content, headers, msg) = yield self.rpc_send('define_topic', + {'topic_name':topic_name}, {}) defer.returnValue(str(content['topic_name'])) @defer.inlineCallbacks