Skip to content

Commit

Permalink
feat: add handling of alternative message encodings
Browse files Browse the repository at this point in the history
  • Loading branch information
nokome committed Dec 11, 2018
1 parent f49bd82 commit d7e4339
Show file tree
Hide file tree
Showing 12 changed files with 357 additions and 48 deletions.
8 changes: 7 additions & 1 deletion src/Processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import json
from typing import Union, Optional
from typing import Any, Dict, Union, List, Optional

from .types.Thing import Thing
from .types.utils import cast, hydrate, dehydrate
Expand All @@ -17,6 +17,12 @@ class Processor:
`Thing`. They merely serve as an example of how to implement these
methods in derived classes.
"""

async def hello(self, version: str) -> object:
return {}

async def goodbye(self) -> None:
pass

async def import_(self, thing: Union[str, dict, Thing],
format: str = 'application/json', type: Optional[str] = None) -> Thing:
Expand Down
9 changes: 3 additions & 6 deletions src/comms/AsyncioConnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ async def listen():
while True:
line = await self.reader.readline()
if line:
message = line.decode('utf8')
await callback(message)
await callback(line)
else:
break
except asyncio.CancelledError:
Expand All @@ -52,13 +51,11 @@ async def finish(self) -> None:
assert self.task.cancelled()
self.task = None

async def write(self, message: str) -> None:
async def write(self, message: bytes) -> None:
"""
Write a message to the connection.
"""
line = message + '\n'
bites = line.encode('utf8')
self.writer.write(bites)
self.writer.write(message + b'\n')
await self.writer.drain()

async def close(self) -> None:
Expand Down
41 changes: 25 additions & 16 deletions src/comms/Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import json

from .jsonRpc import Request, Response
from .JsonEncoder import JSON_ENCODING, JsonEncoder
from .JsonGzipEncoder import JSON_GZIP_ENCODING, JsonGzipEncoder
from .Logger import Logger

class Client(Logger):

encoding: Dict = JSON_ENCODING

futures: Dict[int, Future] = {}

async def start(self) -> None:
Expand All @@ -31,9 +35,11 @@ async def stop(self) -> None:
await self.close()
self.log(stopped=True)

async def hello(self, version: str = "1.0", name: Optional[str] = None,
messages: List[Dict[str, Any]] = [{"contentType": "application/json"}]) -> None:
await self.call("hello", version=version, name=name, messages=messages)
async def hello(self, version: str = "1.0", encodings: List[Dict] = [JSON_ENCODING, JSON_GZIP_ENCODING]) -> None:
result = await self.call("hello", version=version, encodings=encodings)
encoding = result.get('encoding')
if encoding:
self.encoding = encoding

async def goodbye(self) -> None:
await self.call("goodbye")
Expand Down Expand Up @@ -100,20 +106,23 @@ async def close(self) -> None:
"""
raise NotImplementedError()

def encode(self, request: Request) -> str:
return json.dumps(request.__dict__)

def decode(self, message: str) -> Response:
# Convert the message into a response
# Currently this only deals with JSON messages but in the furture
# should handle other message formats
response = Response()
response.__dict__.update(json.loads(message))
return response

async def read(self, message: str) -> None:
def decode(self, message: bytes) -> Response:
if self.encoding == JSON_ENCODING:
return JsonEncoder.decode(message, Response)
elif self.encoding == JSON_GZIP_ENCODING:
return JsonGzipEncoder.decode(message, Response)
raise RuntimeError(f'Unhandled encoding: {self.encoding}')

def encode(self, request: Request) -> bytes:
if self.encoding == JSON_ENCODING:
return JsonEncoder.encode(request)
elif self.encoding == JSON_GZIP_ENCODING:
return JsonGzipEncoder.encode(request)
raise RuntimeError(f'Unhandled encoding: {self.encoding}')

async def read(self, message: bytes) -> None:
# Recieve a response message
self.receive(self.decode(message))

async def write(self, message: str) -> None:
async def write(self, message: bytes) -> None:
raise NotImplementedError()
2 changes: 1 addition & 1 deletion src/comms/CloneClientServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class CloneMixin:
def __init__(self, connection):
self.connection = connection

async def write(self, message: str) -> None:
async def write(self, message: bytes) -> None:
assert self.connection
await self.connection.write(message)

Expand Down
30 changes: 30 additions & 0 deletions src/comms/JsonEncoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Type

import json

JSON_ENCODING = {
'contentType': 'application/json'
}

class JsonEncoder:

@staticmethod
def decode(message: bytes, cls: Type):
request = cls()
dic = json.loads(message)
request.__dict__.update(dic)
return request

@staticmethod
def encode(obj) -> bytes:
return json.dumps(obj, cls=JSONEncoderExtension).encode('utf8')


class JSONEncoderExtension(json.JSONEncoder):
"""
Extension of the builtin JSON encoder to handle objects
"""

def default(self, obj):
return obj.__dict__

19 changes: 19 additions & 0 deletions src/comms/JsonGzipEncoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Type
import gzip

from .JsonEncoder import JsonEncoder

JSON_GZIP_ENCODING = {
'contentType': 'application/json',
'contentEncoding': 'gzip'
}

class JsonGzipEncoder(JsonEncoder):

@staticmethod
def decode(message: bytes, cls: Type):
return JsonEncoder.decode(gzip.decompress(message), cls)

@staticmethod
def encode(obj) -> bytes:
return gzip.compress(JsonEncoder.encode(obj))
130 changes: 114 additions & 16 deletions src/comms/Server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
Module that defines the `Server` class
"""

from typing import Any, Dict
import asyncio
import json
import signal
import traceback

from ..Processor import Processor
from .jsonRpc import Request, Response
from .jsonRpc import Request, Response, Error
from .JsonEncoder import JSON_ENCODING, JsonEncoder
from .JsonGzipEncoder import JSON_GZIP_ENCODING, JsonGzipEncoder
from .Logger import Logger

class Server(Logger):
Expand All @@ -16,6 +20,9 @@ class Server(Logger):
"""

processor: Processor
"""
The procecessor that this server dispatches requests to.
"""

def __init__(self, processor: Processor):
self.processor = processor
Expand Down Expand Up @@ -65,27 +72,118 @@ def stop():
loop.run_until_complete(run())
loop.close()

async def receive(self, message: str):
async def receive(self, message: bytes, format: Dict = JSON_ENCODING):
assert self.processor

response = Response()

try:
request = self.decode(message)
request = self.decode(message, format)
except Exception as exc:
response.error = Error.parse_error(str(exc))
return response

response.id = request.id

result = None
if self.processor:
if request.method == 'execute':
result = await self.processor.execute(request.params['thing'])
result = result.__dict__
if not request.method:
response.error = Error.invalid_request('missing "method" property')

try:
result: Any = None
if request.method == 'hello':
result = await self.handle_hello(request)
elif request.method == 'goodbye':
result = await self.handle_goodbye(request)
elif request.method == 'import':
result = await self.processor.import_(
request.param(0, 'thing'),
request.param(1, 'format', False)
)
elif request.method == 'export':
result = await self.processor.export(
request.param(0, 'thing'),
request.param(1, 'format', False)
)
elif request.method == 'compile':
result = await self.processor.compile(
request.param(0, 'thing'),
request.param(1, 'format', False)
)
elif request.method == 'build':
result = await self.processor.build(
request.param(0, 'thing'),
request.param(1, 'format', False)
)
elif request.method == 'execute':
result = await self.processor.execute(
request.param(0, 'thing'),
request.param(1, 'format', False)
)
else:
raise Error.method_not_found(request.method, { 'method': request.method })

response = Response(id=request.id, result=result)
self.log(request=request, response=response)
except Exception as exc:
response.error = str(exc)
return self.encode(response)
#raise exc
if isinstance(exc, Error):
error = exc
else:
error = Error.application_error(str(exc), { 'trace': traceback.format_exc() })
response.error = error
return self.encode(response, format)

def supports(self, encoding: Dict) -> bool:
"""
Does this server support the given encoding?
This method may be overriden by a derived class to expand or restrict
the supported encodings
"""
return encoding == JSON_ENCODING or encoding == JSON_GZIP_ENCODING

def encode(self, response: Response) -> str:
return json.dumps(response.__dict__)
def decode(self, message: bytes, encoding: Dict = JSON_ENCODING) -> Request:
"""
Decode a request message.
Currently, this assumes that the message is a JSON string.
In the future, alternative message encodings will be available.
"""

if encoding == JSON_ENCODING:
return JsonEncoder.decode(message, Request)
elif encoding == JSON_GZIP_ENCODING:
return JsonGzipEncoder.decode(message, Request)
raise RuntimeError(f'Unhandled encoding: {encoding}')

def encode(self, response: Response, encoding: Dict = JSON_ENCODING) -> bytes:
"""
Encode a response message
Currently, this simply encodes the response as a JSON string.
"""

def decode(self, message: str) -> Request:
request = Request()
request.__dict__.update(json.loads(message))
return request
if encoding == JSON_ENCODING:
return JsonEncoder.encode(response)
elif encoding == JSON_GZIP_ENCODING:
return JsonGzipEncoder.encode(response)
raise RuntimeError(f'Unhandled encoding: {encoding}')

async def handle_hello(self, request):
# Intercept the call to hello to get the declared list of encodings
version = request.param(0, 'version')
result = await self.processor.hello(version)

encoding_to_use = JSON_ENCODING
encodings = request.param(1, 'encodings', False)
if encodings:
for encoding in encodings:
if self.supports(encoding):
encoding_to_use = encoding
break
result['encoding'] = encoding_to_use

return result

async def handle_goodbye(self, request):
return await self.processor.goodbye()
2 changes: 1 addition & 1 deletion src/comms/StdioClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def open(self) -> None:
# Wait on the subprocess
asyncio.ensure_future(self.subprocess.wait())

async def write(self, message: str) -> None:
async def write(self, message: bytes) -> None:
assert self.connection
await self.connection.write(message)

Expand Down
2 changes: 1 addition & 1 deletion src/comms/UnixSocketClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def open(self) -> None:
self.connection = AsyncioConnection(reader, writer)
self.connection.listen(self.read)

async def write(self, message: str) -> None:
async def write(self, message: bytes) -> None:
assert self.connection
await self.connection.write(message)

Expand Down
Loading

0 comments on commit d7e4339

Please sign in to comment.