Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferring a job from FastAPI route: The future belongs to a different loop than the one specified as the loop argument #852

Open
duc00 opened this issue Oct 19, 2023 · 6 comments
Labels
Issue contains: Exploration & Design decisions 🤯 We don't know how this will be implemented yet Issue contains: Some Python 🐍 This issue involves writing some Python code Issue status: Blocked ⛔️ Issue cannot be processed for now Issue type: Bug 🐞 Something isn't working

Comments

@duc00
Copy link

duc00 commented Oct 19, 2023

Hi,

I am trying to defer a job from a FastAPI route, and ends up with asyncio issues. FastAPI also uses asyncio at its core.

My code looks roughly like this:

FastAPI route:

@router.post("")
async def new_job_alert(request: NewJobAlertRequest) -> dict[str, Any]:
     # Schedule job
     await job_alert_iteration.defer_async(job_alert_id=1)

    return {"message": "OK"}

My FastAPI worker is Uvicorn.

Procrastinate task:

app = App(
    connector=AiopgConnector({... my params}),
)
app.open()

@task_queue_app.task(name="job_alert_iteration")
def job_alert_iteration(job_alert_id: EntityId) -> None:
    do_something()

I end up with the following Exception:

Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py", line 426, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/usr/local/lib/python3.10/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
    return await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/fastapi/applications.py", line 289, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.10/site-packages/prometheus_fastapi_instrumentator/middleware.py", line 169, in __call__
    raise exc
  File "/usr/local/lib/python3.10/site-packages/prometheus_fastapi_instrumentator/middleware.py", line 167, in __call__
    await self.app(scope, receive, send_wrapper)
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
    raise e
  File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
    response = await func(request)
  File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 273, in app
    raw_response = await run_endpoint_function(
  File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 190, in run_endpoint_function
    return await dependant.call(**values)
  File "/app/src/presentation/api/api_v1/endpoints/job_alert.py", line 36, in new_job_alert
    await job_alert_iteration.defer_async(job_alert_id=new_job_alert_id)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/tasks.py", line 121, in defer_async
    return await self.configure().defer_async(**task_kwargs)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/jobs.py", line 153, in defer_async
    job = await self.job_manager.defer_job_async(job=job)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/manager.py", line 39, in defer_job_async
    result = await self.connector.execute_query_one_async(
  File "/usr/local/lib/python3.10/site-packages/procrastinate/aiopg_connector.py", line 31, in wrapped
    return await coro(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/aiopg_connector.py", line 67, in wrapped
    return await coro(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/aiopg_connector.py", line 269, in execute_query_one_async
    await cursor.execute(query, self._wrap_json(arguments))
  File "/usr/local/lib/python3.10/site-packages/aiopg/connection.py", line 426, in execute
    await self._conn._poll(waiter, timeout)
  File "/usr/local/lib/python3.10/site-packages/aiopg/connection.py", line 881, in _poll
    await asyncio.wait_for(self._waiter, timeout)
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 426, in wait_for
    fut = ensure_future(fut, loop=loop)
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 615, in ensure_future
    return _ensure_future(coro_or_future, loop=loop)
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 621, in _ensure_future
    raise ValueError('The future belongs to a different loop than '
ValueError: The future belongs to a different loop than the one specified as the loop argument

I tried using the sync version defer without await, but no chance, I also end up with another asyncio exception:

Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py", line 426, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/usr/local/lib/python3.10/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
    return await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/fastapi/applications.py", line 289, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.10/site-packages/prometheus_fastapi_instrumentator/middleware.py", line 169, in __call__
    raise exc
  File "/usr/local/lib/python3.10/site-packages/prometheus_fastapi_instrumentator/middleware.py", line 167, in __call__
    await self.app(scope, receive, send_wrapper)
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
    raise e
  File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
    response = await func(request)
  File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 273, in app
    raw_response = await run_endpoint_function(
  File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 190, in run_endpoint_function
    return await dependant.call(**values)
  File "/app/src/presentation/api/api_v1/endpoints/job_alert.py", line 36, in new_job_alert
    job_alert_iteration.defer(job_alert_id=new_job_alert_id)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/tasks.py", line 129, in defer
    return self.configure().defer(**task_kwargs)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/jobs.py", line 162, in defer
    job = self.job_manager.defer_job(job=job)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/manager.py", line 52, in defer_job
    result = self.connector.execute_query_one(
  File "/usr/local/lib/python3.10/site-packages/procrastinate/utils.py", line 149, in wrapper
    return sync_await(awaitable=awaitable)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/utils.py", line 200, in sync_await
    return loop.run_until_complete(awaitable)
  File "uvloop/loop.pyx", line 1511, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1504, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1377, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 518, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.

Please note that I do see the job created in the procrastinate_jobs table.

Thanks for your help!

@ewjoachim
Copy link
Member

ewjoachim commented Oct 19, 2023

  File "/app/src/presentation/api/api_v1/endpoints/job_alert.py", line 36, in new_job_alert
    job_alert_iteration.defer(job_alert_id=new_job_alert_id)

You're in an async project, use await defer_async() instead of defer and you should not experience this kind of issues. Let me know if it solves your problem.

EDIT Damn sorry, read too fast, missed the sentence between the 2 blocks. I'll re-read and re-reply

@ewjoachim
Copy link
Member

I'm seeing you're using part of the sync api though, by calling app.open() instead of await app.open_async() and defining you task a a function instead of a coroutine. Would you mind trying to use only the async API all the way through and report if that fixes the issue ?

@duc00
Copy link
Author

duc00 commented Oct 19, 2023

Hey @ewjoachim, thanks for the reply, will do!

@duc00
Copy link
Author

duc00 commented Oct 20, 2023

So I thought a bit about the problem. My goal would be to use the sync API only. All my logic is sync. So I also modified the FastAPI route to be a sync def endpoint. I ran the endpoint and got the following exception:

Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py", line 426, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/usr/local/lib/python3.10/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
    return await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/fastapi/applications.py", line 289, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.10/site-packages/prometheus_fastapi_instrumentator/middleware.py", line 169, in __call__
    raise exc
  File "/usr/local/lib/python3.10/site-packages/prometheus_fastapi_instrumentator/middleware.py", line 167, in __call__
    await self.app(scope, receive, send_wrapper)
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
    raise e
  File "/usr/local/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
    response = await func(request)
  File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 273, in app
    raw_response = await run_endpoint_function(
  File "/usr/local/lib/python3.10/site-packages/fastapi/routing.py", line 192, in run_endpoint_function
    return await run_in_threadpool(dependant.call, **values)
  File "/usr/local/lib/python3.10/site-packages/starlette/concurrency.py", line 41, in run_in_threadpool
    return await anyio.to_thread.run_sync(func, *args)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2106, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 833, in run
    result = context.run(func, *args)
  File "/app/src/presentation/api/api_v1/endpoints/job_alert.py", line 36, in new_job_alert
    job_alert_iteration.defer(job_alert_id=new_job_alert_id)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/tasks.py", line 129, in defer
    return self.configure().defer(**task_kwargs)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/jobs.py", line 162, in defer
    job = self.job_manager.defer_job(job=job)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/manager.py", line 52, in defer_job
    result = self.connector.execute_query_one(
  File "/usr/local/lib/python3.10/site-packages/procrastinate/utils.py", line 149, in wrapper
    return sync_await(awaitable=awaitable)
  File "/usr/local/lib/python3.10/site-packages/procrastinate/utils.py", line 196, in sync_await
    loop = asyncio.get_event_loop()
  File "/usr/local/lib/python3.10/asyncio/events.py", line 656, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'AnyIO worker thread'.

The thing is that FastAPI processes the request to a sync def endpoint in a separate thread, which appears by default not to have an event loop. One would have to be created if not exist, for procrastinate to be able to work. Am I right seeing things like this @ewjoachim?

@duc00
Copy link
Author

duc00 commented Oct 20, 2023

For the sake of giving the full picture, I tried the async only route too, using open_async, defer_async and a coroutine as task. It does work!

However, this does not suite my needs as I want to define my FastAPI endpoint as a sync def for performance and basically correctly representing what happens within it (no asyncio whatsoever). I will probably move to a different queue project. I love the simplicity of this one, thanks for opening it to the world! 😄

@ewjoachim
Copy link
Member

I believe your issue will likely be solved by #753 but I can't make promises on when it will be merged. Mixing sync and async really is a pain :( Sorry.

@ewjoachim ewjoachim added Issue type: Bug 🐞 Something isn't working Issue contains: Exploration & Design decisions 🤯 We don't know how this will be implemented yet Issue contains: Some Python 🐍 This issue involves writing some Python code Issue status: Blocked ⛔️ Issue cannot be processed for now labels Jan 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Issue contains: Exploration & Design decisions 🤯 We don't know how this will be implemented yet Issue contains: Some Python 🐍 This issue involves writing some Python code Issue status: Blocked ⛔️ Issue cannot be processed for now Issue type: Bug 🐞 Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants