Skip to content

Commit

Permalink
Merge branch 'main' into zhiwei/asgi-header-decode
Browse files Browse the repository at this point in the history
  • Loading branch information
ChihweiLHBird committed Mar 19, 2023
2 parents bcd1788 + 5ee36fd commit c1d9e47
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 139 deletions.
2 changes: 1 addition & 1 deletion examples/http_redirect.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def proxy(request, path):
path=path,
_server=https.config.SERVER_NAME,
_external=True,
_scheme="http",
_scheme="https",
)
return response.redirect(url)

Expand Down
15 changes: 10 additions & 5 deletions sanic/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

from sanic.application.ext import setup_ext
from sanic.application.state import ApplicationState, ServerStage
from sanic.asgi import ASGIApp
from sanic.asgi import ASGIApp, Lifespan
from sanic.base.root import BaseSanic
from sanic.blueprint_group import BlueprintGroup
from sanic.blueprints import Blueprint
Expand Down Expand Up @@ -119,6 +119,7 @@ class Sanic(StaticHandleMixin, BaseSanic, StartupMixin, metaclass=TouchUpMeta):
)
__slots__ = (
"_asgi_app",
"_asgi_lifespan",
"_asgi_client",
"_blueprint_order",
"_delayed_tasks",
Expand Down Expand Up @@ -198,6 +199,8 @@ def __init__(
self.config.INSPECTOR = inspector

# Then we can do the rest
self._asgi_app: Optional[ASGIApp] = None
self._asgi_lifespan: Optional[Lifespan] = None
self._asgi_client: Any = None
self._blueprint_order: List[Blueprint] = []
self._delayed_tasks: List[str] = []
Expand Down Expand Up @@ -1349,12 +1352,14 @@ async def __call__(self, scope, receive, send):
three arguments: scope, receive, send. See the ASGI reference for more
details: https://asgi.readthedocs.io/en/latest
"""
self.asgi = True
if scope["type"] == "lifespan":
self.asgi = True
self.motd("")
self._asgi_app = await ASGIApp.create(self, scope, receive, send)
asgi_app = self._asgi_app
await asgi_app()
self._asgi_lifespan = Lifespan(self, scope, receive, send)
await self._asgi_lifespan()
else:
self._asgi_app = await ASGIApp.create(self, scope, receive, send)
await self._asgi_app()

_asgi_single_callable = True # We conform to ASGI 3.0 single-callable

Expand Down
200 changes: 95 additions & 105 deletions sanic/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,23 @@


class Lifespan:
def __init__(self, asgi_app: ASGIApp) -> None:
self.asgi_app = asgi_app
def __init__(
self, sanic_app, scope: ASGIScope, receive: ASGIReceive, send: ASGISend
) -> None:
self.sanic_app = sanic_app
self.scope = scope
self.receive = receive
self.send = send

if (
"server.init.before"
in self.asgi_app.sanic_app.signal_router.name_index
):
if "server.init.before" in self.sanic_app.signal_router.name_index:
logger.debug(
'You have set a listener for "before_server_start" '
"in ASGI mode. "
"It will be executed as early as possible, but not before "
"the ASGI server is started.",
extra={"verbosity": 1},
)
if (
"server.shutdown.after"
in self.asgi_app.sanic_app.signal_router.name_index
):
if "server.shutdown.after" in self.sanic_app.signal_router.name_index:
logger.debug(
'You have set a listener for "after_server_stop" '
"in ASGI mode. "
Expand All @@ -57,11 +56,11 @@ async def startup(self) -> None:
in sequence since the ASGI lifespan protocol only supports a single
startup event.
"""
await self.asgi_app.sanic_app._startup()
await self.asgi_app.sanic_app._server_event("init", "before")
await self.asgi_app.sanic_app._server_event("init", "after")
await self.sanic_app._startup()
await self.sanic_app._server_event("init", "before")
await self.sanic_app._server_event("init", "after")

if not isinstance(self.asgi_app.sanic_app.config.USE_UVLOOP, Default):
if not isinstance(self.sanic_app.config.USE_UVLOOP, Default):
warnings.warn(
"You have set the USE_UVLOOP configuration option, but Sanic "
"cannot control the event loop when running in ASGI mode."
Expand All @@ -77,35 +76,33 @@ async def shutdown(self) -> None:
in sequence since the ASGI lifespan protocol only supports a single
shutdown event.
"""
await self.asgi_app.sanic_app._server_event("shutdown", "before")
await self.asgi_app.sanic_app._server_event("shutdown", "after")
await self.sanic_app._server_event("shutdown", "before")
await self.sanic_app._server_event("shutdown", "after")

async def __call__(
self, scope: ASGIScope, receive: ASGIReceive, send: ASGISend
) -> None:
message = await receive()
if message["type"] == "lifespan.startup":
try:
await self.startup()
except Exception as e:
error_logger.exception(e)
await send(
{"type": "lifespan.startup.failed", "message": str(e)}
)
else:
await send({"type": "lifespan.startup.complete"})

message = await receive()
if message["type"] == "lifespan.shutdown":
try:
await self.shutdown()
except Exception as e:
error_logger.exception(e)
await send(
{"type": "lifespan.shutdown.failed", "message": str(e)}
)
else:
await send({"type": "lifespan.shutdown.complete"})
async def __call__(self) -> None:
while True:
message = await self.receive()
if message["type"] == "lifespan.startup":
try:
await self.startup()
except Exception as e:
error_logger.exception(e)
await self.send(
{"type": "lifespan.startup.failed", "message": str(e)}
)
else:
await self.send({"type": "lifespan.startup.complete"})
elif message["type"] == "lifespan.shutdown":
try:
await self.shutdown()
except Exception as e:
error_logger.exception(e)
await self.send(
{"type": "lifespan.shutdown.failed", "message": str(e)}
)
else:
await self.send({"type": "lifespan.shutdown.complete"})
return


class ASGIApp:
Expand All @@ -117,81 +114,74 @@ class ASGIApp:
stage: Stage
response: Optional[BaseHTTPResponse]

def __init__(self) -> None:
self.ws = None

@classmethod
async def create(
cls, sanic_app, scope: ASGIScope, receive: ASGIReceive, send: ASGISend
) -> "ASGIApp":
cls,
sanic_app: Sanic,
scope: ASGIScope,
receive: ASGIReceive,
send: ASGISend,
) -> ASGIApp:
instance = cls()
instance.ws = None
instance.sanic_app = sanic_app
instance.transport = MockTransport(scope, receive, send)
instance.transport.loop = sanic_app.loop
instance.stage = Stage.IDLE
instance.response = None
instance.sanic_app.state.is_started = True
setattr(instance.transport, "add_task", sanic_app.loop.create_task)

try:
headers = Header(
[
(
key.decode("ASCII"),
value.decode(errors="surrogateescape"),
)
for key, value in scope.get("headers", [])
]
)
except UnicodeDecodeError:
raise BadRequest(
"Header names can only contain US-ASCII characters"
)
instance.lifespan = Lifespan(instance)

if scope["type"] == "lifespan":
await instance.lifespan(scope, receive, send)
else:
path = (
scope["path"][1:]
if scope["path"].startswith("/")
else scope["path"]
)
url = "/".join([scope.get("root_path", ""), quote(path)])
url_bytes = url.encode("latin-1")
url_bytes += b"?" + scope["query_string"]

if scope["type"] == "http":
version = scope["http_version"]
method = scope["method"]
elif scope["type"] == "websocket":
version = "1.1"
method = "GET"

instance.ws = instance.transport.create_websocket_connection(
send, receive
headers = Header(
[
(
key.decode("ASCII"),
value.decode(errors="surrogateescape"),
)
else:
raise ServerError("Received unknown ASGI scope")

request_class = sanic_app.request_class or Request
instance.request = request_class(
url_bytes,
headers,
version,
method,
instance.transport,
sanic_app,
)
instance.request.stream = instance
instance.request_body = True
instance.request.conn_info = ConnInfo(instance.transport)

await sanic_app.dispatch(
"http.lifecycle.request",
inline=True,
context={"request": instance.request},
fail_not_found=False,
for key, value in scope.get("headers", [])
]
)
path = (
scope["path"][1:]
if scope["path"].startswith("/")
else scope["path"]
)
url = "/".join([scope.get("root_path", ""), quote(path)])
url_bytes = url.encode("latin-1")
url_bytes += b"?" + scope["query_string"]

if scope["type"] == "http":
version = scope["http_version"]
method = scope["method"]
elif scope["type"] == "websocket":
version = "1.1"
method = "GET"

instance.ws = instance.transport.create_websocket_connection(
send, receive
)
else:
raise ServerError("Received unknown ASGI scope")

request_class = sanic_app.request_class or Request
instance.request = request_class(
url_bytes,
headers,
version,
method,
instance.transport,
sanic_app,
)
instance.request.stream = instance # type: ignore
instance.request_body = True
instance.request.conn_info = ConnInfo(instance.transport)

await instance.sanic_app.dispatch(
"http.lifecycle.request",
inline=True,
context={"request": instance.request},
fail_not_found=False,
)

return instance

Expand Down
2 changes: 1 addition & 1 deletion sanic/mixins/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ def delete(
strict_slashes: Optional[bool] = None,
version: Optional[Union[int, str, float]] = None,
name: Optional[str] = None,
ignore_body: bool = True,
ignore_body: bool = False,
version_prefix: str = "/v",
error_format: Optional[str] = None,
**ctx_kwargs: Any,
Expand Down
33 changes: 30 additions & 3 deletions sanic/worker/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from random import choice
from signal import SIGINT, SIGTERM, Signals
from signal import signal as signal_func
from typing import Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional

from sanic.compat import OS_IS_WINDOWS
from sanic.exceptions import ServerKilled
Expand Down Expand Up @@ -54,9 +54,36 @@ def __init__(
signal_func(SIGINT, self.shutdown_signal)
signal_func(SIGTERM, self.shutdown_signal)

def manage(self, ident, func, kwargs, transient=False) -> Worker:
def manage(
self,
ident: str,
func: Callable[..., Any],
kwargs: Dict[str, Any],
transient: bool = False,
workers: int = 1,
) -> Worker:
"""
Instruct Sanic to manage a custom process.
:param ident: A name for the worker process
:type ident: str
:param func: The function to call in the background process
:type func: Callable[..., Any]
:param kwargs: Arguments to pass to the function
:type kwargs: Dict[str, Any]
:param transient: Whether to mark the process as transient. If True
then the Worker Manager will restart the process along
with any global restart (ex: auto-reload), defaults to False
:type transient: bool, optional
:param workers: The number of worker processes to run, defaults to 1
:type workers: int, optional
:return: The Worker instance
:rtype: Worker
"""
container = self.transient if transient else self.durable
worker = Worker(ident, func, kwargs, self.context, self.worker_state)
worker = Worker(
ident, func, kwargs, self.context, self.worker_state, workers
)
container[worker.ident] = worker
return worker

Expand Down
5 changes: 4 additions & 1 deletion sanic/worker/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,17 @@ def __init__(
server_settings,
context: BaseContext,
worker_state: Dict[str, Any],
num: int = 1,
):
self.ident = ident
self.num = num
self.context = context
self.serve = serve
self.server_settings = server_settings
self.worker_state = worker_state
self.processes: Set[WorkerProcess] = set()
self.create_process()
for _ in range(num):
self.create_process()

def create_process(self) -> WorkerProcess:
process = WorkerProcess(
Expand Down

0 comments on commit c1d9e47

Please sign in to comment.