Skip to content

Commit

Permalink
Merge pull request #90 from blodow/feat/add_send_timeout
Browse files Browse the repository at this point in the history
[#89] Add SSE Send Timeout
  • Loading branch information
sysid committed Jan 25, 2024
2 parents 8c1e090 + 0ef461e commit e3d46d7
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 365 deletions.
608 changes: 251 additions & 357 deletions pdm.lock

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion sse_starlette/sse.py
Expand Up @@ -24,6 +24,10 @@
_log = logging.getLogger(__name__)


class SendTimeoutError(TimeoutError):
pass


# https://stackoverflow.com/questions/58133694/graceful-shutdown-of-uvicorn-starlette-app-with-websockets
class AppStatus:
"""helper for monkey-patching the signal-handler of uvicorn"""
Expand Down Expand Up @@ -171,6 +175,7 @@ def __init__(
data_sender_callable: Optional[
Callable[[], Coroutine[None, None, None]]
] = None,
send_timeout: Optional[float] = None,
) -> None:
if sep is not None and sep not in ["\r\n", "\r", "\n"]:
raise ValueError(f"sep must be one of: \\r\\n, \\r, \\n, got: {sep}")
Expand All @@ -187,6 +192,7 @@ def __init__(
self.media_type = self.media_type if media_type is None else media_type
self.background = background
self.data_sender_callable = data_sender_callable
self.send_timeout = send_timeout

_headers: dict[str, str] = {}
if headers is not None: # pragma: no cover
Expand Down Expand Up @@ -245,7 +251,13 @@ async def stream_response(self, send: Send) -> None:
async for data in self.body_iterator:
chunk = ensure_bytes(data, self.sep)
_log.debug(f"chunk: {chunk.decode()}")
await send({"type": "http.response.body", "body": chunk, "more_body": True})
with anyio.move_on_after(self.send_timeout) as timeout:
await send(
{"type": "http.response.body", "body": chunk, "more_body": True}
)
if timeout.cancel_called:
await self.body_iterator.aclose()
raise SendTimeoutError()

async with self._send_lock:
self.active = False
Expand Down
32 changes: 32 additions & 0 deletions tests/anyio_compat.py
@@ -0,0 +1,32 @@
import sys
from contextlib import contextmanager
from typing import Generator

# AnyIO v4 introduces a breaking change that groups all exceptions in a task
# group into an exception group.
# This file allows to be compatible with AnyIO <4 and >=4 by unwrapping groups
# if they only contain a single exception. It also supports python <3.11 (before
# exception groups support) and >=3.11.
# Solution as proposed in https://anyio.readthedocs.io/en/stable/migration.html

has_exceptiongroups = True
if sys.version_info < (3, 11):
try:
from exceptiongroup import BaseExceptionGroup
except ImportError:
has_exceptiongroups = False


@contextmanager
def collapse_excgroups() -> Generator[None, None, None]:
try:
yield
except BaseException as exc:
if has_exceptiongroups:
while isinstance(exc, BaseExceptionGroup) and len(exc.exceptions) == 1:
exc = exc.exceptions[0]

raise exc


__all__ = ["collapse_excgroups"]
44 changes: 44 additions & 0 deletions tests/integration/frozen_client.py
@@ -0,0 +1,44 @@
"""
https://github.com/sysid/sse-starlette/issues/89
Server Simulation:
Run with: uvicorn tests.integration.frozen_client:app
Client Simulation:
% curl -s -N localhost:8000/events > /dev/null
^Z (suspend process -> no consumption of messages but connection alive)
Measure resource consumption:
connections: lsof -i :8000
buffers: netstat -m
"""
import anyio
from starlette.applications import Starlette
from starlette.routing import Route

from sse_starlette import EventSourceResponse
import uvicorn


async def events(request):
async def _event_generator():
try:
i = 0
while True:
i += 1
if i % 100 == 0:
print(i)
yield dict(data={i: " " * 4096})
await anyio.sleep(0.001)
finally:
print("disconnected")
return EventSourceResponse(_event_generator(), send_timeout=10)

app = Starlette(
debug=True,
routes=[
Route("/events", events),
],
)

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="trace", log_config=None) # type: ignore
45 changes: 39 additions & 6 deletions tests/test_event_source_response.py
Expand Up @@ -6,9 +6,12 @@
import anyio
import anyio.lowlevel
import pytest
from sse_starlette import EventSourceResponse
from starlette.testclient import TestClient

from sse_starlette import EventSourceResponse
from sse_starlette.sse import SendTimeoutError
from tests.anyio_compat import collapse_excgroups

_log = logging.getLogger(__name__)


Expand Down Expand Up @@ -124,19 +127,49 @@ async def receive():
await anyio.lowlevel.checkpoint()
return {"type": "something"}

response = EventSourceResponse(event_publisher(), ping=1)
with pytest.raises(anyio.WouldBlock) as e:
response = EventSourceResponse(event_publisher(), ping=1)
with collapse_excgroups():
await response({}, receive, send)

await response({}, receive, send)


def test_header_charset():
def test_header_charset(reset_appstatus_event):
async def numbers(minimum, maximum):
for i in range(minimum, maximum + 1):
await asyncio.sleep(0.1)
await anyio.sleep(0.1)
yield i

generator = numbers(1, 5)
response = EventSourceResponse(generator, ping=0.2) # type: ignore
content_type = [h for h in response.raw_headers if h[0].decode() == "content-type"]
assert content_type == [(b"content-type", b"text/event-stream; charset=utf-8")]


@pytest.mark.anyio
async def test_send_timeout(reset_appstatus_event):
# Timeout is set to 0.5s, but `send` will take 1s. Expect SendTimeoutError.
cleanup = False

async def event_publisher():
try:
yield {"event": "some", "data": "any"}
assert False # never reached
finally:
nonlocal cleanup
cleanup = True

async def send(*args, **kwargs):
await anyio.sleep(1.0)

async def receive():
await anyio.lowlevel.checkpoint()
return {"type": "something"}

response = EventSourceResponse(event_publisher(), send_timeout=0.5)
with pytest.raises(SendTimeoutError):
with collapse_excgroups():
await response({}, receive, send)

assert cleanup


2 changes: 1 addition & 1 deletion tox.ini
Expand Up @@ -7,7 +7,7 @@
[tox]
minversion = 3.8.0
isolated_build = True
envlist = py38,py39,py310,py311
envlist = py38,py39,py310,py311,py312

[gh-actions]
python =
Expand Down

0 comments on commit e3d46d7

Please sign in to comment.