Skip to content

Commit

Permalink
Initial stab. Many (especially core) tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Fulton committed May 31, 2016
1 parent f64c9a8 commit fe28e53
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 35 deletions.
30 changes: 18 additions & 12 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,30 @@
"""Setup
"""
version = '5.0.0a0'

from setuptools import setup, find_packages
import os
import sys

install_requires = [
'ZODB >= 4.2.0b1',
'six',
'transaction >= 1.6.0',
'persistent >= 4.1.0',
'zc.lockfile',
'ZConfig',
'zdaemon',
'zope.interface',
]

tests_require = ['zope.testing', 'manuel', 'random2']

if sys.version_info < (2, 6):
print("This version of ZEO requires Python 2.6 or higher")
sys.exit(0)
elif sys.version_info < (3, ):
install_requires.extend(['futures', 'trollius'])
tests_require.append('mock')

classifiers = """\
Intended Audience :: Developers
Expand Down Expand Up @@ -93,8 +110,6 @@ def emit(self, record):
_unittests_only(suite, mod.test_suite())
return suite

tests_require = ['zope.testing', 'manuel', 'random2']

long_description = (
open('README.rst').read()
+ '\n' +
Expand All @@ -115,16 +130,7 @@ def emit(self, record):
test_suite="__main__.alltests", # to support "setup.py test"
tests_require = tests_require,
extras_require = dict(test=tests_require),
install_requires = [
'ZODB >= 4.2.0b1',
'six',
'transaction >= 1.6.0',
'persistent >= 4.1.0',
'zc.lockfile',
'ZConfig',
'zdaemon',
'zope.interface',
],
install_requires = install_requires,
zip_safe = False,
entry_points = """
[console_scripts]
Expand Down
31 changes: 20 additions & 11 deletions src/ZEO/asyncio/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
from .._compat import PY3

from pickle import loads, dumps
from ZEO.Exceptions import ClientDisconnected
from ZODB.ConflictResolution import ResolvedSerial
from struct import unpack
import asyncio

if PY3:
import asyncio
else:
import trollius as asyncio

import concurrent.futures
import logging
import random
Expand Down Expand Up @@ -313,16 +320,16 @@ def message_received(self, data):
raise AttributeError(name)

def call_async(self, method, args):
self._write(dumps((0, True, method, args), 3))
self._write(dumps((0, True, method, args), 2))

def call_async_iter(self, it):
self._writeit(dumps((0, True, method, args), 3) for method, args in it)
self._writeit(dumps((0, True, method, args), 2) for method, args in it)

message_id = 0
def call(self, future, method, args):
self.message_id += 1
self.futures[self.message_id] = future
self._write(dumps((self.message_id, False, method, args), 3))
self._write(dumps((self.message_id, False, method, args), 2))
return future

def promise(self, method, *args):
Expand Down Expand Up @@ -369,7 +376,7 @@ def heartbeat(self, write=True):
self.heartbeat_handle = self.loop.call_later(
self.heartbeat_interval, self.heartbeat)

class Client:
class Client(object):
"""asyncio low-level ZEO client interface
"""

Expand Down Expand Up @@ -712,7 +719,7 @@ def is_read_only(self):
else:
return protocol.read_only

class ClientRunner:
class ClientRunner(object):

def set_options(self, addrs, wrapper, cache, storage_key, read_only,
timeout=30, disconnect_poll=1):
Expand All @@ -729,9 +736,10 @@ def setup_delegation(self, loop):
from concurrent.futures import Future
call_soon_threadsafe = loop.call_soon_threadsafe

def call(meth, *args, timeout=None):
def call(meth, *args, **kw):
timeout = kw.pop('timeout', None)
result = Future()
call_soon_threadsafe(meth, result, *args)
call_soon_threadsafe(meth, result, *args, **kw)
return self.wait_for_result(result, timeout)

self.__call = call
Expand All @@ -745,7 +753,8 @@ def wait_for_result(self, future, timeout):
else:
raise

def call(self, method, *args, timeout=None):
def call(self, method, *args, **kw):
timeout = kw.pop('timeout', None)
return self.__call(self.call_threadsafe, method, args, timeout=timeout)

def call_future(self, method, *args):
Expand Down Expand Up @@ -825,8 +834,8 @@ def __init__(self, addrs, client, cache,
self.thread = threading.Thread(
target=self.run,
name="%s zeo client networking thread" % client.__name__,
daemon=True,
)
self.thread.setDaemon(True)
self.started = threading.Event()
self.thread.start()
self.started.wait()
Expand Down Expand Up @@ -858,7 +867,7 @@ def run(self):
def close(self):
if not self.closed:
self.closed = True
super().close()
super(ClientThread, self).close()
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join(9)
if self.exception:
Expand Down
13 changes: 11 additions & 2 deletions src/ZEO/asyncio/testing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
import asyncio

from .._compat import PY3

if PY3:
import asyncio
else:
import trollius as asyncio
class ConnectionRefusedError(Exception):
pass

import pprint

class Loop:
Expand Down Expand Up @@ -138,7 +147,7 @@ def __init__(self, addr, client, cache, storage, read_only, timeout,
def start(self, wait=True):
pass

def call(self, method, *args, timeout=None):
def call(self, method, *args, **kw):
return getattr(self, method)(*args)

async = async_iter = call
Expand Down
32 changes: 22 additions & 10 deletions src/ZEO/asyncio/tests.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@

from .._compat import PY3

from zope.testing import setupstack
from concurrent.futures import Future
from unittest import mock
if PY3:
from unittest import mock
else:
import mock

from ZODB.POSException import ReadOnlyError

import asyncio
if PY3:
import asyncio
else:
import trollius as asyncio

import collections
import logging
import pdb
Expand Down Expand Up @@ -40,11 +51,11 @@ def start(self,

def send(meth, *args):
loop.protocol.data_received(
sized(pickle.dumps((0, True, meth, args), 3)))
sized(pickle.dumps((0, True, meth, args), 2)))

def respond(message_id, result):
loop.protocol.data_received(
sized(pickle.dumps((message_id, False, '.reply', result), 3)))
sized(pickle.dumps((message_id, False, '.reply', result), 2)))

if finish_start:
protocol.data_received(sized(b'Z3101'))
Expand Down Expand Up @@ -609,7 +620,7 @@ def test_call_async_from_same_thread(self):
def test_ClientDisconnected_on_call_timeout(self):
wrapper, cache, loop, client, protocol, transport, send, respond = (
self.start())
self.wait_for_result = super().wait_for_result
self.wait_for_result = super(AsyncTests, self).wait_for_result
self.assertRaises(ClientDisconnected, self.call, 'foo')
client.ready = False
self.assertRaises(ClientDisconnected, self.call, 'foo')
Expand All @@ -628,16 +639,16 @@ def test_errors_in_data_received(self):
try:
loop.protocol.data_received(b''.join((
sized(pickle.dumps(
(0, True, 'receiveBlobStart', ('oid', 'serial')), 3)),
(0, True, 'receiveBlobStart', ('oid', 'serial')), 2)),
sized(pickle.dumps(
(0, True, 'receiveBlobChunk',
('oid', 'serial', chunk)), 3)),
('oid', 'serial', chunk)), 2)),
)))
except ValueError:
pass
loop.protocol.data_received(
sized(pickle.dumps(
(0, True, 'receiveBlobStop', ('oid', 'serial')), 3)),
(0, True, 'receiveBlobStop', ('oid', 'serial')), 2)),
)
wrapper.receiveBlobChunk.assert_called_with('oid', 'serial', chunk)
wrapper.receiveBlobStop.assert_called_with('oid', 'serial')
Expand Down Expand Up @@ -674,7 +685,8 @@ def test_heartbeat(self):
def unsized(self, data, unpickle=False):
result = []
while data:
size, message, *data = data
size = data.pop(0)
message = data.pop(0)
self.assertEqual(struct.unpack(">I", size)[0], len(message))
if unpickle:
message = pickle.loads(message)
Expand All @@ -688,7 +700,7 @@ def parse(self, data):
return self.unsized(data, True)

def response(*data):
return sized(pickle.dumps(data, 3))
return sized(pickle.dumps(data, 2))

def sized(message):
return struct.pack(">I", len(message)) + message
Expand Down

0 comments on commit fe28e53

Please sign in to comment.