Skip to content

Commit

Permalink
issue #535: activate Corker on 2.4 in master too.
Browse files Browse the repository at this point in the history
  • Loading branch information
dw committed Feb 17, 2019
1 parent 5095905 commit 18b984a
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 20 deletions.
31 changes: 17 additions & 14 deletions mitogen/core.py
Expand Up @@ -2840,7 +2840,7 @@ class Broker(object):
#: before force-disconnecting them during :meth:`shutdown`.
shutdown_timeout = 3.0

def __init__(self, poller_class=None):
def __init__(self, poller_class=None, activate_compat=True):
self._alive = True
self._exitted = False
self._waker = Waker(self)
Expand All @@ -2858,6 +2858,19 @@ def __init__(self, poller_class=None):
name='mitogen.broker'
)
self._thread.start()
if activate_compat:
self._py24_25_compat()

def _py24_25_compat(self):
"""
Python 2.4/2.5 have grave difficulties with threads/fork. We
mandatorily quiesce all running threads during fork using a
monkey-patch there.
"""
if sys.version_info < (2, 6):
# import_module() is used to avoid dep scanner.
os_fork = import_module('mitogen.os_fork')
mitogen.os_fork._notice_broker_or_pool(self)

def start_receive(self, stream):
"""
Expand Down Expand Up @@ -3003,6 +3016,7 @@ def _do_broker_main(self):
except Exception:
LOG.exception('_broker_main() crashed')

self._alive = False # Ensure _alive is consistent on crash.
self._exitted = True
self._broker_exit()

Expand Down Expand Up @@ -3206,7 +3220,7 @@ def _setup_master(self):
Router.max_message_size = self.config['max_message_size']
if self.config['profiling']:
enable_profiling()
self.broker = Broker()
self.broker = Broker(activate_compat=False)
self.router = Router(self.broker)
self.router.debug = self.config.get('debug', False)
self.router.undirectional = self.config['unidirectional']
Expand Down Expand Up @@ -3348,17 +3362,6 @@ def _setup_stdio(self):
# Reopen with line buffering.
sys.stdout = os.fdopen(1, 'w', 1)

def _py24_25_compat(self):
"""
Python 2.4/2.5 have grave difficulties with threads/fork. We
mandatorily quiesce all running threads during fork using a
monkey-patch there.
"""
if sys.version_info < (2, 6):
# import_module() is used to avoid dep scanner.
os_fork = import_module('mitogen.os_fork')
mitogen.os_fork._notice_broker_or_pool(self.broker)

def main(self):
self._setup_master()
try:
Expand Down Expand Up @@ -3386,7 +3389,7 @@ def main(self):
socket.gethostname())
_v and LOG.debug('Recovered sys.executable: %r', sys.executable)

self._py24_25_compat()
self.broker._py24_25_compat()
self.dispatcher.run()
_v and LOG.debug('ExternalContext.main() normal exit')
except KeyboardInterrupt:
Expand Down
12 changes: 6 additions & 6 deletions mitogen/os_fork.py
Expand Up @@ -48,21 +48,21 @@

# List of weakrefs. On Python 2.4, mitogen.core registers its Broker on this
# list and mitogen.service registers its Pool too.
_brokers = weakref.WeakValueDictionary()
_pools = weakref.WeakValueDictionary()
_brokers = weakref.WeakKeyDictionary()
_pools = weakref.WeakKeyDictionary()


def _notice_broker_or_pool(obj):
if isinstance(obj, mitogen.core.Broker):
_brokers[id(obj)] = obj
_brokers[obj] = True
else:
_pools[id(obj)] = obj
_pools[obj] = True


def wrap_os__fork():
corker = Corker(
brokers=list(_brokers.values()),
pools=list(_pools.values()),
brokers=list(_brokers),
pools=list(_pools),
)
try:
corker.cork()
Expand Down
2 changes: 2 additions & 0 deletions tests/router_test.py
Expand Up @@ -174,6 +174,8 @@ def test_shutdown(self):
expect = '_broker_main() crashed'
self.assertTrue(expect in log.stop())

self.broker.join()


class AddHandlerTest(testlib.TestCase):
klass = mitogen.master.Router
Expand Down
5 changes: 5 additions & 0 deletions tests/testlib.py
Expand Up @@ -432,6 +432,7 @@ def tearDown(self):
if not self.broker_shutdown:
self.broker.shutdown()
self.broker.join()
del self.broker
super(BrokerMixin, self).tearDown()

def sync_with_broker(self):
Expand All @@ -445,6 +446,10 @@ def setUp(self):
super(RouterMixin, self).setUp()
self.router = self.router_class(self.broker)

def tearDown(self):
del self.router
super(RouterMixin, self).tearDown()


class DockerMixin(RouterMixin):
@classmethod
Expand Down

0 comments on commit 18b984a

Please sign in to comment.