Skip to content

Commit

Permalink
Refactored the asyncio.Server-based server to be more careful about c…
Browse files Browse the repository at this point in the history
…losing down

It appeared that some tests were failing because servers weren't
closing down.  Modified the test forker module and the server to be
more paranoid about closing the server.

This seems to have helped test stability. (Or maybe I accidentally
fixed something while flailing :)).
  • Loading branch information
Jim Fulton committed Jun 27, 2016
1 parent fddb6f8 commit 31d4112
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 20 deletions.
31 changes: 24 additions & 7 deletions src/ZEO/asyncio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ def __init__(self, storage_server, addr, ssl):
self.event_loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if isinstance(addr, tuple):
cr = loop.create_server(self.factory, addr[0], addr[1], ssl=ssl)
cr = loop.create_server(self.factory, addr[0], addr[1],
reuse_address=True, ssl=ssl)
else:
cr = loop.create_unix_server(self.factory, addr, ssl=ssl)

Expand Down Expand Up @@ -245,12 +246,28 @@ def factory(self):

def loop(self, timeout=None):
self.event_loop.run_forever()
self.event_loop.run_until_complete(self.server.wait_closed())
self.event_loop.close()

__closed = False
closed = False
def close(self):
if not self.__closed:
self.__closed = True
self.server.close()
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
if not self.closed:
self.closed = True
self.event_loop.call_soon_threadsafe(self._close)

def _close(self):
loop = self.event_loop

self.server.close()

f = asyncio.async(self.server.wait_closed(), loop=loop)
@f.add_done_callback
def server_closed(f):
# stop the loop when the server closes:
loop.call_soon(loop.stop)

def timeout():
logger.warning("Timed out closing asyncio.Server")
loop.call_soon(loop.stop)

# But if the server doesn't close in a second, stop the loop anyway.
loop.call_later(1, timeout)
39 changes: 26 additions & 13 deletions src/ZEO/tests/forker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
##############################################################################
"""Library for forking storage server and connecting client storage"""
from __future__ import print_function
import gc
import os
import random
import sys
Expand Down Expand Up @@ -83,7 +84,7 @@ def encode_format(fmt):
return fmt

def runner(config, qin, qout, timeout=None,
join_timeout=9, debug=False, name=None,
debug=False, name=None,
keep=False, protocol=None):

if debug:
Expand Down Expand Up @@ -120,11 +121,7 @@ def runner(config, qin, qout, timeout=None,
except Empty:
pass
server.server.close()
thread.join(join_timeout)
if thread.is_alive():
logger.warning("server thread didn't stop")
else:
logger.debug('server thread stopped')
thread.join(3)

if not keep:
# Try to cleanup storage files
Expand All @@ -134,10 +131,11 @@ def runner(config, qin, qout, timeout=None,
except AttributeError:
pass

qout.put('stopped')
qout.put(thread.is_alive())
qin.get(timeout=11) # ack
if hasattr(qout, 'close'):
qout.close()
qout.join_thread()
qout.cancel_join_thread()

except Exception:
logger.exception("In server thread")
Expand All @@ -149,12 +147,25 @@ def runner(config, qin, qout, timeout=None,

def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None):
qin.put('stop')
if hasattr(qin, 'close'):
qin.close()
qin.join_thread()
qout.get(timeout=stop_timeout)
dirty = qout.get(timeout=stop_timeout)
qin.put('ack')
if dirty:
print("WARNING SERVER DIDN'T STOP CLEANLY", file=sys.stderr)

# The runner thread didn't stop. If it was a process,
# give it some time to exit
if hasattr(thread, 'pid') and thread.pid:
os.waitpid(thread.pid)
else:
# Gaaaa, force gc in hopes of maybe getting the unclosed
# sockets to get GCed
gc.collect()

thread.join(stop_timeout)
os.remove(config)
if hasattr(qin, 'close'):
qin.close()
qin.cancel_join_thread()

def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
path='Data.fs', protocol=None, blob_dir=None,
Expand All @@ -170,6 +181,8 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
to the config file.
"""

import logging; logging.basicConfig(level='DEBUG')

if not storage_conf:
storage_conf = '<filestorage>\npath %s\n</filestorage>' % path

Expand Down Expand Up @@ -217,7 +230,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
thread.start()
addr = qout.get(timeout=start_timeout)

def stop(stop_timeout=9):
def stop(stop_timeout=99):
stop_runner(thread, tmpfile, qin, qout, stop_timeout)

return addr, stop
Expand Down

0 comments on commit 31d4112

Please sign in to comment.