Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uasyncio.queues: Don't sleep! #80

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions uasyncio.queues/uasyncio/queues.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections.deque import deque
from uasyncio.core import sleep

from uasyncio import core


class QueueEmpty(Exception):
Expand All @@ -21,25 +22,32 @@ class Queue:
with qsize(), since your single-threaded uasyncio application won't be
interrupted between calling qsize() and doing an operation on the Queue.
"""
_attempt_delay = 0.1

def __init__(self, maxsize=0):
self.maxsize = maxsize
self._queue = deque()
self._full = core.TaskQueue()
self._empty = core.TaskQueue()

def _get(self):
return self._queue.popleft()
res = self._queue.popleft()
if self._full.peek():
core._task_queue.push(self._full.pop())
return res

def get(self):
"""Returns generator, which can be used for getting (and removing)
an item from a queue.

Usage::

item = yield from queue.get()
item = await queue.get()
"""
while not self._queue:
yield from sleep(self._attempt_delay)
if not self._queue:
self._empty.push(core.cur_task)
core.cur_task.data = self._empty
yield

return self._get()

def get_nowait(self):
Expand All @@ -52,25 +60,29 @@ def get_nowait(self):
return self._get()

def _put(self, val):
if self._empty.peek():
core._task_queue.push(self._empty.pop())
self._queue.append(val)

def put(self, val):
"""Returns generator which can be used for putting item in a queue.

Usage::

yield from queue.put(item)
await queue.put(item)
"""
while self.qsize() >= self.maxsize and self.maxsize:
yield from sleep(self._attempt_delay)
if self.maxsize and self.qsize() >= self.maxsize:
self._full.push(core.cur_task)
core.cur_task.data = self._full
yield
self._put(val)

def put_nowait(self, val):
"""Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.
"""
if self.qsize() >= self.maxsize and self.maxsize:
if self.maxsize and self.qsize() >= self.maxsize:
raise QueueFull()
self._put(val)

Expand Down