Skip to content

Commit

Permalink
Rework outgoing batch request handling
Browse files Browse the repository at this point in the history
Tests are very comprehensive, and test that unwanted
events are logged
  • Loading branch information
Neil Booth committed Feb 28, 2018
1 parent b4921da commit 1a2e549
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 77 deletions.
87 changes: 69 additions & 18 deletions aiorpcx/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,21 @@ class RPCRequestOut(RPCRequest, Future):

_next_id = 0

@classmethod
def next_id(cls):
result = cls._next_id
cls._next_id += 1
return result

def __init__(self, method, args, on_done, *, loop=None):
'''Initialize a request using the next unique request ID.
on_done - can be None
'''
RPCRequest.__init__(self, method, args, self._next_id)
RPCRequest.__init__(self, method, args, self.next_id())
Future.__init__(self, loop=loop)
if on_done:
self.add_done_callback(on_done)
RPCRequestOut._next_id += 1


class RPCResponse(object):
Expand Down Expand Up @@ -149,12 +154,19 @@ def __init__(self, items):
assert (all(isinstance(item, RPCRequest) for item in items)
or all(isinstance(item, RPCResponse) for item in items))

def requests(self):
'''An iterable of the batch items that are not notifications.
For a response batch simply returns everything.'''
for item in self.items:
if item.request_id is not None:
yield item

def request_ids(self):
'''Return a frozenset of all request IDs in the batch, ignoring
notifications.
'''
return frozenset(item.request_id for item in self
if item.request_id is not None)
return frozenset(item.request_id for item in self.requests())

def is_request_batch(self):
return isinstance(self.items[0], RPCRequest)
Expand All @@ -169,19 +181,45 @@ def __repr__(self):
return f'RPCBatch({self.items!r})'


class RPCBatchOut(RPCBatch):
class RPCBatchOut(RPCBatch, Future):
'''Represents an outgoing RPC batch request.
You can specify a callback for each individual request in the batch,
and/or a callback for the batch as a whole (via its future).
The results can be
def __init__(self):
# We don't call the base class constructor
The batch request can be await-ed.
'''
def __init__(self, on_done=None, *, loop=None):
'''Create an outgoig batch request. on_done can be None.'''
# We don't call RPCBatch.__init__()
self.items = []
Future.__init__(self, loop=loop)
if on_done:
self.add_done_callback(on_done)

def add_request(self, handler, method, args=None):
request = RPCRequestOut(method, args, handler)
self.items.append(request)
def _cancel_requests(self):
for request in self.requests():
request.cancel()

def cancel(self):
super().cancel()
self._cancel_requests()

def set_result(self, result):
super().set_result(result)
self._cancel_requests()

def set_exception(self, exception):
super().set_exception(exception)
self._cancel_requests()

def add_request(self, method, args=None, on_done=None):
self.items.append(RPCRequestOut(method, args, on_done))

def add_notification(self, method, args=None):
request = RPCRequest(method, args, None)
self.items.append(request)
self.items.append(RPCRequest(method, args, None))


class RPCHelperBase(object):
Expand Down Expand Up @@ -441,11 +479,6 @@ def _process_response_batch(self, batch):
self.logger.debug('response to unsent batch request: %s',
repr(batch))

def _remove(self, request):
'''If it completed any way other than via a response, it will still
be in our requests set.'''
self.requests.pop(request.request_id, None)

# External API - methods for use by a session layer
def message_received(self, message):
'''Analyse an incoming message and queue it for processing.
Expand All @@ -472,8 +505,11 @@ def send_request(self, request):
incoming response can be handled.
'''
if isinstance(request, RPCRequestOut):
def request_done(request):
self.requests.pop(request.request_id, None)

self.requests[request.request_id] = request
request.add_done_callback(self._remove)
request.add_done_callback(request_done)
self.helper.send_message(self.protocol.request_message(request))

def send_batch(self, batch):
Expand All @@ -487,7 +523,22 @@ def send_batch(self, batch):
raise RuntimeError('request batch cannot be empty')
request_ids = batch.request_ids()
if request_ids:
def request_done(request):
nonlocal remaining
remaining -= 1
if not remaining and not batch.done():
batch.set_result(False)

def batch_done(batch):
self.requests.pop(request_ids, None)

remaining = len(request_ids)
self.requests[request_ids] = batch
for request in batch.requests():
request.add_done_callback(request_done)
batch.add_done_callback(batch_done)
else:
batch.set_result(False)
self.helper.send_message(self.protocol.batch_message(batch))

def all_requests(self):
Expand Down
Loading

0 comments on commit 1a2e549

Please sign in to comment.