Skip to content

Commit

Permalink
Merge pull request #72 from zopefoundation/uvloop-server
Browse files Browse the repository at this point in the history
Use uvloop in the single-threaded server
  • Loading branch information
jimfulton committed Aug 9, 2016
2 parents cc26ec3 + 28914ad commit 49bc8e4
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 21 deletions.
7 changes: 6 additions & 1 deletion src/ZEO/asyncio/server.py
Expand Up @@ -2,8 +2,13 @@

if PY3:
import asyncio
try:
from uvloop import new_event_loop
except ImportError:
from asyncio import new_event_loop
else:
import trollius as asyncio
from trollius import new_event_loop

import json
import logging
Expand Down Expand Up @@ -223,7 +228,7 @@ def __init__(self, storage_server, addr, ssl):
self.storage_server = storage_server
self.addr = addr
self.ssl_context = ssl
self.event_loop = loop = asyncio.new_event_loop()
self.event_loop = loop = new_event_loop()

if isinstance(addr, tuple):
cr = loop.create_server(self.factory, addr[0], addr[1],
Expand Down
6 changes: 3 additions & 3 deletions src/ZEO/nagios.rst
Expand Up @@ -10,7 +10,7 @@ ZEO includes a script that provides a nagios monitor plugin:
In it's simplest form, the script just checks if it can get status:

>>> import ZEO
>>> addr, stop = ZEO.server('test.fs')
>>> addr, stop = ZEO.server('test.fs', threaded=False)
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port

>>> nagios([saddr])
Expand Down Expand Up @@ -39,7 +39,7 @@ The monitor will optionally output server metric data. There are 2
kinds of metrics it can output, level and rate metric. If we use the
-m/--output-metrics option, we'll just get rate metrics:

>>> addr, stop = ZEO.server('test.fs')
>>> addr, stop = ZEO.server('test.fs', threaded=False)
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr, '-m'])
OK|active_txns=0
Expand Down Expand Up @@ -115,7 +115,7 @@ profixes metrics with a storage id.
... </mappingstorage>
... <mappingstorage second>
... </mappingstorage>
... """)
... """, threaded=False)
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr, '-m', '-sstatus'])
Empty storage u'first'|first:active_txns=0
Expand Down
2 changes: 1 addition & 1 deletion src/ZEO/tests/drop_cache_rather_than_verify.txt
Expand Up @@ -71,7 +71,7 @@ is generated before the cache is dropped or the message is logged.
Now, we'll restart the server on the original address:

>>> _, admin = start_server(zeo_conf=dict(invalidation_queue_size=1),
... addr=addr, keep=1, threaded=True)
... addr=addr, keep=1)

>>> wait_connected(db.storage)

Expand Down
4 changes: 2 additions & 2 deletions src/ZEO/tests/new_addr.test
Expand Up @@ -4,7 +4,7 @@ We'll start by setting up a server and connecting to it:

>>> import ZEO, transaction

>>> addr, stop = ZEO.server(path='test.fs')
>>> addr, stop = ZEO.server(path='test.fs', threaded=False)
>>> conn = ZEO.connection(addr)
>>> client = conn.db().storage
>>> client.is_connected()
Expand All @@ -24,7 +24,7 @@ And wait for the connectin to notice it's disconnected:

Now, we'll restart the server:

>>> addr, stop = ZEO.server(path='test.fs')
>>> addr, stop = ZEO.server(path='test.fs', threaded=False)

Update with another client:

Expand Down
5 changes: 4 additions & 1 deletion src/ZEO/tests/testConfig.py
Expand Up @@ -19,6 +19,7 @@
from ZODB.config import storageFromString

from .forker import start_zeo_server
from .threaded import threaded_server_tests

class ZEOConfigTestBase(setupstack.TestCase):

Expand Down Expand Up @@ -120,4 +121,6 @@ def test_blob_cache_size_check(self):
blob_cache_size_check=50)

def test_suite():
return unittest.makeSuite(ZEOConfigTest)
suite = unittest.makeSuite(ZEOConfigTest)
suite.layer = threaded_server_tests
return suite
20 changes: 11 additions & 9 deletions src/ZEO/tests/testZEO.py
Expand Up @@ -1609,15 +1609,6 @@ def test_suite():
globs={'print_function': print_function},
),
)
if not forker.ZEO4_SERVER:
zeo.addTest(
doctest.DocFileSuite(
'dynamic_server_ports.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
checker=renormalizing.RENormalizing(patterns),
globs={'print_function': print_function},
),
)
zeo.addTest(PackableStorage.IExternalGC_suite(
lambda :
ServerManagingClientStorageForIExternalGCTest(
Expand Down Expand Up @@ -1655,6 +1646,17 @@ def test_suite():
suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
'ClientStorageSharedBlobs', create_storage_shared))

if not forker.ZEO4_SERVER:
from .threaded import threaded_server_tests
dynamic_server_ports_suite = doctest.DocFileSuite(
'dynamic_server_ports.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
checker=renormalizing.RENormalizing(patterns),
globs={'print_function': print_function},
)
dynamic_server_ports_suite.layer = threaded_server_tests
suite.addTest(dynamic_server_ports_suite)

return suite


Expand Down
2 changes: 1 addition & 1 deletion src/ZEO/tests/testZEO2.py
Expand Up @@ -497,7 +497,7 @@ def test_prefetch(self):
>>> count = 999
>>> import ZEO
>>> addr, stop = ZEO.server()
>>> addr, stop = ZEO.server(threaded=False)
>>> conn = ZEO.connection(addr)
>>> root = conn.root()
>>> cls = root.__class__
Expand Down
5 changes: 4 additions & 1 deletion src/ZEO/tests/test_client_credentials.py
Expand Up @@ -9,6 +9,7 @@
import ZEO.StorageServer

from . import forker
from .threaded import threaded_server_tests

@unittest.skipIf(forker.ZEO4_SERVER, "ZEO4 servers don't support SSL")
class ClientAuthTests(setupstack.TestCase):
Expand Down Expand Up @@ -54,5 +55,7 @@ def register(zs, storage_id, read_only, credentials=self):
stop()

def test_suite():
return unittest.makeSuite(ClientAuthTests)
suite = unittest.makeSuite(ClientAuthTests)
suite.layer = threaded_server_tests
return suite

3 changes: 2 additions & 1 deletion src/ZEO/tests/test_client_side_conflict_resolution.py
Expand Up @@ -111,7 +111,7 @@ def test_server_side(self):

def test_client_side(self):
# First, traditional:
addr, stop = ZEO.server('data.fs')
addr, stop = ZEO.server('data.fs', threaded=False)
db = ZEO.DB(addr)
with db.transaction() as conn:
conn.root.l = Length(0)
Expand All @@ -130,6 +130,7 @@ def test_client_side(self):
addr2, stop = ZEO.server(
storage_conf='<mappingstorage>\n</mappingstorage>\n',
zeo_conf=dict(client_conflict_resolution=True),
threaded=False,
)

db = ZEO.DB(addr2)
Expand Down
6 changes: 5 additions & 1 deletion src/ZEO/tests/testssl.py
Expand Up @@ -11,6 +11,7 @@

from .testConfig import ZEOConfigTestBase
from . import forker
from .threaded import threaded_server_tests

here = os.path.dirname(__file__)
server_cert = os.path.join(here, 'server.pem')
Expand Down Expand Up @@ -121,6 +122,7 @@ def test_ssl_pw(self):
@mock.patch(('asyncio' if PY3 else 'trollius') + '.set_event_loop')
@mock.patch(('asyncio' if PY3 else 'trollius') + '.new_event_loop')
@mock.patch('ZEO.asyncio.client.new_event_loop')
@mock.patch('ZEO.asyncio.server.new_event_loop')
class SSLConfigTestMockiavellian(ZEOConfigTestBase):

@mock.patch('ssl.create_default_context')
Expand Down Expand Up @@ -335,10 +337,12 @@ def create_server(**ssl_settings):


def test_suite():
return unittest.TestSuite((
suite = unittest.TestSuite((
unittest.makeSuite(SSLConfigTest),
unittest.makeSuite(SSLConfigTestMockiavellian),
))
suite.layer = threaded_server_tests
return suite

# Helpers for other tests:

Expand Down
12 changes: 12 additions & 0 deletions src/ZEO/tests/threaded.py
@@ -0,0 +1,12 @@
"""Test layer for threaded-server tests
uvloop currently has a bug,
https://github.com/MagicStack/uvloop/issues/39, that causes failure if
multiprocessing and threaded servers are mixed in the same
application, so we isolate the few threaded tests in their own layer.
"""
import ZODB.tests.util

threaded_server_tests = ZODB.tests.util.MininalTestLayer(
'threaded_server_tests')

0 comments on commit 49bc8e4

Please sign in to comment.