Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaner process management #2811

Merged
merged 13 commits into from
Dec 7, 2023
14 changes: 14 additions & 0 deletions sanic/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2383,6 +2383,20 @@
if hasattr(self, "multiplexer"):
self.multiplexer.ack()

def set_serving(self, serving: bool) -> None:
"""Set the serving state of the application.

This method is used to set the serving state of the application.
It is used internally by Sanic and should not typically be called
manually.

Args:
serving (bool): Whether the application is serving.
"""
self.state.is_running = serving
if hasattr(self, "multiplexer"):
self.multiplexer.set_serving(serving)

Check warning on line 2398 in sanic/app.py

View check run for this annotation

Codecov / codecov/patch

sanic/app.py#L2398

Added line #L2398 was not covered by tests

async def _server_event(
self,
concern: str,
Expand Down
22 changes: 21 additions & 1 deletion sanic/mixins/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from pathlib import Path
from socket import SHUT_RDWR, socket
from ssl import SSLContext
from time import sleep
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -60,6 +61,7 @@
from sanic.server import try_use_uvloop
from sanic.server.async_server import AsyncioServer
from sanic.server.events import trigger_events
from sanic.server.goodbye import get_goodbye
from sanic.server.loop import try_windows_loop
from sanic.server.protocols.http_protocol import HttpProtocol
from sanic.server.protocols.websocket_protocol import WebSocketProtocol
Expand Down Expand Up @@ -1146,7 +1148,6 @@
app.router.reset()
app.signal_router.reset()

sync_manager.shutdown()
for sock in socks:
try:
sock.shutdown(SHUT_RDWR)
Expand All @@ -1158,12 +1159,31 @@
loop.close()
cls._cleanup_env_vars()
cls._cleanup_apps()

limit = 100
while cls._get_process_states(worker_state):
sleep(0.1)
limit -= 1

Check warning on line 1166 in sanic/mixins/startup.py

View check run for this annotation

Codecov / codecov/patch

sanic/mixins/startup.py#L1165-L1166

Added lines #L1165 - L1166 were not covered by tests
if limit <= 0:
error_logger.warning(

Check warning on line 1168 in sanic/mixins/startup.py

View check run for this annotation

Codecov / codecov/patch

sanic/mixins/startup.py#L1168

Added line #L1168 was not covered by tests
"Worker shutdown timed out. "
"Some processes may still be running."
)
break

Check warning on line 1172 in sanic/mixins/startup.py

View check run for this annotation

Codecov / codecov/patch

sanic/mixins/startup.py#L1172

Added line #L1172 was not covered by tests
sync_manager.shutdown()
unix = kwargs.get("unix")
if unix:
remove_unix_socket(unix)
logger.debug(get_goodbye())
if exit_code:
os._exit(exit_code)

@staticmethod
def _get_process_states(worker_state) -> List[str]:
return [
state for s in worker_state.values() if (state := s.get("state"))
]

@classmethod
def serve_single(cls, primary: Optional[Sanic] = None) -> None:
"""Serve a single process of a Sanic application.
Expand Down
31 changes: 31 additions & 0 deletions sanic/server/goodbye.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# flake8: noqa: E501

import random
import sys


# fmt: off
ascii_phrases = {
'Farewell', 'Bye', 'See you later', 'Take care', 'So long', 'Adieu', 'Cheerio',
'Goodbye', 'Adios', 'Au revoir', 'Arrivederci', 'Sayonara', 'Auf Wiedersehen',
'Do svidaniya', 'Annyeong', 'Tot ziens', 'Ha det', 'Selamat tinggal',
'Hasta luego', 'Nos vemos', 'Salut', 'Ciao', 'A presto',
'Dag', 'Tot later', 'Vi ses', 'Sampai jumpa',
}

non_ascii_phrases = {
'Tschüss', 'Zài jiàn', 'Bāi bāi', 'Míngtiān jiàn', 'Adeus', 'Tchau', 'Até logo',
'Hejdå', 'À bientôt', 'Bis später', 'Adjø',
'じゃね', 'またね', '안녕히 계세요', '잘 가', 'שלום',
'להתראות', 'مع السلامة', 'إلى اللقاء', 'وداعاً', 'अलविदा',
'फिर मिलेंगे',
}

all_phrases = ascii_phrases | non_ascii_phrases
# fmt: on


def get_goodbye() -> str: # pragma: no cover
is_utf8 = sys.stdout.encoding.lower() == "utf-8"
phrases = all_phrases if is_utf8 else ascii_phrases
return random.choice(list(phrases)) # nosec: B311
16 changes: 8 additions & 8 deletions sanic/server/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,15 @@ def _setup_system_signals(
register_sys_signals: bool,
loop: asyncio.AbstractEventLoop,
) -> None: # no cov
# Ignore SIGINT when run_multiple
if run_multiple:
signal_func(SIGINT, SIG_IGN)
os.environ["SANIC_WORKER_PROCESS"] = "true"

signal_func(SIGINT, SIG_IGN)
signal_func(SIGTERM, SIG_IGN)
os.environ["SANIC_WORKER_PROCESS"] = "true"
# Register signals for graceful termination
if register_sys_signals:
if OS_IS_WINDOWS:
ctrlc_workaround_for_windows(app)
else:
for _signal in [SIGTERM] if run_multiple else [SIGINT, SIGTERM]:
for _signal in [SIGINT, SIGTERM]:
loop.add_signal_handler(
_signal, partial(app.stop, terminate=False)
)
Expand All @@ -180,8 +178,6 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix):
try:
server_logger.info("Starting worker [%s]", pid)
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server_logger.info("Stopping worker [%s]", pid)

Expand All @@ -193,6 +189,7 @@ def _run_server_forever(loop, before_stop, after_stop, cleanup, unix):
loop.run_until_complete(after_stop())
remove_unix_socket(unix)
loop.close()
server_logger.info("Worker complete [%s]", pid)


def _serve_http_1(
Expand Down Expand Up @@ -296,8 +293,11 @@ def _cleanup():
else:
conn.abort()

app.set_serving(False)

_setup_system_signals(app, run_multiple, register_sys_signals, loop)
loop.run_until_complete(app._server_event("init", "after"))
app.set_serving(True)
_run_server_forever(
loop,
partial(app._server_event, "shutdown", "before"),
Expand Down
6 changes: 6 additions & 0 deletions sanic/worker/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def run(self):
self.monitor()
self.join()
self.terminate()
self.cleanup()

def start(self):
"""Start the worker processes."""
Expand Down Expand Up @@ -182,6 +183,11 @@ def terminate(self):
for process in self.processes:
process.terminate()

def cleanup(self):
"""Cleanup the worker processes."""
for process in self.processes:
process.exit()

def restart(
self,
process_names: Optional[List[str]] = None,
Expand Down
18 changes: 18 additions & 0 deletions sanic/worker/multiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@
"state": ProcessState.ACKED.name,
}

def set_serving(self, serving: bool) -> None:
"""Set the worker to serving.

Args:
serving (bool): Whether the worker is serving.
"""
self._state._state[self.name] = {

Check warning on line 48 in sanic/worker/multiplexer.py

View check run for this annotation

Codecov / codecov/patch

sanic/worker/multiplexer.py#L48

Added line #L48 was not covered by tests
**self._state._state[self.name],
"serving": serving,
}

def exit(self):
"""Run cleanup at worker exit."""
try:
del self._state._state[self.name]
except ConnectionRefusedError:
logger.debug("Monitor process has already exited.")

Check warning on line 58 in sanic/worker/multiplexer.py

View check run for this annotation

Codecov / codecov/patch

sanic/worker/multiplexer.py#L55-L58

Added lines #L55 - L58 were not covered by tests

def restart(
self,
name: str = "",
Expand Down
15 changes: 14 additions & 1 deletion sanic/worker/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@
self.set_state(ProcessState.JOINED)
self._current_process.join()

def exit(self):
limit = 100
while self.is_alive() and limit > 0:
sleep(0.1)
limit -= 1

if not self.is_alive():
try:
del self.worker_state[self.name]
except ConnectionRefusedError:
logger.debug("Monitor process has already exited.")
except KeyError:
logger.debug("Could not find worker state to delete.")

Check warning on line 82 in sanic/worker/process.py

View check run for this annotation

Codecov / codecov/patch

sanic/worker/process.py#L80-L82

Added lines #L80 - L82 were not covered by tests

def terminate(self):
if self.state is not ProcessState.TERMINATED:
logger.debug(
Expand All @@ -79,7 +93,6 @@
self.set_state(ProcessState.TERMINATED, force=True)
try:
os.kill(self.pid, SIGINT)
del self.worker_state[self.name]
except (KeyError, AttributeError, ProcessLookupError):
...

Expand Down
2 changes: 1 addition & 1 deletion tests/test_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def raised_ceiling():
# Chrome, Firefox:
# Content-Disposition: form-data; name="foo%22;bar\"; filename="😀"
'form-data; name="foo%22;bar\\"; filename="😀"',
("form-data", {"name": 'foo";bar\\', "filename": "😀"})
("form-data", {"name": 'foo";bar\\', "filename": "😀"}),
# cgi: ('form-data', {'name': 'foo%22;bar"; filename="😀'})
# werkzeug (pre 2.3.0): ('form-data', {'name': 'foo%22;bar"; filename='})
),
Expand Down
4 changes: 2 additions & 2 deletions tests/test_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ def test_stack_trace_on_not_found(app, static_file_directory, caplog):
assert counter[("sanic.root", logging.INFO)] == 10
assert counter[("sanic.root", logging.ERROR)] == 0
assert counter[("sanic.error", logging.ERROR)] == 0
assert counter[("sanic.server", logging.INFO)] == 2
assert counter[("sanic.server", logging.INFO)] == 3


def test_no_stack_trace_on_not_found(app, static_file_directory, caplog):
Expand All @@ -539,7 +539,7 @@ async def file_not_found(request, exception):
assert counter[("sanic.root", logging.INFO)] == 10
assert counter[("sanic.root", logging.ERROR)] == 0
assert counter[("sanic.error", logging.ERROR)] == 0
assert counter[("sanic.server", logging.INFO)] == 2
assert counter[("sanic.server", logging.INFO)] == 3
assert response.text == "No file: /static/non_existing_file.file"


Expand Down
2 changes: 1 addition & 1 deletion tests/typing/samples/request_custom_sanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class CustomConfig(Config):

@app.get("/")
async def handler(
request: Request[Sanic[CustomConfig, SimpleNamespace], SimpleNamespace]
request: Request[Sanic[CustomConfig, SimpleNamespace], SimpleNamespace],
):
reveal_type(request.ctx)
reveal_type(request.app)
4 changes: 3 additions & 1 deletion tests/worker/test_reloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ def start(self):
monkeypatch.setattr(threading.Thread, "start", orig)


def test_reloader_triggers_start_stop_listeners(app: Sanic, app_loader: AppLoader):
def test_reloader_triggers_start_stop_listeners(
app: Sanic, app_loader: AppLoader
):
results = []

@app.reload_process_start
Expand Down
7 changes: 4 additions & 3 deletions tests/worker/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ def test_run_server_forever(remove_unix_socket: Mock, do_cleanup: bool):
after_stop.return_value = Mock()
unix = Mock()

_run_server_forever(
loop, before_stop, after_stop, cleanup if do_cleanup else None, unix
)
with pytest.raises(KeyboardInterrupt):
_run_server_forever(
loop, before_stop, after_stop, cleanup if do_cleanup else None, unix
)

loop.run_forever.assert_called_once_with()
loop.run_until_complete.assert_has_calls(
Expand Down
Loading