Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions receptor/messages/envelope.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def serialize(self):

class InnerEnvelope:
def __init__(self, receptor, message_id, sender, recipient, message_type, timestamp,
raw_payload, directive=None, in_response_to=None, ttl=None,
serial=1, expire_time_delta=300):
raw_payload, directive=None, in_response_to=None, ttl=None, serial=1,
code=0, expire_time_delta=300):
self.receptor = receptor
self.message_id = message_id
self.sender = sender
Expand All @@ -53,6 +53,7 @@ def __init__(self, receptor, message_id, sender, recipient, message_type, timest
self.expire_time = None
self.expire_time = time.time() + expire_time_delta
self.serial = serial # serial index of responses
self.code = code # optional code indicating an error

@classmethod
async def deserialize(cls, receptor, msg):
Expand All @@ -62,7 +63,7 @@ async def deserialize(cls, receptor, msg):
return cls(receptor=receptor, **json.loads(payload))

@classmethod
def make_response(cls, receptor, recipient, payload, in_response_to, serial, ttl=None):
def make_response(cls, receptor, recipient, payload, in_response_to, serial, ttl=None, code=0):
if isinstance(payload, bytes):
encoded_payload = base64.encodebytes(payload)
else:
Expand All @@ -78,7 +79,8 @@ def make_response(cls, receptor, recipient, payload, in_response_to, serial, ttl
directive=None,
in_response_to=in_response_to,
ttl=ttl,
serial=serial
serial=serial,
code=code,
)

def sign_and_serialize(self):
Expand Down
5 changes: 3 additions & 2 deletions receptor/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ def connection_lost(self, exc):
def emit_response(self, response):
self.transport.write(json.dumps(dict(timestamp=response.timestamp,
in_response_to=response.in_response_to,
payload=response.raw_payload)).encode())
payload=response.raw_payload,
code=response.code)).encode())

def data_received(self, data):
recipient, directive, payload = data.rstrip(DELIM).decode('utf8').split('\n', 2)
Expand All @@ -184,7 +185,7 @@ def data_received(self, data):
message_type='directive',
timestamp=sent_timestamp.isoformat(),
raw_payload=payload,
directive=directive
directive=directive,
)
# TODO: Persistent registry?
self.loop.create_task(self.receptor.router.send(inner_env,
Expand Down
2 changes: 1 addition & 1 deletion receptor/security/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ async def sign_response(self, inner_envelope):
{attr: getattr(inner_envelope, attr)
for attr in ['message_id', 'sender', 'recipient', 'message_type',
'timestamp', 'raw_payload', 'directive',
'in_response_to', 'ttl', 'serial']}
'in_response_to', 'ttl', 'serial', 'code']}
)

39 changes: 26 additions & 13 deletions receptor/work.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pkg_resources
import logging
import pkg_resources

from . import exceptions
from .messages import envelope
Expand All @@ -20,24 +20,37 @@ def load_receptor_worker(self, name):
async def handle(self, inner_env):
logger.info(f'Handling work for {inner_env.message_id} as {inner_env.directive}')
namespace, action = inner_env.directive.split(':', 1)
worker_module = self.load_receptor_worker(namespace)
try:
action_method = getattr(worker_module, f'{action}')
except AttributeError:
logger.exception(f'Could not load action {action} from {namespace}')
raise exceptions.InvalidDirectiveAction(f'Invalid action {action} for {namespace}')
responses = action_method(inner_env)
serial = 0
async for response in responses:
try:
worker_module = self.load_receptor_worker(namespace)
try:
action_method = getattr(worker_module, f'{action}')
except AttributeError:
logger.exception(f'Could not load action {action} from {namespace}')
raise exceptions.InvalidDirectiveAction(f'Invalid action {action} for {namespace}')

responses = action_method(inner_env)
async for response in responses:
serial += 1
logger.debug(f'Response emitted for {inner_env.message_id}, serial {serial}')
enveloped_response = envelope.InnerEnvelope.make_response(
receptor=self.receptor,
recipient=inner_env.sender,
payload=response,
in_response_to=inner_env.message_id,
serial=serial
)
await self.receptor.router.send(enveloped_response)
except Exception as e:
serial += 1
logger.debug(f'Response emitted for {inner_env.message_id}, serial {serial}')
logger.error(f'Error encountered while handling the response, replying with an error message ({e})')
enveloped_response = envelope.InnerEnvelope.make_response(
receptor=self.receptor,
recipient=inner_env.sender,
payload=response,
payload=str(e),
in_response_to=inner_env.message_id,
serial=serial
serial=serial,
code=1,
)
await self.receptor.router.send(enveloped_response)