Skip to content

Commit

Permalink
Fixed more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Fulton committed May 26, 2016
1 parent 1cc3ceb commit 01d7417
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 63 deletions.
18 changes: 16 additions & 2 deletions src/ZEO/ClientStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def __init__(self, addr, storage='1', cache_size=20 * MB,
wait=True,
drop_cache_rather_verify=True,
username=None, password=None, realm=None,
# For tests:
_client_factory=ZEO.asyncio.client.ClientThread,
):
"""ClientStorage constructor.
Expand Down Expand Up @@ -256,18 +258,24 @@ def __init__(self, addr, storage='1', cache_size=20 * MB,
blob_cache_size * blob_cache_size_check // 100)
self._check_blob_size()

self._server = ZEO.asyncio.client.ClientThread(
self._server = _client_factory(
addr, self, cache, storage,
ZEO.asyncio.client.Fallback if read_only_fallback else read_only,
wait_timeout or 30,
)
self._server.start()
self._call = self._server.call
self._async = self._server.async
self._async_iter = self._server.async_iter

self._commit_lock = threading.Lock()

try:
self._server.start(wait=wait)
except Exception:
# No point in keeping the server going of the storage creation fails
self._server.close()
raise

def new_addr(self, addr):
self._addr = addr
self._server.new_addrs(addr)
Expand Down Expand Up @@ -763,6 +771,12 @@ def tpc_abort(self, txn):
# all, yet you want to be sure that other abort logic is
# executed regardless.
try:
# It's tempting to make an asynchronous call here, but
# it's useful for it to be synchronous because, if we
# failed due to a disconnect, synchronous calls will
# wait a little while in hopes of reconnecting. If
# we're able to reconnect and retry the transaction,
# ten it might succeed!
self._call('tpc_abort', id(txn))
except ClientDisconnected:
logger.debug("%s ClientDisconnected in tpc_abort() ignored",
Expand Down
41 changes: 28 additions & 13 deletions src/ZEO/tests/ConnectionTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################

import concurrent.futures
import os
import time
import socket
Expand Down Expand Up @@ -174,6 +174,7 @@ def openClientStorage(self, cache=None, cache_size=200000, wait=1,
var='.',
cache_size=cache_size,
wait=wait,
wait_timeout=1,
min_disconnect_poll=0.1,
read_only=read_only,
read_only_fallback=read_only_fallback,
Expand Down Expand Up @@ -454,7 +455,7 @@ def checkVerificationInvalidationPersists(self):

def checkBadMessage1(self):
# not even close to a real message
self._bad_message("salty")
self._bad_message(b"salty")

def checkBadMessage2(self):
# just like a real message, but with an unpicklable argument
Expand All @@ -474,22 +475,36 @@ def _bad_message(self, msg):
self._storage = self.openClientStorage()
self._dostore()

# break into the internals to send a bogus message
zrpc_conn = self._storage._server.rpc
zrpc_conn.message_output(msg)
generation = self._storage._connection_generation

future = concurrent.futures.Future()

def write():
try:
self._storage._server.client.protocol._write(msg)
except Exception as exc:
future.set_exception(exc)
else:
future.set_result(None)

# break into the internals to send a bogus message
self._storage._server.loop.call_soon_threadsafe(write)
future.result()

# If we manage to call _dostore before the server disconnects
# us, we'll get a ClientDisconnected error. When we retry, it
# will succeed. It will succeed because:
# - _dostore calls tpc_abort
# - tpc_abort makes a synchronous call to the server to abort
# the transaction
# - when disconnected, synchronous calls are blocked for a little
# while while reconnecting (or they timeout of it takes too long).
try:
self._dostore()
except ClientDisconnected:
pass
else:
self._storage.close()
self.fail("Server did not disconnect after bogus message")
self._storage.close()
self._dostore()

self._storage = self.openClientStorage()
self._dostore()
self._storage.close()
self.assertTrue(self._storage._connection_generation > generation)

# Test case for multiple storages participating in a single
# transaction. This is not really a connection test, but it needs
Expand Down
71 changes: 23 additions & 48 deletions src/ZEO/tests/testConversionSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import doctest
import unittest

import ZEO.asyncio.testing

class FakeStorageBase:

def __getattr__(self, name):
Expand Down Expand Up @@ -52,7 +54,7 @@ def register_connection(*args):

def test_server_record_iternext():
"""
On the server, record_iternext calls are simply delegated to the
underlying storage.
Expand All @@ -71,7 +73,7 @@ def test_server_record_iternext():
2
3
4
The storage info also reflects the fact that record_iternext is supported.
>>> zeo.get_info()['supports_record_iternext']
Expand All @@ -85,9 +87,8 @@ def test_server_record_iternext():
"""


def test_client_record_iternext():
"""\
"""Test client storage delegation to the network client
The client simply delegates record_iternext calls to it's server stub.
Expand All @@ -96,21 +97,25 @@ def test_client_record_iternext():
First, fake out the connection manager so we can make a connection:
>>> import ZEO.ClientStorage
>>> from ZEO.ClientStorage import ClientStorage
>>> oldConnectionManagerClass = ClientStorage.ConnectionManagerClass
>>> class FauxConnectionManagerClass:
... def __init__(*a, **k):
... pass
... def attempt_connect(self):
... return True
>>> ClientStorage.ConnectionManagerClass = FauxConnectionManagerClass
>>> client = ClientStorage('', wait=False)
>>> ClientStorage.ConnectionManagerClass = oldConnectionManagerClass
>>> import ZEO
>>> class Client(ZEO.asyncio.testing.ClientRunner):
...
... def record_iternext(self, next=None):
... if next == None:
... next = '0'
... next = str(int(next) + 1)
... oid = next
... if next == '4':
... next = None
...
... return oid, oid*8, 'data ' + oid, next
>>> client = ZEO.client(
... '', wait=False, _client_factory=Client)
Now we'll have our way with it's private _server attr:
>>> client._server = FakeStorage()
>>> next = None
>>> while 1:
... oid, serial, data, next = client.record_iternext(next)
Expand All @@ -124,35 +129,6 @@ def test_client_record_iternext():
"""

def test_server_stub_record_iternext():
"""\
The server stub simply delegates record_iternext calls to it's rpc.
There's really no decent way to test ZEO without running to much crazy
stuff. I'd rather do a lame test than a really lame test, so here goes.
>>> class FauxRPC:
... storage = FakeStorage()
... def call(self, meth, *args):
... return getattr(self.storage, meth)(*args)
... peer_protocol_version = 1
>>> import ZEO.ServerStub
>>> stub = ZEO.ServerStub.StorageServer(FauxRPC())
>>> next = None
>>> while 1:
... oid, serial, data, next = stub.record_iternext(next)
... print(oid)
... if next is None:
... break
1
2
3
4
"""

def history_to_version_compatible_storage():
"""
Some storages work under ZODB <= 3.8 and ZODB >= 3.9.
Expand All @@ -163,7 +139,7 @@ def history_to_version_compatible_storage():
... return oid,version,size
A ZEOStorage such as the following should support this type of storage:
>>> class OurFakeServer(FakeServer):
... storages = {'1':VersionCompatibleStorage()}
>>> import ZEO.StorageServer
Expand All @@ -181,7 +157,7 @@ def history_to_version_compatible_storage():
>>> from ZEO.StorageServer import ZEOStorage308Adapter
>>> zeo = ZEOStorage308Adapter(VersionCompatibleStorage())
The history method should still return the parameters it was called with:
>>> zeo.history('oid','',99)
Expand All @@ -193,4 +169,3 @@ def test_suite():

if __name__ == '__main__':
unittest.main(defaultTest='test_suite')

0 comments on commit 01d7417

Please sign in to comment.