Skip to content

Commit ecc570c

Browse files
committed
select: make Select.add() handle multiple buffered items.
Previously given something like: l = mitogen.core.Latch() l.put(1) l.put(2) s = mitogen.select.Select([l], oneshot=False) assert 1 == s.get(block=False) assert 2 == s.get(block=False) The second call would throw TimeoutError, because Select.add() only queued the receiver/latch once if it was non-empty, rather than once for each item as should happen.
1 parent 49a6446 commit ecc570c

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

mitogen/select.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,15 @@ def add(self, recv):
224224
raise Error(self.owned_msg)
225225

226226
recv.notify = self._put
227-
# Avoid race by polling once after installation.
228-
if not recv.empty():
227+
# After installing the notify function, _put() will potentially begin
228+
# receiving calls from other threads immediately, but not for items
229+
# they already had buffered. For those we call _put(), possibly
230+
# duplicating the effect of other _put() being made concurrently, such
231+
# that the Select ends up with more items in its buffer than exist in
232+
# the underlying receivers. We handle the possibility of receivers
233+
# marked notified yet empty inside Select.get(), so this should be
234+
# robust.
235+
for _ in range(recv.size()):
229236
self._put(recv)
230237

231238
not_present_msg = 'Instance is not a member of this Select'
@@ -335,5 +342,6 @@ def get_event(self, timeout=None, block=True):
335342
# A receiver may have been queued with no result if another
336343
# thread drained it before we woke up, or because another
337344
# thread drained it between add() calling recv.empty() and
338-
# self._put(). In this case just sleep again.
345+
# self._put(), or because Select.add() caused duplicate _put()
346+
# calls. In this case simply retry.
339347
continue

tests/select_test.py

+22
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,18 @@ def test_nonempty_before_add(self):
358358
msg = select.get()
359359
self.assertEquals('123', msg.unpickle())
360360

361+
def test_nonempty_multiple_items_before_add(self):
362+
recv = mitogen.core.Receiver(self.router)
363+
recv._on_receive(mitogen.core.Message.pickled('123'))
364+
recv._on_receive(mitogen.core.Message.pickled('234'))
365+
select = self.klass([recv], oneshot=False)
366+
msg = select.get()
367+
self.assertEquals('123', msg.unpickle())
368+
msg = select.get()
369+
self.assertEquals('234', msg.unpickle())
370+
self.assertRaises(mitogen.core.TimeoutError,
371+
lambda: select.get(block=False))
372+
361373
def test_nonempty_after_add(self):
362374
recv = mitogen.core.Receiver(self.router)
363375
select = self.klass([recv])
@@ -415,6 +427,16 @@ def test_nonempty_before_add(self):
415427
select = self.klass([latch])
416428
self.assertEquals(123, select.get())
417429

430+
def test_nonempty_multiple_items_before_add(self):
431+
latch = mitogen.core.Latch()
432+
latch.put(123)
433+
latch.put(234)
434+
select = self.klass([latch], oneshot=False)
435+
self.assertEquals(123, select.get())
436+
self.assertEquals(234, select.get())
437+
self.assertRaises(mitogen.core.TimeoutError,
438+
lambda: select.get(block=False))
439+
418440
def test_nonempty_after_add(self):
419441
latch = mitogen.core.Latch()
420442
select = self.klass([latch])

0 commit comments

Comments
 (0)