Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge causes anext(): asynchronous generator is already running Exception #114

Closed
sebastianjanisch opened this issue Jun 16, 2024 · 2 comments

Comments

@sebastianjanisch
Copy link

sebastianjanisch commented Jun 16, 2024

Hi,

I am trying to iterate over several async generators using the merge function. I have tried reproducing the behavior I see in a minimal example but so far haven't managed, so I'll try to describe what I observe as best as I can.

I am using aiostream together with the reflex.dev UI library, which allows the launching of background tasks in an async setting (see here: https://reflex.dev/docs/events/background-events/#background-tasks)

Below code is conceptually what i do in my real example, yet it doesn't produce the error I'm seeing.

The setup is that I have several nested async generators which each at some point acquire a lock using async with self. In reality these tasks make async httpx calls so it makes sense to run them asynchronously.

The error I do observe seems to happen at the async with self stage:

   async with self:
  File "/Users/sjanisch/Work/projects/test/bayesline/code/app/reflexgui/.venv/lib/python3.11/site-packages/reflex/state.py", line 2028, in __aexit__
    await self._self_actx.__aexit__(*exc_info)
  File "/Users/sjanisch/.pyenv/versions/3.11.8/lib/python3.11/contextlib.py", line 217, in __aexit__
    await anext(self.gen)
RuntimeError: anext(): asynchronous generator is already running

It's unfortunate that I can't reliably reproduce this but maybe there are thoughts around what the cause might be.

class State(rx.State):

    task1_text: str = "Task 1"
    task2_text: str = "Task 2"

    async def on_load(self):
        pass

    async def task1(self):
        async with self:
            self.task1_text = "Task 1 Running"
            yield

        await asyncio.sleep(2)

        async with self:
            self.task1_text = "Task 1 50%"
            yield

        await asyncio.sleep(2)

        async with self:
            self.task1_text = "Task 1 Done"
            yield

    async def task2(self):
        async with self:
            self.task2_text = "Task 2 Running"
            yield

        await asyncio.sleep(2)

        async with self:
            self.task2_text = "Task 2 50%"
            yield

        await asyncio.sleep(2)

        async with self:
            self.task2_text = "Task 2 Done"
            yield

    @rx.background
    async def start_tasks(self):
        tasks = stream.merge(self.task1(), self.task2())
        async with tasks.stream() as streamer:
            async for result in streamer:
                yield result


@rx.page(route="/test", on_load=State.on_load)
def page() -> rx.Component:
    return rx.vstack(
        rx.text(State.task1_text),
        rx.text(State.task2_text),
        rx.button("Start Tasks", on_click=State.start_tasks),
    )
@vxgmichel
Copy link
Owner

vxgmichel commented Jun 17, 2024

Hi @sebastianjanisch,

My guess is that an async generator somehow gets shared between task1 and task2, most probably because both tasks use the same async with self.

So if both tasks try to enter or exit at the same time, the second task won't be able to access the async generator as it's already running in the first one.

I guess you could easily check that by protecting self with an async lock and see whether the problem remains. However, the sharing of the async generator is very suspicious so there's probably another problem somewhere else.

I'm not familiar with reflex but I don't think the problem comes from aiostream. My guess is that you would observe the same issue using asyncio.gather(self.task1(), self.task2()).

Hope this helps, and feel free to re-open the issue if you have more elements pointing towards aiostream.

@sebastianjanisch
Copy link
Author

Hi @vxgmichel thanks for responding so quickly. You are right, I slapped a asyncio lock around the async with self block (i.e. async with _lock, self: which did the trick. I'll relay to the guys from Reflex to see if they want to investigate on their side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants