Skip to content

Commit

Permalink
remove multi_get for now
Browse files Browse the repository at this point in the history
  • Loading branch information
sorcio committed Apr 12, 2018
1 parent a1ac052 commit 77158e4
Showing 1 changed file with 32 additions and 48 deletions.
80 changes: 32 additions & 48 deletions trio/_sync.py
Expand Up @@ -890,7 +890,7 @@ def put_nowait(self, obj):
assert not self._data
task, abort_fn = self._get_wait.popitem(last=False)
abort_fn(None)
_core.reschedule(task, _core.Value((self, obj)))
_core.reschedule(task, _core.Value(obj))
elif len(self._data) < self.capacity:
self._data.append(obj)
else:
Expand Down Expand Up @@ -933,8 +933,16 @@ def get_nowait(self):
WouldBlock: if the queue is empty.
"""
_, value = multi_get_nowait([self])
return value
if self._put_wait:
task, value = self._put_wait.popitem(last=False)
# No need to check max_size, b/c we'll pop an item off again right
# below.
self._data.append(value)
_core.reschedule(task)
if self._data:
value = self._data.popleft()
return value
raise _core.WouldBlock()

@_core.enable_ki_protection
async def get(self):
Expand All @@ -944,7 +952,27 @@ async def get(self):
object: The dequeued object.
"""
_, value = await multi_get([self])
await _core.checkpoint_if_cancelled()
try:
value = self.get_nowait()
except _core.WouldBlock:
pass
else:
await _core.cancel_shielded_checkpoint()
return value

# Queue doesn't have anything, we must wait.
task = _core.current_task()

def abort_fn(_):
try:
del self._get_wait[task]
except KeyError:
pass
return _core.Abort.SUCCEEDED

self._get_wait[task] = abort_fn
value = await _core.wait_task_rescheduled(abort_fn)
return value

@aiter_compat
Expand Down Expand Up @@ -973,47 +1001,3 @@ def statistics(self):
tasks_waiting_put=len(self._put_wait),
tasks_waiting_get=len(self._get_wait),
)


@_core.enable_ki_protection
def multi_get_nowait(queues):
for queue in queues:
if queue._put_wait:
task, value = queue._put_wait.popitem(last=False)
# No need to check max_size, b/c we'll pop an item off again right
# below.
queue._data.append(value)
_core.reschedule(task)
if queue._data:
value = queue._data.popleft()
return queue, value
raise _core.WouldBlock()


@_core.enable_ki_protection
async def multi_get(queues):
# Returns (queue object, value gotten)
await _core.checkpoint_if_cancelled()
try:
queue, value = multi_get_nowait(queues)
except _core.WouldBlock:
pass
else:
await _core.cancel_shielded_checkpoint()
return queue, value
# No queue had anything.
task = _core.current_task()

def abort_fn(_):
for queue in queues:
try:
del queue._get_wait[task]
except KeyError:
# If we just pushed to this queue, we already popped.
# But is it alright to... always pass?
pass
return _core.Abort.SUCCEEDED

for queue in queues:
queue._get_wait[task] = abort_fn
return await _core.wait_task_rescheduled(abort_fn)

0 comments on commit 77158e4

Please sign in to comment.