-
Notifications
You must be signed in to change notification settings - Fork 8
/
asgi.py
151 lines (134 loc) · 5.99 KB
/
asgi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from functools import partial
from typing import AnyStr, cast, Optional, TYPE_CHECKING
import trio
from hypercorn.typing import (
ASGIReceiveCallable,
ASGISendCallable,
LifespanScope,
LifespanShutdownCompleteEvent,
LifespanShutdownFailedEvent,
LifespanStartupCompleteEvent,
LifespanStartupFailedEvent,
WebsocketScope,
)
from quart.asgi import ASGIHTTPConnection, ASGIWebsocketConnection
from quart.signals import websocket_received
from quart.wrappers import Request, Response, Websocket # noqa: F401
from werkzeug.datastructures import Headers
if TYPE_CHECKING:
from quart_trio import QuartTrio # noqa: F401
class TrioASGIHTTPConnection(ASGIHTTPConnection):
async def __call__(self, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
request = self._create_request_from_scope(send)
async with trio.open_nursery() as nursery:
nursery.start_soon(self.handle_messages, nursery, request, receive)
nursery.start_soon(self.handle_request, nursery, request, send)
async def handle_messages( # type: ignore
self, nursery: trio.Nursery, request: Request, receive: ASGIReceiveCallable
) -> None:
await super().handle_messages(request, receive)
nursery.cancel_scope.cancel()
async def handle_request( # type: ignore
self, nursery: trio.Nursery, request: Request, send: ASGISendCallable
) -> None:
response = await self.app.handle_request(request)
if isinstance(response, Response) and response.timeout != Ellipsis:
timeout = cast(Optional[float], response.timeout)
else:
timeout = self.app.config["RESPONSE_TIMEOUT"]
if timeout is not None:
with trio.move_on_after(timeout):
await self._send_response(send, response)
else:
await self._send_response(send, response)
nursery.cancel_scope.cancel()
class TrioASGIWebsocketConnection(ASGIWebsocketConnection):
def __init__(self, app: "QuartTrio", scope: WebsocketScope) -> None:
self.app = app
self.scope = scope
self._accepted = False
self._closed = False
self.send_channel, self.receive_channel = trio.open_memory_channel[AnyStr](10)
async def __call__(self, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
websocket = self._create_websocket_from_scope(send)
async with trio.open_nursery() as nursery:
nursery.start_soon(self.handle_messages, nursery, receive)
nursery.start_soon(self.handle_websocket, nursery, websocket, send)
async def handle_messages( # type: ignore
self, nursery: trio.Nursery, receive: ASGIReceiveCallable
) -> None:
while True:
event = await receive()
if event["type"] == "websocket.receive":
message = event.get("bytes") or event["text"]
await websocket_received.send(message)
await self.send_channel.send(message)
elif event["type"] == "websocket.disconnect":
break
nursery.cancel_scope.cancel()
def _create_websocket_from_scope(self, send: ASGISendCallable) -> Websocket:
headers = Headers()
headers["Remote-Addr"] = (self.scope.get("client") or ["<local>"])[0]
for name, value in self.scope["headers"]:
headers.add(name.decode().title(), value.decode())
return self.app.websocket_class(
self.scope["path"],
self.scope["query_string"],
self.scope["scheme"],
headers,
self.scope.get("root_path", ""),
self.scope.get("http_version", "1.1"),
list(self.scope.get("subprotocols", [])),
self.receive_channel.receive,
partial(self.send_data, send),
partial(self.accept_connection, send),
partial(self.close_connection, send),
self.scope,
)
async def handle_websocket( # type: ignore
self, nursery: trio.Nursery, websocket: Websocket, send: ASGISendCallable
) -> None:
await super().handle_websocket(websocket, send)
nursery.cancel_scope.cancel()
class TrioASGILifespan:
def __init__(self, app: "QuartTrio", scope: LifespanScope) -> None:
self.app = app
async def __call__(self, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
async with trio.open_nursery() as nursery:
self.app.nursery = nursery
while True:
event = await receive()
if event["type"] == "lifespan.startup":
try:
await self.app.startup()
except (Exception, trio.MultiError) as error:
await send(
cast(
LifespanStartupFailedEvent,
{"type": "lifespan.startup.failed", "message": str(error)},
)
)
else:
await send(
cast(
LifespanStartupCompleteEvent, {"type": "lifespan.startup.complete"}
)
)
elif event["type"] == "lifespan.shutdown":
try:
await self.app.shutdown()
except (Exception, trio.MultiError) as error:
await send(
cast(
LifespanShutdownFailedEvent,
{"type": "lifespan.shutdown.failed", "message": str(error)},
)
)
else:
await send(
cast(
LifespanShutdownCompleteEvent,
{"type": "lifespan.shutdown.complete"},
)
)
break