jsonrpc protocol implementation for aiohttp.web.
import asyncio
from aiohttp import web
from aiohttp_jrpc import Service
SCH = {
"type": "object",
"properties": {
"data": {"type": "string"},
},
}
class MyJRPC(Service):
@Service.valid(SCH)
def hello(self, ctx, data):
if data["data"] == "hello":
return {"status": "hi"}
return {"status": data}
def error(self, ctx, data):
raise Exception("Error which will catch middleware")
def no_valid(self, ctx, data):
""" Method without validation incommig data """
return {"status": "ok"}
@asyncio.coroutine
def init(loop):
app = web.Application(loop=loop, middlewares=[jrpc_errorhandler_middleware])
app.router.add_route('POST', "/api", MyJRPC)
srv = yield from loop.create_server(app.make_handler(),
"127.0.0.1", 8080)
print("Server started at http://127.0.0.1:8080")
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
aiohttp_jrpc
BSD license.