Skip to content

Commit

Permalink
Alter the process and reloading system
Browse files Browse the repository at this point in the history
Rather than using a single process with the reloader and restarting
the entire program there is now a main process that checks for changes
and restarts the process workers if so. This should resolve the
various bugs trying to restart the entire program.

A downside is that there is now always at least two processes and it
is harder to use a reloader when using the API method to serve an app.

Note the application must be loaded in the main process so that the
observer knows what files to check for changes.
  • Loading branch information
pgjones committed Aug 29, 2022
1 parent eff8fb0 commit 67ff9c9
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 106 deletions.
12 changes: 0 additions & 12 deletions src/hypercorn/asyncio/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@
from ..utils import (
check_multiprocess_shutdown_event,
load_application,
MustReloadError,
observe_changes,
raise_shutdown,
repr_socket_addr,
restart,
ShutdownError,
)

Expand Down Expand Up @@ -75,7 +72,6 @@ def _signal_handler(*_: Any) -> None: # noqa: N803
shutdown_trigger = signal_event.wait # type: ignore

lifespan = Lifespan(app, config, loop)
reload_ = False

lifespan_task = loop.create_task(lifespan.handle_lifespan())
await lifespan.wait_for_startup()
Expand Down Expand Up @@ -143,17 +139,12 @@ async def _server_callback(reader: asyncio.StreamReader, writer: asyncio.StreamW

tasks.append(loop.create_task(raise_shutdown(shutdown_trigger)))

if config.use_reloader:
tasks.append(loop.create_task(observe_changes(asyncio.sleep)))

try:
if len(tasks):
gathered_tasks = asyncio.gather(*tasks)
await gathered_tasks
else:
loop.run_forever()
except MustReloadError:
reload_ = True
except (ShutdownError, KeyboardInterrupt):
pass
finally:
Expand Down Expand Up @@ -181,9 +172,6 @@ async def _server_callback(reader: asyncio.StreamReader, writer: asyncio.StreamW
lifespan_task.cancel()
await lifespan_task

if reload_:
restart()


def asyncio_worker(
config: Config, sockets: Optional[Sockets] = None, shutdown_event: Optional[EventType] = None
Expand Down
78 changes: 44 additions & 34 deletions src/hypercorn/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import signal
import time
from multiprocessing import Event, Process
from typing import Any
from multiprocessing.synchronize import Event as EventType
from typing import Any, List

from .config import Config
from .config import Config, Sockets
from .typing import WorkerFunc
from .utils import write_pid_file
from .utils import load_application, wait_for_changes, write_pid_file


def run(config: Config) -> None:
Expand All @@ -31,44 +32,36 @@ def run(config: Config) -> None:
else:
raise ValueError(f"No worker of class {config.worker_class} exists")

if config.workers == 1:
worker_func(config)
else:
run_multiple(config, worker_func)


def run_multiple(config: Config, worker_func: WorkerFunc) -> None:
if config.use_reloader:
raise RuntimeError("Reloader can only be used with a single worker")

sockets = config.create_sockets()

processes = []
active = True
while active:
# Ignore SIGINT before creating the processes, so that they
# inherit the signal handling. This means that the shutdown
# function controls the shutdown.
signal.signal(signal.SIGINT, signal.SIG_IGN)

# Ignore SIGINT before creating the processes, so that they
# inherit the signal handling. This means that the shutdown
# function controls the shutdown.
signal.signal(signal.SIGINT, signal.SIG_IGN)
shutdown_event = Event()
processes = start_processes(config, worker_func, sockets, shutdown_event)

shutdown_event = Event()
def shutdown(*args: Any) -> None:
nonlocal active, shutdown_event
shutdown_event.set()
active = False

for _ in range(config.workers):
process = Process(
target=worker_func,
kwargs={"config": config, "shutdown_event": shutdown_event, "sockets": sockets},
)
process.daemon = True
process.start()
processes.append(process)
if platform.system() == "Windows":
time.sleep(0.1)
for signal_name in {"SIGINT", "SIGTERM", "SIGBREAK"}:
if hasattr(signal, signal_name):
signal.signal(getattr(signal, signal_name), shutdown)

def shutdown(*args: Any) -> None:
shutdown_event.set()
if config.use_reloader:
# Reload the application so that the correct (new) paths
# are checked for changes.
load_application(config.application_path, config.wsgi_max_body_size)

for signal_name in {"SIGINT", "SIGTERM", "SIGBREAK"}:
if hasattr(signal, signal_name):
signal.signal(getattr(signal, signal_name), shutdown)
wait_for_changes(shutdown_event)
shutdown_event.set()
else:
active = False

for process in processes:
process.join()
Expand All @@ -79,3 +72,20 @@ def shutdown(*args: Any) -> None:
sock.close()
for sock in sockets.insecure_sockets:
sock.close()


def start_processes(
config: Config, worker_func: WorkerFunc, sockets: Sockets, shutdown_event: EventType
) -> List[Process]:
processes = []
for _ in range(config.workers):
process = Process(
target=worker_func,
kwargs={"config": config, "shutdown_event": shutdown_event, "sockets": sockets},
)
process.daemon = True
process.start()
processes.append(process)
if platform.system() == "Windows":
time.sleep(0.1)
return processes
14 changes: 0 additions & 14 deletions src/hypercorn/trio/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
from ..utils import (
check_multiprocess_shutdown_event,
load_application,
MustReloadError,
observe_changes,
raise_shutdown,
repr_socket_addr,
restart,
ShutdownError,
)

Expand All @@ -36,7 +33,6 @@ async def worker_serve(
config.set_statsd_logger_class(StatsdLogger)

lifespan = Lifespan(app, config)
reload_ = False
context = WorkerContext()

async with trio.open_nursery() as lifespan_nursery:
Expand Down Expand Up @@ -80,9 +76,6 @@ async def worker_serve(
task_status.started(binds)
try:
async with trio.open_nursery() as nursery:
if config.use_reloader:
nursery.start_soon(observe_changes, trio.sleep)

if shutdown_trigger is not None:
nursery.start_soon(raise_shutdown, shutdown_trigger)

Expand All @@ -96,10 +89,6 @@ async def worker_serve(
)

await trio.sleep_forever()
except trio.MultiError as error:
reload_ = any(isinstance(exc, MustReloadError) for exc in error.exceptions)
except MustReloadError:
reload_ = True
except (ShutdownError, KeyboardInterrupt):
pass
finally:
Expand All @@ -109,9 +98,6 @@ async def worker_serve(
await lifespan.wait_for_shutdown()
lifespan_nursery.cancel_scope.cancel()

if reload_:
restart()


def trio_worker(
config: Config, sockets: Optional[Sockets] = None, shutdown_event: Optional[EventType] = None
Expand Down
53 changes: 7 additions & 46 deletions src/hypercorn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import inspect
import os
import platform
import socket
import sys
import time
from enum import Enum
from importlib import import_module
from multiprocessing.synchronize import Event as EventType
Expand All @@ -23,10 +23,6 @@ class ShutdownError(Exception):
pass


class MustReloadError(Exception):
pass


class NoAppError(Exception):
pass

Expand Down Expand Up @@ -119,7 +115,7 @@ def load_application(path: str, wsgi_max_body_size: int) -> AppWrapper:
return WSGIWrapper(app, wsgi_max_body_size)


async def observe_changes(sleep: Callable[[float], Awaitable[Any]]) -> None:
def wait_for_changes(shutdown_event: EventType) -> None:
last_updates: Dict[Path, float] = {}
for module in list(sys.modules.values()):
filename = getattr(module, "__file__", None)
Expand All @@ -131,60 +127,25 @@ async def observe_changes(sleep: Callable[[float], Awaitable[Any]]) -> None:
except (FileNotFoundError, NotADirectoryError):
pass

while True:
await sleep(1)
while not shutdown_event.is_set():
time.sleep(1)

for index, (path, last_mtime) in enumerate(last_updates.items()):
if index % 10 == 0:
# Yield to the event loop
await sleep(0)
time.sleep(0)

try:
mtime = path.stat().st_mtime
except FileNotFoundError:
# File deleted
raise MustReloadError()
return
else:
if mtime > last_mtime:
raise MustReloadError()
return
else:
last_updates[path] = mtime


def restart() -> None:
# Restart this process (only safe for dev/debug)
executable = sys.executable
script_path = Path(sys.argv[0]).resolve()
args = sys.argv[1:]
main_package = sys.modules["__main__"].__package__

if main_package is None:
# Executed by filename
if platform.system() == "Windows":
if not script_path.exists() and script_path.with_suffix(".exe").exists():
# quart run
executable = str(script_path.with_suffix(".exe"))
else:
# python run.py
args.append(str(script_path))
else:
if script_path.is_file() and os.access(script_path, os.X_OK):
# hypercorn run:app --reload
executable = str(script_path)
else:
# python run.py
args.append(str(script_path))
else:
# Executed as a module e.g. python -m run
module = script_path.stem
import_name = main_package
if module != "__main__":
import_name = f"{main_package}.{module}"
args[:0] = ["-m", import_name.lstrip(".")]

os.execv(executable, [executable] + args)


async def raise_shutdown(shutdown_event: Callable[..., Awaitable[None]]) -> None:
await shutdown_event()
raise ShutdownError()
Expand Down

0 comments on commit 67ff9c9

Please sign in to comment.