-
-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathgunicorn_utils.py
98 lines (72 loc) · 3.27 KB
/
gunicorn_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import concurrent.futures
import contextlib
import multiprocessing
import time
from typing import Iterator
from gunicorn.app.wsgiapp import WSGIApplication
class _StandaloneApplication(WSGIApplication):
def __init__(self, config_path: str = None, **kwargs):
self.config_path = config_path
self.options = kwargs
super().__init__()
def init(self, parser, opts, args):
"""Patch `opts` to simulate the config path being passed as a command-line argument."""
super().init(parser, opts, args)
opts.config = self.config_path
def load_config(self):
"""Set the option kwargs in the config, then load the config as usual."""
for key, value in self.options.items():
if key in self.cfg.settings and value is not None:
self.cfg.set(key.lower(), value)
super().load_config()
def _proc_target(config_path: str, event: multiprocessing.Event, **kwargs) -> None:
"""Run a Gunicorn app with the given config and set `event` when Gunicorn is ready."""
def when_ready(_):
event.set()
# Clear sys.argv to prevent Gunicorn from trying to interpret the command arguments
# used to run the test as it's own arguments.
import sys
sys.argv = [""]
app = _StandaloneApplication(config_path, when_ready=when_ready, **kwargs)
import logging
logging.disable(logging.INFO)
app.run()
@contextlib.contextmanager
def run_gunicorn(config_path: str = "config/gunicorn.conf.py", **kwargs) -> Iterator[None]:
"""
Run the Snekbox app through separate Gunicorn process. Use as a context manager.
`config_path` is the path to the Gunicorn config to use.
Additional kwargs are interpreted as Gunicorn settings.
Raise RuntimeError if Gunicorn terminates before it is ready.
Raise TimeoutError if Gunicorn isn't ready after 60 seconds.
"""
event = multiprocessing.Event()
proc = multiprocessing.Process(target=_proc_target, args=(config_path, event), kwargs=kwargs)
try:
proc.start()
# Wait 60 seconds for Gunicorn to be ready, but exit early if Gunicorn fails.
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
concurrent.futures.wait(
[executor.submit(proc.join), executor.submit(event.wait)],
timeout=60,
return_when=concurrent.futures.FIRST_COMPLETED,
)
# Can't use the context manager cause wait=False needs to be set.
executor.shutdown(wait=False, cancel_futures=True)
if proc.is_alive():
if not event.is_set():
raise TimeoutError("Timed out waiting for Gunicorn to be ready.")
else:
raise RuntimeError(f"Gunicorn terminated unexpectedly with code {proc.exitcode}.")
yield
finally:
# See https://github.com/python-discord/snekbox/issues/177
# Sleeping before terminating the process avoids a case where
# terminating the process can take >30 seconds.
time.sleep(0.2)
proc.terminate()
# Actually wait for the process to finish. There doesn't seem to be a
# reliable way of checking if process ended or the timeout was reached,
# so kill the process afterwards to be sure.
proc.join(timeout=10)
proc.kill()