Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task was destroyed but it is pending! with Redis pub/sub #573

Closed
MatthewScholefield opened this issue Nov 30, 2020 · 2 comments
Closed

Task was destroyed but it is pending! with Redis pub/sub #573

MatthewScholefield opened this issue Nov 30, 2020 · 2 comments

Comments

@MatthewScholefield
Copy link

MatthewScholefield commented Nov 30, 2020

I'm trying to find a way to send events to each client based on a Redis pub sub channel. Not sharing all events like AsyncRedisManager but with specific events and behavior. In my attempts I have encountered strange behavior that only exists with the combination of aioredis and python-socketio.

This is the method I'm looking to run for each client:

async def handle_progress(sid, data):
    for event in subscribe_redis('__keyspace@0__:abc'):
        await sio.emit('percent_complete', str(i))
    await sio.emit('status', '{"status": "complete", "result": "/abc.dat"}')

I've attempted a number of methods, each which error in some way:

Within handler:

@sio.on('connect')
async def handle_connect(sid, data):
    await handle_progress(sid, data)

This seems to prevent new connections entirely.

Within task created by handler:

@sio.on('connect')
async def handle_connect(sid, data):
    asyncio.create_task(handle_progress(sid, data)):

This causes strange Task was destroyed but it is pending! errors when overlapping requests come in (even with async_handlers=False). My guess was that this was something with ASGI cancelling the event loop, but this proves to be false by the bottom most method.

Using start_background_task:

@sio.on('connect')
async def handle_connect(sid, data):
    sio.start_background_task(handle_progress, sid, data)

As mentioned here, this seems to create a new thread and emitting events from multiple threads isn't supported. I also encountered some strange behavior with this, possibly for this reason.

Using queue with single async consumer:

connections: janus.Queue = None  # Thread-safe, asyncio queue

async def process_connections():
    connections = janus.Queue().async_q
    while True:
        sid, data = await connections.get()
        asyncio.create_task(handle_progress(sid, data))

def main():
    sio.start_background_task(process_connections)

However, surprisingly, this still causes Task was destroyed but it is pending! momentarily after sio.emit(...) within handle_progress.

If I swap out subscribe_redis with fake_subscribe_redis that simply yields every N milliseconds, everything seems to behave normally.

So, I have two questions:

  • Which of these is the "correct" method from python-socketio's perspective?
  • Are there any areas to investigate that might be causing this?

For reference, here are subscribe_redis and fake_subscribe_redis:

async def subscribe_redis(key):
    redis = await aioredis.create_redis('redis://localhost')
    mpsc = Receiver()
    chan_name = '__keyspace@0__:' + key
    await redis.subscribe(mpsc.channel(chan_name))
    yield
    try:
        async for channel, msg in mpsc.iter():
            yield
    finally:
        redis.unsubscribe(chan_name)

async def fake_subscribe_redis(key):
    while True:
        await asyncio.sleep(0.1)
        yield

Notes:

  • aioredis mentions it only supports single subscriber consumers, but the above code fails with the Task was destroyed but it is pending! error even when just a single subscriber is instantiated. Additionally, I've tried using it with snippets that provide multi-consumer support but the behavior was the same.
  • As mentioned briefly at the start, running this same code without any references to sio works without errors.

Sorry for such a long issue description xD.

@miguelgrinberg
Copy link
Owner

You omitted the most important thing: the error that you are getting. Can you please include the complete text of the error (including stacktrace). Also, if you have an alternative code that works, can you show me that as well?

@MatthewScholefield
Copy link
Author

Figured out the underlying issue in #647

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants