Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Implement __aiter__ protocol on a Queue #445

Closed
wants to merge 3 commits into from

Conversation

mattrasband
Copy link

@mattrasband mattrasband commented Oct 10, 2016

Many times when using asycio.Queue a task is created to process the queue continually, resulting in code such as this:

while True:
    val = await q.get()
    # do stuff with val
    q.task_done()

Which feels less than ideal for a few reasons:

  • There is not a good wait to determine when the queue is done, unless the producer/consumer decide on a sentinel ahead of time or the task is explicitly cancelled.
  • It doesn't read as cleanly as most python code :)

Instead in many cases it would feel more pythonic to iterate the results of a queue in a task:

async for val in q:
    # do stuff with val
    q.task_done()

It doesn't substantially change the LOC, but it does, in my opinion, read a little more fluidly.

This does open up a few questions on how, and when, to raise StopAsyncIteration:

  1. There is no queue.close/quit/etc, so we cannot set an event on that
  2. Relying on __del__ didn't feel right, while it works with scope - using it manually feels wrong and it obviously kills the queue you may otherwise plan to continue using - among other questions in an async env.
  3. join could be used to synchronize processing before continuing to be used again - and thus cannot be reliably used to suggest the iterator should discontinue consuming.
  4. The intended usage of _finished appears to be, and I may be misunderstanding, primarily around whether or not there is data at that moment, not if the queue is fully closed (I picture a case in which a consumer consumes faster than a producer produces)

That leaves me with a few options that seem reasonable (with various implementations):

  • Use a sentinel (as this PR shows): I am not 100% sure I like this as it leaks the implementation out more than I think I like (but wanted your opinions).
  • Add a method on the queue to use the sentinel idea internally (I haven't thought of a good name yet...)

Outstanding questions:

  • I don't believe that join should set this value, since a producer could call q.join(); q.put(queues.END_QUEUE) (basically saying we wouldn't need to make this decision for them)
  • Should this instead be a method and a specific sentinel per queue?
  • Naming may be suspect, perhaps there is a better name for the sentinel value (or maybe we don't expose it and instead use the method approach).
  • In the case of multiple producers, there is possibly question as they may now be dumping to a queue that is no longer being consumed.

This may not be the best implementation, however I wanted to start a conversation around it as it has, thus far, been nice in a side project and others were encouraging me to. Granted our case is limited and instead I could put this in an external lib.

If you like the method idea instead and have a good name idea - I am at a loss at the moment...

Many times when using an asyncio.Queue a `while True: await q.get()` block is required, which feels like a little awkward flow. It feels more fluid to be able to asynchronously iterate the items in the queue and mark them as done.
@sethmlarson
Copy link

sethmlarson commented Oct 10, 2016

I'm -1 on this idea in general, I don't really have a problem with the prior syntax. The idea of a Queue is just FIFO, not manage things such as whether it's "open" or whether one of the ends is done reading entries. Applications should create their own "pipe" type and handle it themselves if they need this sort of behavior IMO.

The idea is useful in some cases so I'm not discounting that. This would definitely be useful as an external library if it's not merged here.

@mattrasband
Copy link
Author

That's certainly fair, it may indeed not belong in the stdlib and be a per-project consideration 👍 I am all in favor of whatever is best for the community.

@sethmlarson
Copy link

sethmlarson commented Oct 10, 2016

@mrasband I'm not a representative voice here, just giving my thoughts.
Hopefully others will weigh in as well.

@mattrasband
Copy link
Author

@SethMichaelLarson All feedback is good feedback, that's why I put it out here!

@mattrasband
Copy link
Author

I'm actually going to close this and make it a lib, I think it serves only one clear usage of queues (more pipelining, 1:1 style) and could cause a bit of confusion for the numerous other use cases.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants