Skip to content
This repository has been archived by the owner on Sep 23, 2020. It is now read-only.

Commit

Permalink
added BaseProcessClient and changed all BaseServiceClient subclasses …
Browse files Browse the repository at this point in the history
…and tests
  • Loading branch information
Michael Meisinger committed May 21, 2010
1 parent 342829f commit 70573b4
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 109 deletions.
4 changes: 3 additions & 1 deletion README.txt
Expand Up @@ -124,7 +124,9 @@ Change log:
- The system now looks for a local config file ionlocal.config and if exists,
overrides entries in ion.config.
- Test cases use the config file to determine the broker host to use. If local
config override exists, a different broker (e.g. localhast) can be given.
config override exists, a different broker (e.g. localhost) can be given.
- Added BaseProcessClient and changed BaseServiceClient and all clients and
all test cases (again).
2010-05-16:
- Removed support for BaseProcess.send_message and reply_message. Always use
send, reply and rpc_send now.
Expand Down
53 changes: 53 additions & 0 deletions ion/core/base_process.py
Expand Up @@ -366,3 +366,56 @@ def build(self, spawnArgs=None):
instance = self.processClass(receiver, spawnArgs)
receiver.procinst = instance
return receiver

class BaseProcessClient(object):
"""
This is the base class for a process client. A process client is code that
executes in the process space of a calling process. If no calling process
is given, a local one is created on the fly. This client adds some
glue to interact with a specific targer process
"""
def __init__(self, proc=None, target=None, targetname=None, **kwargs):
"""
Initializes a process client
@param proc a BaseProcess instance as originator of messages
@param target global scoped (process id or name) to send to
@param targetname system scoped exchange name to send messages to
"""
if not proc:
proc = BaseProcess()
self.proc = proc
assert target or targetname, "Need either target or targetname"
self.target = target
if not self.target:
self.target = self.proc.get_scoped_name('system', targetname)

@defer.inlineCallbacks
def _check_init(self):
"""
Called in client methods to ensure that there exists a spawned process
to send messages from
"""
if not self.proc.is_spawned():
yield self.proc.spawn()

@defer.inlineCallbacks
def attach(self):
self._check_init()

def rpc_send(self, *args):
"""
Sends an RPC message to the specified target via originator process
"""
return self.proc.rpc_send(self.target, *args)

def send(self, *args):
"""
Sends a message to the specified target via originator process
"""
return self.proc.send(self.target, *args)

def reply(self, *args):
"""
Replies to a message via the originator process
"""
return self.proc.reply(*args)
9 changes: 5 additions & 4 deletions ion/data/fetcher.py
Expand Up @@ -82,8 +82,10 @@ class FetcherClient(BaseServiceClient):
Client class for the fetcher.
@note RPC style interactions
"""
def __init__(self, proc=None, pid=None):
BaseServiceClient.__init__(self, "fetcher", proc, pid)
def __init__(self, proc=None, **kwargs):
if not 'targetname' in kwargs:
kwargs['targetname'] = "fetcher"
BaseServiceClient.__init__(self, proc, **kwargs)

@defer.inlineCallbacks
def get_url(self, requested_url):
Expand All @@ -95,8 +97,7 @@ def get_url(self, requested_url):
yield self._check_init()

logging.info('Sending request')
(content, headers, msg) = yield self.proc.rpc_send(self.svc, 'get_url',
requested_url)
(content, headers, msg) = yield self.rpc_send('get_url', requested_url)
if 'failure' in content:
raise ValueError('Error on URL: ' + content['failure'])
defer.returnValue(content)
Expand Down
2 changes: 0 additions & 2 deletions ion/data/objstore.py
Expand Up @@ -17,9 +17,7 @@
from twisted.internet import defer

from ion.data.dataobject import DataObject
from ion.data.cassandrads import CassandraStore
from ion.data.store import Store
from ion.services.base_service import BaseService, BaseServiceClient
import ion.util.procutils as pu


Expand Down
2 changes: 1 addition & 1 deletion ion/data/test/test_fetcher.py
Expand Up @@ -20,7 +20,7 @@ def setUp(self):
'class': 'FetcherService'},]
sup = yield self._spawn_processes(services)

self.fc = FetcherClient(sup)
self.fc = FetcherClient(proc=sup)

@defer.inlineCallbacks
def tearDown(self):
Expand Down
12 changes: 7 additions & 5 deletions ion/play/hello_service.py
Expand Up @@ -17,10 +17,10 @@
class HelloService(BaseService):
"""Example service implementation
"""

# Declaration of service
declare = BaseService.service_declare(name='hello', version='0.1.0', dependencies=[])

def __init__(self, receiver, spawnArgs=None):
BaseService.__init__(self, receiver, spawnArgs)
logging.info('HelloService.__init__()')
Expand All @@ -41,13 +41,15 @@ class HelloServiceClient(BaseServiceClient):
This is an exemplar service class that calls the hello service. It
applies the RPC pattern.
"""
def __init__(self, proc=None, pid=None):
BaseServiceClient.__init__(self, "hello", proc, pid)
def __init__(self, proc=None, **kwargs):
if not 'targetname' in kwargs:
kwargs['targetname'] = "hello"
BaseServiceClient.__init__(self, proc, **kwargs)

@defer.inlineCallbacks
def hello(self, text='Hi there'):
yield self._check_init()
(content, headers, msg) = yield self.proc.rpc_send(self.svc, 'hello', text, {})
(content, headers, msg) = yield self.rpc_send('hello', text)
logging.info('Friends reply: '+str(content))
defer.returnValue(str(content))

Expand Down
2 changes: 1 addition & 1 deletion ion/play/test/test_hello.py
Expand Up @@ -34,5 +34,5 @@ def test_hello(self):

sup = yield self._spawn_processes(services)

hc = HelloServiceClient(sup)
hc = HelloServiceClient(proc=sup)
res = yield hc.hello("Hi there, hello1")
35 changes: 2 additions & 33 deletions ion/services/base_service.py
Expand Up @@ -13,7 +13,7 @@
from magnet.spawnable import spawn

from ion.core import base_process
from ion.core.base_process import BaseProcess
from ion.core.base_process import BaseProcess, BaseProcessClient
import ion.util.procutils as pu

class BaseService(BaseProcess):
Expand Down Expand Up @@ -86,42 +86,11 @@ def service_declare(cls, **kwargs):
decl.update(kwargs)
return decl

class BaseServiceClient(object):
class BaseServiceClient(BaseProcessClient):
"""
This is the base class for service client libraries. Service client libraries
can be used from any process or standalone (in which case they spawn their
own client process). A service client makes accessing the service easier and
can perform client side optimizations (such as caching and transformation
of certain service results).
"""
def __init__(self, svcname=None, proc=None, svcpid=None):
"""
Initializes a service client
@param svc service name (globally known name)
@param proc a BaseProcess instance as originator of requests
@param svcpid target exchange name (service process id or name)
"""
assert svcname or svcpid, "Need either service name or process-id"
self.svcname = svcname
self.svc = svcpid
if not proc:
proc = BaseProcess()
self.proc = proc

@defer.inlineCallbacks
def _check_init(self):
"""
Called in client methods to ensure that there exists a spawned process
to send messages from
"""
if not self.proc.is_spawned():
yield self.proc.spawn()
if not self.svc:
assert self.svcname, 'Must hace svcname to access service'
svcid = self.proc.get_scoped_name('system', self.svcname)
#svcid = yield base_process.procRegistry.get(self.svcname)
self.svc = str(svcid)

@defer.inlineCallbacks
def attach(self):
yield self.proc.spawn()
11 changes: 6 additions & 5 deletions ion/services/coi/attributestore.py
Expand Up @@ -80,21 +80,22 @@ class AttributeStoreClient(BaseServiceClient):
"""
Class for the client accessing the attribute store via Exchange
"""
def __init__(self, proc=None, pid=None, svcname=None):
svcname = svcname if svcname else "attributestore"
BaseServiceClient.__init__(self, svcname, proc, pid)
def __init__(self, proc=None, **kwargs):
if not 'targetname' in kwargs:
kwargs['targetname'] = "attributestore"
BaseServiceClient.__init__(self, proc, **kwargs)

@defer.inlineCallbacks
def put(self, key, value):
yield self._check_init()
(content, headers, msg) = yield self.proc.rpc_send(self.svc, 'put', {'key':str(key), 'value':value})
(content, headers, msg) = yield self.rpc_send('put', {'key':str(key), 'value':value})
logging.info('Service reply: '+str(content))
defer.returnValue(str(content))

@defer.inlineCallbacks
def get(self, key):
yield self._check_init()
(content, headers, msg) = yield self.proc.rpc_send(self.svc, 'get', {'key':str(key)})
(content, headers, msg) = yield self.rpc_send('get', {'key':str(key)})
logging.info('Service reply: '+str(content))
defer.returnValue(content['value'])

Expand Down
10 changes: 6 additions & 4 deletions ion/services/coi/datastore.py
Expand Up @@ -96,8 +96,10 @@ class DatastoreClient(BaseServiceClient):
"""
Class for the client accessing the object store service via ION Exchange
"""
def __init__(self, proc=None, pid=None):
BaseServiceClient.__init__(self, "datastore", proc, pid)
def __init__(self, proc=None, **kwargs):
if not 'targetname' in kwargs:
kwargs['targetname'] = "datastore"
BaseServiceClient.__init__(self, proc, **kwargs)

@defer.inlineCallbacks
def put(self, key, value, parents=None):
Expand All @@ -110,14 +112,14 @@ def put(self, key, value, parents=None):
elif parents:
cont['parents'] = [parents]

(content, headers, msg) = yield self.proc.rpc_send(self.svc, 'put', cont)
(content, headers, msg) = yield self.rpc_send('put', cont)
logging.info('Service reply: '+str(content))
defer.returnValue(str(content))

@defer.inlineCallbacks
def get(self, key):
yield self._check_init()
(content, headers, msg) = yield self.proc.rpc_send(self.svc, 'get', {'key':str(key)})
(content, headers, msg) = yield self.rpc_send('get', {'key':str(key)})
logging.info('Service reply: '+str(content))
defer.returnValue(str(content['value']))

Expand Down
38 changes: 20 additions & 18 deletions ion/services/coi/resource_registry.py
Expand Up @@ -28,7 +28,7 @@ class ResourceRegistryService(BaseService):
# For now, keep registration in local memory store.
def slc_init(self):
self.datastore = Store()

@defer.inlineCallbacks
def op_register_resource(self, content, headers, msg):
"""
Expand All @@ -39,7 +39,7 @@ def op_register_resource(self, content, headers, msg):
resdesc['lifecycle_state'] = ResourceLCState.RESLCS_NEW
resid = pu.create_unique_id('R:')
yield self.datastore.put(resid, resdesc)
yield self.reply(msg, 'result', {'res_id':str(resid)},)
yield self.reply(msg, 'result', {'res_id':str(resid)},)

def op_define_resource_type(self, content, headers, msg):
"""
Expand All @@ -55,25 +55,27 @@ def op_get_resource_desc(self, content, headers, msg):
logging.info('op_get_resource_desc: '+str(resid))

res_desc = yield self.datastore.get(resid)
yield self.reply(msg, 'result', {'res_desc':res_desc})
yield self.reply(msg, 'result', {'res_desc':res_desc})

def op_set_resource_lcstate(self, content, headers, msg):
"""
Service operation: set the life cycle state of resource
"""

def op_find_resources(self, content, headers, msg):
"""
Service operation: find resources by criteria
"""


class ResourceRegistryClient(BaseServiceClient):
"""
Class for the client accessing the resource registry.
"""
def __init__(self, proc=None, pid=None):
BaseServiceClient.__init__(self, "resource_registry", proc, pid)
def __init__(self, proc=None, **kwargs):
if not 'targetname' in kwargs:
kwargs['targetname'] = "resource_registry"
BaseServiceClient.__init__(self, proc, **kwargs)

def registerResourceType(self, rt_desc):
pass
Expand All @@ -82,17 +84,17 @@ def registerResourceType(self, rt_desc):
def register_resource(self, res_desc):
yield self._check_init()

(content, headers, msg) = yield self.proc.rpc_send(self.svc,
'register_resource', {'res_desc':res_desc.encode()})
(content, headers, msg) = yield self.rpc_send('register_resource',
{'res_desc':res_desc.encode()})
logging.info('Service reply: '+str(headers))
defer.returnValue(str(content['res_id']))

@defer.inlineCallbacks
def get_resource_desc(self, res_id):
yield self._check_init()

(content, headers, msg) = yield self.proc.rpc_send(self.svc,
'get_resource_desc', {'res_id':res_id})
(content, headers, msg) = yield self.rpc_send('get_resource_desc',
{'res_id':res_id})
logging.info('Service reply: '+str(content))
rd = ResourceDesc()
rdd = content['res_desc']
Expand All @@ -101,15 +103,15 @@ def get_resource_desc(self, res_id):
defer.returnValue(rd)
else:
defer.returnValue(None)

class ResourceTypes(object):
"""Static class with constant definitions for resource types.
Do not instantiate
"""
RESTYPE_GENERIC = 'rt_generic'
RESTYPE_SERVICE = 'rt_service'
RESTYPE_UNASSIGNED = 'rt_unassigned'

def __init__(self):
raise RuntimeError('Do not instantiate '+self.__class__.__name__)

Expand All @@ -124,7 +126,7 @@ class ResourceLCState(object):
RESLCS_RETIRED = 'rlcs_retired'
RESLCS_DEVELOPED = 'rlcs_developed'
RESLCS_COMMISSIONED = 'rlcs_commissioned'

def __init__(self):
raise RuntimeError('Do not instantiate '+self.__class__.__name__)

Expand All @@ -145,13 +147,13 @@ def setResourceDesc(self, **kwargs):
self.set_attr('res_type',kwargs['res_type'])
else:
raise RuntimeError("Resource type missing")

if 'name' in kwargs:
self.set_attr('res_name',kwargs['name'])

class ResourceTypeDesc(DataObject):
"""Structured object for a resource type description.
Attributes:
.res_name name of the resource type
.res_type identifier of this resource type
Expand All @@ -162,7 +164,7 @@ def __init__(self, **kwargs):
DataObject.__init__(self)
if len(kwargs) != 0:
self.setResourceTypeDesc(**kwargs)

def setResourceTypeDesc(self, **kwargs):
if 'name' in kwargs:
self.set_attr('name',kwargs['name'])
Expand All @@ -178,7 +180,7 @@ def setResourceTypeDesc(self, **kwargs):
self.set_attr('res_type',kwargs['res_type'])
else:
self.res_type = ResourceTypes.RESTYPE_UNASSIGNED

if 'desc' in kwargs:
self.set_attr('desc',kwargs['desc'])

Expand Down

0 comments on commit 70573b4

Please sign in to comment.