Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError during unit tests: Future attached to a different loop #68

Closed
hinzundcode opened this issue Aug 9, 2023 · 4 comments
Closed

Comments

@hinzundcode
Copy link

I tried to write pytest testcases for a FastAPI route that returns an event source/stream. The first test always succeedes, but the second test, despite identical content, consistently runs into an error: RuntimeError: Task ... got Future ... attached to a different loop. I'm not sure if the error is related to sse_starlette, fastapi, httpx or pytest-asyncio. I'll try it here first.

Here is a minimal example that can be used to reproduce the issue:

from httpx import AsyncClient
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio

app = FastAPI()

@app.post("/foo")
async def http_foo() -> EventSourceResponse:
    async def event_generator():
        yield {"data": "1"}
        yield {"data": "2"}
        yield {"data": "3"}
    return EventSourceResponse(event_generator())

def parse_event_stream(text):
    events = []
    for line in text.strip().split("\r\n\r\n"):
        events.append(line[len("data:"):].strip())
    return events

async def test_first():
    client = AsyncClient(app=app, base_url="http://test")
    async with client:
        response = await client.post("/foo")
    events = parse_event_stream(response.text)
    assert events == ["1", "2", "3"]

async def test_second():
    client = AsyncClient(app=app, base_url="http://test")
    async with client:
        response = await client.post("/foo")
    events = parse_event_stream(response.text)
    assert events == ["1", "2", "3"]

Versions used:

sse-starlette 1.6.1
fastapi 0.101.0
httpx 0.24.1
pytest 7.4.0
pytest-asyncio 0.21.1

My pytest config looks like this:

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

And here is the full error log:

$ poetry run pytest -vv -s -k sse
===================================================================== test session starts ======================================================================
platform darwin -- Python 3.8.15, pytest-7.4.0, pluggy-1.2.0 -- /Users/chris/Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/bin/python
cachedir: .pytest_cache
rootdir: /Users/chris/test-project
configfile: pyproject.toml
testpaths: tests
plugins: timeout-2.1.0, asyncio-0.21.1, mock-3.11.1, anyio-3.7.1
asyncio: mode=auto
collected 54 items / 52 deselected / 2 selected                                                                                                                

tests/test_sse.py::test_first PASSED
tests/test_sse.py::test_second FAILED

=========================================================================== FAILURES ===========================================================================
_________________________________________________________________________ test_second __________________________________________________________________________

    async def test_second():
        client = prepare_test()
        async with client:
>           response = await client.post("/foo")

tests/test_sse.py:33: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1848: in post
    return await self.request(
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1530: in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1617: in send
    response = await self._send_handling_auth(
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1645: in _send_handling_auth
    response = await self._send_handling_redirects(
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1682: in _send_handling_redirects
    response = await self._send_single_request(request)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1719: in _send_single_request
    response = await transport.handle_async_request(request)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_transports/asgi.py:162: in handle_async_request
    await self.app(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/fastapi/applications.py:289: in __call__
    await super().__call__(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/applications.py:122: in __call__
    await self.middleware_stack(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/errors.py:184: in __call__
    raise exc
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/errors.py:162: in __call__
    await self.app(scope, receive, _send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/exceptions.py:79: in __call__
    raise exc
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/exceptions.py:68: in __call__
    await self.app(scope, receive, sender)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py:20: in __call__
    raise e
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py:17: in __call__
    await self.app(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/routing.py:718: in __call__
    await route.handle(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/routing.py:276: in handle
    await self.app(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/routing.py:69: in app
    await response(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:251: in __call__
    await wrap(partial(self.listen_for_disconnect, receive))
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:597: in __aexit__
    raise exceptions[0]
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:240: in wrap
    await func()
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:215: in listen_for_exit_signal
    await AppStatus.should_exit_event.wait()
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:1778: in wait
    if await self._event.wait():
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asyncio.locks.Event object at 0x1346e4730 [unset]>

    async def wait(self):
        """Block until the internal flag is true.
    
        If the internal flag is true on entry, return True
        immediately.  Otherwise, block until another coroutine calls
        set() to set the flag to true, then return True.
        """
        if self._value:
            return True
    
        fut = self._loop.create_future()
        self._waiters.append(fut)
        try:
>           await fut
E           RuntimeError: Task <Task pending name='sse_starlette.sse.EventSourceResponse.__call__.<locals>.wrap' coro=<EventSourceResponse.__call__.<locals>.wrap() running at /Users/chris/Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:240> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/chris/Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:661]> got Future <Future pending> attached to a different loop

../.pyenv/versions/3.8.15/lib/python3.8/asyncio/locks.py:309: RuntimeError
=================================================================== short test summary info ====================================================================
FAILED tests/test_sse.py::test_second - RuntimeError: Task <Task pending name='sse_starlette.sse.EventSourceResponse.__call__.<locals>.wrap' coro=<EventSourceResponse.__call__.<locals>.wrap() run...
========================================================== 1 failed, 1 passed, 52 deselected in 1.39s ==========================================================
@sysid
Copy link
Owner

sysid commented Aug 9, 2023

I don't think this is sse-starlette related. It is sometime a bit tricky to have correct event-loop handling in combination with pytest.

If you face the issue outside pytest in regular app, please reopen.

@sysid sysid closed this as completed Aug 9, 2023
@dleen
Copy link

dleen commented Aug 15, 2023

I had the same issue, only with SSE, so what I had to was for any tests involving SSEs I launched the webapp in a subprocess and made calls using a real http client (not the test client).

@prashantgupta24
Copy link

@hinzundcode did you ever get to the bottom of this? I seem to be facing the same issue, thanks!

@prashantgupta24
Copy link

Started a discussion in FastAPI in case anyone's interested!
tiangolo/fastapi#10518

eyurtsev added a commit to langchain-ai/langserve that referenced this issue Nov 3, 2023
- Fix issue in stream/astream endpoint associated with addable types

- Have not been able to figure how to run sync unit tests yet with
pytest:
- fastapi app is async so there's an event loop that's created somewhere
  - within fastapi sse_starlette also deals with an event loop for the
    streaming endpoint
- This results in a failure when running all unit tests together
(sysid/sse-starlette#68)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants