Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions receptor/buffers/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ def __init__(self, dir_, key, loop):
pass
for item in self._read_manifest():
self.q.put_nowait(item)

async def put(self, data):
ident = str(uuid.uuid4())
await self._loop.run_in_executor(pool, self._write_file, data, ident)
await self.q.put(ident)
await self._save_manifest()

async def get(self, handle_only=False, delete=True):
while True:
msg = await self.q.get()
Expand All @@ -42,21 +42,25 @@ async def get(self, handle_only=False, delete=True):
return await self._get_file(msg, handle_only=handle_only, delete=delete)
except FileNotFoundError:
pass

async def _save_manifest(self):
async with self._manifest_lock:
await self._loop.run_in_executor(pool, self._write_manifest)

def _write_manifest(self):
with open(self._manifest_path, "w") as fp:
json.dump(list(self.q._queue), fp)

def _read_manifest(self):
try:
with open(self._manifest_path, "r") as fp:
return json.load(fp)
except FileNotFoundError:
return []
except json.decoder.JSONDecodeError:
with open(self._manifest_path, "r") as fp:
logger.error("failed to decode manifest: %s", fp.read())
raise

def _path_for_ident(self, ident):
return os.path.join(self._message_path, ident)
Expand Down
4 changes: 2 additions & 2 deletions receptor/messages/directive.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
import logging
import json
import logging

from ..exceptions import UnknownDirective
from . import envelope
Expand All @@ -26,7 +26,7 @@ async def __call__(self, router, inner_env):
serial = 0
async for response in responses:
serial += 1
enveloped_response = envelope.InnerEnvelope.make_response(
enveloped_response = envelope.Inner.make_response(
receptor=router.receptor,
recipient=inner_env.sender,
payload=response,
Expand Down
194 changes: 170 additions & 24 deletions receptor/messages/envelope.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,187 @@
import asyncio
import base64
import datetime
import json
import logging
import uuid
import struct
import time
import uuid
from enum import IntEnum

logger = logging.getLogger(__name__)

MAX_INT64 = (2 ** 64 - 1)

class OuterEnvelope:
def __init__(self, frame_id, sender, recipient, route_list, inner):
self.frame_id = frame_id
self.sender = sender
self.recipient = recipient
self.route_list = route_list
self.inner = inner
self.inner_obj = None

async def deserialize_inner(self, receptor):
self.inner_obj = await InnerEnvelope.deserialize(receptor, self.inner)
class FramedMessage:
"""
A complete, two-part message.
"""

__slots__ = ("msg_id", "header", "payload")

def __init__(self, msg_id=None, header=None, payload=None):
if msg_id is None:
msg_id = uuid.uuid4().int
self.msg_id = msg_id
self.header = header
self.payload = payload

def serialize(self):
h = json.dumps(self.header).encode("utf-8")
return b''.join([
Frame.wrap(h, type_=Frame.Types.HEADER, msg_id=self.msg_id).serialize(),
h,
Frame.wrap(self.payload, msg_id=self.msg_id).serialize(),
self.payload
])


class CommandMessage(FramedMessage):
"""
A complete, single part message, meant to encapsulate point to point
commands or naive broadcasts.
"""

def serialize(self):
h = json.dumps(self.header).encode("utf-8")
return b''.join([
Frame.wrap(h, type_=Frame.Types.COMMAND, msg_id=self.msg_id).serialize(),
h,
])


class FramedBuffer:
"""
A buffer that accumulates frames and bytes to produce a header and a
payload.

This buffer assumes that an entire message (denoted by msg_id) will be
sent before another message is sent.
"""
def __init__(self, loop=None):
self.q = asyncio.Queue(loop=loop)
self.header = None
self.framebuffer = bytearray()
self.bb = bytearray()
self.current_frame = None
self.to_read = 0

async def put(self, data):
if not self.to_read:
return await self.handle_frame(data)
await self.consume(data)

async def handle_frame(self, data):
try:
self.framebuffer += data
frame, rest = Frame.from_data(self.framebuffer)
except struct.error:
return # We don't have enough data yet
else:
self.framebuffer = bytearray()

if frame.type not in Frame.Types:
raise Exception("Unknown Frame Type")

self.current_frame = frame
self.to_read = self.current_frame.length
await self.consume(rest)

async def consume(self, data):
logger.debug("consuming %d bytes; to_read = %d bytes", len(data), self.to_read)
data, rest = data[:self.to_read], data[self.to_read:]
self.to_read -= len(data)
self.bb += data
if self.to_read == 0:
await self.finish()
if rest:
await self.handle_frame(rest)

async def finish(self):
if self.current_frame.type == Frame.Types.HEADER:
self.header = json.loads(self.bb)
elif self.current_frame.type == Frame.Types.PAYLOAD:
await self.q.put(FramedMessage(
self.current_frame.msg_id, header=self.header,
payload=self.bb))
self.header = None
elif self.current_frame.type == Frame.Types.COMMAND:
await self.q.put(FramedMessage(
self.current_frame.msg_id, header=json.loads(self.bb)))
else:
raise Exception("Unknown Frame Type")
self.to_read = 0
self.bb = bytearray()

async def get(self):
return await self.q.get()


class Frame:
"""
A Frame represents the minimal metadata about a transmission.

Usually you should not create one directly, but rather use the
FramedMessage or CommandMessage classes.
"""

class Types(IntEnum):
HEADER = 0
PAYLOAD = 1
COMMAND = 2

fmt = struct.Struct(">ccIIQQ")

__slots__ = ('type', 'version', 'length', 'msg_id', 'id')

def __init__(self, type_, version, length, msg_id, id_):
self.type = type_
self.version = version
self.length = length
self.msg_id = msg_id
self.id = id_

def __repr__(self):
return f"Frame({self.type}, {self.version}, {self.length}, {self.msg_id}, {self.id})"

@classmethod
def from_raw(cls, raw):
doc = json.loads(raw)
return cls(**doc)

def serialize(self):
return json.dumps(dict(
frame_id=self.frame_id,
sender=self.sender,
recipient=self.recipient,
route_list=self.route_list,
inner=self.inner
))
return self.fmt.pack(
bytes([self.type]), bytes([self.version]),
self.id, self.length, *split_uuid(self.msg_id))

@classmethod
def deserialize(cls, buf):
t, v, i, length, hi, lo = Frame.fmt.unpack(buf)
msg_id = join_uuid(hi, lo)
return cls(Frame.Types(ord(t)), ord(v), length, msg_id, i)

@classmethod
def from_data(cls, data):
return cls.deserialize(data[:Frame.fmt.size]), data[Frame.fmt.size:]

@classmethod
def wrap(cls, data, type_=Types.PAYLOAD, msg_id=None):
"""
Returns a frame for the passed data.
"""
if not msg_id:
msg_id = uuid.uuid4().int

return cls(type_, 1, len(data), msg_id, 1)


def split_uuid(data):
"Splits a 128 bit int into two 64 bit words for binary encoding"
return ((data >> 64) & MAX_INT64, data & MAX_INT64)


def join_uuid(hi, lo):
"Joins two 64 bit words into a 128bit int from binary encoding"
return (hi << 64) | lo


class InnerEnvelope:
class Inner:
def __init__(self, receptor, message_id, sender, recipient, message_type, timestamp,
raw_payload, directive=None, in_response_to=None, ttl=None, serial=1,
code=0, expire_time_delta=300):
Expand Down
Loading