Skip to content

Commit

Permalink
Change to UUIDv7 as conversation_id.
Browse files Browse the repository at this point in the history
  • Loading branch information
Benedikt Moneke authored and Benedikt Moneke committed Jul 1, 2023
1 parent 0661f3d commit dff2fb5
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 51 deletions.
21 changes: 15 additions & 6 deletions pyleco/core/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#

from json import JSONDecodeError
from typing import Any, List, Optional, Self
from typing import Any, List, Optional, Self, Tuple


from . import VERSION_B
Expand Down Expand Up @@ -53,12 +53,15 @@ class Message:
def __init__(self, receiver: bytes | str, sender: bytes | str = b"",
data: Optional[bytes | str | Any] = None,
header: Optional[bytes] = None,
conversation_id: bytes = b"",
conversation_id: Optional[bytes] = None,
message_id: Optional[bytes] = None,
message_type: Optional[bytes] = None,
**kwargs) -> None:
self._setup_caches()
self.receiver = receiver if isinstance(receiver, bytes) else receiver.encode()
self.sender = sender if isinstance(sender, bytes) else sender.encode()
self.header = (create_header_frame(conversation_id=conversation_id, **kwargs)
self.header = (create_header_frame(conversation_id=conversation_id, message_id=message_id,
message_type=message_type)
if header is None else header)
if isinstance(data, (bytes)):
self.payload = [data]
Expand All @@ -69,9 +72,9 @@ def __init__(self, receiver: bytes | str, sender: bytes | str = b"",

def _setup_caches(self) -> None:
self._payload: List[bytes] = []
self._header_elements = None
self._sender_elements = None
self._receiver_elements = None
self._header_elements: Optional[Tuple[bytes, bytes, bytes]] = None
self._sender_elements: Optional[Tuple[bytes, bytes]] = None
self._receiver_elements: Optional[Tuple[bytes, bytes]] = None
self._data = None

@classmethod
Expand Down Expand Up @@ -148,6 +151,12 @@ def message_id(self) -> bytes:
self._header_elements = interpret_header(self.header)
return self._header_elements[1]

@property
def message_type(self) -> bytes:
if self._header_elements is None:
self._header_elements = interpret_header(self.header)
return self._header_elements[2]

@property
def receiver_node(self) -> bytes:
if self._receiver_elements is None:
Expand Down
7 changes: 3 additions & 4 deletions pyleco/core/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ def set_properties(self, properties: Dict[str, Any]) -> None: ...

def call_method(self, method: str, _args: Optional[list | tuple] = None, **kwargs) -> Any: ...


class PollingActor(Actor, Protocol):
"""An Actor which allows regular polling."""

polling_interval: float

def start_polling(self, polling_interval: Optional[float]) -> None: ...

def set_polling_interval(self, polling_interval: float) -> None: ...
Expand Down Expand Up @@ -121,7 +120,7 @@ def sign_out(self) -> None: ...

def send(self,
receiver: str | bytes,
conversation_id: bytes = b"",
conversation_id: Optional[bytes] = None,
data: Optional[Any] = None,
**kwargs) -> None:
"""Send a message based on kwargs."""
Expand All @@ -131,7 +130,7 @@ def send(self,

def send_message(self, message: Message) -> None: ...

def ask(self, receiver: bytes | str, conversation_id: bytes = b"",
def ask(self, receiver: bytes | str, conversation_id: Optional[bytes] = None,
data: Optional[Any] = None,
**kwargs) -> Message:
"""Send a message based on kwargs and retrieve the response."""
Expand Down
45 changes: 26 additions & 19 deletions pyleco/core/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,34 @@
#

import json
import random
from time import time
import struct
from typing import Tuple
from typing import Optional, Tuple

from uuid_extensions import uuid7 # as long as uuid does not yet support UUIDv7
from pydantic import BaseModel


def create_header_frame(conversation_id: bytes = b"", message_id: bytes = b"") -> bytes:
def create_header_frame(conversation_id: Optional[bytes] = None,
message_id: Optional[bytes] = None,
message_type: Optional[bytes] = None) -> bytes:
"""Create the header frame.
:param bytes conversation_id: ID of the conversation.
:param bytes message_id: Message ID of this message, must not contain ";".
:param bytes message_id: Message ID of this message, must not contain.
:return: header frame.
"""
return b";".join((conversation_id, message_id))
if conversation_id is None:
conversation_id = generate_conversation_id()
elif len(conversation_id) != 16:
raise ValueError("Length of 'conversation_id' is not 16 bytes.")
if message_id is None:
message_id = b"\x00" * 3
elif len(message_id) != 3:
raise ValueError("Length of 'message_id' is not 3 bytes.")
if message_type is None:
message_type = b"\x00"
elif len(message_type) != 1:
raise ValueError("Length of 'message_type' is not 1 bytes.")
return b"".join((conversation_id, message_id, message_type))


def split_name(name: bytes, node: bytes = b"") -> Tuple[bytes, bytes]:
Expand All @@ -55,17 +67,15 @@ def split_name_str(name: str, node: str = "") -> Tuple[str, str]:
return (s.pop() if s else node), n


def interpret_header(header: bytes) -> Tuple[bytes, bytes]:
def interpret_header(header: bytes) -> Tuple[bytes, bytes, bytes]:
"""Interpret the header frame.
:return: conversation_id, message_id
:return: conversation_id, message_id, message_type
"""
try:
conversation_id, message_id = header.rsplit(b";", maxsplit=1)
except (IndexError, ValueError):
conversation_id = b""
message_id = b""
return conversation_id, message_id
conversation_id = header[:16]
message_id = header[16:19]
message_type = header[19:20]
return conversation_id, message_id, message_type


def serialize_data(data: object) -> bytes:
Expand All @@ -86,7 +96,4 @@ def deserialize_data(content: bytes) -> object:

def generate_conversation_id() -> bytes:
"""Generate a conversation_id."""
# struct.pack uses 8 bytes and takes 0.1 seconds for 1 Million repetitions.
# str().encode() uses 14 bytes and takes 0.5 seconds for 1 Million repetitions.
# !d is a double (8 bytes) in network byte order (big-endian)
return struct.pack("!d", time()) + random.randbytes(2)
return uuid7(as_type="bytes") # type: ignore
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies = [
"pyzmq",
"openrpc",
"jsonrpc2-pyclient",
"uuid7",
]

[project.optional-dependencies]
Expand Down
47 changes: 29 additions & 18 deletions tests/core/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
from pyleco.core.message import Message


cid = b"conversation_id;"


class Test_Message_from_frames:
@pytest.fixture
def message(self) -> Message:
return Message.from_frames(b"\xffo", b"rec", b"send", b"x;y",
return Message.from_frames(b"\xffo", b"rec", b"send", b"conversation_id;mid0",
b'[["GET", [1, 2]], ["GET", 3]]')

def test_payload(self, message: Message):
Expand All @@ -48,7 +51,13 @@ def test_sender(self, message: Message):
assert message.sender == b"send"

def test_header(self, message: Message):
assert message.header == b"x;y"
assert message.header == b"conversation_id;mid0"

def test_conversation_id(self, message: Message):
assert message.conversation_id == b"conversation_id;"

def test_message_id(self, message: Message):
assert message.message_id == b"mid"

def test_multiple_payload_frames(self):
message = Message.from_frames(
Expand All @@ -61,8 +70,8 @@ def test_no_payload(self):
assert message.payload == []

def test_get_frames_list(self, message: Message):
assert message.to_frames() == [b"\xffo", b"rec", b"send", b"x;y",
b'[["GET", [1, 2]], ["GET", 3]]']
assert message.to_frames() == [b"\xffo", b"rec", b"send", b"conversation_id;mid0",
b'[["GET", [1, 2]], ["GET", 3]]']


class Test_Message_create_message:
Expand All @@ -72,8 +81,8 @@ def message(self) -> Message:
receiver=b"rec",
sender=b"send",
data=[["GET", [1, 2]], ["GET", 3]],
conversation_id=b"x",
message_id=b"y",
conversation_id=b"conversation_id;",
message_id=b"mid",
)

def test_payload(self, message: Message):
Expand All @@ -89,25 +98,26 @@ def test_sender(self, message: Message):
assert message.sender == b"send"

def test_header(self, message: Message):
assert message.header == b"x;y"
assert message.header == b"conversation_id;mid\x00"

def test_get_frames_list(self, message: Message):
def test_to_frames(self, message: Message):
assert message.to_frames() == [
VERSION_B,
b"rec",
b"send",
b"x;y",
b"conversation_id;mid\x00",
b'[["GET", [1, 2]], ["GET", 3]]',
]

def test_get_frames_list_without_payload(self, message: Message):
def test_to_frames_without_payload(self, message: Message):
message.payload = []
assert message.to_frames() == [VERSION_B, b"rec", b"send", b"x;y"]
assert message.to_frames() == [VERSION_B, b"rec", b"send", b"conversation_id;mid\x00"]

def test_message_without_data_does_not_have_payload_frame(self):
message = Message(b"rec", "send")
message = Message(b"rec", "send", conversation_id=b"conversation_id;")
assert message.payload == []
assert message.to_frames() == [VERSION_B, b"rec", b"send", b";"]
assert message.to_frames() == [VERSION_B, b"rec", b"send",
b"conversation_id;\x00\x00\x00\x00"]

def test_message_binary_data(self):
message = Message(b"rec", data=b"binary data")
Expand All @@ -117,6 +127,7 @@ def test_message_data_str_to_binary_data(self):
message = Message(b"rec", data="some string")
assert message.payload[0] == b"some string"


class Test_Message_with_strings:
@pytest.fixture
def str_message(self) -> Message:
Expand Down Expand Up @@ -180,13 +191,13 @@ def test_get_frames_list_raises_error_on_empty_sender():

class TestComparison:
def test_dictionary_order_is_irrelevant(self):
assert Message(b"r", data={"a": 1, "b": 2}) == Message(b"r", data={"b": 2, "a": 1})
m1 = Message(b"r", conversation_id=cid, data={"a": 1, "b": 2})
m2 = Message(b"r", conversation_id=cid, data={"b": 2, "a": 1})
assert m1 == m2

def test_distinguish_empty_payload_frame(self):
m1 = Message("r")
m1 = Message("r", conversation_id=b"conversation_id;")
m1.payload = [b""]
m2 = Message("r")
m2 = Message("r", conversation_id=b"conversation_id;")
assert m2.payload == [] # verify that it does not have a payload
assert m1 != m2


44 changes: 40 additions & 4 deletions tests/core/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,29 @@
from pyleco.core import serialization


@pytest.mark.parametrize("kwargs, header", (
({}, b";"),
class Test_create_header_frame:
@pytest.mark.parametrize("kwargs, header", (
({"conversation_id": b"\x00" * 16, }, bytes([0] * 20)),
))
def test_create_header_frame(self, kwargs, header):
assert serialization.create_header_frame(**kwargs) == header

@pytest.mark.parametrize("cid_l", (0, 1, 2, 4, 6, 7, 10, 15, 17, 20))
def test_wrong_cid_length_raises_errors(self, cid_l):
with pytest.raises(ValueError, match="'conversation_id'"):
serialization.create_header_frame(conversation_id=bytes([0] * cid_l))

@pytest.mark.parametrize("mid_l", (0, 1, 2, 4, 6, 7))
def test_wrong_mid_length_raises_errors(self, mid_l):
with pytest.raises(ValueError, match="'message_id'"):
serialization.create_header_frame(message_id=bytes([0] * mid_l))


@pytest.mark.parametrize("header, conversation_id, message_id, message_type", (
(bytes(range(20)), bytes(range(16)), b"\x10\x11\x12", b"\x13"),
))
def test_create_header_frame(kwargs, header):
assert serialization.create_header_frame(**kwargs) == header
def test_interpret_header(header, conversation_id, message_id, message_type):
assert serialization.interpret_header(header) == (conversation_id, message_id, message_type)


@pytest.mark.parametrize("full_name, node, name", (
Expand All @@ -53,3 +71,21 @@ def test_dict(self):
raw = {"some": "item", "key": "value", 5: [7, 3.1]}
expected = b'{"some": "item", "key": "value", "5": [7, 3.1]}'
assert serialization.serialize_data(raw) == expected


class Test_generate_conversation_id_is_UUIDv7:
@pytest.fixture
def conversation_id(self):
return serialization.generate_conversation_id()

def test_type_is_bytes(self, conversation_id):
assert isinstance(conversation_id, bytes)

def test_length(self, conversation_id):
assert len(conversation_id) == 16

def test_UUID_version(self, conversation_id):
assert conversation_id[6] >> 4 == 7

def test_variant(self, conversation_id):
assert conversation_id[8] >> 6 == 0b10

0 comments on commit dff2fb5

Please sign in to comment.