Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Refactored RPC error reply into ReceivedError; fixed code and tests

  • Loading branch information...
commit d70646eb8bc06cb25d76ac035701a85f4151b5b5 1 parent 9be07fe
@mmeisinger mmeisinger authored
View
2  README.txt
@@ -153,6 +153,8 @@ Change log:
- Requires Carrot 0.10.10
- Message headers now contain status code for every message. 'OK is the default
and 'ERROR' is set on error
+- BaseProcess.rpc_send now raises a ReceivedError in case the RPC comes back
+ with status='ERROR'
- Changed reply_ok and reply_err: a dict content value will not be modified
- Fixed imports and tests throughout the code base
View
55 ion/agents/instrumentagents/instrument_agent.py
@@ -12,9 +12,9 @@
from ion.agents.resource_agent import ResourceAgent
from ion.agents.resource_agent import ResourceAgentClient
+from ion.core.exception import ReceivedError
from ion.data.dataobject import ResourceReference
from ion.core.base_process import BaseProcess, BaseProcessClient
-from ion.core.exception import ReceivedError
from ion.resources.ipaa_resource_descriptions import InstrumentAgentResourceInstance
"""
@@ -94,8 +94,6 @@ def fetch_params(self, param_list):
(content, headers, message) = yield self.rpc_send('fetch_params',
param_list)
assert(isinstance(content, dict))
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@defer.inlineCallbacks
@@ -110,8 +108,6 @@ def set_params(self, param_dict):
(content, headers, message) = yield self.rpc_send('set_params',
param_dict)
assert(isinstance(content, dict))
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@defer.inlineCallbacks
@@ -127,8 +123,6 @@ def execute(self, command):
assert(isinstance(command, (list, tuple)))
(content, headers, message) = yield self.rpc_send('execute',
command)
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@defer.inlineCallbacks
@@ -139,8 +133,6 @@ def get_status(self, arg):
@retval Result message of some sort
"""
(content, headers, message) = yield self.rpc_send('get_status', arg)
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@defer.inlineCallbacks
@@ -155,8 +147,6 @@ def configure_driver(self, config_vals):
assert(isinstance(config_vals, dict))
(content, headers, message) = yield self.rpc_send('configure_driver',
config_vals)
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@defer.inlineCallbacks
@@ -170,8 +160,6 @@ def initialize(self, arg):
log.debug("DHE: in initialize!")
(content, headers, message) = yield self.rpc_send('initialize',
arg)
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@defer.inlineCallbacks
@@ -185,8 +173,6 @@ def disconnect(self, command):
log.debug("DHE: in disconnect!")
(content, headers, message) = yield self.rpc_send('disconnect',
command)
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@@ -372,13 +358,13 @@ def op_execute_instrument(self, content, headers, msg):
indicating code and response message on fail
"""
assert(isinstance(content, (list, tuple)))
- execResult = yield self.driver_client.execute(content)
- assert(isinstance(execResult, dict))
- if (execResult['status'] == 'OK'):
+ try:
+ execResult = yield self.driver_client.execute(content)
+ assert(isinstance(execResult, dict))
yield self.reply_ok(msg, execResult['value'])
- else:
+ except ReceivedError, re:
yield self.reply_err(msg, "Failure, response is: %s"
- % execResult['value'])
+ % re.msg_content['value'])
@defer.inlineCallbacks
def op_get_status(self, content, headers, msg):
"""
@@ -388,11 +374,11 @@ def op_get_status(self, content, headers, msg):
@retval ACK message with response on success, ERR message on failure
"""
assert(isinstance(content, (list, tuple)))
- response = yield self.driver_client.get_status(content)
- if (response['status'] == 'OK'):
+ try:
+ response = yield self.driver_client.get_status(content)
yield self.reply_ok(msg, response['value'])
- else:
- yield self.reply_err(msg, response['value'])
+ except ReceivedError, re:
+ yield self.reply_err(msg, re.msg_content['value'])
class InstrumentAgentClient(ResourceAgentClient):
"""
@@ -411,8 +397,6 @@ def get_from_instrument(self, paramList):
(content, headers, message) = yield self.rpc_send('get_from_instrument',
paramList)
assert(isinstance(content, dict))
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@defer.inlineCallbacks
@@ -426,8 +410,6 @@ def get_from_CI(self, paramList):
(content, headers, message) = yield self.rpc_send('get_from_CI',
paramList)
assert(isinstance(content, dict))
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@defer.inlineCallbacks
@@ -443,8 +425,6 @@ def set_to_instrument(self, paramDict):
(content, headers, message) = yield self.rpc_send('set_to_instrument',
paramDict)
assert(isinstance(content, dict))
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@defer.inlineCallbacks
@@ -460,8 +440,6 @@ def set_to_CI(self, paramDict):
(content, headers, message) = yield self.rpc_send('set_to_CI',
paramDict)
assert(isinstance(content, dict))
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@defer.inlineCallbacks
@@ -472,8 +450,6 @@ def disconnect(self, argList):
assert(isinstance(argList, list))
(content, headers, message) = yield self.rpc_send('disconnect',
argList)
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
assert(isinstance(content, dict))
defer.returnValue(content)
@@ -495,8 +471,6 @@ def execute_instrument(self, command):
assert(isinstance(command, list))
(content, headers, message) = yield self.rpc_send('execute_instrument',
command)
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
def execute_CI(self, command):
@@ -516,8 +490,6 @@ def execute_CI(self, command):
assert(isinstance(command, list))
(content, headers, message) = yield self.rpc_send('execute_CI',
command)
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
assert(isinstance(content, dict))
defer.returnValue(content)
@@ -530,8 +502,6 @@ def get_status(self, argList):
assert(isinstance(argList, list))
(content, headers, message) = yield self.rpc_send('get_status',
argList)
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
defer.returnValue(content)
@defer.inlineCallbacks
@@ -544,9 +514,6 @@ def get_capabilities(self):
"""
(content, headers, message) = yield self.rpc_send('get_capabilities',
())
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
-
assert(isinstance(content, dict))
assert(ci_commands in content.keys())
assert(ci_parameters in content.keys())
@@ -575,8 +542,6 @@ def get_translator(self):
and puts it into the repository ready format
"""
(content, headers, message) = yield self.rpc_send('get_translator', ())
- if headers['status'] == 'ERROR':
- raise ReceivedError(headers, content)
#assert(inspect.isroutine(content))
defer.returnValue(content)
View
7 ion/agents/instrumentagents/test/test_instrument.py
@@ -118,6 +118,7 @@ def test_get_set_SBE49_params(self):
try:
response = yield self.IAClient.set_to_instrument({'baudrate': 19200,
'badvalue': 1})
+ self.fail("ReceivedError expected")
except ReceivedError, re:
pass
@@ -183,20 +184,19 @@ def test_execute(self):
['stop']])
print "response ", response
self.assert_(isinstance(response, dict))
- self.assert_('status' in response.keys())
- self.assertEqual(response['status'], 'OK')
self.assert_('start' in response['value'])
self.assert_('stop' in response['value'])
- self.assert_(response['status'] == 'OK')
try:
response = yield self.IAClient.execute_instrument([['badcommand',
'now','1']])
+ self.fail("ReceivedError expected")
except ReceivedError, re:
pass
try:
response = yield self.IAClient.execute_instrument([])
+ self.fail("ReceivedError expected")
except ReceivedError, re:
pass
@@ -227,7 +227,6 @@ def test_status(self):
"""
response = yield self.IAClient.get_status(['some_arg'])
self.assert_(isinstance(response, dict))
- self.assertEqual(response['status'], "OK")
self.assertEqual(response['value'], 'a-ok')
@defer.inlineCallbacks
View
29 ion/agents/resource_agent.py
@@ -206,10 +206,7 @@ def set_lifecycle_state(self, value):
assert(isinstance(value, LCState))
(content, headers, msg) = yield self.rpc_send('set_lifecycle_state',
str(value))
- if content['status'] == 'OK':
- defer.returnValue(value)
- else:
- defer.returnValue(False)
+ defer.returnValue(value)
@defer.inlineCallbacks
def get_lifecycle_state(self):
@@ -219,10 +216,7 @@ def get_lifecycle_state(self):
"""
(content, headers, msg) = yield self.rpc_send('get_lifecycle_state',
'')
- if content['status'] == 'OK':
- defer.returnValue(LCState(content['value']))
- else:
- defer.returnValue(False)
+ defer.returnValue(LCState(content['value']))
@defer.inlineCallbacks
def get_resource_ref(self):
@@ -230,10 +224,7 @@ def get_resource_ref(self):
Obtain the resource ID that the resource is registered with.
"""
(content, headers, msg) = yield self.rpc_send('get_resource_ref', '')
- if content['status'] == 'OK':
- defer.returnValue(AgentInstance.decode(content['value']))
- else:
- defer.returnValue(None)
+ defer.returnValue(AgentInstance.decode(content['value']))
@defer.inlineCallbacks
def get_resource_instance(self):
@@ -243,12 +234,9 @@ def get_resource_instance(self):
"""
(content, headers, msg) = \
yield self.rpc_send('get_resource_instance', '')
- if content['status'] == 'OK':
- content_decode = AgentInstance.decode(content['value'])
- assert(isinstance(content_decode, AgentInstance))
- defer.returnValue(content_decode)
- else:
- defer.returnValue(None)
+ content_decode = AgentInstance.decode(content['value'])
+ assert(isinstance(content_decode, AgentInstance))
+ defer.returnValue(content_decode)
@defer.inlineCallbacks
def register_resource(self, agent_instance=None, descriptor=None):
@@ -267,7 +255,4 @@ def register_resource(self, agent_instance=None, descriptor=None):
(content, headers, msg) = \
yield self.rpc_send('register_resource', agent_instance.encode())
- if (content['status'] == 'OK'):
- defer.returnValue(AgentInstance.decode(content['value']))
- else:
- defer.returnValue(None)
+ defer.returnValue(AgentInstance.decode(content['value']))
View
23 ion/core/base_process.py
@@ -14,6 +14,7 @@
from ion.core.id import Id
from ion.core import ioninit
+from ion.core.exception import ReceivedError
from ion.core.messaging.receiver import ProcessReceiver
from ion.core.process.process import IProcess, ProcessDesc, ProcessFactory
from ion.core.process.process import ProcessInstantiator
@@ -285,22 +286,20 @@ def _receive_rpc(self, payload, msg):
d = self.rpc_conv.pop(payload['conv-id'])
content = payload.get('content', None)
res = (content, payload, msg)
- if type(content) is dict and payload.get('status',None) == 'OK':
- pass
- elif type(content) is dict and payload.get('status',None) == 'ERROR':
- log.warn('RPC reply is an ERROR: '+str(content.get('value',None)))
- else:
+ if not type(content) is dict:
log.error('RPC reply is not well formed. Use reply_ok or reply_err')
# @todo is it OK to ack the response at this point already?
d1 = msg.ack()
- if d1:
- d1.addCallback(lambda res1: d.callback(res))
- d1.addErrback(lambda c: d.errback(c))
- return d1
+ if payload.get('status','OK') == 'ERROR':
+ def _cb(result):
+ log.warn('RPC reply is an ERROR: '+str(content.get('value',None)))
+ raise ReceivedError(payload, content)
+ d1.addCallback(_cb)
else:
- # Support for older carrot version where ack did not return deferred
- d.callback(res)
- return d
+ d1.addCallback(lambda res1: d.callback(res))
+ d1.addErrback(lambda c: d.errback(c))
+ return d1
+
def _receive_msg(self, payload, msg):
"""
View
2  ion/core/process/process.py
@@ -107,8 +107,6 @@ def on_terminate(self, *args, **kwargs):
@retval Deferred
"""
headers = yield self.container.terminate_process(parent=self.sup_process, pid=self.proc_id)
- if not headers.get('status','ERROR') == 'OK':
- self.error('shutdown', headers)
def on_error(self, cause=None, *args, **kwargs):
if cause:
View
9 ion/core/test/test_baseprocess.py
@@ -18,6 +18,7 @@
from ion.core import ioninit
from ion.core.base_process import BaseProcess, ProcessDesc, ProcessFactory
from ion.core.cc.container import Container
+from ion.core.exception import ReceivedError
from ion.core.messaging.receiver import Receiver, WorkerReceiver
from ion.core.id import Id
from ion.test.iontest import IonTestCase, ReceiverProcess
@@ -178,9 +179,11 @@ def test_error_in_op(self):
child1 = ProcessDesc(name='echo', module='ion.core.test.test_baseprocess')
pid1 = yield self.test_sup.spawn_child(child1)
- (cont,hdrs,msg) = yield self.test_sup.rpc_send(pid1,'echofail2','content123')
- self.assertEquals(hdrs['status'], 'ERROR')
- log.info('Process 1 responded to error correctly')
+ try:
+ (cont,hdrs,msg) = yield self.test_sup.rpc_send(pid1,'echofail2','content123')
+ self.fail("ReceivedError expected")
+ except ReceivedError, re:
+ log.info('Process 1 responded to error correctly')
@defer.inlineCallbacks
def test_send_byte_string(self):
View
63 ion/data/datastore/registry.py
@@ -459,8 +459,7 @@ def base_clear_registry(self,op_name):
yield self._check_init()
(content, headers, msg) = yield self.rpc_send(op_name,None)
- if content['status'] == 'OK':
- defer.returnValue(None)
+ defer.returnValue(None)
@defer.inlineCallbacks
@@ -485,14 +484,10 @@ def base_register_resource(self,op_name ,resource):
log.debug(self.__class__.__name__ + ': '+ op_name + '; Result:' + str(headers))
- if content['status']=='OK':
- #resource = dataobject.Resource.decode(content['value'])
- resource = dataobject.serializer.decode(content['value'], headers['encoding'])
- log.info(self.__class__.__name__ + ': '+ op_name + ' Success!')
- defer.returnValue(resource)
- else:
- log.info(self.__class__.__name__ + ': '+ op_name + ' Failed!')
- defer.returnValue(None)
+ #resource = dataobject.Resource.decode(content['value'])
+ resource = dataobject.serializer.decode(content['value'], headers['encoding'])
+ log.info(self.__class__.__name__ + ': '+ op_name + ' Success!')
+ defer.returnValue(resource)
@defer.inlineCallbacks
def base_get_resource(self,op_name ,resource_reference):
@@ -514,14 +509,10 @@ def base_get_resource(self,op_name ,resource_reference):
log.debug(self.__class__.__name__ + ': '+ op_name + '; Result:' + str(headers))
- if content['status']=='OK':
- #resource = dataobject.Resource.decode(content['value'])
- resource = dataobject.serializer.decode(content['value'], headers['encoding'])
- log.info(self.__class__.__name__ + ': '+ op_name + ' Success!')
- defer.returnValue(resource)
- else:
- log.info(self.__class__.__name__ + ': '+ op_name + ' Failed!')
- defer.returnValue(None)
+ #resource = dataobject.Resource.decode(content['value'])
+ resource = dataobject.serializer.decode(content['value'], headers['encoding'])
+ log.info(self.__class__.__name__ + ': '+ op_name + ' Success!')
+ defer.returnValue(resource)
@defer.inlineCallbacks
def base_get_resource_by_id(self, op_name, id):
@@ -532,14 +523,10 @@ def base_get_resource_by_id(self, op_name, id):
(content, headers, msg) = yield self.rpc_send(op_name, id, headers)
log.debug(self.__class__.__name__ + ': '+ op_name + '; Result:' + str(headers))
- if content['status']=='OK':
- #resource = dataobject.Resource.decode(content['value'])
- resource = dataobject.serializer.decode(content['value'], headers['encoding'])
- log.info(self.__class__.__name__ + ': '+ op_name + ' Success!')
- defer.returnValue(resource)
- else:
- log.info(self.__class__.__name__ + ': '+ op_name + ' Failed!')
- defer.returnValue(None)
+ #resource = dataobject.Resource.decode(content['value'])
+ resource = dataobject.serializer.decode(content['value'], headers['encoding'])
+ log.info(self.__class__.__name__ + ': '+ op_name + ' Success!')
+ defer.returnValue(resource)
@defer.inlineCallbacks
@@ -570,14 +557,10 @@ def base_set_resource_lcstate(self, op_name, resource_reference, lcstate):
log.debug(self.__class__.__name__ + ': '+ op_name + '; Result:' + str(headers))
- if content['status'] == 'OK':
- #resource_reference = dataobject.ResourceReference.decode(content['value'])
- resource_reference = dataobject.serializer.decode(content['value'], headers['encoding'])
- log.info(self.__class__.__name__ + ': '+ op_name + ' Success!')
- defer.returnValue(resource_reference)
- else:
- log.info(self.__class__.__name__ + ': '+ op_name + ' Failed!')
- defer.returnValue(None)
+ #resource_reference = dataobject.ResourceReference.decode(content['value'])
+ resource_reference = dataobject.serializer.decode(content['value'], headers['encoding'])
+ log.info(self.__class__.__name__ + ': '+ op_name + ' Success!')
+ defer.returnValue(resource_reference)
@defer.inlineCallbacks
@@ -616,14 +599,10 @@ def base_find_resource(self, op_name, description, regex=True,ignore_defaults=Tr
log.debug(self.__class__.__name__ + ': '+ op_name + '; Result:' + str(headers))
# Return a list of resources
- if content['status'] == 'OK':
- #results = dataobject.DataObject.decode(content['value'])
- results = dataobject.serializer.decode(content['value'], headers['encoding'])
- log.info(self.__class__.__name__ + ': '+ op_name + ' Success!')
- defer.returnValue(results.resources)
- else:
- log.info(self.__class__.__name__ + ': '+ op_name + ' Failed!')
- defer.returnValue([])
+ #results = dataobject.DataObject.decode(content['value'])
+ results = dataobject.serializer.decode(content['value'], headers['encoding'])
+ log.info(self.__class__.__name__ + ': '+ op_name + ' Success!')
+ defer.returnValue(results.resources)
class RegistryClient(BaseRegistryClient,IRegistry,LCStateMixin):
View
17 ion/services/cei/dtrs.py
@@ -15,6 +15,7 @@
from twisted.internet import defer
from ion.core.base_process import ProcessFactory
+from ion.core.exception import ReceivedError
from ion.services.base_service import BaseService, BaseServiceClient
from ion.core import ioninit
@@ -114,14 +115,14 @@ def lookup(self, dt, nodes=None, vars=None):
"""
yield self._check_init()
log.debug("Sending DTRS lookup request")
- (content, headers, msg) = yield self.rpc_send('lookup', {
- 'deployable_type' : dt,
- 'nodes' : nodes,
- 'vars' : vars
- })
-
- if headers.get('status') == 'ERROR':
- raise DeployableTypeLookupError(content.get('value'))
+ try:
+ (content, headers, msg) = yield self.rpc_send('lookup', {
+ 'deployable_type' : dt,
+ 'nodes' : nodes,
+ 'vars' : vars
+ })
+ except ReceivedError, re:
+ raise DeployableTypeLookupError(re.msg_content.get('value'))
defer.returnValue({
'document' : content.get('document'),
View
62 ion/services/coi/objstore.py
@@ -10,8 +10,9 @@
from twisted.internet import defer
-from ion.services import base_service
+from ion.services import base_service
from ion.core import base_process
+from ion.core.exception import ReceivedError
from ion.data import store
from ion.data.datastore import cas
from ion.data.datastore import objstore
@@ -28,7 +29,7 @@ def decode(self, bytes, decoder=None):
"""
@brief Operates on encoded sendable objects
"""
- return
+ return
class ObjectChassis(object):
"""
@@ -39,17 +40,17 @@ class ObjectChassis(object):
The chassis presents the revision control functional interface.
The chassis provides access to the data of any storable data object.
- This interface relies solely on object/cas store interface methods.
+ This interface relies solely on object/cas store interface methods.
This means that all information stored/retrieved from the backend
IStore should be apart of the standard DataObject, or be part of
another meta model.
-
+
The object chassis is the working version of any data object. Data
object content is always extracted from the data store via it's commit
object. In this way, the context of a data object (wrt
- history/ancestry change) is always determinable.
+ history/ancestry change) is always determinable.
"""
@@ -70,12 +71,12 @@ class ObjectStoreService(base_service.BaseService):
"""
- declare = base_service.BaseService.service_declare(name='objstore', version='0.1.0', dependencies=[])
+ declare = base_service.BaseService.service_declare(name='objstore', version='0.1.0', dependencies=[])
@defer.inlineCallbacks
def slc_init(self):
"""
- @brief setup creation of ObjectStore instance
+ @brief setup creation of ObjectStore instance
decide on which backend to use based on option
It would be nice to separate the existence/creation of the
@@ -86,7 +87,7 @@ def slc_init(self):
question, how is that connection managed?
"""
# Need to make this an option:
- # IStore interface to local persistent store.
+ # IStore interface to local persistent store.
backend = yield store.Store.create_store()
self.objstore = yield objstore.ObjectStore(backend)
@@ -115,7 +116,7 @@ def op_update(self, content, headers, msg):
@note A data object id is a topic that can be subscribed
to/conversation. There needs to be a way to enforce an affinity
(with in some messaging domain) between object operations and
- IObjectStore service process instances.
+ IObjectStore service process instances.
"""
@defer.inlineCallbacks
@@ -123,7 +124,7 @@ def op_get(self, content, headers, msg):
"""
@brief The content should be a 'cas content' id
@note The backend decodes right before we re-encode. Is this
- necessary?
+ necessary?
Ans: cas.BaseObject has a cache for this encoding step
"""
id = content
@@ -146,7 +147,7 @@ def op_put(self, content, headers, msg):
id_local = yield self.objstore.put(obj)
if not id == id_local:
yield self.reply_err(msg, 'Content inconsistency')
- yield self.reply_ok(msg, id)
+ yield self.reply_ok(msg, id)
except cas.CAStoreError, e:
yield self.reply_err(msg, e)
@@ -192,11 +193,8 @@ def create(self, name, baseClass):
encoded_baseClass = dataobject.DEncoder().encode(baseClass())
(content, headers, msg) = yield self.rpc_send('create', [name, encoded_baseClass])
- if content['status'] == 'OK':
- obj = content['value']
- defer.returnValue(obj)
- else:
- defer.returnValue(None) #what to return?
+ obj = content['value']
+ defer.returnValue(obj)
@defer.inlineCallbacks
@@ -209,14 +207,9 @@ def clone(self, name):
@todo Change name to id.
"""
(content, headers, msg) = yield self.rpc_send('clone', name)
- if content['status'] == 'OK':
- obj = content['value']
- defer.returnValue(obj)
- else:
- defer.returnValue(None) #what to return?
+ obj = content['value']
+ defer.returnValue(obj)
-
-
def get(self, id):
"""
@@ -234,19 +227,18 @@ def _get_callback(self, (content, headers, msg), id):
"""
"""
data = content['value'] # This should not be content['value']
- if content['status'] == 'OK':
- """@todo make 'ok' a bool instead
- """
- obj = self.decode(data)
- if not id == cas.sha1(obj, bin=False):
- raise cas.CAStoreError("Object Integrity Error!")
- return obj
- else:
- """@todo should check for not found in store error
- """
- raise cas.CAStoreError("Client Error")
+ """@todo make 'ok' a bool instead
+ """
+ obj = self.decode(data)
+ if not id == cas.sha1(obj, bin=False):
+ raise cas.CAStoreError("Object Integrity Error!")
+ return obj
def _get_errback(self, reason):
+ try:
+ reason.raiseException()
+ except ReceivedError, re:
+ raise cas.CAStoreError("not found")
return reason
def put(self, obj):
@@ -270,5 +262,3 @@ def _put_callback(self, (content, headers, msg)):
return id
factory = base_process.ProcessFactory(ObjectStoreService)
-
-
View
6 ion/services/coi/service_registry.py
@@ -131,6 +131,8 @@ def register_service_definition(self,service):
def describe_service(self,service_class):
+ if type(service_class) is str:
+ service_class = pu.get_class(service_class)
assert issubclass(service_class, BaseService)
@@ -204,7 +206,7 @@ def describe_instance(self,service_instance):
# is also used to look for an existing description
service_resource = coi_resource_descriptions.ServiceInstance()
- service_class = getattr(service_instance.proc_mod_obj,service_instance.proc_class)
+ service_class = service_instance.proc_class
sd = yield self.register_service_definition(service_class)
service_resource.description = sd.reference(head=True)
@@ -212,7 +214,7 @@ def describe_instance(self,service_instance):
if service_instance.proc_node:
service_resource.proc_node = service_instance.proc_node
- service_resource.proc_id = str(service_instance.id)
+ service_resource.proc_id = service_instance.proc_id
service_resource.proc_name = service_instance.proc_name
if service_instance.spawn_args:
service_resource.spawn_args = service_instance.spawn_args
View
8 ion/services/coi/test/test_identity_registry.py
@@ -11,6 +11,7 @@
from twisted.internet import defer
from twisted.trial import unittest
+from ion.core.exception import ReceivedError
from ion.test.iontest import IonTestCase
from ion.services.coi.identity_registry import IdentityRegistryClient
@@ -137,8 +138,11 @@ def test_register_user(self):
# Test for user not found handled properly.
ooi_id.RegistryIdentity = "bogus-ooi_id"
- result = yield self.identity_registry_client.get_user(ooi_id)
- self.assertEqual(result, None)
+ try:
+ result = yield self.identity_registry_client.get_user(ooi_id)
+ self.fail("ReceivedError expected")
+ except ReceivedError, re:
+ pass
# Test if we can find the user we have stuffed in.
user_description = coi_resource_descriptions.IdentityResource()
View
21 ion/services/coi/test/test_service_registry.py
@@ -11,7 +11,7 @@
from twisted.internet import defer
from twisted.trial import unittest
-from ion.services.coi.service_registry import ServiceRegistryClient
+from ion.services.coi.service_registry import ServiceRegistryClient
from ion.test.iontest import IonTestCase
from ion.play import hello_service
@@ -38,33 +38,33 @@ def tearDown(self):
@defer.inlineCallbacks
def test_service_reg(self):
-
+
# Register a service class object
play_desc = yield self.src.register_service_definition(hello_service.HelloService)
-
+
# create a reference to the description in the registry
ref = play_desc.reference()
-
+
# get the description back from its reference
svc_desc = yield self.src.get_service_definition(ref)
-
+
#print svc_desc
@defer.inlineCallbacks
def test_service_instance_reg(self):
"""
- """
+ """
services = [
- {'name':'hello1','module':'ion.play.hello_service','class':'HelloService'},
+ {'name':'hello1','module':'ion.play.hello_service','class':'ion.play.hello_service.HelloService'},
]
# Create a process supervisor class
sup = yield self._spawn_processes(services, sup=self.sup)
-
+
# Register the service instance from the supervisors child_procs - Agument is a ProcessDesc object.
# See base process!
play_desc = yield self.src.register_service_instance(sup.child_procs[1])
#print play_desc
-
+
class ServiceRegistryCoreServiceTest(IonTestCase):
@@ -86,6 +86,3 @@ def test_service_reg(self):
raise unittest.SkipTest('Not implimented yet!')
# Not sure what the point is here?
# Lets get it integrated with base process!
-
-
-
View
48 ion/services/dm/distribution/base_consumer.py
@@ -387,12 +387,7 @@ def __init__(self, **kwargs):
def attach(self,queues):
(content, headers, msg) = yield self.sup_process.rpc_send(self.proc_id,
'attach', {'queues':queues})
- if headers.get('status','ERROR') == 'OK':
- #self.proc_attached = queue
- defer.returnValue('OK')
- else:
- #self.proc_attached = None
- defer.returnValue('ERROR')
+ defer.returnValue('OK')
'''
Magnet does not yet support Deattach
@@ -400,60 +395,39 @@ def attach(self,queues):
def deattach(self,queues):
(content, headers, msg) = yield self.sup_process.rpc_send(self.proc_id,
'deattach', {'queues':queues})
- if headers.get('status','ERROR') == 'OK':
- #self.proc_attached = queue
- defer.returnValue('OK')
- else:
- #self.proc_attached = None
- defer.returnValue('ERROR')
+ #self.proc_attached = queue
+ defer.returnValue('OK')
'''
@defer.inlineCallbacks
def set_process_parameters(self,params):
(content, headers, msg) = yield self.sup_process.rpc_send(self.proc_id,
'set_process_parameters', params)
- if headers.get('status','ERROR') == 'OK':
- #self.proc_params = params
- defer.returnValue('OK')
- else:
- #self.proc_params = None
- defer.returnValue('ERROR')
+ #self.proc_params = params
+ defer.returnValue('OK')
@defer.inlineCallbacks
def get_process_parameters(self):
(content, headers, msg) = yield self.sup_process.rpc_send(self.proc_id,
'get_process_parameters', {})
- if headers.get('status','ERROR') == 'OK':
- defer.returnValue(content)
- else:
- defer.returnValue('ERROR')
+ defer.returnValue(content)
@defer.inlineCallbacks
def set_delivery_queues(self,params):
(content, headers, msg) = yield self.sup_process.rpc_send(self.proc_id,
'set_delivery_queues', params)
- if headers.get('status','ERROR') == 'OK':
- #self.proc_params = params
- defer.returnValue('OK')
- else:
- #self.proc_params = None
- defer.returnValue('ERROR')
+ #self.proc_params = params
+ defer.returnValue('OK')
@defer.inlineCallbacks
def get_delivery_queues(self):
(content, headers, msg) = yield self.sup_process.rpc_send(self.proc_id,
'get_delivery_queues', {})
- if headers.get('status','ERROR') == 'OK':
- defer.returnValue(content)
- else:
- defer.returnValue('ERROR')
+ defer.returnValue(content)
@defer.inlineCallbacks
def get_msg_count(self):
(content, headers, msg) = yield self.sup_process.rpc_send(self.proc_id,
'get_msg_count', {})
- if headers.get('status','ERROR') == 'OK':
- #defer.returnValue(content.get('count','ERROR'))
- defer.returnValue(content)
- else:
- defer.returnValue('ERROR')
+ #defer.returnValue(content.get('count','ERROR'))
+ defer.returnValue(content)
View
50 ion/services/dm/distribution/pubsub_service.py
@@ -451,13 +451,12 @@ def define_topic(self, topic):
topic.encode())
log.debug(self.__class__.__name__ + ': define_topic; Result:' + str(headers))
- if content['status']=='OK':
- topic = dataobject.Resource.decode(content['value'])
- log.info(self.__class__.__name__ + '; define_topic: Success!')
- defer.returnValue(topic)
- else:
- log.info(self.__class__.__name__ + '; define_topic: Failed!')
- defer.returnValue(None)
+ topic = dataobject.Resource.decode(content['value'])
+ log.info(self.__class__.__name__ + '; define_topic: Success!')
+ defer.returnValue(topic)
+ #else:
+ # log.info(self.__class__.__name__ + '; define_topic: Failed!')
+ # defer.returnValue(None)
@defer.inlineCallbacks
@@ -475,13 +474,12 @@ def define_publisher(self, publisher):
publisher.encode())
log.debug(self.__class__.__name__ + ': define_publisher; Result:' + str(headers))
- if content['status']=='OK':
- log.info(self.__class__.__name__ + '; define_publisher: Success!')
- publisher = dataobject.Resource.decode(content['value'])
- defer.returnValue(publisher)
- else:
- log.info(self.__class__.__name__ + '; define_publisher: Failed!')
- defer.returnValue(None)
+ log.info(self.__class__.__name__ + '; define_publisher: Success!')
+ publisher = dataobject.Resource.decode(content['value'])
+ defer.returnValue(publisher)
+ #else:
+ # log.info(self.__class__.__name__ + '; define_publisher: Failed!')
+ # defer.returnValue(None)
@defer.inlineCallbacks
@@ -524,12 +522,11 @@ def publish(self, publisher_proc, topic_ref, data):
publication.encode())
log.debug(self.__class__.__name__ + ': publish; Result:' + str(headers))
- if content['status']=='OK':
- log.info(self.__class__.__name__ + '; publish: Success!')
- defer.returnValue('sent')
- else:
- log.info(self.__class__.__name__ + '; publish: Failed!')
- defer.returnValue('error sending message!')
+ log.info(self.__class__.__name__ + '; publish: Success!')
+ defer.returnValue('sent')
+ #else:
+ # log.info(self.__class__.__name__ + '; publish: Failed!')
+ # defer.returnValue('error sending message!')
@defer.inlineCallbacks
@@ -547,13 +544,12 @@ def define_subscription(self, subscription):
subscription.encode())
log.debug(self.__class__.__name__ + ': define_subscription; Result:' + str(headers))
- if content['status']=='OK':
- log.info(self.__class__.__name__ + '; define_subscription: Success!')
- subscription = dataobject.Resource.decode(content['value'])
- defer.returnValue(subscription)
- else:
- log.info(self.__class__.__name__ + '; define_subscription: Failed!')
- defer.returnValue(None)
+ log.info(self.__class__.__name__ + '; define_subscription: Success!')
+ subscription = dataobject.Resource.decode(content['value'])
+ defer.returnValue(subscription)
+ #else:
+ # log.info(self.__class__.__name__ + '; define_subscription: Failed!')
+ # defer.returnValue(None)
# Spawn off the process using the module name
factory = ProcessFactory(DataPubsubService)
View
8 ion/services/dm/distribution/test/test_baseconsumer.py
@@ -15,6 +15,7 @@
from ion.core.base_process import ProcessFactory
from ion.core import bootstrap
+from ion.core.exception import ReceivedError
#from ion.core.base_process import BaseProcess, ProcessDesc
from ion.test.iontest import IonTestCase
import ion.util.procutils as pu
@@ -94,8 +95,11 @@ def test_spawn_attach_msg(self):
#self.assertEqual(child.proc_attached,self.queue1)
- res = yield child1.attach(None)
- self.assertEqual(res,'ERROR')
+ try:
+ res = yield child1.attach(None)
+ self.fail("ReceivedError expected")
+ except ReceivedError, re:
+ pass
#self.assertEqual(child.proc_attached,None)
yield child1.shutdown()
View
8 ion/services/dm/distribution/test/test_pubsub.py
@@ -16,6 +16,7 @@
from ion.core import bootstrap
+from ion.core.exception import ReceivedError
from ion.services.dm.distribution.pubsub_service import DataPubsubClient
from ion.test.iontest import IonTestCase
import ion.util.procutils as pu
@@ -78,8 +79,11 @@ def test_publisher_fail(self):
self.topic2 = yield self.pubsub_client.define_topic(self.topic2)
self.pub1 = PublisherResource.create("Grids", self.sup, [self.topic1], "content")
yield self.pubsub_client.define_publisher(self.pub1)
- res = yield self.pubsub_client.publish(self.sup, self.topic2, "Point data")
- self.assertEqual(res,'error sending message!')
+ try:
+ yield self.pubsub_client.publish(self.sup, self.topic2, "Point data")
+ self.fail("ReceivedError expected")
+ except ReceivedError, re:
+ pass
class PubSubServiceMethodTest(IonTestCase):
View
22 ion/services/sa/fetcher.py
@@ -23,6 +23,7 @@
import simplejson as json
from ion.core.base_process import ProcessFactory
+from ion.core.exception import ReceivedError
from ion.services.base_service import BaseService, BaseServiceClient
from ion.services.dm.util.url_manipulation import base_dap_url
@@ -222,9 +223,10 @@ def get_head(self, requested_url):
yield self._check_init()
log.info('Sending HEAD request to fetcher...')
- (content, headers, msg) = yield self.rpc_send('get_head', requested_url)
- if 'ERROR' in headers:
- raise ValueError('Error on URL: ' + content['failure'])
+ try:
+ (content, headers, msg) = yield self.rpc_send('get_head', requested_url)
+ except ReceivedError, re:
+ raise ValueError('Error on URL: ' + re.msg_content['value'])
defer.returnValue(content)
@@ -238,9 +240,10 @@ def get_url(self, requested_url):
yield self._check_init()
log.info('Sending request')
- (content, headers, msg) = yield self.rpc_send('get_url', requested_url)
- if 'ERROR' in headers:
- raise ValueError('Error on URL: ' + content['failure'])
+ try:
+ (content, headers, msg) = yield self.rpc_send('get_url', requested_url)
+ except ReceivedError, re:
+ raise ValueError('Error on URL: ' + re.msg_content['value'])
defer.returnValue(content)
@defer.inlineCallbacks
@@ -249,9 +252,10 @@ def get_dap_dataset(self, requested_url):
Pull an entire dataset.
"""
yield self._check_init()
- (content, headers, msg) = yield self.rpc_send('get_dap_dataset', requested_url)
- if 'ERROR' in headers:
- raise ValueError('Error on URL: ' + content['failure'])
+ try:
+ (content, headers, msg) = yield self.rpc_send('get_dap_dataset', requested_url)
+ except ReceivedError, re:
+ raise ValueError('Error on URL: ' + re.msg_content['value'])
defer.returnValue(content)
def _rewrite_headers(self, old_headers):
View
15 ion/services/sa/instrument_management.py
@@ -323,10 +323,7 @@ def create_new_instrument(self, userInput):
reqcont['userInput'] = userInput
(cont, hdrs, msg) = yield self.rpc_send('create_new_instrument', reqcont)
- if cont.get('status') == 'OK':
- defer.returnValue(DataObject.decode(cont['value']))
- else:
- defer.returnValue(None)
+ defer.returnValue(DataObject.decode(cont['value']))
@defer.inlineCallbacks
def create_new_data_product(self, dataProductInput):
@@ -334,10 +331,7 @@ def create_new_data_product(self, dataProductInput):
reqcont['dataProductInput'] = dataProductInput
(cont, hdrs, msg) = yield self.rpc_send('create_new_data_product', reqcont)
- if cont.get('status') == 'OK':
- defer.returnValue(DataObject.decode(cont['value']))
- else:
- defer.returnValue(None)
+ defer.returnValue(DataObject.decode(cont['value']))
@defer.inlineCallbacks
def start_instrument_agent(self, instrumentID, model):
@@ -384,10 +378,7 @@ def execute_command(self, instrumentID, command, arglist):
@defer.inlineCallbacks
def _base_command(self, op, content):
(cont, hdrs, msg) = yield self.rpc_send(op, content)
- if cont.get('status') == 'OK':
- defer.returnValue(cont)
- else:
- defer.returnValue(None)
+ defer.returnValue(cont)
# Spawn of the process using the module name
factory = ProcessFactory(InstrumentManagementService)
View
3  ion/test/iontest.py
@@ -16,7 +16,7 @@
from ion.core import ioninit
from ion.core.base_process import BaseProcess
from ion.core.cc import container
-from ion.core.cc.container import Id
+from ion.core.cc.container import Id, Container
from ion.core.process.process import IProcess
from ion.data.store import Store
import ion.util.procutils as pu
@@ -59,6 +59,7 @@ def _start_container(self):
log.error("PROBLEM: Previous test did not stop container. Fixing...")
yield self._stop_container()
+ self.container = container.create_new_container()
yield self.container.initialize(mopt)
yield self.container.activate()
Please sign in to comment.
Something went wrong with that request. Please try again.