Skip to content

Commit

Permalink
Add check for readyness of process
Browse files Browse the repository at this point in the history
  • Loading branch information
yuvipanda committed Dec 27, 2018
1 parent fa08479 commit 2ca085c
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 1 deletion.
55 changes: 54 additions & 1 deletion simpervisor/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import signal
import asyncio
import time
import logging
from simpervisor import atexitasync

Expand All @@ -15,11 +16,13 @@ class KilledProcessError(Exception):
pass

class SupervisedProcess:
def __init__(self, name, *args, always_restart=False, **kwargs):
def __init__(self, name, *args, always_restart=False, ready_func=None, ready_timeout=5, **kwargs):
self.always_restart = always_restart
self.name = name
self._proc_args = args
self._proc_kwargs = kwargs
self.ready_func = ready_func
self.ready_timeout = ready_timeout
self.proc: asyncio.Process = None

# asyncio.Process has no 'poll', so we keep that state internally
Expand Down Expand Up @@ -104,6 +107,7 @@ async def _restart_process_if_needed(self):
exits. If we restart the process, `start()` sets this up again.
"""
retcode = await self.proc.wait()
# FIXME: Do we need to aquire a lock somewhere in this method?
atexitasync.remove_handler(self._handle_signal)
self._debug_log(
'exited', f'{self.name} exited with code {retcode}',
Expand Down Expand Up @@ -158,6 +162,55 @@ async def kill(self):
raise KilledProcessError(f"Process {self.name} has already been explicitly killed")
return await self._signal_and_wait(signal.SIGKILL)


async def ready(self):
"""
Wait for process to become 'ready'
"""
# FIXME: Should this be internal and part of 'start'?
# FIXME: Do we need some locks here?
# Repeatedly run ready_func with a timeout until it returns true
# FIXME, parameterize these numbers
start_time = time.time()
wait_time = 0.01

while True:
if time.time() - start_time > self.ready_timeout:
# We have exceeded our timeout, so return
return False

# Make sure we haven't been killed yet since the last loop
# We explicitly do *not* check if we are running, since we might be
# restarting in a loop while the readyness check is happening
if self._killed or not self.proc:
return False

# FIXME: What's the timeout for each readyness check handler?
# FIXME: We should probably check again if our process is still running
# FIXME: Should we be locking something here?
is_ready = await asyncio.wait_for(self.ready_func(self), 1)
cur_time = time.time() - start_time
self._debug_log(
'ready-wait',
f'Readyness: {is_ready} after {cur_time} seconds, next check in {wait_time}s',
{'wait_time': wait_time, 'ready': is_ready, 'elapsed_time': cur_time}
)
if is_ready:
return True
await asyncio.sleep(wait_time)

# FIXME: Be more sophisticated here with backoff & jitter
wait_time = 2 * wait_time
if (time.time() + wait_time) > (start_time + self.ready_timeout):
# If we wait for wait_time, we'll be over the ready_timeout
# So let's clamp wait_time so that wait_time is just enough
# to get us to ready_timeout seconds since start_time
# FIXME: This means wait_time can be negative...
wait_time = (start_time + self.ready_timeout) - time.time() - 0.01

return False


# Pass through methods specific methods from proc
# We don't pass through everything, just a subset we know is safe
# and would work.
Expand Down
23 changes: 23 additions & 0 deletions tests/child_scripts/simplehttpserver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""
Simple echo http server
"""
import time
import os
import sys
from aiohttp import web

wait_time = float(sys.argv[1])
print(f'waiting {wait_time}')
time.sleep(wait_time)

PORT = os.environ['PORT']

routes = web.RouteTableDef()

@routes.get('/')
async def hello(request):
return web.Response(text="Hello, world")

app = web.Application()
app.add_routes(routes)
web.run_app(app, port=PORT)
49 changes: 49 additions & 0 deletions tests/test_ready.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import sys
import time
import pytest
import os
from simpervisor import SupervisedProcess
import aiohttp
import logging

@pytest.mark.asyncio
async def test_ready():
"""
Test web app's readyness
"""
httpserver_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'child_scripts',
'simplehttpserver.py'
)

port = '9005'
# We tell our server to wait this many seconds before it starts serving
ready_time = 3.0

async def _ready_func(p):
url = f'http://localhost:{port}'
async with aiohttp.ClientSession() as session:
try:
async with session.get(url) as resp:
logging.debug(f'Got code {resp.status} back from {url}')
return resp.status == 200
except aiohttp.ClientConnectionError:
logging.debug(f'Connection to {url} refused')
return False

proc = SupervisedProcess(
'socketserver',
sys.executable, httpserver_file, str(ready_time),
ready_func=_ready_func,
env={'PORT': port}
)

try:
await proc.start()
start_time = time.time()
assert (await proc.ready())
assert time.time() - start_time > ready_time
finally:
# Clean up our process after ourselves
await proc.kill()

0 comments on commit 2ca085c

Please sign in to comment.