Skip to content

Commit

Permalink
Support batch JSON-RPC in python client
Browse files Browse the repository at this point in the history
This is used to batch request all schemas to reduce the number of roundtrips
which boosts the performance of the initial connect (closes BlueBrain#502).
  • Loading branch information
tribal-tec committed Aug 23, 2018
1 parent a94fe34 commit 78cfa1e
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 118 deletions.
2 changes: 2 additions & 0 deletions plugins/RocketsPlugin/RocketsPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,8 @@ class RocketsPlugin::Impl
(std::function<brayns::Version()>)[] {
return brayns::Version();
});

_handleSchema(ENDPOINT_VERSION, version.getSchema());
}

void _handleVolumeParams()
Expand Down
80 changes: 23 additions & 57 deletions python/brayns/api_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,48 +33,53 @@
import python_jsonschema_objects as pjs
import inflection

from .utils import HTTP_METHOD_GET, HTTP_METHOD_PUT, HTTP_STATUS_OK
from .utils import HTTP_METHOD_GET, HTTP_METHOD_PUT, HTTP_STATUS_OK, SCHEMA_ENDPOINT

from . import utils


def build_api(target_object, url):
def build_api(target_object, registry, schemas):
"""
Add the API (types, properties, methods) to the given object from the information found at url.
Fetches the registry from the remote running Brayns instance and adds all found properties,
RPCs and types to the given target_object.
Add the API (types, properties, methods) to the given object from registry and schemas.
:param object target_object: The target object where to add all generated properties, RPCs and
types to
:param str url: The address of the remote running Brayns instance.
:param dict registry: The registry of all endpoints which are properties and methods
:param dict schemas: The schemas matching the endpoints from the registry
"""
registry = _obtain_registry(url)
for registry_entry in registry.keys():
if _try_add_method(target_object, url, registry_entry):
if registry_entry in schemas:
schema = schemas[registry_entry]
elif registry_entry.endswith(SCHEMA_ENDPOINT):
method_name = registry_entry[:-len(SCHEMA_ENDPOINT)]
schema = schemas[method_name]

if 'type' not in schema:
continue

if 'method' in schema['type']:
_add_method(target_object, schema)
continue
if registry_entry.endswith(SCHEMA_ENDPOINT):
continue

writeable = HTTP_METHOD_PUT in registry[registry_entry]
_try_add_property(target_object, url, registry_entry, writeable)
_try_add_property(target_object, registry_entry, schema, writeable)


def _try_add_property(target_object, url, registry_entry, writeable):
def _try_add_property(target_object, registry_entry, schema, writeable):
"""
Try to add registry_entry as a property.
This will add a property with the name registry_entry which initializes itself with current
value from Brayns on first access. Furthermore, it add potential enum values as string constants
and adds a commit() function if the property is writeable.
value from Brayns on first access. Furthermore, it adds potential enum values as string
constants and adds a commit() function if the property is writeable.
:param object target_object: The target object where to add the property to
:param str url: The address of the remote running Brayns instance.
:param str registry_entry: registry endpoint, e.g. foo[/schema]
:param dict schema: The schema of the object behind the endpoint.
:param bool writeable: if the property is writeable or not
"""
schema, ret_code = _obtain_schema(url, registry_entry)
if ret_code != HTTP_STATUS_OK:
return

classes = pjs.ObjectBuilder(schema).build_classes()
class_names = dir(classes)

Expand All @@ -101,28 +106,6 @@ def _try_add_property(target_object, url, registry_entry, writeable):
_add_property(target_object, member, registry_entry, schema['type'])


def _try_add_method(target_object, url, registry_entry):
"""
Try to add registry_entry as a method to target_object if it is an RPC.
:param object target_object: The target object where to add the method to
:param str url: The address of the remote running Brayns instance.
:param str registry_entry: registry endpoint, e.g. foo[/schema]
:return: True if registry_entry was an RPC, False otherwise
:rtype: bool
"""
if not registry_entry.endswith('/schema'):
return False

method = registry_entry[:-len('/schema')]
status = utils.http_request(HTTP_METHOD_GET, url, method)
schema, ret_code = _obtain_schema(url, method)
if status.code != HTTP_STATUS_OK and ret_code == HTTP_STATUS_OK:
_add_method(target_object, schema)
return True
return False


def _create_method_with_object_parameter(param, method, description):
"""
Create code for a method where each property of the param object is a key-value argument.
Expand Down Expand Up @@ -261,9 +244,6 @@ def _add_method(target_object, schema):
:param dict schema: schema containing name, description, params of the RPC
:raises Exception: if the param type of the RPC does not match oneOf, object or array
"""
if schema['type'] != 'method':
return

method = schema['title']
func_name = str(utils.underscorize(os.path.basename(method)))

Expand Down Expand Up @@ -347,17 +327,3 @@ def function(self):
setattr(type(target_object), snake_case_name,
property(fget=getter_builder(member, property_name),
doc='Access to the {0} property'.format(endpoint_name)))


def _obtain_registry(url):
"""Obtain the registry of exposed objects and RPCs from Brayns."""
status = utils.http_request(HTTP_METHOD_GET, url, 'registry')
if status.code != HTTP_STATUS_OK:
raise Exception('Failed to obtain registry from Brayns')
return status.contents


def _obtain_schema(url, object_name):
"""Returns the JSON schema for the given object."""
status = utils.http_request(HTTP_METHOD_GET, url, object_name + '/schema')
return status.contents, status.code
29 changes: 27 additions & 2 deletions python/brayns/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@
from PIL import Image
from .api_generator import build_api
from .rpcclient import RpcClient
from .utils import in_notebook, HTTP_METHOD_GET, HTTP_STATUS_OK
from .utils import in_notebook, HTTP_METHOD_GET, HTTP_STATUS_OK, SCHEMA_ENDPOINT
from .version import MINIMAL_VERSION
from . import utils


def _obtain_registry(url):
"""Obtain the registry of exposed objects and RPCs from Brayns."""
status = utils.http_request(HTTP_METHOD_GET, url, 'registry')
if status.code != HTTP_STATUS_OK:
raise Exception('Failed to obtain registry from Brayns')
return status.contents


class Client(RpcClient):
"""Client that connects to a remote running Brayns instance which provides the supported API."""

Expand All @@ -46,7 +54,7 @@ def __init__(self, url):
"""
super(Client, self).__init__(url)
self._check_version()
build_api(self, self.url())
self._build_api()

if in_notebook():
self._add_widgets() # pragma: no cover
Expand Down Expand Up @@ -140,6 +148,23 @@ def _check_version(self):
raise Exception('Brayns does not satisfy minimal required version; '
'needed {0}, got {1}'.format(MINIMAL_VERSION, version))

def _build_api(self):
"""Fetch the registry and all schemas from the remote running Brayns to build the API."""
registry = _obtain_registry(self.url())
endpoints = {x.replace(SCHEMA_ENDPOINT, '') for x in registry}

# batch request all schemas from all endpoints
params = list()
for endpoint in endpoints:
params.append({'endpoint': endpoint})
methods = ['schema']*len(params)
schemas = self.batch_request(methods, params)

schemas_dict = dict()
for param, schema in zip(params, schemas):
schemas_dict[param['endpoint']] = schema
build_api(self, registry, schemas_dict)

def _add_widgets(self): # pragma: no cover
"""Add functions to the Brayns object to provide widgets for appropriate properties."""
self._add_show_function()
Expand Down
114 changes: 93 additions & 21 deletions python/brayns/rpcclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,30 +101,53 @@ def request(self, method, params=None, response_timeout=5): # pragma: no cover
:rtype: dict
:raises Exception: if request was not answered within given response_timeout
"""
data = dict()
data['jsonrpc'] = JSON_RPC_VERSION
data['id'] = self._request_id
data['method'] = method
if params:
data['params'] = params
request, result = self._make_request(method, params)

result = {'done': False, 'result': None}
self._setup_websocket()
self._ws.send(json.dumps(request, cls=Encoder))

def callback(payload):
"""
The callback for the response.
if response_timeout:
timeout = response_timeout * 10

:param dict payload: the actual response data
"""
result['result'] = payload
result['done'] = True
while not result['done'] and timeout:
time.sleep(0.1)
timeout -= 1

self._ws_requests[self._request_id] = callback
self._request_id += 1
if 'done' not in result:
raise Exception('Request was not answered within {0} seconds'
.format(response_timeout))
else:
while not result['done']:
time.sleep(0.0001)

return result['result']

def batch_request(self, methods, params, response_timeout=5): # pragma: no cover
"""
Invoke a batch RPC on the remote running Brayns instance and wait for its reponse.
:param list methods: name of the methods to invoke
:param list params: params for the methods
:param int response_timeout: number of seconds to wait for the response
:return: list of responses and/or errors of RPC
:rtype: list
:raises Exception: if methods and/or params are not a list
:raises Exception: if request was not answered within given response_timeout
"""
if not isinstance(methods, list) and not isinstance(params, list):
raise Exception('Not a list of methods')

requests = list()
responses = list()
for method, param in zip(methods, params):
request, response = self._make_request(method, param)
requests.append(request)
responses.append(response)

self._setup_websocket()
self._ws.send(json.dumps(data, cls=Encoder))
self._ws.send(json.dumps(requests, cls=Encoder))

result = responses[0]
if response_timeout:
timeout = response_timeout * 10

Expand All @@ -139,7 +162,11 @@ def callback(payload):
while not result['done']:
time.sleep(0.0001)

return result['result']
results = list()
for response in responses:
results.append(response['result'])

return results

def notify(self, method, params=None): # pragma: no cover
"""
Expand All @@ -157,6 +184,38 @@ def notify(self, method, params=None): # pragma: no cover
self._setup_websocket()
self._ws.send(json.dumps(data, cls=Encoder))

def _make_request(self, method, params=None): # pragma: no cover
"""
Create a request object with given method and params and setup the response callback.
:param str method: name of the method to invoke
:param str params: params for the method
:return: request and response dict
:rtype: dict
"""
request = dict()
request['jsonrpc'] = JSON_RPC_VERSION
request['id'] = self._request_id
request['method'] = method
if params:
request['params'] = params

response = {'done': False, 'result': None}

def callback(payload):
"""
The callback for the response.
:param dict payload: the actual response data
"""
response['result'] = payload
response['done'] = True

self._ws_requests[self._request_id] = callback
self._request_id += 1

return request, response

def _setup_websocket(self): # pragma: no cover
"""
Setups websocket to handle binary (image) and text (all properties) messages.
Expand Down Expand Up @@ -227,13 +286,27 @@ def _handle_response(self, data): # pragma: no cover
"""
Handle a potential JSON-RPC response message.
:param dict data: data of the reply
:param dict data: data of the reply, either a dict or a list (batch request)
:return: True if a request was handled, False otherwise
:rtype: bool
"""
if 'id' not in data:
if 'id' not in data and not isinstance(data, list):
return False

if isinstance(data, list):
for response in data:
self._finish_request(response)
else:
self._finish_request(data)

return True

def _finish_request(self, data): # pragma: no cover
"""
Extract payload from data which can be result or error and invoke the response callback.
:param dict data: data of the reply
"""
payload = None
if 'result' in data:
payload = None if data['result'] == '' or data['result'] == 'OK' else data['result']
Expand All @@ -245,4 +318,3 @@ def _handle_response(self, data): # pragma: no cover
self._ws_requests.pop(data['id'])
else:
print('Invalid reply for request ' + str(data['id']))
return True
2 changes: 2 additions & 0 deletions python/brayns/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

WS_PATH = '/ws'

SCHEMA_ENDPOINT = '/schema'


class Status(object):
"""Holds the execution status of an HTTP request."""
Expand Down
2 changes: 1 addition & 1 deletion python/setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[metadata]
version = 0.7.0.dev0
version = 0.7.0.dev1
name = brayns
summary = Brayns python API
home-page = https://github.com/BlueBrain/Brayns
Expand Down
Loading

0 comments on commit 78cfa1e

Please sign in to comment.