Skip to content

Commit

Permalink
feat(Python): add initial Client and Server implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
nokome committed Dec 3, 2018
1 parent 8ab0915 commit 312e1bc
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 1 deletion.
111 changes: 111 additions & 0 deletions src/comms/Client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from typing import cast, Any, Dict, List, Optional, Union
from asyncio import Future
import json

from .jsonRpc import Request, Response

class Client:

futures: Dict[int, Future] = {}

async def start(self) -> None:
"""
Start this client.
Opens the connection to the server and makes
a `hello` handshake request.
"""
await self.open()
await self.hello()

async def stop(self) -> None:
"""
Stop this client.
Make a `goodbye` request and closes the connection to
the server.
"""
await self.goodbye()
await self.close()

async def hello(self, version: str = "1.0", name: Optional[str] = None,
messages: List[Dict[str, Any]]=[{"contentType": "application/json"}]) -> None:
await self.call("hello", version=version, name=name, messages=messages)

async def goodbye(self) -> None:
await self.call("goodbye")

async def execute(self, thing):
return await self.call("execute", thing=thing)

async def call(self, method: str, **kwargs):
request = Request(method=method)
future = await self.send(request)
return future

async def send(self, request: Request) -> Future:
"""
Send a request to the server.
This method must be overriden by derived client classes to
send the request over the transport protocol used by that class.
:param: request The JSON-RPC request to send
"""
await self.write(self.encode(request))
future: Future = Future()
self.futures[request.id] = future
return future


def recieve(self, response: Response) -> None:
"""
Receive a request from the server.
Uses the `id` of the response to match it to the corresponding
request and resolve it's promise.
:param: response The JSON-RPC response as a string or Response instance
"""
future = self.futures.get(response.id)
if not future:
raise RuntimeError(f'No request found for response with id: {response.id}')
future.set_result(response.result)
del self.futures[response.id]

async def open(self) -> None:
"""
Open the connection to the server.
Should be implemented in derived classes to
open connections to a server before the `hello`
request is made.
"""
raise NotImplementedError()

async def close(self) -> None:
"""
Close the connection to the server.
Should be implemented in derived classes to
close connections to a server after the `goodbye`
request is made.
"""
raise NotImplementedError()

def encode(self, request: Request) -> str:
return json.dumps(request.__dict__)

def decode(self, message: str) -> Response:
# Convert the message into a response
# Currently this only deals with JSON messages but in the furture
# should handle other message formats
return Response(**json.loads(message))

async def read(self, message: str) -> None:
# Recieve the response
print(self.__class__.__name__, 'read', message)
self.recieve(self.decode(message))

async def write(self, message: str) -> None:
raise NotImplementedError()
55 changes: 55 additions & 0 deletions src/comms/Server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
Module that defines the `Server` class
"""

import json

from ..Processor import Processor
from .jsonRpc import Request, Response

class Server:
"""
Base class for all servers.
"""

def __init__(self, processor: Processor = Processor(), logging=0):
self.processor = processor
self.logging = logging

async def start(self) -> None:
"""
Start this server.
Starts listening for requests.
"""
await self.open()

async def stop(self) -> None:
"""
Stop this server.
Stops listening for requests.
"""
await self.close()

async def recieve(self, request: Request):
response = Response(id=request.id, result="foo")
await self.write(self.encode(response))

async def open(self) -> None:
raise NotImplementedError()

async def close(self) -> None:
raise NotImplementedError()

def encode(self, response: Response) -> str:
return json.dumps(response.__dict__)

def decode(self, message: str) -> Request:
return Request(**json.loads(message))

async def read(self, message: str) -> None:
await self.recieve(self.decode(message))

async def write(self, message: str) -> None:
raise NotImplementedError()
27 changes: 27 additions & 0 deletions src/comms/StdioClient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import asyncio
import sys

from .Client import Client
from .StdioMixin import StdioMixin

class StdioClient(StdioMixin, Client):

def __init__(self, input=sys.stdin, output=sys.stdout):
StdioMixin.__init__(self, input, output)
Client.__init__(self)

self.subprocess = None

async def spawn(self, cmd):
self.subprocess = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE
)
self.reader = self.subprocess.stdout
self.writer = self.subprocess.stdin

await self.start()

async def kill(self):
self.subprocess.kill()
37 changes: 37 additions & 0 deletions src/comms/StdioMixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio

class StdioMixin:

def __init__(self, input, output):
self.input = input
self.output = output
self.reader = None
self.writer = None

async def open(self) -> None:
loop = asyncio.get_event_loop()

if self.reader is None:
self.reader = asyncio.StreamReader()
reader_protocol = asyncio.StreamReaderProtocol(self.reader)
await loop.connect_read_pipe(lambda: reader_protocol, self.input)

if self.writer is None:
writer_transport, writer_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, self.output)
self.writer = asyncio.streams.StreamWriter(writer_transport, writer_protocol, self.reader, loop)

async def do():
while True:
line = await self.reader.readline()
if line:
message = line.decode('utf8')
await self.read(message)
else:
break
await do()

async def write(self, message: str) -> None:
line = message + '\n'
bites = line.encode('utf8')
self.writer.write(bites)
await self.writer.drain()
10 changes: 10 additions & 0 deletions src/comms/StdioServer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import sys

from .Server import Server
from .StdioMixin import StdioMixin

class StdioServer(StdioMixin, Server):

def __init__(self, processor=None, logging=0, input=sys.stdin, output=sys.stdout):
StdioMixin.__init__(self, input, output)
Server.__init__(self, processor, logging)
22 changes: 22 additions & 0 deletions src/comms/jsonRpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Any, ClassVar, Optional

class Request:

id:int
count:ClassVar[int] = 0

def __init__(self, method, id: Optional[int] = None):
self.method = method
if id is None:
Request.count += 1
id = Request.count
self.id = id

class Response:

def __init__(self, id: int, result: Any = None):
self.id = id
self.result = result

class Error:
pass
16 changes: 16 additions & 0 deletions tests/comms/ClientTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pytest

from stencilaschema.comms.Client import Client
from stencilaschema.comms.jsonRpc import Request, Response

@pytest.mark.asyncio
async def test_recieve():
client = Client()

async def write(message):
pass
client.write = write

future = await client.send(Request(method="compile", id=1))
client.recieve(Response(id=1, result={"type": "Thing"}))
assert await future == {"type": "Thing"}
4 changes: 3 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ envlist = py36,py37

[testenv]
deps = pytest
pytest-asyncio
commands = pytest tests

[testenv:cover]
deps = pytest
coverage
pytest-asyncio
pytest-cov
coverage
commands = pytest --cov {envsitepackagesdir}/stencilaschema --cov-report term --cov-report xml tests

[pytest]
Expand Down

0 comments on commit 312e1bc

Please sign in to comment.