Skip to content

Commit

Permalink
Fixed read-only fallback.
Browse files Browse the repository at this point in the history
We were sending lastTransaction requests while register requests were
in-flight.  If register failed, then the lastTransaction request was
invalid, causing the connection to be closed. :(

When we update the server, we'll have register return lastTransaction
and probably info, since the client wants that information on connect.
  • Loading branch information
Jim Fulton committed May 26, 2016
1 parent b79c721 commit 6134baa
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 51 deletions.
100 changes: 61 additions & 39 deletions src/ZEO/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,16 @@ def close(self):
self._connecting.cancel()
if self.transport is not None:
self.transport.close()
for future in self.futures.values():
for future in self.pop_futures():
future.set_exception(ClientDisconnected("Closed"))
self.futures.clear()

def pop_futures(self):
# Remove and return futures from self.futures. The caller
# will finalize them in some way and callbacks may modify
# self.futures.
futures = list(self.futures.values())
self.futures.clear()
return futures

def protocol_factory(self):
return self
Expand Down Expand Up @@ -137,38 +144,15 @@ def writeit(data):

self._writeit = writeit

def pause_writing(self):
self.paused.append(1)

def resume_writing(self):
paused = self.paused
del paused[:]
output = self.output
writelines = self.transport.writelines
from struct import pack
while output and not paused:
message = output.pop(0)
if isinstance(message, bytes):
writelines((pack(">I", len(message)), message))
else:
data = message
for message in data:
writelines((pack(">I", len(message)), message))
if paused: # paused again. Put iter back.
output.insert(0, data)
break

def get_peername(self):
return self.transport.get_extra_info('peername')

def connection_lost(self, exc):
if self.closed:
for f in self.futures.values():
for f in self.pop_futures():
f.cancel()
else:
logger.info("Disconnected, %s, %r", self, exc)
self.client.disconnected(self)
for f in self.futures.values():
# We have to be careful processing the futures, because
# exception callbacks might modufy them.
for f in self.pop_futures():
f.set_exception(ClientDisconnected(exc or 'connection lost'))

def finish_connect(self, protocol_version):
Expand Down Expand Up @@ -199,24 +183,38 @@ def finish_connect(self, protocol_version):
'register', self.storage_key,
self.read_only if self.read_only is not Fallback else False,
)
# Get lastTransaction in flight right away to make successful
# connection quicker
lastTransaction = self.promise('lastTransaction')
if self.read_only is not Fallback:
# Get lastTransaction in flight right away to make
# successful connection quicker, but only if we're not
# doing read-only fallback. If we might need to retry, we
# can't send lastTransaction because if the registration
# fails, it will be seen as an invalid message and the
# connection will close. :( It would be a lot better of
# registere returned the last transaction (and info while
# it's at it).
lastTransaction = self.promise('lastTransaction')
else:
lastTransaction = None # to make python happy

@register
def registered(_):
if self.read_only is Fallback:
self.read_only = False
self.client.registered(self, lastTransaction)
r_lastTransaction = self.promise('lastTransaction')
else:
r_lastTransaction = lastTransaction
self.client.registered(self, r_lastTransaction)

@register.catch
def register_failed(exc):
if (isinstance(exc, ZODB.POSException.ReadOnlyError) and
self.read_only is Fallback):
# We tried a write connection, degrade to a read-only one
self.read_only = True
register = self.promise(
'register', self.storage_key, self.read_only)
logger.info("%s write connection failed. Trying read-only",
self)
register = self.promise('register', self.storage_key, True)
# get lastTransaction in flight.
lastTransaction = self.promise('lastTransaction')

@register
Expand Down Expand Up @@ -312,6 +310,30 @@ def call(self, future, method, args):
def promise(self, method, *args):
return self.call(Promise(), method, args)

def pause_writing(self):
self.paused.append(1)

def resume_writing(self):
paused = self.paused
del paused[:]
output = self.output
writelines = self.transport.writelines
from struct import pack
while output and not paused:
message = output.pop(0)
if isinstance(message, bytes):
writelines((pack(">I", len(message)), message))
else:
data = message
for message in data:
writelines((pack(">I", len(message)), message))
if paused: # paused again. Put iter back.
output.insert(0, data)
break

def get_peername(self):
return self.transport.get_extra_info('peername')

# Methods called by the server.
# WARNING WARNING we can't call methods that call back to us
# syncronously, as that would lead to DEADLOCK!
Expand Down Expand Up @@ -759,6 +781,10 @@ def __init__(self, addrs, client, cache,
daemon=True,
)
self.started = threading.Event()
self.thread.start()
self.started.wait()
if self.exception:
raise self.exception

exception = None
def run(self):
Expand All @@ -782,10 +808,6 @@ def run(self):
logger.debug('Stopping client thread')

def start(self, wait=True):
self.thread.start()
self.started.wait()
if self.exception:
raise self.exception
if wait:
self.wait_for_result(self.connected, self.timeout)

Expand Down
25 changes: 13 additions & 12 deletions src/ZEO/asyncio/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,21 +441,19 @@ def test_readonly_fallback(self):
self.assertEqual(self.unsized(transport.pop(2)), b'Z3101')
# We see that the client tried a writable connection:
self.assertEqual(self.parse(transport.pop()),
[(1, False, 'register', ('TEST', False)),
(2, False, 'lastTransaction', ()),
])
(1, False, 'register', ('TEST', False)))
# We respond with a read-only exception:
respond(1, (ReadOnlyError, ReadOnlyError()))
self.assertTrue(self.is_read_only())

# The client tries for a read-only connection:
self.assertEqual(self.parse(transport.pop()),
[(3, False, 'register', ('TEST', True)),
(4, False, 'lastTransaction', ()),
[(2, False, 'register', ('TEST', True)),
(3, False, 'lastTransaction', ()),
])
# We respond with successfully:
respond(3, None)
respond(4, 'b'*8)
respond(2, None)
respond(3, 'b'*8)
self.assertTrue(self.is_read_only())

# At this point, the client is ready and using the protocol,
Expand All @@ -467,8 +465,8 @@ def test_readonly_fallback(self):

# The client asks for info, and we respond:
self.assertEqual(self.parse(transport.pop()),
(5, False, 'get_info', ()))
respond(5, dict(length=42))
(4, False, 'get_info', ()))
respond(4, dict(length=42))

self.assert_(connected.done())

Expand All @@ -477,15 +475,18 @@ def test_readonly_fallback(self):
loop.protocol.data_received(sized(b'Z3101'))
self.assertEqual(self.unsized(loop.transport.pop(2)), b'Z3101')
self.assertEqual(self.parse(loop.transport.pop()),
[(1, False, 'register', ('TEST', False)),
(2, False, 'lastTransaction', ()),
])
(1, False, 'register', ('TEST', False)))
self.assertTrue(self.is_read_only())

# We respond and the writable connection succeeds:
respond(1, None)
self.assertFalse(self.is_read_only())

# at this point, a lastTransaction request is emitted:

self.assertEqual(self.parse(loop.transport.pop()),
(2, False, 'lastTransaction', ()))

# Now, the original protocol is closed, and the client is
# no-longer ready:
self.assertFalse(client.ready)
Expand Down

0 comments on commit 6134baa

Please sign in to comment.