From 72cd320dcac2bb34f1258470d5d294ad3e8ee8dd Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Tue, 19 Nov 2019 14:15:44 -0600 Subject: [PATCH 1/9] Adding framed buffer FramedBuffer encapsulates handling of the new message format. Essentially there are 1 or 2 frames that constitute a message. [Header Frame][Header Body][Payload Frame][Payload] or [Command Frame][Command Body] Signed-off-by: Jesse Jaggars --- receptor/messages/envelope.py | 119 +++++++++++++++++++++++++++++++- receptor/tests/test_protocol.py | 43 ++++++++++++ 2 files changed, 161 insertions(+), 1 deletion(-) diff --git a/receptor/messages/envelope.py b/receptor/messages/envelope.py index 8dabbf55..09776db3 100644 --- a/receptor/messages/envelope.py +++ b/receptor/messages/envelope.py @@ -1,12 +1,129 @@ +import asyncio import base64 import datetime +import itertools import json import logging -import uuid import time +import uuid +from struct import pack, unpack logger = logging.getLogger(__name__) +MAX_INT64 = (2 ** 64 - 1) + + +class FramedBuffer: + """ + A buffer that accumulates frames and bytes to produce a header and a + payload. + + This buffer assumes that an entire message (denoted by msg_id) will be + sent before another message is sent. + """ + def __init__(self, loop=None): + self.q = asyncio.Queue(loop=loop) + self.header = None + self.bb = bytearray() + self.current_frame = None + self.to_read = 0 + + async def put(self, data): + if not self.to_read: + return await self.handle_frame(data) + await self.consume(data) + + async def handle_frame(self, data): + self.current_frame, rest = Frame.from_data(data) + if self.current_frame.type in (Frame.START_MSG, Frame.PAYLOAD): + self.to_read = self.current_frame.length + await self.consume(rest) + else: + raise Exception("Unknown Frame Type") + + async def consume(self, data): + self.to_read -= len(data) + self.bb += data + if self.to_read == 0: + await self.finish() + + async def finish(self): + if self.current_frame.type == Frame.START_MSG: + self.header = Header(**json.loads(self.bb)) + elif self.current_frame.type == Frame.PAYLOAD: + await self.q.put((self.header, self.bb)) + self.header = None + self.to_read = 0 + self.bb = bytearray() + + async def get(self): + return await self.q.get() + + +class Frame: + START_MSG = 0 + PAYLOAD = 1 + FINISH = 2 + + def __init__(self, type_, version, length, msg_id, id_): + self.type = type_ + self.version = version + self.length = length + self.msg_id = msg_id + self.id = id_ + + def serialize(self): + high, low = ((self.msg_id >> 64) & MAX_INT64, self.msg_id & MAX_INT64) + return b''.join([ + pack("ccIi", chr(self.type).encode("ascii"), chr(self.version).encode("ascii"), self.id, self.length), + pack(">QQ", high, low), + ]) + + @classmethod + def deserialize(cls, buf): + t, v, i, length = unpack("ccIi", buf[0:12]) + hi, lo = unpack(">QQ", buf[12:]) + msg_id = (hi << 64) | lo + return cls(ord(t), ord(v), length, msg_id, i) + + @classmethod + def from_data(cls, data): + return cls.deserialize(data[:28]), data[28:] + + +class Header: + def __init__(self, sender, recipient, route_list): + self.sender = sender + self.recipient = recipient + self.route_list = route_list + + def serialize(self): + return json.dumps({"sender": self.sender, "recipient": self.recipient, "route_list": self.route_list}).encode("utf-8") + + def __repr__(self): + return f"Header: {self.sender}, {self.recipient}, {self.route_list}" + + def __eq__(self, other): + return (self.sender, self.recipient, self.route_list) == (other.sender, other.recipient, other.route_list) + + +def gen_chunks(buffer, header, msg_id=None, chunksize=2 ** 8): + if msg_id is None: + msg_id = uuid.uuid4().int + seq = itertools.count() + buf = bytearray(chunksize) + bv = memoryview(buf) + header = header.serialize() + yield Frame(Frame.START_MSG, 1, len(header), msg_id, next(seq)).serialize() + header + bytes_read = buffer.readinto(buf) + while bytes_read: + f = Frame(Frame.PAYLOAD, 1, bytes_read, msg_id, next(seq)).serialize() + if bytes_read == chunksize: + yield f + bv.tobytes() + else: + yield f + bv[:bytes_read].tobytes() + bytes_read = buffer.readinto(buf) + class OuterEnvelope: def __init__(self, frame_id, sender, recipient, route_list, inner): diff --git a/receptor/tests/test_protocol.py b/receptor/tests/test_protocol.py index ec293813..145fa2f4 100644 --- a/receptor/tests/test_protocol.py +++ b/receptor/tests/test_protocol.py @@ -1,6 +1,10 @@ +import io +import uuid + import pytest from receptor import protocol +from receptor.messages.envelope import Frame, FramedBuffer, Header, gen_chunks def deser(x): @@ -32,3 +36,42 @@ async def test_databuffer_many_msgs(): assert msg[0] == await b.get() assert msg[1] == await b.get() assert b.q.empty() + + +@pytest.mark.asyncio +async def test_framedbuffer(): + b = FramedBuffer() + + msg_id = uuid.uuid4().int + header = Header("node1", "node2", []) + header_bytes = header.serialize() + f1 = Frame(Frame.START_MSG, 1, len(header_bytes), msg_id, 1) + + await b.put(f1.serialize() + header_bytes) + + payload = b"tina loves butts" + payload2 = b"yep yep yep" + f2 = Frame(Frame.PAYLOAD, 1, len(payload) + len(payload2), msg_id, 2) + + await b.put(f2.serialize() + payload) + await b.put(payload2) + + h, p = await b.get() + + assert h == header + assert p == payload + payload2 + + +@pytest.mark.asyncio +async def test_gen_chunks(): + + b = FramedBuffer() + + header = Header("node1", "node2", []) + payload = b"this is a test with a buffer" + for chunk in gen_chunks(io.BytesIO(payload), header): + await b.put(chunk) + + h, p = await b.get() + assert h == header + assert p == payload From 9fdbdb9c0f7e96752323d0065c3ea2f6a6f2f74f Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Wed, 20 Nov 2019 10:15:24 -0600 Subject: [PATCH 2/9] explicitly setting byte order Signed-off-by: Jesse Jaggars --- receptor/messages/envelope.py | 32 ++++++++++++++++++-------------- receptor/tests/test_protocol.py | 3 +-- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/receptor/messages/envelope.py b/receptor/messages/envelope.py index 09776db3..58f18a78 100644 --- a/receptor/messages/envelope.py +++ b/receptor/messages/envelope.py @@ -1,6 +1,7 @@ import asyncio import base64 import datetime +import io import itertools import json import logging @@ -63,7 +64,6 @@ async def get(self): class Frame: START_MSG = 0 PAYLOAD = 1 - FINISH = 2 def __init__(self, type_, version, length, msg_id, id_): self.type = type_ @@ -73,22 +73,25 @@ def __init__(self, type_, version, length, msg_id, id_): self.id = id_ def serialize(self): - high, low = ((self.msg_id >> 64) & MAX_INT64, self.msg_id & MAX_INT64) - return b''.join([ - pack("ccIi", chr(self.type).encode("ascii"), chr(self.version).encode("ascii"), self.id, self.length), - pack(">QQ", high, low), - ]) + return pack(">ccIIQQ", bytes([self.type]), bytes([self.version]), self.id, self.length, *split_uuid(self.msg_id)) @classmethod def deserialize(cls, buf): - t, v, i, length = unpack("ccIi", buf[0:12]) - hi, lo = unpack(">QQ", buf[12:]) - msg_id = (hi << 64) | lo + t, v, i, length, hi, lo = unpack(">ccIIQQ", buf) + msg_id = join_uuid(hi, lo) return cls(ord(t), ord(v), length, msg_id, i) @classmethod def from_data(cls, data): - return cls.deserialize(data[:28]), data[28:] + return cls.deserialize(data[:26]), data[26:] + + +def split_uuid(data): + return ((data >> 64) & MAX_INT64, data & MAX_INT64) + + +def join_uuid(hi, lo): + return (hi << 64) | lo class Header: @@ -107,7 +110,7 @@ def __eq__(self, other): return (self.sender, self.recipient, self.route_list) == (other.sender, other.recipient, other.route_list) -def gen_chunks(buffer, header, msg_id=None, chunksize=2 ** 8): +def gen_chunks(data, header, msg_id=None, chunksize=2 ** 8): if msg_id is None: msg_id = uuid.uuid4().int seq = itertools.count() @@ -115,13 +118,14 @@ def gen_chunks(buffer, header, msg_id=None, chunksize=2 ** 8): bv = memoryview(buf) header = header.serialize() yield Frame(Frame.START_MSG, 1, len(header), msg_id, next(seq)).serialize() + header + yield Frame(Frame.PAYLOAD, 1, len(data), msg_id, next(seq)).serialize() + buffer = io.BytesIO(data) bytes_read = buffer.readinto(buf) while bytes_read: - f = Frame(Frame.PAYLOAD, 1, bytes_read, msg_id, next(seq)).serialize() if bytes_read == chunksize: - yield f + bv.tobytes() + yield bv.tobytes() else: - yield f + bv[:bytes_read].tobytes() + yield bv[:bytes_read].tobytes() bytes_read = buffer.readinto(buf) diff --git a/receptor/tests/test_protocol.py b/receptor/tests/test_protocol.py index 145fa2f4..909af8b9 100644 --- a/receptor/tests/test_protocol.py +++ b/receptor/tests/test_protocol.py @@ -1,4 +1,3 @@ -import io import uuid import pytest @@ -69,7 +68,7 @@ async def test_gen_chunks(): header = Header("node1", "node2", []) payload = b"this is a test with a buffer" - for chunk in gen_chunks(io.BytesIO(payload), header): + for chunk in gen_chunks(payload, header): await b.put(chunk) h, p = await b.get() From 7fb416f71549dd0f049fe2b43cbab1a28579c3b5 Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Wed, 20 Nov 2019 14:10:44 -0600 Subject: [PATCH 3/9] moving tests to a more reasonable place Signed-off-by: Jesse Jaggars --- Pipfile | 1 + Pipfile.lock | 38 ++++++++----- receptor/messages/envelope.py | 25 +++++---- receptor/tests/test_framedbuffer.py | 83 +++++++++++++++++++++++++++++ receptor/tests/test_protocol.py | 42 --------------- 5 files changed, 124 insertions(+), 65 deletions(-) create mode 100644 receptor/tests/test_framedbuffer.py diff --git a/Pipfile b/Pipfile index c1a9dc79..761641b3 100644 --- a/Pipfile +++ b/Pipfile @@ -9,6 +9,7 @@ ipython = "*" pytest = "*" pytest-asyncio = "*" flake8 = "*" +rope = "*" [packages] python-dateutil = "*" diff --git a/Pipfile.lock b/Pipfile.lock index 308377c7..ee8b4c56 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "9cf1be5dc097163566804935386a811de7fbeb747cb173a20a129aef6d9c9660" + "sha256": "774283c0d95761538eddbd14bba1765b759f5993969cef4e953c2c645d3a867e" }, "pipfile-spec": 6, "requires": { @@ -40,6 +40,14 @@ } }, "develop": { + "appnope": { + "hashes": [ + "sha256:5b26757dc6f79a3b7dc9fab95359328d5747fcb2409d331ea66d0272b90ab2a0", + "sha256:8b995ffe925347a2138d7ac0fe77155e4311a0ea6d6da4f5128fe4b3cbe5ed71" + ], + "markers": "sys_platform == 'darwin'", + "version": "==0.1.0" + }, "astroid": { "hashes": [ "sha256:71ea07f44df9568a75d0f354c49143a4575d90645e9fead6dfb52c26a85ed13a", @@ -47,13 +55,6 @@ ], "version": "==2.3.3" }, - "atomicwrites": { - "hashes": [ - "sha256:03472c30eb2c5d1ba9227e4c2ca66ab8287fbfbbda3888aa93dc2e28fc6811b4", - "sha256:75a9445bac02d8d058d5e1fe689654ba5a6556a1dfd8ce6ec55a0ed79866cfa6" - ], - "version": "==1.3.0" - }, "attrs": { "hashes": [ "sha256:08a96c641c3a74e44eb59afb61a24f2cb9f4d7188748e76ba4bb5edfa3cb7d1c", @@ -248,11 +249,11 @@ }, "pylint": { "hashes": [ - "sha256:7b76045426c650d2b0f02fc47c14d7934d17898779da95288a74c2a7ec440702", - "sha256:856476331f3e26598017290fd65bebe81c960e806776f324093a46b76fb2d1c0" + "sha256:3db5468ad013380e987410a8d6956226963aed94ecb5f9d3a28acca6d9ac36cd", + "sha256:886e6afc935ea2590b462664b161ca9a5e40168ea99e5300935f6591ad467df4" ], "index": "pypi", - "version": "==2.4.3" + "version": "==2.4.4" }, "pyparsing": { "hashes": [ @@ -263,11 +264,11 @@ }, "pytest": { "hashes": [ - "sha256:27abc3fef618a01bebb1f0d6d303d2816a99aa87a5968ebc32fe971be91eb1e6", - "sha256:58cee9e09242937e136dbb3dab466116ba20d6b7828c7620f23947f37eb4dae4" + "sha256:1897d74f60a5d8be02e06d708b41bf2445da2ee777066bd68edf14474fc201eb", + "sha256:f6a567e20c04259d41adce9a360bd8991e6aa29dd9695c5e6bd25a9779272673" ], "index": "pypi", - "version": "==5.2.2" + "version": "==5.3.0" }, "pytest-asyncio": { "hashes": [ @@ -277,6 +278,15 @@ "index": "pypi", "version": "==0.10.0" }, + "rope": { + "hashes": [ + "sha256:6b728fdc3e98a83446c27a91fc5d56808a004f8beab7a31ab1d7224cecc7d969", + "sha256:c5c5a6a87f7b1a2095fb311135e2a3d1f194f5ecb96900fdd0a9100881f48aaf", + "sha256:f0dcf719b63200d492b85535ebe5ea9b29e0d0b8aebeb87fe03fc1a65924fdaf" + ], + "index": "pypi", + "version": "==0.14.0" + }, "six": { "hashes": [ "sha256:1f1b7d42e254082a9db6279deae68afb421ceba6158efa6131de7b3003ee93fd", diff --git a/receptor/messages/envelope.py b/receptor/messages/envelope.py index 58f18a78..8e6ba95d 100644 --- a/receptor/messages/envelope.py +++ b/receptor/messages/envelope.py @@ -36,12 +36,13 @@ async def put(self, data): async def handle_frame(self, data): self.current_frame, rest = Frame.from_data(data) - if self.current_frame.type in (Frame.START_MSG, Frame.PAYLOAD): - self.to_read = self.current_frame.length - await self.consume(rest) - else: + + if self.current_frame.type not in (Frame.HEADER, Frame.PAYLOAD): raise Exception("Unknown Frame Type") + self.to_read = self.current_frame.length + await self.consume(rest) + async def consume(self, data): self.to_read -= len(data) self.bb += data @@ -49,8 +50,8 @@ async def consume(self, data): await self.finish() async def finish(self): - if self.current_frame.type == Frame.START_MSG: - self.header = Header(**json.loads(self.bb)) + if self.current_frame.type == Frame.HEADER: + self.header = Header.from_bytes(self.bb) elif self.current_frame.type == Frame.PAYLOAD: await self.q.put((self.header, self.bb)) self.header = None @@ -62,9 +63,11 @@ async def get(self): class Frame: - START_MSG = 0 + HEADER = 0 PAYLOAD = 1 + __slots__ = ('type', 'version', 'length', 'msg_id', 'id') + def __init__(self, type_, version, length, msg_id, id_): self.type = type_ self.version = version @@ -103,8 +106,12 @@ def __init__(self, sender, recipient, route_list): def serialize(self): return json.dumps({"sender": self.sender, "recipient": self.recipient, "route_list": self.route_list}).encode("utf-8") + @classmethod + def from_bytes(cls, data): + return cls(**json.loads(data)) + def __repr__(self): - return f"Header: {self.sender}, {self.recipient}, {self.route_list}" + return f"Header({self.sender}, {self.recipient}, {self.route_list})" def __eq__(self, other): return (self.sender, self.recipient, self.route_list) == (other.sender, other.recipient, other.route_list) @@ -117,7 +124,7 @@ def gen_chunks(data, header, msg_id=None, chunksize=2 ** 8): buf = bytearray(chunksize) bv = memoryview(buf) header = header.serialize() - yield Frame(Frame.START_MSG, 1, len(header), msg_id, next(seq)).serialize() + header + yield Frame(Frame.HEADER, 1, len(header), msg_id, next(seq)).serialize() + header yield Frame(Frame.PAYLOAD, 1, len(data), msg_id, next(seq)).serialize() buffer = io.BytesIO(data) bytes_read = buffer.readinto(buf) diff --git a/receptor/tests/test_framedbuffer.py b/receptor/tests/test_framedbuffer.py new file mode 100644 index 00000000..b282b348 --- /dev/null +++ b/receptor/tests/test_framedbuffer.py @@ -0,0 +1,83 @@ +import json +import uuid + +import pytest + +from receptor.messages.envelope import Frame, FramedBuffer, Header, gen_chunks + + +@pytest.yield_fixture +def msg_id(): + return uuid.uuid4().int + + +@pytest.yield_fixture +def framed_buffer(): + return FramedBuffer() + + +@pytest.mark.asyncio +async def test_framedbuffer(framed_buffer, msg_id): + header = Header("node1", "node2", []) + header_bytes = header.serialize() + f1 = Frame(Frame.HEADER, 1, len(header_bytes), msg_id, 1) + + await framed_buffer.put(f1.serialize() + header_bytes) + + payload = b"tina loves butts" + payload2 = b"yep yep yep" + f2 = Frame(Frame.PAYLOAD, 1, len(payload) + len(payload2), msg_id, 2) + + await framed_buffer.put(f2.serialize() + payload) + await framed_buffer.put(payload2) + + h, p = await framed_buffer.get() + + assert h == header + assert p == payload + payload2 + + +@pytest.mark.asyncio +async def test_gen_chunks(): + + b = FramedBuffer() + + header = Header("node1", "node2", []) + payload = b"this is a test with a buffer" + for chunk in gen_chunks(payload, header): + await b.put(chunk) + + h, p = await b.get() + assert h == header + assert p == payload + + +@pytest.mark.asyncio +async def test_hi(msg_id, framed_buffer): + hi = json.dumps({"cmd": "hi"}).encode("utf-8") + f1 = Frame(Frame.PAYLOAD, 1, len(hi), msg_id, 1) + + await framed_buffer.put(f1.serialize()) + await framed_buffer.put(hi) + + h, p = await framed_buffer.get() + + assert h is None + assert p == hi + + +@pytest.mark.asyncio +async def test_extra_header(framed_buffer, msg_id): + h1 = Header("node1", "node2", []) + payload = h1.serialize() + f1 = Frame(Frame.HEADER, 1, len(payload), msg_id, 1) + await framed_buffer.put(f1.serialize()) + await framed_buffer.put(payload) + + h2 = Header("node3", "node4", []) + payload = h2.serialize() + f2 = Frame(Frame.HEADER, 1, len(payload), msg_id, 2) + await framed_buffer.put(f2.serialize()) + await framed_buffer.put(payload) + + assert framed_buffer.header == h2 diff --git a/receptor/tests/test_protocol.py b/receptor/tests/test_protocol.py index 909af8b9..ec293813 100644 --- a/receptor/tests/test_protocol.py +++ b/receptor/tests/test_protocol.py @@ -1,9 +1,6 @@ -import uuid - import pytest from receptor import protocol -from receptor.messages.envelope import Frame, FramedBuffer, Header, gen_chunks def deser(x): @@ -35,42 +32,3 @@ async def test_databuffer_many_msgs(): assert msg[0] == await b.get() assert msg[1] == await b.get() assert b.q.empty() - - -@pytest.mark.asyncio -async def test_framedbuffer(): - b = FramedBuffer() - - msg_id = uuid.uuid4().int - header = Header("node1", "node2", []) - header_bytes = header.serialize() - f1 = Frame(Frame.START_MSG, 1, len(header_bytes), msg_id, 1) - - await b.put(f1.serialize() + header_bytes) - - payload = b"tina loves butts" - payload2 = b"yep yep yep" - f2 = Frame(Frame.PAYLOAD, 1, len(payload) + len(payload2), msg_id, 2) - - await b.put(f2.serialize() + payload) - await b.put(payload2) - - h, p = await b.get() - - assert h == header - assert p == payload + payload2 - - -@pytest.mark.asyncio -async def test_gen_chunks(): - - b = FramedBuffer() - - header = Header("node1", "node2", []) - payload = b"this is a test with a buffer" - for chunk in gen_chunks(payload, header): - await b.put(chunk) - - h, p = await b.get() - assert h == header - assert p == payload From 41e7159472578387b2e8940c256c73c1f99edb0c Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Thu, 21 Nov 2019 08:56:18 -0600 Subject: [PATCH 4/9] using framed messages Signed-off-by: Jesse Jaggars --- receptor/messages/envelope.py | 121 +++++++++++++++++++--------- receptor/protocol.py | 16 ++-- receptor/receptor.py | 58 ++++++------- receptor/router.py | 16 ++-- receptor/tests/test_framedbuffer.py | 57 ++++++++----- 5 files changed, 164 insertions(+), 104 deletions(-) diff --git a/receptor/messages/envelope.py b/receptor/messages/envelope.py index 8e6ba95d..5d2d0b10 100644 --- a/receptor/messages/envelope.py +++ b/receptor/messages/envelope.py @@ -5,15 +5,51 @@ import itertools import json import logging +import struct import time import uuid -from struct import pack, unpack +from enum import IntEnum logger = logging.getLogger(__name__) MAX_INT64 = (2 ** 64 - 1) +class FramedMessage: + """ + A complete message constructed from one or more Frames. + """ + + __slots__ = ("msg_id", "header", "payload") + + def __init__(self, msg_id=None, header=None, payload=None): + if msg_id is None: + msg_id = uuid.uuid4().int + self.msg_id = msg_id + self.header = header + self.payload = payload + + + def serialize(self): + h = json.dumps(self.header).encode("utf-8") + return b''.join([ + Frame.wrap(h, type_=Frame.Types.HEADER, msg_id=self.msg_id).serialize(), + h, + Frame.wrap(self.payload, msg_id=self.msg_id).serialize(), + self.payload + ]) + + +class CommandMessage(FramedMessage): + + def serialize(self): + h = json.dumps(self.header).encode("utf-8") + return b''.join([ + Frame.wrap(h, type_=Frame.Types.COMMAND, msg_id=self.msg_id).serialize(), + h, + ]) + + class FramedBuffer: """ A buffer that accumulates frames and bytes to produce a header and a @@ -33,11 +69,11 @@ async def put(self, data): if not self.to_read: return await self.handle_frame(data) await self.consume(data) - + async def handle_frame(self, data): self.current_frame, rest = Frame.from_data(data) - if self.current_frame.type not in (Frame.HEADER, Frame.PAYLOAD): + if self.current_frame.type not in Frame.Types: raise Exception("Unknown Frame Type") self.to_read = self.current_frame.length @@ -50,11 +86,18 @@ async def consume(self, data): await self.finish() async def finish(self): - if self.current_frame.type == Frame.HEADER: - self.header = Header.from_bytes(self.bb) - elif self.current_frame.type == Frame.PAYLOAD: - await self.q.put((self.header, self.bb)) + if self.current_frame.type == Frame.Types.HEADER: + self.header = json.loads(self.bb) + elif self.current_frame.type == Frame.Types.PAYLOAD: + await self.q.put(FramedMessage( + self.current_frame.msg_id, header=self.header, + payload=self.bb)) self.header = None + elif self.current_frame.type == Frame.Types.COMMAND: + await self.q.put(FramedMessage( + self.current_frame.msg_id, header=json.loads(self.bb))) + else: + raise Exception("Unknown Frame Type") self.to_read = 0 self.bb = bytearray() @@ -63,11 +106,17 @@ async def get(self): class Frame: - HEADER = 0 - PAYLOAD = 1 + + class Types(IntEnum): + HEADER = 0 + PAYLOAD = 1 + COMMAND = 2 + + fmt = struct.Struct(">ccIIQQ") __slots__ = ('type', 'version', 'length', 'msg_id', 'id') + def __init__(self, type_, version, length, msg_id, id_): self.type = type_ self.version = version @@ -76,17 +125,29 @@ def __init__(self, type_, version, length, msg_id, id_): self.id = id_ def serialize(self): - return pack(">ccIIQQ", bytes([self.type]), bytes([self.version]), self.id, self.length, *split_uuid(self.msg_id)) + return self.fmt.pack( + bytes([self.type]), bytes([self.version]), + self.id, self.length, *split_uuid(self.msg_id)) @classmethod def deserialize(cls, buf): - t, v, i, length, hi, lo = unpack(">ccIIQQ", buf) + t, v, i, length, hi, lo = Frame.fmt.unpack(buf) msg_id = join_uuid(hi, lo) - return cls(ord(t), ord(v), length, msg_id, i) + return cls(Frame.Types(ord(t)), ord(v), length, msg_id, i) @classmethod def from_data(cls, data): - return cls.deserialize(data[:26]), data[26:] + return cls.deserialize(data[:Frame.fmt.size]), data[Frame.fmt.size:] + + @classmethod + def wrap(cls, data, type_=Types.PAYLOAD, msg_id=None): + """ + Returns a frame for the passed data. + """ + if not msg_id: + msg_id = uuid.uuid4().int + + return cls(type_, 1, len(data), msg_id, 1) def split_uuid(data): @@ -97,24 +158,10 @@ def join_uuid(hi, lo): return (hi << 64) | lo -class Header: - def __init__(self, sender, recipient, route_list): - self.sender = sender - self.recipient = recipient - self.route_list = route_list - - def serialize(self): - return json.dumps({"sender": self.sender, "recipient": self.recipient, "route_list": self.route_list}).encode("utf-8") - - @classmethod - def from_bytes(cls, data): - return cls(**json.loads(data)) - - def __repr__(self): - return f"Header({self.sender}, {self.recipient}, {self.route_list})" - - def __eq__(self, other): - return (self.sender, self.recipient, self.route_list) == (other.sender, other.recipient, other.route_list) +def glue(header, payload): + hf = Frame.wrap(header, type_=Frame.Types.HEADER) + pf = Frame.wrap(payload, msg_id=hf.msg_id) + return hf.serialize() + header + pf.serialize() + payload def gen_chunks(data, header, msg_id=None, chunksize=2 ** 8): @@ -123,9 +170,9 @@ def gen_chunks(data, header, msg_id=None, chunksize=2 ** 8): seq = itertools.count() buf = bytearray(chunksize) bv = memoryview(buf) - header = header.serialize() - yield Frame(Frame.HEADER, 1, len(header), msg_id, next(seq)).serialize() + header - yield Frame(Frame.PAYLOAD, 1, len(data), msg_id, next(seq)).serialize() + header = json.dumps(header).encode("utf-8") + yield Frame(Frame.Types.HEADER, 1, len(header), msg_id, next(seq)).serialize() + header + yield Frame(Frame.Types.PAYLOAD, 1, len(data), msg_id, next(seq)).serialize() buffer = io.BytesIO(data) bytes_read = buffer.readinto(buf) while bytes_read: @@ -146,13 +193,13 @@ def __init__(self, frame_id, sender, recipient, route_list, inner): self.inner_obj = None async def deserialize_inner(self, receptor): - self.inner_obj = await InnerEnvelope.deserialize(receptor, self.inner) + self.inner_obj = await Inner.deserialize(receptor, self.inner) @classmethod def from_raw(cls, raw): doc = json.loads(raw) return cls(**doc) - + def serialize(self): return json.dumps(dict( frame_id=self.frame_id, @@ -163,7 +210,7 @@ def serialize(self): )) -class InnerEnvelope: +class Inner: def __init__(self, receptor, message_id, sender, recipient, message_type, timestamp, raw_payload, directive=None, in_response_to=None, ttl=None, serial=1, code=0, expire_time_delta=300): diff --git a/receptor/protocol.py b/receptor/protocol.py index 71be5915..24f93ca6 100644 --- a/receptor/protocol.py +++ b/receptor/protocol.py @@ -68,7 +68,7 @@ def connection_made(self, transport): self.peername = transport.get_extra_info('peername') self.transport = transport connected_peers_gauge.inc() - self.incoming_buffer = DataBuffer(loop=self.loop) + self.incoming_buffer = envelope.FramedBuffer(loop=self.loop) self.loop.create_task(self.wait_greeting()) def connection_lost(self, exc): @@ -77,7 +77,7 @@ def connection_lost(self, exc): def data_received(self, data): logger.debug(data) - self.incoming_buffer.add(data) + self.loop.create_task(self.incoming_buffer.put(data)) async def wait_greeting(self): ''' @@ -86,9 +86,9 @@ async def wait_greeting(self): ''' logger.debug('Looking for handshake...') data = await self.incoming_buffer.get() - logger.debug(data) - if data["cmd"] == "HI": - self.handle_handshake(data) + logger.debug(data.header) + if data.header["cmd"] == "HI": + self.handle_handshake(data.header) logger.debug("handshake complete, starting normal handle loop") else: logger.error("Handshake failed!") @@ -102,15 +102,15 @@ def handle_handshake(self, data): self.loop.create_task(self.receptor.message_handler(self.incoming_buffer)) def send_handshake(self): - msg = json.dumps({ + msg = envelope.CommandMessage(header={ "cmd": "HI", "id": self.receptor.node_id, "expire_time": time.time() + 10, "meta": dict(capabilities=self.receptor.work_manager.get_capabilities(), groups=self.receptor.config.node_groups, work=self.receptor.work_manager.get_work()) - }).encode("utf-8") - self.transport.write(msg + DELIM) + }) + self.transport.write(msg.serialize()) class BasicProtocol(BaseProtocol): diff --git a/receptor/receptor.py b/receptor/receptor.py index 0281ac0a..67b14edf 100644 --- a/receptor/receptor.py +++ b/receptor/receptor.py @@ -94,11 +94,11 @@ def update_connections(self, protocol_obj): async def message_handler(self, buf): while True: - data = await buf.get() + header, data = await buf.get() if "cmd" in data and data["cmd"] == "ROUTE": await self.handle_route_advertisement(data) else: - await self.handle_message(data) + await self.handle_message(header, data) def add_connection(self, protocol_obj): self.update_connections(protocol_obj) @@ -137,74 +137,74 @@ async def send_route_advertisement(self, edges=None, seen=[]): for target in destinations: buf = self.buffer_mgr.get_buffer_for_node(target, self) try: - await buf.put(json.dumps({ + msg = envelope.CommandMessage(header={ "cmd": "ROUTE", "id": self.node_id, "capabilities": self.work_manager.get_capabilities(), "groups": self.config.node_groups, "edges": edges, "seen": seens - }).encode("utf-8")) + }) + await buf.put(msg.serialize()) except Exception as e: logger.exception("Error trying to broadcast routes and capabilities: {}".format(e)) - async def handle_directive(self, outer_env): + async def handle_directive(self, inner): try: - namespace, _ = outer_env.inner_obj.directive.split(':', 1) + namespace, _ = inner.directive.split(':', 1) if namespace == RECEPTOR_DIRECTIVE_NAMESPACE: - await directive.control(self.router, outer_env.inner_obj) + await directive.control(self.router, inner) else: # other namespace/work directives - await self.work_manager.handle(outer_env.inner_obj) + await self.work_manager.handle(inner) except ValueError: - logger.error("error in handle_message: Invalid directive -> '%s'. Sending failure response back." % (outer_env.inner_obj.directive,)) - err_resp = outer_env.inner_obj.make_response( + logger.error("error in handle_message: Invalid directive -> '%s'. Sending failure response back." % (inner.directive,)) + err_resp = inner.make_response( receptor=self, - recipient=outer_env.inner_obj.sender, - payload="An invalid directive ('%s') was specified." % (outer_env.inner_obj.directive,), - in_response_to=outer_env.inner_obj.message_id, - serial=outer_env.inner_obj.serial + 1, + recipient=inner.sender, + payload="An invalid directive ('%s') was specified." % (inner.directive,), + in_response_to=inner.message_id, + serial=inner.serial + 1, ttl=15, code=1, ) await self.router.send(err_resp) except Exception as e: logger.error("error in handle_message: '%s'. Sending failure response back." % (str(e),)) - err_resp = outer_env.inner_obj.make_response( + err_resp = inner.make_response( receptor=self, - recipient=outer_env.inner_obj.sender, + recipient=inner.sender, payload=str(e), - in_response_to=outer_env.inner_obj.message_id, - serial=outer_env.inner_obj.serial + 1, + in_response_to=inner.message_id, + serial=inner.serial + 1, ttl=15, code=1, ) await self.router.send(err_resp) - async def handle_response(self, outer_env): - in_response_to = outer_env.inner_obj.in_response_to + async def handle_response(self, inner): + in_response_to = inner.in_response_to if in_response_to in self.router.response_registry: logger.info(f'Handling response to {in_response_to} with callback.') for connection in self.controller_connections: - connection.emit_response(outer_env.inner_obj) + connection.emit_response(inner) else: logger.warning(f'Received response to {in_response_to} but no record of sent message.') - async def handle_message(self, msg): + async def handle_message(self, header, msg): handlers = dict( directive=self.handle_directive, response=self.handle_response, ) messages_received_counter.inc() - outer_env = envelope.OuterEnvelope(**msg) - next_hop = self.router.next_hop(outer_env.recipient) + next_hop = self.router.next_hop(header.recipient) if next_hop: - return await self.router.forward(outer_env, next_hop) + return await self.router.forward(header, msg, next_hop) - await outer_env.deserialize_inner(self) + inner = await envelope.Inner.deserialize(self, msg) - if outer_env.inner_obj.message_type not in handlers: + if inner.message_type not in handlers: raise exceptions.UnknownMessageType( - f'Unknown message type: {outer_env.inner_obj.message_type}') + f'Unknown message type: {inner.message_type}') - await handlers[outer_env.inner_obj.message_type](outer_env) + await handlers[inner.message_type](inner) diff --git a/receptor/router.py b/receptor/router.py index b3f0ffbf..317af6ff 100644 --- a/receptor/router.py +++ b/receptor/router.py @@ -177,14 +177,14 @@ async def send(self, inner_envelope, expected_response=False): #TODO: This probably needs to emit an error response raise UnrouteableError(f'No route found to {inner_envelope.recipient}') signed = await inner_envelope.sign_and_serialize() - outer_envelope = envelope.OuterEnvelope( - frame_id=str(uuid.uuid4()), - sender=self.node_id, - recipient=inner_envelope.recipient, - route_list=[self.node_id], - inner=signed - ) + + header = { + "sender": self.node_id, + "recipient": inner_envelope.recipient, + "route_list": [self.node_id] + } + msg = envelope.FramedMessage(uuid.uuid4().int, header, signed) logger.debug(f'Sending {inner_envelope.message_id} to {inner_envelope.recipient} via {next_node_id}') if expected_response and inner_envelope.message_type == 'directive': self.response_registry[inner_envelope.message_id] = dict(message_sent_time=inner_envelope.timestamp) - await self.forward(outer_envelope, next_node_id) + await self.forward(msg, next_node_id) diff --git a/receptor/tests/test_framedbuffer.py b/receptor/tests/test_framedbuffer.py index b282b348..3a97e11f 100644 --- a/receptor/tests/test_framedbuffer.py +++ b/receptor/tests/test_framedbuffer.py @@ -3,7 +3,7 @@ import pytest -from receptor.messages.envelope import Frame, FramedBuffer, Header, gen_chunks +from receptor.messages.envelope import Frame, FramedBuffer, gen_chunks @pytest.yield_fixture @@ -18,23 +18,23 @@ def framed_buffer(): @pytest.mark.asyncio async def test_framedbuffer(framed_buffer, msg_id): - header = Header("node1", "node2", []) - header_bytes = header.serialize() - f1 = Frame(Frame.HEADER, 1, len(header_bytes), msg_id, 1) + header = {"sender": "node1", "recipient": "node2", "route_list": []} + header_bytes = json.dumps(header).encode("utf-8") + f1 = Frame(Frame.Types.HEADER, 1, len(header_bytes), msg_id, 1) await framed_buffer.put(f1.serialize() + header_bytes) payload = b"tina loves butts" payload2 = b"yep yep yep" - f2 = Frame(Frame.PAYLOAD, 1, len(payload) + len(payload2), msg_id, 2) + f2 = Frame(Frame.Types.PAYLOAD, 1, len(payload) + len(payload2), msg_id, 2) await framed_buffer.put(f2.serialize() + payload) await framed_buffer.put(payload2) - h, p = await framed_buffer.get() + m = await framed_buffer.get() - assert h == header - assert p == payload + payload2 + assert m.header == header + assert m.payload == payload + payload2 @pytest.mark.asyncio @@ -42,42 +42,55 @@ async def test_gen_chunks(): b = FramedBuffer() - header = Header("node1", "node2", []) + header = {"sender": "node1", "recipient": "node2", "route_list": []} payload = b"this is a test with a buffer" for chunk in gen_chunks(payload, header): await b.put(chunk) - h, p = await b.get() - assert h == header - assert p == payload + m = await b.get() + assert m.header == header + assert m.payload == payload @pytest.mark.asyncio async def test_hi(msg_id, framed_buffer): hi = json.dumps({"cmd": "hi"}).encode("utf-8") - f1 = Frame(Frame.PAYLOAD, 1, len(hi), msg_id, 1) + f1 = Frame(Frame.Types.PAYLOAD, 1, len(hi), msg_id, 1) await framed_buffer.put(f1.serialize()) await framed_buffer.put(hi) - h, p = await framed_buffer.get() + m = await framed_buffer.get() - assert h is None - assert p == hi + assert m.header is None + assert m.payload == hi @pytest.mark.asyncio async def test_extra_header(framed_buffer, msg_id): - h1 = Header("node1", "node2", []) - payload = h1.serialize() - f1 = Frame(Frame.HEADER, 1, len(payload), msg_id, 1) + h1 = {"sender": "node1", "recipient": "node2", "route_list": []} + payload = json.dumps(h1).encode("utf-8") + f1 = Frame(Frame.Types.HEADER, 1, len(payload), msg_id, 1) await framed_buffer.put(f1.serialize()) await framed_buffer.put(payload) - h2 = Header("node3", "node4", []) - payload = h2.serialize() - f2 = Frame(Frame.HEADER, 1, len(payload), msg_id, 2) + h2 = {"sender": "node3", "recipient": "node4", "route_list": []} + payload = json.dumps(h2).encode("utf-8") + f2 = Frame(Frame.Types.HEADER, 1, len(payload), msg_id, 2) await framed_buffer.put(f2.serialize()) await framed_buffer.put(payload) assert framed_buffer.header == h2 + + +@pytest.mark.asyncio +async def test_command(framed_buffer, msg_id): + cmd = {"cmd": "hi"} + payload = json.dumps(cmd).encode("utf-8") + f1 = Frame(Frame.Types.COMMAND, 1, len(payload), msg_id, 1) + await framed_buffer.put(f1.serialize()) + await framed_buffer.put(payload) + + m = await framed_buffer.get() + assert m.header == cmd + assert m.payload is None From 77a6faf4e6073b6aaf4f9bdcd77aa6cf730bc875 Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Thu, 21 Nov 2019 11:47:56 -0600 Subject: [PATCH 5/9] adding logging to manifest loading Signed-off-by: Jesse Jaggars --- receptor/buffers/file.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/receptor/buffers/file.py b/receptor/buffers/file.py index e300a9d9..72f94fe9 100644 --- a/receptor/buffers/file.py +++ b/receptor/buffers/file.py @@ -57,6 +57,10 @@ def _read_manifest(self): return json.load(fp) except FileNotFoundError: return [] + except json.decoder.JSONDecodeError: + with open(self._manifest_path, "r") as fp: + logger.error("failed to decode manifest: %s", fp.read()) + raise def _path_for_ident(self, ident): return os.path.join(self._message_path, ident) From 7fcd70638d2387e914dbe4b15f23c54252e0133a Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Thu, 21 Nov 2019 14:28:11 -0600 Subject: [PATCH 6/9] handling edge cases in consume 1. too much data 2. not enough data For #1 we make sure we read no more than `to_read` in any given consume operation. For #2 we make sure we have enough bytes on hand to unpack into a Frame, buffering if we don't. Signed-off-by: Jesse Jaggars --- receptor/messages/directive.py | 4 +- receptor/messages/envelope.py | 74 +++++++---------------------- receptor/protocol.py | 15 ++++-- receptor/receptor.py | 22 +++++---- receptor/router.py | 12 ++--- receptor/security/__init__.py | 4 +- receptor/tests/test_framedbuffer.py | 51 +++++++++++++------- receptor/work.py | 9 ++-- 8 files changed, 89 insertions(+), 102 deletions(-) diff --git a/receptor/messages/directive.py b/receptor/messages/directive.py index c2b5c7fa..175571fa 100644 --- a/receptor/messages/directive.py +++ b/receptor/messages/directive.py @@ -1,6 +1,6 @@ import datetime -import logging import json +import logging from ..exceptions import UnknownDirective from . import envelope @@ -26,7 +26,7 @@ async def __call__(self, router, inner_env): serial = 0 async for response in responses: serial += 1 - enveloped_response = envelope.InnerEnvelope.make_response( + enveloped_response = envelope.Inner.make_response( receptor=router.receptor, recipient=inner_env.sender, payload=response, diff --git a/receptor/messages/envelope.py b/receptor/messages/envelope.py index 5d2d0b10..79621015 100644 --- a/receptor/messages/envelope.py +++ b/receptor/messages/envelope.py @@ -1,8 +1,6 @@ import asyncio import base64 import datetime -import io -import itertools import json import logging import struct @@ -61,29 +59,42 @@ class FramedBuffer: def __init__(self, loop=None): self.q = asyncio.Queue(loop=loop) self.header = None + self.framebuffer = bytearray() self.bb = bytearray() self.current_frame = None self.to_read = 0 async def put(self, data): + logger.debug("put: %s ... %s", data[:16], data[-16:]) if not self.to_read: return await self.handle_frame(data) await self.consume(data) async def handle_frame(self, data): - self.current_frame, rest = Frame.from_data(data) + try: + self.framebuffer += data + frame, rest = Frame.from_data(self.framebuffer) + except struct.error: + return # We don't have enough data yet + else: + self.framebuffer = bytearray() - if self.current_frame.type not in Frame.Types: + if frame.type not in Frame.Types: raise Exception("Unknown Frame Type") + self.current_frame = frame self.to_read = self.current_frame.length await self.consume(rest) async def consume(self, data): + logger.debug("consuming %d bytes; to_read = %d bytes", len(data), self.to_read) + data, rest = data[:self.to_read], data[self.to_read:] self.to_read -= len(data) self.bb += data if self.to_read == 0: await self.finish() + if rest: + await self.handle_frame(rest) async def finish(self): if self.current_frame.type == Frame.Types.HEADER: @@ -124,6 +135,9 @@ def __init__(self, type_, version, length, msg_id, id_): self.msg_id = msg_id self.id = id_ + def __repr__(self): + return f"Frame({self.type}, {self.version}, {self.length}, {self.msg_id}, {self.id})" + def serialize(self): return self.fmt.pack( bytes([self.type]), bytes([self.version]), @@ -158,58 +172,6 @@ def join_uuid(hi, lo): return (hi << 64) | lo -def glue(header, payload): - hf = Frame.wrap(header, type_=Frame.Types.HEADER) - pf = Frame.wrap(payload, msg_id=hf.msg_id) - return hf.serialize() + header + pf.serialize() + payload - - -def gen_chunks(data, header, msg_id=None, chunksize=2 ** 8): - if msg_id is None: - msg_id = uuid.uuid4().int - seq = itertools.count() - buf = bytearray(chunksize) - bv = memoryview(buf) - header = json.dumps(header).encode("utf-8") - yield Frame(Frame.Types.HEADER, 1, len(header), msg_id, next(seq)).serialize() + header - yield Frame(Frame.Types.PAYLOAD, 1, len(data), msg_id, next(seq)).serialize() - buffer = io.BytesIO(data) - bytes_read = buffer.readinto(buf) - while bytes_read: - if bytes_read == chunksize: - yield bv.tobytes() - else: - yield bv[:bytes_read].tobytes() - bytes_read = buffer.readinto(buf) - - -class OuterEnvelope: - def __init__(self, frame_id, sender, recipient, route_list, inner): - self.frame_id = frame_id - self.sender = sender - self.recipient = recipient - self.route_list = route_list - self.inner = inner - self.inner_obj = None - - async def deserialize_inner(self, receptor): - self.inner_obj = await Inner.deserialize(receptor, self.inner) - - @classmethod - def from_raw(cls, raw): - doc = json.loads(raw) - return cls(**doc) - - def serialize(self): - return json.dumps(dict( - frame_id=self.frame_id, - sender=self.sender, - recipient=self.recipient, - route_list=self.route_list, - inner=self.inner - )) - - class Inner: def __init__(self, receptor, message_id, sender, recipient, message_type, timestamp, raw_payload, directive=None, in_response_to=None, ttl=None, serial=1, diff --git a/receptor/protocol.py b/receptor/protocol.py index 24f93ca6..b95d5083 100644 --- a/receptor/protocol.py +++ b/receptor/protocol.py @@ -57,7 +57,9 @@ async def watch_queue(self): continue try: - self.transport.write(msg + DELIM) + logger.debug("about to write %s ... %s", msg[:8], msg[-8:]) + self.transport.write(msg) + logger.debug("written successfully") except Exception: logger.exception("Error received trying to write to %s", self.id) await buffer_obj.put(msg) @@ -76,7 +78,7 @@ def connection_lost(self, exc): self.receptor.remove_connection(self) def data_received(self, data): - logger.debug(data) + logger.debug("recv: %s ... %s", data[:16], data[-16:]) self.loop.create_task(self.incoming_buffer.put(data)) async def wait_greeting(self): @@ -95,10 +97,12 @@ async def wait_greeting(self): self.transport.close() def handle_handshake(self, data): + logger.debug("handle_handshake: %s", data) self.id = data["id"] self.meta = data.get("meta", {}) self.receptor.add_connection(self) self.loop.create_task(self.watch_queue()) + logger.debug("starting message_handler: %s", self.incoming_buffer) self.loop.create_task(self.receptor.message_handler(self.incoming_buffer)) def send_handshake(self): @@ -180,14 +184,15 @@ def emit_response(self, response): def _do_emit_callback(self, fut): res = fut.result() - self.transport.write(res.encode() + DELIM) + logger.debug("_do_emit_callback: %s", res) + self.transport.write(res + DELIM) def data_received(self, data): recipient, directive, payload = data.rstrip(DELIM).decode('utf8').split('\n', 2) message_id = str(uuid.uuid4()) logger.info(f'{message_id}: Sending {directive} to {recipient}') sent_timestamp = datetime.datetime.utcnow() - inner_env = envelope.InnerEnvelope( + inner_env = envelope.Inner( receptor=self.receptor, message_id=message_id, sender=self.receptor.node_id, @@ -212,7 +217,7 @@ def _data_received_callback(self, inner_env, fut): try: fut.result() except Exception as e: - err_resp = envelope.InnerEnvelope.make_response( + err_resp = envelope.Inner.make_response( receptor=self.receptor, recipient=inner_env.sender, payload=str(e), diff --git a/receptor/receptor.py b/receptor/receptor.py index 67b14edf..9c63da4b 100644 --- a/receptor/receptor.py +++ b/receptor/receptor.py @@ -93,12 +93,18 @@ def update_connections(self, protocol_obj): self.update_connection_manifest(protocol_obj.id) async def message_handler(self, buf): + logger.debug("spawning message_handler") while True: - header, data = await buf.get() - if "cmd" in data and data["cmd"] == "ROUTE": - await self.handle_route_advertisement(data) + try: + data = await buf.get() + except Exception: + logger.exception("message_handler") else: - await self.handle_message(header, data) + logger.debug("message_handler: %s", data) + if "cmd" in data.header and data.header["cmd"] == "ROUTE": + await self.handle_route_advertisement(data.header) + else: + await self.handle_message(data) def add_connection(self, protocol_obj): self.update_connections(protocol_obj) @@ -191,17 +197,17 @@ async def handle_response(self, inner): else: logger.warning(f'Received response to {in_response_to} but no record of sent message.') - async def handle_message(self, header, msg): + async def handle_message(self, msg): handlers = dict( directive=self.handle_directive, response=self.handle_response, ) messages_received_counter.inc() - next_hop = self.router.next_hop(header.recipient) + next_hop = self.router.next_hop(msg.header["recipient"]) if next_hop: - return await self.router.forward(header, msg, next_hop) + return await self.router.forward(msg, next_hop) - inner = await envelope.Inner.deserialize(self, msg) + inner = await envelope.Inner.deserialize(self, msg.payload) if inner.message_type not in handlers: raise exceptions.UnknownMessageType( diff --git a/receptor/router.py b/receptor/router.py index 317af6ff..18e03b53 100644 --- a/receptor/router.py +++ b/receptor/router.py @@ -97,7 +97,7 @@ def get_nodes(self): async def ping_node(self, node_id, callback=log_ping): logger.info(f'Sending ping to node {node_id}') now = datetime.datetime.utcnow().isoformat() - ping_envelope = envelope.InnerEnvelope( + ping_envelope = envelope.Inner( receptor=self.receptor, message_id=str(uuid.uuid4()), sender=self.node_id, @@ -137,17 +137,17 @@ def find_shortest_path(self, to_node_id): mins[next_vertex] = next_total_cost heapq.heappush(heap, (next_total_cost, next_vertex, path)) - async def forward(self, outer_envelope, next_hop): + async def forward(self, msg, next_hop): """ Forward a message on to the next hop closer to its destination """ buffer_mgr = self.receptor.config.components_buffer_manager buffer_obj = buffer_mgr.get_buffer_for_node(next_hop, self.receptor) - outer_envelope.route_list.append(self.node_id) - logger.debug(f'Forwarding frame {outer_envelope.frame_id} to {next_hop}') + msg.header["route_list"].append(self.node_id) + logger.debug(f'Forwarding frame {msg.msg_id} to {next_hop}') try: route_counter.inc() - await buffer_obj.put(outer_envelope.serialize().encode("utf-8")) + await buffer_obj.put(msg.serialize()) except ReceptorBufferError as e: logger.exception("Receptor Buffer Write Error forwarding message to {}: {}".format(next_hop, e)) # TODO: Possible to find another route? This might be a hard failure @@ -183,7 +183,7 @@ async def send(self, inner_envelope, expected_response=False): "recipient": inner_envelope.recipient, "route_list": [self.node_id] } - msg = envelope.FramedMessage(uuid.uuid4().int, header, signed) + msg = envelope.FramedMessage(msg_id=uuid.uuid4().int, header=header, payload=signed) logger.debug(f'Sending {inner_envelope.message_id} to {inner_envelope.recipient} via {next_node_id}') if expected_response and inner_envelope.message_type == 'directive': self.response_registry[inner_envelope.message_id] = dict(message_sent_time=inner_envelope.timestamp) diff --git a/receptor/security/__init__.py b/receptor/security/__init__.py index 1d0ce0db..8137511e 100644 --- a/receptor/security/__init__.py +++ b/receptor/security/__init__.py @@ -1,5 +1,6 @@ import json import logging + logger = logging.getLogger(__name__) @@ -26,5 +27,4 @@ async def sign_response(self, inner_envelope): for attr in ['message_id', 'sender', 'recipient', 'message_type', 'timestamp', 'raw_payload', 'directive', 'in_response_to', 'ttl', 'serial', 'code']} - ) - + ).encode("utf-8") diff --git a/receptor/tests/test_framedbuffer.py b/receptor/tests/test_framedbuffer.py index 3a97e11f..192b4671 100644 --- a/receptor/tests/test_framedbuffer.py +++ b/receptor/tests/test_framedbuffer.py @@ -3,7 +3,7 @@ import pytest -from receptor.messages.envelope import Frame, FramedBuffer, gen_chunks +from receptor.messages.envelope import Frame, FramedBuffer, FramedMessage @pytest.yield_fixture @@ -12,8 +12,8 @@ def msg_id(): @pytest.yield_fixture -def framed_buffer(): - return FramedBuffer() +def framed_buffer(event_loop): + return FramedBuffer(loop=event_loop) @pytest.mark.asyncio @@ -37,21 +37,6 @@ async def test_framedbuffer(framed_buffer, msg_id): assert m.payload == payload + payload2 -@pytest.mark.asyncio -async def test_gen_chunks(): - - b = FramedBuffer() - - header = {"sender": "node1", "recipient": "node2", "route_list": []} - payload = b"this is a test with a buffer" - for chunk in gen_chunks(payload, header): - await b.put(chunk) - - m = await b.get() - assert m.header == header - assert m.payload == payload - - @pytest.mark.asyncio async def test_hi(msg_id, framed_buffer): hi = json.dumps({"cmd": "hi"}).encode("utf-8") @@ -94,3 +79,33 @@ async def test_command(framed_buffer, msg_id): m = await framed_buffer.get() assert m.header == cmd assert m.payload is None + + +@pytest.mark.asyncio +async def test_overfull(framed_buffer, msg_id): + header = {"foo": "bar"} + payload = b'this is a test' + msg = FramedMessage(header=header, payload=payload) + + await framed_buffer.put(msg.serialize()) + + m = await framed_buffer.get() + + assert m.header == header + assert m.payload == payload + + +@pytest.mark.asyncio +async def test_underfull(framed_buffer, msg_id): + header = {"foo": "bar"} + payload = b'this is a test' + msg = FramedMessage(header=header, payload=payload) + b = msg.serialize() + + await framed_buffer.put(b[:10]) + await framed_buffer.put(b[10:]) + + m = await framed_buffer.get() + + assert m.header == header + assert m.payload == payload diff --git a/receptor/work.py b/receptor/work.py index fd5f12e8..b747e3c8 100644 --- a/receptor/work.py +++ b/receptor/work.py @@ -1,11 +1,11 @@ import logging import traceback + import pkg_resources from . import exceptions from .messages import envelope -from .stats import work_counter, active_work_gauge - +from .stats import active_work_gauge, work_counter logger = logging.getLogger(__name__) @@ -58,7 +58,7 @@ async def handle(self, inner_env): async for response in responses: serial += 1 logger.debug(f'Response emitted for {inner_env.message_id}, serial {serial}') - enveloped_response = envelope.InnerEnvelope.make_response( + enveloped_response = envelope.Inner.make_response( receptor=self.receptor, recipient=inner_env.sender, payload=response, @@ -70,7 +70,7 @@ async def handle(self, inner_env): serial += 1 logger.error(f'Error encountered while handling the response, replying with an error message ({e})') logger.error(traceback.format_tb(e.__traceback__)) - enveloped_response = envelope.InnerEnvelope.make_response( + enveloped_response = envelope.Inner.make_response( receptor=self.receptor, recipient=inner_env.sender, payload=str(e), @@ -80,4 +80,3 @@ async def handle(self, inner_env): ) self.remove_work(inner_env) await self.receptor.router.send(enveloped_response) - From 741056589063cefea4cfb1547c2fd91cb452fa3a Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Thu, 21 Nov 2019 16:16:29 -0600 Subject: [PATCH 7/9] removing some dead code Signed-off-by: Jesse Jaggars --- Pipfile | 1 - Pipfile.lock | 17 ++++------------- receptor/messages/envelope.py | 1 - receptor/protocol.py | 25 ------------------------ receptor/tests/test_protocol.py | 34 --------------------------------- 5 files changed, 4 insertions(+), 74 deletions(-) delete mode 100644 receptor/tests/test_protocol.py diff --git a/Pipfile b/Pipfile index 761641b3..c1a9dc79 100644 --- a/Pipfile +++ b/Pipfile @@ -9,7 +9,6 @@ ipython = "*" pytest = "*" pytest-asyncio = "*" flake8 = "*" -rope = "*" [packages] python-dateutil = "*" diff --git a/Pipfile.lock b/Pipfile.lock index ee8b4c56..9632f48e 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "774283c0d95761538eddbd14bba1765b759f5993969cef4e953c2c645d3a867e" + "sha256": "9cf1be5dc097163566804935386a811de7fbeb747cb173a20a129aef6d9c9660" }, "pipfile-spec": 6, "requires": { @@ -199,10 +199,10 @@ }, "pluggy": { "hashes": [ - "sha256:0db4b7601aae1d35b4a033282da476845aa19185c1e6964b25cf324b5e4ec3e6", - "sha256:fa5fa1622fa6dd5c030e9cad086fa19ef6a0cf6d7a2d12318e10cb49d6d68f34" + "sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0", + "sha256:966c145cd83c96502c3c3868f50408687b38434af77734af1e9ca461a4081d2d" ], - "version": "==0.13.0" + "version": "==0.13.1" }, "prompt-toolkit": { "hashes": [ @@ -278,15 +278,6 @@ "index": "pypi", "version": "==0.10.0" }, - "rope": { - "hashes": [ - "sha256:6b728fdc3e98a83446c27a91fc5d56808a004f8beab7a31ab1d7224cecc7d969", - "sha256:c5c5a6a87f7b1a2095fb311135e2a3d1f194f5ecb96900fdd0a9100881f48aaf", - "sha256:f0dcf719b63200d492b85535ebe5ea9b29e0d0b8aebeb87fe03fc1a65924fdaf" - ], - "index": "pypi", - "version": "==0.14.0" - }, "six": { "hashes": [ "sha256:1f1b7d42e254082a9db6279deae68afb421ceba6158efa6131de7b3003ee93fd", diff --git a/receptor/messages/envelope.py b/receptor/messages/envelope.py index 79621015..e138ea54 100644 --- a/receptor/messages/envelope.py +++ b/receptor/messages/envelope.py @@ -65,7 +65,6 @@ def __init__(self, loop=None): self.to_read = 0 async def put(self, data): - logger.debug("put: %s ... %s", data[:16], data[-16:]) if not self.to_read: return await self.handle_frame(data) await self.consume(data) diff --git a/receptor/protocol.py b/receptor/protocol.py index b95d5083..d8fff3c3 100644 --- a/receptor/protocol.py +++ b/receptor/protocol.py @@ -1,7 +1,6 @@ import asyncio import datetime import functools -import json import logging import time import uuid @@ -12,23 +11,6 @@ logger = logging.getLogger(__name__) DELIM = b"\x1b[K" -SIZEB = b"\x1b[%dD" - - -class DataBuffer: - def __init__(self, loop=None, deserializer=json.loads): - self.q = asyncio.Queue(loop=loop) - self.data_buffer = b"" - self.deserializer = deserializer - - def add(self, data): - self.data_buffer = self.data_buffer + data - *ready, self.data_buffer = self.data_buffer.rsplit(DELIM) - for chunk in ready: - self.q.put_nowait(chunk) - - async def get(self): - return self.deserializer(await self.q.get()) class BaseProtocol(asyncio.Protocol): @@ -57,9 +39,7 @@ async def watch_queue(self): continue try: - logger.debug("about to write %s ... %s", msg[:8], msg[-8:]) self.transport.write(msg) - logger.debug("written successfully") except Exception: logger.exception("Error received trying to write to %s", self.id) await buffer_obj.put(msg) @@ -78,7 +58,6 @@ def connection_lost(self, exc): self.receptor.remove_connection(self) def data_received(self, data): - logger.debug("recv: %s ... %s", data[:16], data[-16:]) self.loop.create_task(self.incoming_buffer.put(data)) async def wait_greeting(self): @@ -88,10 +67,8 @@ async def wait_greeting(self): ''' logger.debug('Looking for handshake...') data = await self.incoming_buffer.get() - logger.debug(data.header) if data.header["cmd"] == "HI": self.handle_handshake(data.header) - logger.debug("handshake complete, starting normal handle loop") else: logger.error("Handshake failed!") self.transport.close() @@ -102,7 +79,6 @@ def handle_handshake(self, data): self.meta = data.get("meta", {}) self.receptor.add_connection(self) self.loop.create_task(self.watch_queue()) - logger.debug("starting message_handler: %s", self.incoming_buffer) self.loop.create_task(self.receptor.message_handler(self.incoming_buffer)) def send_handshake(self): @@ -184,7 +160,6 @@ def emit_response(self, response): def _do_emit_callback(self, fut): res = fut.result() - logger.debug("_do_emit_callback: %s", res) self.transport.write(res + DELIM) def data_received(self, data): diff --git a/receptor/tests/test_protocol.py b/receptor/tests/test_protocol.py deleted file mode 100644 index ec293813..00000000 --- a/receptor/tests/test_protocol.py +++ /dev/null @@ -1,34 +0,0 @@ -import pytest - -from receptor import protocol - - -def deser(x): - return x - - -@pytest.mark.asyncio -async def test_databuffer(): - b = protocol.DataBuffer(deserializer=deser) - msg = b"this is a test" - s = protocol.DELIM + msg + protocol.DELIM - b.add(s) - assert await b.get() == b"" - assert await b.get() == msg - - -def test_databuffer_no_delim(): - b = protocol.DataBuffer(deserializer=deser) - msg = b"this is a test" - b.add(msg) - assert b.q.empty() - - -@pytest.mark.asyncio -async def test_databuffer_many_msgs(): - b = protocol.DataBuffer(deserializer=deser) - msg = [b"first bit", b"second bit", b"third bit unfinished"] - b.add(protocol.DELIM.join(msg)) - assert msg[0] == await b.get() - assert msg[1] == await b.get() - assert b.q.empty() From 73f82e4815bade06896c47c77cf5c6c9f653e8a4 Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Thu, 21 Nov 2019 22:17:10 -0600 Subject: [PATCH 8/9] adding some documentation Signed-off-by: Jesse Jaggars --- receptor/buffers/file.py | 10 +++++----- receptor/messages/envelope.py | 16 +++++++++++++--- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/receptor/buffers/file.py b/receptor/buffers/file.py index 72f94fe9..0e22a6f7 100644 --- a/receptor/buffers/file.py +++ b/receptor/buffers/file.py @@ -27,13 +27,13 @@ def __init__(self, dir_, key, loop): pass for item in self._read_manifest(): self.q.put_nowait(item) - + async def put(self, data): ident = str(uuid.uuid4()) await self._loop.run_in_executor(pool, self._write_file, data, ident) await self.q.put(ident) await self._save_manifest() - + async def get(self, handle_only=False, delete=True): while True: msg = await self.q.get() @@ -42,15 +42,15 @@ async def get(self, handle_only=False, delete=True): return await self._get_file(msg, handle_only=handle_only, delete=delete) except FileNotFoundError: pass - + async def _save_manifest(self): async with self._manifest_lock: await self._loop.run_in_executor(pool, self._write_manifest) - + def _write_manifest(self): with open(self._manifest_path, "w") as fp: json.dump(list(self.q._queue), fp) - + def _read_manifest(self): try: with open(self._manifest_path, "r") as fp: diff --git a/receptor/messages/envelope.py b/receptor/messages/envelope.py index e138ea54..34683ad6 100644 --- a/receptor/messages/envelope.py +++ b/receptor/messages/envelope.py @@ -15,7 +15,7 @@ class FramedMessage: """ - A complete message constructed from one or more Frames. + A complete, two-part message. """ __slots__ = ("msg_id", "header", "payload") @@ -27,7 +27,6 @@ def __init__(self, msg_id=None, header=None, payload=None): self.header = header self.payload = payload - def serialize(self): h = json.dumps(self.header).encode("utf-8") return b''.join([ @@ -39,6 +38,10 @@ def serialize(self): class CommandMessage(FramedMessage): + """ + A complete, single part message, meant to encapsulate point to point + commands or naive broadcasts. + """ def serialize(self): h = json.dumps(self.header).encode("utf-8") @@ -116,6 +119,12 @@ async def get(self): class Frame: + """ + A Frame represents the minimal metadata about a transmission. + + Usually you should not create one directly, but rather use the + FramedMessage or CommandMessage classes. + """ class Types(IntEnum): HEADER = 0 @@ -126,7 +135,6 @@ class Types(IntEnum): __slots__ = ('type', 'version', 'length', 'msg_id', 'id') - def __init__(self, type_, version, length, msg_id, id_): self.type = type_ self.version = version @@ -164,10 +172,12 @@ def wrap(cls, data, type_=Types.PAYLOAD, msg_id=None): def split_uuid(data): + "Splits a 128 bit int into two 64 bit words for binary encoding" return ((data >> 64) & MAX_INT64, data & MAX_INT64) def join_uuid(hi, lo): + "Joins two 64 bit words into a 128bit int from binary encoding" return (hi << 64) | lo From 3e52ec7a2c0575f2f9cf49dd9c5faa8a4951a496 Mon Sep 17 00:00:00 2001 From: Jesse Jaggars Date: Fri, 22 Nov 2019 07:49:37 -0600 Subject: [PATCH 9/9] adding tests for more edge cases Signed-off-by: Jesse Jaggars --- receptor/protocol.py | 2 ++ receptor/tests/test_framedbuffer.py | 33 +++++++++++++++++++++++++---- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/receptor/protocol.py b/receptor/protocol.py index d8fff3c3..3ab68520 100644 --- a/receptor/protocol.py +++ b/receptor/protocol.py @@ -58,6 +58,8 @@ def connection_lost(self, exc): self.receptor.remove_connection(self) def data_received(self, data): + # TODO: The put() call can raise an exception which should trigger a + # transport failure. self.loop.create_task(self.incoming_buffer.put(data)) async def wait_greeting(self): diff --git a/receptor/tests/test_framedbuffer.py b/receptor/tests/test_framedbuffer.py index 192b4671..7b38426d 100644 --- a/receptor/tests/test_framedbuffer.py +++ b/receptor/tests/test_framedbuffer.py @@ -24,8 +24,8 @@ async def test_framedbuffer(framed_buffer, msg_id): await framed_buffer.put(f1.serialize() + header_bytes) - payload = b"tina loves butts" - payload2 = b"yep yep yep" + payload = b"payload one is very boring" + payload2 = b"payload two is also very boring" f2 = Frame(Frame.Types.PAYLOAD, 1, len(payload) + len(payload2), msg_id, 2) await framed_buffer.put(f2.serialize() + payload) @@ -84,7 +84,7 @@ async def test_command(framed_buffer, msg_id): @pytest.mark.asyncio async def test_overfull(framed_buffer, msg_id): header = {"foo": "bar"} - payload = b'this is a test' + payload = b"this is a test" msg = FramedMessage(header=header, payload=payload) await framed_buffer.put(msg.serialize()) @@ -98,7 +98,7 @@ async def test_overfull(framed_buffer, msg_id): @pytest.mark.asyncio async def test_underfull(framed_buffer, msg_id): header = {"foo": "bar"} - payload = b'this is a test' + payload = b"this is a test" msg = FramedMessage(header=header, payload=payload) b = msg.serialize() @@ -109,3 +109,28 @@ async def test_underfull(framed_buffer, msg_id): assert m.header == header assert m.payload == payload + + +@pytest.mark.asyncio +async def test_malformed_frame(framed_buffer, msg_id): + with pytest.raises(ValueError): + await framed_buffer.put( + b"this is total garbage and should break things very nicely" + ) + + +@pytest.mark.skip( + reason=""" + This test illustrates that sending an incomplete stream corrupts the transport""" +) +@pytest.mark.asyncio +async def test_too_short(framed_buffer, msg_id): + f1 = Frame(Frame.Types.HEADER, 1, 100, 1, 1) + too_short_header = b"this is not long enough" + f2 = Frame(Frame.Types.PAYLOAD, 1, 100, 1, 2) + too_short_payload = b"this is also not long enough" + + await framed_buffer.put(f1.serialize() + too_short_header) + await framed_buffer.put(f2.serialize() + too_short_payload) + + await framed_buffer.get()