Skip to content

Commit

Permalink
Merge branch 'main' of github.com:sanic-org/sanic into prry/json-resp…
Browse files Browse the repository at this point in the history
…onse
  • Loading branch information
prryplatypus committed Dec 8, 2022
2 parents 869d57a + d404116 commit 13562d8
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 94 deletions.
4 changes: 4 additions & 0 deletions sanic/exceptions.py
Expand Up @@ -8,6 +8,10 @@ class RequestCancelled(CancelledError):
quiet = True


class ServerKilled(Exception):
...


class SanicException(Exception):
message: str = ""

Expand Down
6 changes: 5 additions & 1 deletion sanic/http/http1.py
Expand Up @@ -16,6 +16,7 @@
PayloadTooLarge,
RequestCancelled,
ServerError,
ServiceUnavailable,
)
from sanic.headers import format_http1_response
from sanic.helpers import has_message_body
Expand Down Expand Up @@ -428,8 +429,11 @@ async def error_response(self, exception: Exception) -> None:
if self.request is None:
self.create_empty_request()

request_middleware = not isinstance(exception, ServiceUnavailable)
try:
await app.handle_exception(self.request, exception)
await app.handle_exception(
self.request, exception, request_middleware
)
except Exception as e:
await app.handle_exception(self.request, e, False)

Expand Down
6 changes: 6 additions & 0 deletions sanic/mixins/startup.py
Expand Up @@ -41,6 +41,7 @@
from sanic.application.state import ApplicationServerInfo, Mode, ServerStage
from sanic.base.meta import SanicMeta
from sanic.compat import OS_IS_WINDOWS, is_atty
from sanic.exceptions import ServerKilled
from sanic.helpers import Default
from sanic.http.constants import HTTP
from sanic.http.tls import get_ssl_context, process_to_context
Expand Down Expand Up @@ -740,6 +741,7 @@ def serve(
socks = []
sync_manager = Manager()
setup_ext(primary)
exit_code = 0
try:
primary_server_info.settings.pop("main_start", None)
primary_server_info.settings.pop("main_stop", None)
Expand Down Expand Up @@ -849,6 +851,8 @@ def serve(
trigger_events(ready, loop, primary)

manager.run()
except ServerKilled:
exit_code = 1
except BaseException:
kwargs = primary_server_info.settings
error_logger.exception(
Expand All @@ -874,6 +878,8 @@ def serve(
unix = kwargs.get("unix")
if unix:
remove_unix_socket(unix)
if exit_code:
os._exit(exit_code)

@classmethod
def serve_single(cls, primary: Optional[Sanic] = None) -> None:
Expand Down
5 changes: 4 additions & 1 deletion sanic/request.py
Expand Up @@ -104,6 +104,7 @@ class Request:
"_protocol",
"_remote_addr",
"_request_middleware_started",
"_response_middleware_started",
"_scheme",
"_socket",
"_stream_id",
Expand Down Expand Up @@ -179,6 +180,7 @@ def __init__(
Tuple[bool, bool, str, str], List[Tuple[str, str]]
] = defaultdict(list)
self._request_middleware_started = False
self._response_middleware_started = False
self.responded: bool = False
self.route: Optional[Route] = None
self.stream: Optional[Stream] = None
Expand Down Expand Up @@ -337,7 +339,8 @@ async def add_header(_, response: HTTPResponse):
middleware = (
self.route and self.route.extra.response_middleware
) or self.app.response_middleware
if middleware:
if middleware and not self._response_middleware_started:
self._response_middleware_started = True
response = await self.app._run_response_middleware(
self, response, middleware
)
Expand Down
36 changes: 30 additions & 6 deletions sanic/worker/manager.py
@@ -1,12 +1,11 @@
import os
import sys

from signal import SIGINT, SIGTERM, Signals
from signal import signal as signal_func
from time import sleep
from typing import List, Optional

from sanic.compat import OS_IS_WINDOWS
from sanic.exceptions import ServerKilled
from sanic.log import error_logger, logger
from sanic.worker.process import ProcessState, Worker, WorkerProcess

Expand All @@ -18,7 +17,7 @@


class WorkerManager:
THRESHOLD = 50
THRESHOLD = 300 # == 30 seconds

def __init__(
self,
Expand Down Expand Up @@ -130,13 +129,36 @@ def monitor(self):

def wait_for_ack(self): # no cov
misses = 0
message = (
"It seems that one or more of your workers failed to come "
"online in the allowed time. Sanic is shutting down to avoid a "
f"deadlock. The current threshold is {self.THRESHOLD / 10}s. "
"If this problem persists, please check out the documentation "
"___."
)
while not self._all_workers_ack():
sleep(0.1)
if self.monitor_subscriber.poll(0.1):
monitor_msg = self.monitor_subscriber.recv()
if monitor_msg != "__TERMINATE_EARLY__":
self.monitor_publisher.send(monitor_msg)
continue
misses = self.THRESHOLD
message = (
"One of your worker processes terminated before startup "
"was completed. Please solve any errors experienced "
"during startup. If you do not see an exception traceback "
"in your error logs, try running Sanic in in a single "
"process using --single-process or single_process=True. "
"Once you are confident that the server is able to start "
"without errors you can switch back to multiprocess mode."
)
misses += 1
if misses > self.THRESHOLD:
error_logger.error("Not all workers are ack. Shutting down.")
error_logger.error(
"Not all workers acknowledged a successful startup. "
"Shutting down.\n\n" + message
)
self.kill()
sys.exit(1)

@property
def workers(self):
Expand All @@ -156,7 +178,9 @@ def transient_processes(self):

def kill(self):
for process in self.processes:
logger.info("Killing %s [%s]", process.name, process.pid)
os.kill(process.pid, SIGKILL)
raise ServerKilled

def shutdown_signal(self, signal, frame):
logger.info("Received signal %s. Shutting down.", Signals(signal).name)
Expand Down
5 changes: 3 additions & 2 deletions sanic/worker/multiplexer.py
Expand Up @@ -28,8 +28,9 @@ def restart(self, name: str = ""):

reload = restart # no cov

def terminate(self):
self._monitor_publisher.send("__TERMINATE__")
def terminate(self, early: bool = False):
message = "__TERMINATE_EARLY__" if early else "__TERMINATE__"
self._monitor_publisher.send(message)

@property
def pid(self) -> int:
Expand Down
169 changes: 92 additions & 77 deletions sanic/worker/serve.py
@@ -1,6 +1,7 @@
import asyncio
import os
import socket
import warnings

from functools import partial
from multiprocessing.connection import Connection
Expand All @@ -10,6 +11,7 @@
from sanic.application.constants import ServerStage
from sanic.application.state import ApplicationServerInfo
from sanic.http.constants import HTTP
from sanic.log import error_logger
from sanic.models.server_types import Signal
from sanic.server.protocols.http_protocol import HttpProtocol
from sanic.server.runners import _serve_http_1, _serve_http_3
Expand Down Expand Up @@ -45,80 +47,93 @@ def worker_serve(
config=None,
passthru: Optional[Dict[str, Any]] = None,
):
from sanic import Sanic

if app_loader:
app = app_loader.load()
else:
app = Sanic.get_app(app_name)

app.refresh(passthru)
app.setup_loop()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Hydrate server info if needed
if server_info:
for app_name, server_info_objects in server_info.items():
a = Sanic.get_app(app_name)
if not a.state.server_info:
a.state.server_info = []
for info in server_info_objects:
if not info.settings.get("app"):
info.settings["app"] = a
a.state.server_info.append(info)

if isinstance(ssl, dict):
cert_loader = CertLoader(ssl)
ssl = cert_loader.load(app)
for info in app.state.server_info:
info.settings["ssl"] = ssl

# When in a worker process, do some init
if os.environ.get("SANIC_WORKER_NAME"):
# Hydrate apps with any passed server info

if monitor_publisher is None:
raise RuntimeError("No restart publisher found in worker process")
if worker_state is None:
raise RuntimeError("No worker state found in worker process")

# Run secondary servers
apps = list(Sanic._app_registry.values())
app.before_server_start(partial(app._start_servers, apps=apps))
for a in apps:
a.multiplexer = WorkerMultiplexer(monitor_publisher, worker_state)

if app.debug:
loop.set_debug(app.debug)

app.asgi = False

if app.state.server_info:
primary_server_info = app.state.server_info[0]
primary_server_info.stage = ServerStage.SERVING
if config:
app.update_config(config)

if version is HTTP.VERSION_3:
return _serve_http_3(host, port, app, loop, ssl)
return _serve_http_1(
host,
port,
app,
ssl,
sock,
unix,
reuse_port,
loop,
protocol,
backlog,
register_sys_signals,
run_multiple,
run_async,
connections,
signal,
state,
asyncio_server_kwargs,
)
try:
from sanic import Sanic

if app_loader:
app = app_loader.load()
else:
app = Sanic.get_app(app_name)

app.refresh(passthru)
app.setup_loop()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Hydrate server info if needed
if server_info:
for app_name, server_info_objects in server_info.items():
a = Sanic.get_app(app_name)
if not a.state.server_info:
a.state.server_info = []
for info in server_info_objects:
if not info.settings.get("app"):
info.settings["app"] = a
a.state.server_info.append(info)

if isinstance(ssl, dict):
cert_loader = CertLoader(ssl)
ssl = cert_loader.load(app)
for info in app.state.server_info:
info.settings["ssl"] = ssl

# When in a worker process, do some init
if os.environ.get("SANIC_WORKER_NAME"):
# Hydrate apps with any passed server info

if monitor_publisher is None:
raise RuntimeError(
"No restart publisher found in worker process"
)
if worker_state is None:
raise RuntimeError("No worker state found in worker process")

# Run secondary servers
apps = list(Sanic._app_registry.values())
app.before_server_start(partial(app._start_servers, apps=apps))
for a in apps:
a.multiplexer = WorkerMultiplexer(
monitor_publisher, worker_state
)

if app.debug:
loop.set_debug(app.debug)

app.asgi = False

if app.state.server_info:
primary_server_info = app.state.server_info[0]
primary_server_info.stage = ServerStage.SERVING
if config:
app.update_config(config)

if version is HTTP.VERSION_3:
return _serve_http_3(host, port, app, loop, ssl)
return _serve_http_1(
host,
port,
app,
ssl,
sock,
unix,
reuse_port,
loop,
protocol,
backlog,
register_sys_signals,
run_multiple,
run_async,
connections,
signal,
state,
asyncio_server_kwargs,
)
except Exception as e:
warnings.simplefilter("ignore", category=RuntimeWarning)
if monitor_publisher:
error_logger.exception(e)
multiplexer = WorkerMultiplexer(monitor_publisher, {})
multiplexer.terminate(True)
else:
raise e

0 comments on commit 13562d8

Please sign in to comment.