Skip to content

Commit

Permalink
added wire format, changes to implement it
Browse files Browse the repository at this point in the history
  • Loading branch information
punit-kulal committed Aug 16, 2023
1 parent f3da249 commit 0200df1
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 35 deletions.
27 changes: 19 additions & 8 deletions clients/python/rest/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,51 @@
import requests

from client import Client, Event
from protos.raystack.raccoon.v1beta1.raccoon_pb2 import SendEventRequest, SendEventResponse
from protos.raystack.raccoon.v1beta1.raccoon_pb2 import SendEventRequest, SendEventResponse, Event as EventPb
from rest.option import RestClientConfig
from serde.util import get_serde, CONTENT_TYPE_HEADER_KEY
from serde.serde import Serde
from serde.util import get_serde, CONTENT_TYPE_HEADER_KEY, get_wire_type
from serde.wire import Wire


class RestClient(Client):
session: requests.Session
serde: Serde
wire: Wire

def __init__(self, config: RestClientConfig):
self.session = requests.session()
self.url = config.url
self.serde = get_serde(config.content_type)
self.serde = get_serde(config.serialiser)
self.wire = get_wire_type(config.wire_type)
self.headers = self._set_content_type_header(config.headers)
self.max_retries = config.max_retries

def send(self, events: [Event]):
req = self._get_stub_request()
for e in events:
req.events.append(e)
response = self.session.post(url=self.url, data=self.serde.serialise(req), headers=self.headers)
events_pb = map(lambda x: self._convert_to_event_pb(x), events)
req.events.extend(events_pb)
response = self.session.post(url=self.url, data=self.wire.marshal(req), headers=self.headers)
deserialised_response = self._parse_response(response)
return req.req_guid, deserialised_response, response

def _convert_to_event_pb(self, e: Event):
proto_event = EventPb()
proto_event.event_bytes = self.serde.serialise(e.event)
proto_event.type = e.type
return proto_event

def _get_stub_request(self):
req = SendEventRequest()
req.req_guid = uuid.uuid4()
req.sent_time.FromNanoseconds(time.time_ns())
return req

def _set_content_type_header(self, headers):
headers[CONTENT_TYPE_HEADER_KEY] = self.serde.get_content_type()
headers[CONTENT_TYPE_HEADER_KEY] = self.wire.get_content_type()
return headers

def _parse_response(self, response) -> SendEventResponse:
if len(response.content) != 0:
event_response = self.serde.deserialise(str(response.content), SendEventResponse())
event_response = self.wire.unmarshal(str(response.content), SendEventResponse())
return event_response
19 changes: 13 additions & 6 deletions clients/python/rest/option.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from serde.enum import ContentType
from serde.enum import Serialiser, WireType


class RestClientConfig:
url: str
max_retries: int
content_type: ContentType
serialiser: Serialiser
headers: dict

def __init__(self):
self.headers = {}
self.content_type = ContentType.JSON
self.serialiser = Serialiser.JSON
self.max_retries = 0
self.wire_type = WireType.JSON


class RestClientConfigBuilder:
Expand All @@ -30,14 +31,20 @@ def with_retry_count(self, retry_count):
self.config.max_retries = retry_count
return self

def with_content_type(self, content_type):
if not isinstance(content_type, ContentType):
def with_serialiser(self, content_type):
if not isinstance(content_type, Serialiser):
raise ValueError("invalid serialiser/deserialiser type")
self.config.content_type = content_type
self.config.serialiser = content_type
return self

def with_headers(self, headers):
self.config.headers = headers

def with_wire_type(self, wire_type):
if not isinstance(wire_type, WireType):
raise ValueError("invalid serialiser/deserialiser type")
self.config.wire_type = wire_type
return self

def build(self):
return self.config
6 changes: 4 additions & 2 deletions clients/python/serde/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from serde.json_serde import JsonSerde


class ContentType(Enum):
class Serialiser(Enum):
JSON = JsonSerde
PROTOBUF = 2



class WireType(Enum):
JSON = JsonSerde
PROTOBUF = 2
23 changes: 23 additions & 0 deletions clients/python/serde/json_serde.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import json

from google.protobuf import json_format


from serde.serde import Serde
from serde.wire import Wire


class JsonSerde(Serde, Wire):
def serialise(self, event):
return bytes(json.dumps(event), "utf-8")

def get_content_type(self):
return "application/json"

def marshal(self, event):
return json_format.MessageToJson(event)

def unmarshal(self, data, template):
return json_format.Parse(data, template)


6 changes: 0 additions & 6 deletions clients/python/serde/serde.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
class Serde:
def serialise(self, event):
raise NotImplementedError()

def deserialise(self, data, template):
raise NotImplementedError()

def get_content_type(self):
raise NotImplementedError()
16 changes: 12 additions & 4 deletions clients/python/serde/util.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
from serde.enum import ContentType
from serde.enum import Serialiser, WireType
from serde.json_serde import JsonSerde
from serde.serde import Serde
from serde.wire import Wire


def get_serde(content_type):
if content_type == ContentType.JSON:
def get_serde(serialiser) -> Serde:
if serialiser == Serialiser.JSON:
return JsonSerde()
else:
return NotImplementedError()
raise NotImplementedError()

def get_wire_type(wire_type) -> Wire:
if wire_type == WireType.JSON:
return JsonSerde()
else:
raise NotImplementedError()


CONTENT_TYPE_HEADER_KEY = "Content-Type"
10 changes: 10 additions & 0 deletions clients/python/serde/wire.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class Wire:
def marshal(self, obj):
raise NotImplementedError("not implemented")

def unmarshal(self, obj, template):
raise NotImplementedError("not implemented")

def get_content_type(self):
raise NotImplementedError("not implemented")

19 changes: 10 additions & 9 deletions clients/python/tests/unit/rest/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,23 @@
from google.protobuf import json_format
from google.protobuf.timestamp_pb2 import Timestamp

from protos.raystack.raccoon.v1beta1.raccoon_pb2 import Event, SendEventRequest, SendEventResponse, Status
import client
from protos.raystack.raccoon.v1beta1.raccoon_pb2 import SendEventRequest, SendEventResponse, Status
from rest.client import RestClient
from rest.option import RestClientConfigBuilder
from serde.enum import ContentType
from serde.enum import Serialiser


class RestClientTest(unittest.TestCase):

sample_url = "http://localhost:8080/api/v1/"
max_retries = 3
content_type = ContentType.JSON
content_type = Serialiser.JSON

def test_client_creation(self):
client_config = RestClientConfigBuilder().\
with_url(self.sample_url).\
with_content_type(self.content_type).\
with_serialiser(self.content_type).\
with_retry_count(self.max_retries).build()
rest_client = RestClient(client_config)
self.assertEqual(rest_client.url, self.sample_url, "sample_urls do not match")
Expand All @@ -50,10 +51,10 @@ def test_client_send(self):
time_in_ns = time.time_ns()
req.sent_time.FromNanoseconds(time_in_ns)
expected_req.sent_time.FromNanoseconds(time_in_ns)
expected_req.events.append(self._get_stub_event_payload())
serialised_data = json_format.MessageToJson(expected_req)
with patch("rest.client.requests.session", return_value=session_mock):
rest_client = self._get_rest_client()
expected_req.events.append(rest_client._convert_to_event_pb(self._get_stub_event_payload()))
serialised_data = json_format.MessageToJson(expected_req)
rest_client._get_stub_request = mock.MagicMock()
rest_client._get_stub_request.return_value = req
rest_client._parse_response = mock.MagicMock()
Expand All @@ -73,7 +74,7 @@ def test_parse_response(self):
def _get_rest_client(self):
client_config = RestClientConfigBuilder().\
with_url(self.sample_url).\
with_content_type(self.content_type).\
with_serialiser(self.content_type).\
with_retry_count(self.max_retries).build()
return RestClient(client_config)

Expand All @@ -86,9 +87,9 @@ def _get_stub_response(self):
return response

def _get_stub_event_payload(self):
e = Event()
e = client.Event()
e.type = "random_topic"
e.event_bytes = bytes("random_bytes", "utf-8")
e.event = {"a":"abc"}
return e

def _get_static_uuid(self):
Expand Down

0 comments on commit 0200df1

Please sign in to comment.