Skip to content

Commit

Permalink
Async changes driven by ClientStorage integration
Browse files Browse the repository at this point in the history
- Fixed tpc_finish:

  - Use tid from server to update cache.

  - Accept and call callback function.

- Implemented flow control

- Added connection/disconnection notification (to client storage).

- implemented get_peername.

- implemented is_read_only

- renamed callAsync to async (death to Camels!)
  • Loading branch information
Jim Fulton committed May 18, 2016
1 parent 4cfe366 commit 7e5b78f
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 25 deletions.
95 changes: 85 additions & 10 deletions src/ZEO/asyncio/client.py
Expand Up @@ -51,7 +51,9 @@ def __init__(self, loop,
self.client = client
self.connect_poll = connect_poll
self.futures = {} # { message_id -> future }
self.input = []
self.input = [] # Buffer when assembling messages
self.output = [] # Buffer when paused
self.paused = [] # Paused indicator, mutable to avoid attr lookup

# Handle the first message, the protocol handshake, differently
self.message_received = self.first_message_received
Expand Down Expand Up @@ -98,15 +100,57 @@ def done_connecting(future):
def connection_made(self, transport):
logger.info("Connected %s", self)
self.transport = transport

paused = self.paused
output = self.output
append = output.append
writelines = transport.writelines
from struct import pack

def write(message):
writelines((pack(">I", len(message)), message))
if paused:
append(message)
else:
writelines((pack(">I", len(message)), message))

self._write = write

def writeit(data):
# Note, don't worry about combining messages. Iters
# will be used with blobs, in which case, the individual
# messages will be big to begin with.
data = iter(data)
for message in data:
writelines((pack(">I", len(message)), message))
if paused:
append(data)
break

self._writeit = writeit

def pause_writing(self):
self.paused.append(1)

def resume_writing(self):
paused = self.paused
del paused[:]
output = self.output
writelines = self.transport.writelines
from struct import pack
while output and not paused:
message = output.pop(0)
if isinstance(message, bytes):
writelines((pack(">I", len(message)), message))
else:
data = message
for message in data:
writelines((pack(">I", len(message)), message))
if paused: # paused again. Put iter back.
output.insert(0, data)
break

def get_peername(self):
return self.transport.get_extra_info('peername')

def connection_lost(self, exc):
if exc is None:
# we were closed
Expand Down Expand Up @@ -232,6 +276,9 @@ def message_received(self, data):
def call_async(self, method, args):
self._write(dumps((0, True, method, args), 3))

def call_async_iter(self, it):
self._writeit(dumps((0, True, method, args), 3) for method, args in it)

message_id = 0
def call(self, future, method, args):
self.message_id += 1
Expand Down Expand Up @@ -304,6 +351,8 @@ def _clear_protocols(self, protocol=None):

def disconnected(self, protocol=None):
if protocol is None or protocol is self.protocol:
if protocol is self.protocol:
self.client.notify_disconnected()
self.ready = False
self.connected = concurrent.futures.Future()
self.protocol = None
Expand Down Expand Up @@ -403,6 +452,10 @@ def finished_verify(self, server_tid):
self.cache.setLastTid(server_tid)
self.ready = True
self.connected.set_result(None)
self.client.notify_connected(self)

def get_peername(self):
return self.protocol.get_peername()

def call_async_threadsafe(self, future, method, args):
if self.ready:
Expand All @@ -411,6 +464,13 @@ def call_async_threadsafe(self, future, method, args):
else:
future.set_exception(ZEO.Exceptions.ClientDisconnected())

def call_async_iter_threadsafe(self, future, it):
if self.ready:
self.protocol.call_async_iter(it)
future.set_result(None)
else:
future.set_exception(ZEO.Exceptions.ClientDisconnected())

def _when_ready(self, func, result_future, *args):

@self.connected.add_done_callback
Expand Down Expand Up @@ -463,16 +523,17 @@ def load_before(data):
else:
self._when_ready(self.load_before_threadsafe, future, oid, tid)

def tpc_finish_threadsafe(self, future, tid, updates):
def tpc_finish_threadsafe(self, future, tid, updates, f):
if self.ready:
@self.protocol.promise('tpc_finish', tid)
def committed(_):
def committed(tid):
cache = self.cache
for oid, s, data in updates:
for oid, data, resolved in updates:
cache.invalidate(oid, tid)
if data and s != ResolvedSerial:
if data and not resolved:
cache.store(oid, tid, None, data)
cache.setLastTid(tid)
f(tid)
future.set_result(None)

committed.catch(future.set_exception)
Expand Down Expand Up @@ -536,21 +597,35 @@ def wait_for_result(self, future, timeout):
def call(self, method, *args, timeout=None):
return self.__call(self.client.call_threadsafe, method, args)

def callAsync(self, method, *args):
def async(self, method, *args):
return self.__call(self.client.call_async_threadsafe, method, args)

def async_iter(self, it):
return self.__call(self.client.call_async_iter_threadsafe, it)

def load(self, oid):
return self.__call(self.client.load_threadsafe, oid)

def load_before(self, oid, tid):
return self.__call(self.client.load_before_threadsafe, oid, tid)

def tpc_finish(self, tid, updates):
return self.__call(self.client.tpc_finish_threadsafe, tid, updates)
def tpc_finish(self, tid, updates, f):
return self.__call(self.client.tpc_finish_threadsafe, tid, updates, f)

def is_connected(self):
return self.client.ready

def is_read_only(self):
try:
protocol = self.client.protocol
except AttributeError:
return True
else:
if protocol is None:
return True
else:
return protocol.read_only

def close(self):
self.__call(self.client.close_threadsafe)

Expand Down
25 changes: 23 additions & 2 deletions src/ZEO/asyncio/testing.py
Expand Up @@ -17,7 +17,7 @@ def call_soon(self, func, *args):

def _connect(self, future, protocol_factory):
self.protocol = protocol = protocol_factory()
self.transport = transport = Transport()
self.transport = transport = Transport(protocol)
protocol.connection_made(transport)
future.set_result((transport, protocol))

Expand Down Expand Up @@ -60,14 +60,26 @@ def call_exception_handler(self, context):

class Transport:

def __init__(self):
capacity = 1 << 64
paused = False
extra = dict(peername='1.2.3.4')

def __init__(self, protocol):
self.data = []
self.protocol = protocol

def write(self, data):
self.data.append(data)
self.check_pause()

def writelines(self, lines):
self.data.extend(lines)
self.check_pause()

def check_pause(self):
if len(self.data) > self.capacity and not self.paused:
self.paused = True
self.protocol.pause_writing()

def pop(self, count=None):
if count:
Expand All @@ -76,8 +88,17 @@ def pop(self, count=None):
else:
r = self.data[:]
del self.data[:]
self.check_resume()
return r

def check_resume(self):
if len(self.data) < self.capacity and self.paused:
self.paused = False
self.protocol.resume_writing()

closed = False
def close(self):
self.closed = True

def get_extra_info(self, name):
return self.extra[name]

0 comments on commit 7e5b78f

Please sign in to comment.