Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug-prone background task creation suggestion in docs #647

Closed
MatthewScholefield opened this issue Mar 2, 2021 · 6 comments
Closed

Bug-prone background task creation suggestion in docs #647

MatthewScholefield opened this issue Mar 2, 2021 · 6 comments

Comments

@MatthewScholefield
Copy link

The docs give the following example for asyncio:

async def my_background_task(my_argument):
    # do some background work here!
    pass

sio.start_background_task(my_background_task, 123)

This causes a number of subtle bugs around the coroutine running my_background_task getting garbage collected due to no references to it. Instead, perhaps the docs should be updated to something that warns of this like:

async def my_background_task(my_argument):
    # do some background work here!
    pass

task = sio.start_background_task(my_background_task, 123)

Make sure to keep a hard reference to task to prevent subtle garbage collection exceptions.

If this looks alright, I'd be happy to do a PR.

@miguelgrinberg
Copy link
Owner

miguelgrinberg commented Mar 2, 2021

Can you explain better what the problem is? If the coroutine is running, then there are references to it. At least one reference is held by asyncio itself, since it obviously knows about this coroutine and is able to schedule the CPU for it.

If the error is still the "Task was destroyed but it is pending", that implies someone destroyed the task. I have asked you to provide the full error message including the stack trace back when you reported this, but you haven't. Typically this error occurs when the loop is destroyed before the task finished or was able to cancel.

@MatthewScholefield
Copy link
Author

MatthewScholefield commented Mar 2, 2021

Hi, first, I forgot to highlight that sio.create_background_task with async is an alias for asyncio.create_task(foo()).

At least one reference is held by asyncio itself, since it obviously knows about this coroutine and is able to schedule the CPU for it.

So this is actually a misleading part of asyncio. Asyncio actually maintains weak references to coroutines that are currently executing to prevent harder-to-debug memory leaks from reference cycles. So, this is why without a hard reference, Python's garbage collector can collect it at free will. You can read this thread from Python's bug tracker that covers asyncio's unusual behavior.

The full error is generally as follows:

Task was destroyed but it is pending!
task: <Task pending name='Task-3' coro=<RedisConnection._read_data() running at .../site-packages/aioredis/connection.py:186> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fb4af031f70>()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at .../site-packages/aioredis/connection.py:168]>

As you can see there is no stack trace at all as this is simply a message injected by some internal asyncio Task's __del__ function. While the actual task that is getting garbage collected seems unrelated, it is because at the top level, the coroutine running everything has been garbage collected.

A simple example of this is as follows:

import asyncio
import weakref

future_refs = []

async def something():
    future = asyncio.get_event_loop().create_future()
    future_refs.append(weakref.ref(future))
    await future

async def go():
    asyncio.create_task(something())
    await asyncio.sleep(0.5)
    future_refs[0]().set_result(None)

asyncio.run(go())

Here, while it is a little strange that we only store weakrefs to futures, we would expect the event loop to still maintain a reference to the running coroutine something(). However, if we force garbage collection at every step of the loop, we get the following error:

Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<something() running at example.py:9> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fca0fef2610>()]>>

Note, we can force garbage collection at the end of every step in the event loop via the following:

  • Edit /usr/lib/python3.*/asyncio/base_events.py and within def _run_once, near the bottom immediately within the for i in range(ntodo):, add import gc; gc.collect().

Anyways, if we change that code to assign to some unused task variable, then this code runs without problems even with the garbage collection line enabled.

@miguelgrinberg
Copy link
Owner

I'll look at this in more detail, but I don't understand the example that you posted. You created a future and only stored a weak reference to it. Then you said "if we force garbage collection at every step of the loop", which would get rid of the future. How is this supposed to work?

Running your example and forcing a GC I get AttributeError: 'NoneType' object has no attribute 'set_result', which is expected, given the conditions you set in the example. Replacing the weak reference to the future with a strong reference makes the example work without error, even with GC running.

@MatthewScholefield
Copy link
Author

MatthewScholefield commented Mar 3, 2021

Hmm, are you sure that the modified base_events.py is ran by the python interpreter you ran the script with? When I run it I do get the AttributeError which is expected, but I also get the Task was destroyed but it is pending! error which is the interesting part.

Technically, I realize we could simplify the example to:

import asyncio

async def something():
    await asyncio.get_event_loop().create_future()

async def go():
    asyncio.create_task(something())
    await asyncio.sleep(0.5)

asyncio.run(go())

Despite the admittedly weird example, according to your assumption that "at least one reference is held by asyncio itself", we should not see "Task was destroyed but it is pending!" because asyncio is still executing the something() coroutine and would still have a reference to it.

Just to make sure I explained it properly, near the bottom of the _run_once method, it should look roughly like this:

       ntodo = len(self._ready)
        for i in range(ntodo):
            import gc; gc.collect()  # <<<<<<<<<<<<<<<<<<<<<<<<<
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs

Anyways, I encourage you to read from the ticket from above if you get a chance to look into it in more detail. Specifically Guido mentions:

Most likely your [person with "Task was destroyed but it is pending!" bug] program is simply relying on undefined behavior and the right way to fix it is to keep strong references to all tasks until they self-destruct.

@miguelgrinberg
Copy link
Owner

@MatthewScholefield Your simplified example is also contrived and not applicable to any real usage. Once again you are waiting on a future that is subject to garbage collection due to not having any references. If I replace the waiting on this future with a sleep, then the task is also pending, but it isn't destroyed. Do you understand this problem well enough to explain why is the sleep different than the weak future?

@MatthewScholefield
Copy link
Author

MatthewScholefield commented Mar 3, 2021

Apologies, here's a real example:

import asyncio


async def request_google():
    reader, writer = await asyncio.open_connection('google.com', 80)
    writer.write(b'GET / HTTP/2\n\n')
    await writer.drain()
    response = await reader.read()
    return response.decode()


async def something():
    data = await request_google()
    print('Data:', data)


async def go():
    asyncio.create_task(something())
    await asyncio.sleep(10.0)

asyncio.run(go())

The reason asyncio.sleep works fine is because they happen to store a strong reference to timer handlers which asyncio.sleep uses (but as can be seen in that bug report, there is no general guarantee that this will happen). It works as follows:

async def sleep(delay, result=None, *, loop=None):
    ...
    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    try:
        return await future
    finally:
        h.cancel()

Now, if we look at loop.call_later we see that asyncio happens to store a strong reference to the arguments of call_later (in this case the local future variable of the sleep coroutine):

    def call_at(self, when, callback, *args, context=None):  # Note: call_later just calls call_at
        ...
        timer = events.TimerHandle(when, callback, args, self, context)
        ...
        heapq.heappush(self._scheduled, timer)

So, this internal hard reference to a variable within the sleep coroutine keeps our original coroutine from being garbage collected.

Let me know if this makes sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants