diff --git a/receptor/messages/envelope.py b/receptor/messages/envelope.py index f20f473c..568c6f94 100644 --- a/receptor/messages/envelope.py +++ b/receptor/messages/envelope.py @@ -37,8 +37,8 @@ def serialize(self): class InnerEnvelope: def __init__(self, receptor, message_id, sender, recipient, message_type, timestamp, - raw_payload, directive=None, in_response_to=None, ttl=None, - serial=1, expire_time_delta=300): + raw_payload, directive=None, in_response_to=None, ttl=None, serial=1, + code=0, expire_time_delta=300): self.receptor = receptor self.message_id = message_id self.sender = sender @@ -53,6 +53,7 @@ def __init__(self, receptor, message_id, sender, recipient, message_type, timest self.expire_time = None self.expire_time = time.time() + expire_time_delta self.serial = serial # serial index of responses + self.code = code # optional code indicating an error @classmethod async def deserialize(cls, receptor, msg): @@ -62,7 +63,7 @@ async def deserialize(cls, receptor, msg): return cls(receptor=receptor, **json.loads(payload)) @classmethod - def make_response(cls, receptor, recipient, payload, in_response_to, serial, ttl=None): + def make_response(cls, receptor, recipient, payload, in_response_to, serial, ttl=None, code=0): if isinstance(payload, bytes): encoded_payload = base64.encodebytes(payload) else: @@ -78,7 +79,8 @@ def make_response(cls, receptor, recipient, payload, in_response_to, serial, ttl directive=None, in_response_to=in_response_to, ttl=ttl, - serial=serial + serial=serial, + code=code, ) def sign_and_serialize(self): diff --git a/receptor/protocol.py b/receptor/protocol.py index b56e2517..bca38a83 100644 --- a/receptor/protocol.py +++ b/receptor/protocol.py @@ -169,7 +169,8 @@ def connection_lost(self, exc): def emit_response(self, response): self.transport.write(json.dumps(dict(timestamp=response.timestamp, in_response_to=response.in_response_to, - payload=response.raw_payload)).encode()) + payload=response.raw_payload, + code=response.code)).encode()) def data_received(self, data): recipient, directive, payload = data.rstrip(DELIM).decode('utf8').split('\n', 2) @@ -184,7 +185,7 @@ def data_received(self, data): message_type='directive', timestamp=sent_timestamp.isoformat(), raw_payload=payload, - directive=directive + directive=directive, ) # TODO: Persistent registry? self.loop.create_task(self.receptor.router.send(inner_env, diff --git a/receptor/security/__init__.py b/receptor/security/__init__.py index 2c6a87ff..1d0ce0db 100644 --- a/receptor/security/__init__.py +++ b/receptor/security/__init__.py @@ -25,6 +25,6 @@ async def sign_response(self, inner_envelope): {attr: getattr(inner_envelope, attr) for attr in ['message_id', 'sender', 'recipient', 'message_type', 'timestamp', 'raw_payload', 'directive', - 'in_response_to', 'ttl', 'serial']} + 'in_response_to', 'ttl', 'serial', 'code']} ) diff --git a/receptor/work.py b/receptor/work.py index da1a6df6..4d11819f 100644 --- a/receptor/work.py +++ b/receptor/work.py @@ -1,5 +1,5 @@ -import pkg_resources import logging +import pkg_resources from . import exceptions from .messages import envelope @@ -20,24 +20,37 @@ def load_receptor_worker(self, name): async def handle(self, inner_env): logger.info(f'Handling work for {inner_env.message_id} as {inner_env.directive}') namespace, action = inner_env.directive.split(':', 1) - worker_module = self.load_receptor_worker(namespace) - try: - action_method = getattr(worker_module, f'{action}') - except AttributeError: - logger.exception(f'Could not load action {action} from {namespace}') - raise exceptions.InvalidDirectiveAction(f'Invalid action {action} for {namespace}') - responses = action_method(inner_env) serial = 0 - async for response in responses: + try: + worker_module = self.load_receptor_worker(namespace) + try: + action_method = getattr(worker_module, f'{action}') + except AttributeError: + logger.exception(f'Could not load action {action} from {namespace}') + raise exceptions.InvalidDirectiveAction(f'Invalid action {action} for {namespace}') + + responses = action_method(inner_env) + async for response in responses: + serial += 1 + logger.debug(f'Response emitted for {inner_env.message_id}, serial {serial}') + enveloped_response = envelope.InnerEnvelope.make_response( + receptor=self.receptor, + recipient=inner_env.sender, + payload=response, + in_response_to=inner_env.message_id, + serial=serial + ) + await self.receptor.router.send(enveloped_response) + except Exception as e: serial += 1 - logger.debug(f'Response emitted for {inner_env.message_id}, serial {serial}') + logger.error(f'Error encountered while handling the response, replying with an error message ({e})') enveloped_response = envelope.InnerEnvelope.make_response( receptor=self.receptor, recipient=inner_env.sender, - payload=response, + payload=str(e), in_response_to=inner_env.message_id, - serial=serial + serial=serial, + code=1, ) await self.receptor.router.send(enveloped_response) -