Skip to content
Permalink
Browse files
core: split out & extend Broker.sync_call()
  • Loading branch information
dw committed Oct 26, 2018
1 parent 592d6fc commit 9ec360c26db00669ccfe77e8eaefc735add52abc
Showing with 60 additions and 6 deletions.
  1. +8 −0 docs/api.rst
  2. +20 −6 mitogen/core.py
  3. +32 −0 tests/broker_test.py
@@ -1227,6 +1227,14 @@ Broker Class
thread, or immediately if the current thread is the broker thread. Safe
to call from any thread.

.. method:: defer_sync (func)

Arrange for `func()` to execute on the broker thread, blocking the
current thread until a result or exception is available.

:returns:
Call result.

.. method:: start_receive (stream)

Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as
@@ -1940,6 +1940,25 @@ def keep_alive(self):
it = (side.keep_alive for (_, (side, _)) in self.poller.readers)
return sum(it, 0)

def defer_sync(self, func):
"""
Block the calling thread while `func` runs on a broker thread.
:returns:
Return value of `func()`.
"""
latch = Latch()
def wrapper():
try:
latch.put(func())
except Exception:
latch.put(sys.exc_info()[1])
self.defer(wrapper)
res = latch.get()
if isinstance(res, Exception):
raise res
return res

def _call(self, stream, func):
try:
func(self)
@@ -2100,11 +2119,6 @@ def _on_parent_disconnect(self):
_v and LOG.debug('%r: parent stream is gone, dying.', self)
self.broker.shutdown()

def _sync(self, func):
latch = Latch()
self.broker.defer(lambda: latch.put(func()))
return latch.get()

def detach(self):
self.detached = True
stream = self.router.stream_by_id(mitogen.parent_id)
@@ -2113,7 +2127,7 @@ def detach(self):
self.parent.send_await(Message(handle=DETACHING))
LOG.info('Detaching from %r; parent is %s', stream, self.parent)
for x in range(20):
pending = self._sync(lambda: stream.pending_bytes())
pending = self.broker.defer_sync(lambda: stream.pending_bytes())
if not pending:
break
time.sleep(0.05)
@@ -0,0 +1,32 @@

import threading

import unittest2

import testlib

import mitogen.core


class DeferSyncTest(testlib.TestCase):
klass = mitogen.core.Broker

def test_okay(self):
broker = self.klass()
try:
th = broker.defer_sync(lambda: threading.currentThread())
self.assertEquals(th, broker._thread)
finally:
broker.shutdown()

def test_exception(self):
broker = self.klass()
try:
self.assertRaises(ValueError,
broker.defer_sync, lambda: int('dave'))
finally:
broker.shutdown()


if __name__ == '__main__':
unittest2.main()

0 comments on commit 9ec360c

Please sign in to comment.