Skip to content

Commit

Permalink
Fixed a bug in low-level data input handling
Browse files Browse the repository at this point in the history
In ZEO.asyncio.client.Protocol.data_received, the logic was broken if
there we unhandled errors, for example in calling client storage methods.
  • Loading branch information
Jim Fulton committed May 30, 2016
1 parent 9e0f98f commit 4833183
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 21 deletions.
55 changes: 34 additions & 21 deletions src/ZEO/asyncio/client.py
Expand Up @@ -235,30 +235,43 @@ def data_received(self, data):

# Low-level input handler collects data into sized messages.

# Note that the logic below assume that when new data pushes
# us over what we want, we process it in one call until we
# need more, because we assume that excess data is all in the
# last item of self.input. This is why the exception handling
# in the while loop is critical. Without it, an exception
# might cause us to exit before processing all of the data we
# should, when then causes the logic to be broken in
# subsequent calls.

self.got += len(data)
self.input.append(data)
while self.got >= self.want:
extra = self.got - self.want
if extra == 0:
collected = b''.join(self.input)
self.input = []
else:
input = self.input
self.input = [data[-extra:]]
input[-1] = input[-1][:-extra]
collected = b''.join(input)

self.got = extra

if self.getting_size:
# we were recieving the message size
assert self.want == 4
self.want = unpack(">I", collected)[0]
self.getting_size = False
else:
self.want = 4
self.getting_size = True
self.message_received(collected)
try:
extra = self.got - self.want
if extra == 0:
collected = b''.join(self.input)
self.input = []
else:
input = self.input
self.input = [input[-1][-extra:]]
input[-1] = input[-1][:-extra]
collected = b''.join(input)

self.got = extra

if self.getting_size:
# we were recieving the message size
assert self.want == 4
self.want = unpack(">I", collected)[0]
self.getting_size = False
else:
self.want = 4
self.getting_size = True
self.message_received(collected)
except Exception:
logger.exception("data_received %s %s %s",
self.want, self.got, self.getting_size)

def first_message_received(self, data):
# Handler for first/handshake message, set up in __init__
Expand Down
28 changes: 28 additions & 0 deletions src/ZEO/asyncio/tests.py
Expand Up @@ -613,6 +613,34 @@ def test_ClientDisconnected_on_call_timeout(self):
client.ready = False
self.assertRaises(ClientDisconnected, self.call, 'foo')

def test_errors_in_data_received(self):
# There was a bug in ZEO.async.client.Protocol.data_recieved
# that caused it to fail badly if errors were raised while
# handling data.

wrapper, cache, loop, client, protocol, transport, send, respond = (
self.start(finish_start=True))

wrapper.receiveBlobStart.side_effect = ValueError('test')

chunk = 'x' * 99999
try:
loop.protocol.data_received(b''.join((
sized(pickle.dumps(
(0, True, 'receiveBlobStart', ('oid', 'serial')), 3)),
sized(pickle.dumps(
(0, True, 'receiveBlobChunk',
('oid', 'serial', chunk)), 3)),
)))
except ValueError:
pass
loop.protocol.data_received(
sized(pickle.dumps(
(0, True, 'receiveBlobStop', ('oid', 'serial')), 3)),
)
wrapper.receiveBlobChunk.assert_called_with('oid', 'serial', chunk)
wrapper.receiveBlobStop.assert_called_with('oid', 'serial')

def unsized(self, data, unpickle=False):
result = []
while data:
Expand Down

0 comments on commit 4833183

Please sign in to comment.