Skip to content

Commit

Permalink
Merge pull request #324 from mattbennett/better-thread-mgmt
Browse files Browse the repository at this point in the history
remove "protected" managed threads
  • Loading branch information
mattbennett committed Jul 6, 2016
2 parents ea49513 + aab3bb2 commit 46867e8
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 181 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Released 2016-05-11

* Deprecate ``MethodProxy.async`` in favour of ``MethodProxy.call_async`` in preparation for async becoming a keyword
* Add support for loading logging configuration from ``config.yaml``
* Refactor `ServiceContainer` internals for better separation between "managed"
and "worker" threads. Improved logging when threads are killed.

Version 2.3.0
-------------
Expand Down
164 changes: 83 additions & 81 deletions nameko/containers.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
from __future__ import absolute_import, unicode_literals

from abc import ABCMeta, abstractproperty
import inspect
from logging import getLogger
import sys
import uuid
import warnings
from abc import ABCMeta, abstractproperty
from logging import getLogger

import eventlet
import six
from eventlet.event import Event
from eventlet.greenpool import GreenPool
from greenlet import GreenletExit # pylint: disable=E0611
import six

from nameko.constants import (
PARENT_CALLS_CONFIG_KEY, DEFAULT_PARENT_CALLS_TRACKED,
MAX_WORKERS_CONFIG_KEY, DEFAULT_MAX_WORKERS,
SERIALIZER_CONFIG_KEY, DEFAULT_SERIALIZER,
CALL_ID_STACK_CONTEXT_KEY, NAMEKO_CONTEXT_KEYS)

CALL_ID_STACK_CONTEXT_KEY, DEFAULT_MAX_WORKERS,
DEFAULT_PARENT_CALLS_TRACKED, DEFAULT_SERIALIZER, MAX_WORKERS_CONFIG_KEY,
NAMEKO_CONTEXT_KEYS, PARENT_CALLS_CONFIG_KEY, SERIALIZER_CONFIG_KEY)
from nameko.exceptions import ConfigurationError, ContainerBeingKilled
from nameko.extensions import (
is_dependency, ENTRYPOINT_EXTENSIONS_ATTR, iter_extensions)
from nameko.exceptions import ContainerBeingKilled, ConfigurationError
ENTRYPOINT_EXTENSIONS_ATTR, is_dependency, iter_extensions)
from nameko.log_helpers import make_timing_logger
from nameko.utils import SpawningSet

_log = getLogger(__name__)
_log_time = make_timing_logger(_log)
MANAGED_THREAD = object()

if six.PY2: # pragma: no cover
is_method = inspect.ismethod
Expand Down Expand Up @@ -171,8 +169,8 @@ def __init__(self, service_cls, config, worker_ctx_cls=None):
self.started = False
self._worker_pool = GreenPool(size=self.max_workers)

self._active_threads = {}
self._protected_threads = set()
self._worker_threads = {}
self._managed_threads = {}
self._being_killed = False
self._died = Event()

Expand Down Expand Up @@ -251,10 +249,8 @@ def stop(self):
# finally, stop remaining extensions
self.subextensions.all.stop()

# just in case there was an entrypoint not taking care of its
# workers, or an extension not taking care of its managed threads
self._kill_active_threads()
self._kill_protected_threads()
# any any managed threads they spawned
self._kill_managed_threads()

self.started = False

Expand All @@ -265,9 +261,8 @@ def stop(self):
def kill(self, exc_info=None):
""" Kill the container in a semi-graceful way.
All non-protected managed threads are killed first. This includes
all active workers generated by :meth:`ServiceContainer.spawn_worker`.
Next, dependencies are killed. Finally, any remaining protected threads
Entrypoints are killed, followed by any active worker threads.
Next, dependencies are killed. Finally, any remaining managed threads
are killed.
If ``exc_info`` is provided, the exception will be raised by
Expand Down Expand Up @@ -304,9 +299,9 @@ def safely_kill_extensions(ext_set):
_log.warning('Extension raised `%s` during kill', exc)

safely_kill_extensions(self.entrypoints.all)
self._kill_active_threads()
self._kill_worker_threads()
safely_kill_extensions(self.extensions.all)
self._kill_protected_threads()
self._kill_managed_threads()

self.started = False

Expand All @@ -330,10 +325,9 @@ def wait(self):
def spawn_worker(self, entrypoint, args, kwargs,
context_data=None, handle_result=None):
""" Spawn a worker thread for running the service method decorated
with by `entrypoint`.
by `entrypoint`.
``args`` and ``kwargs`` are used as arguments for the service
method.
``args`` and ``kwargs`` are used as parameters for the service method.
``context_data`` is used to initialize a ``WorkerContext``.
Expand All @@ -353,39 +347,41 @@ def spawn_worker(self, entrypoint, args, kwargs,
self, service, entrypoint, args, kwargs, data=context_data)

_log.debug('spawning %s', worker_ctx)
gt = self._worker_pool.spawn(self._run_worker, worker_ctx,
handle_result)
self._active_threads[gt] = entrypoint
gt.link(self._handle_thread_exited)
return worker_ctx
gt = self._worker_pool.spawn(
self._run_worker, worker_ctx, handle_result
)
gt.link(self._handle_worker_thread_exited, worker_ctx)

def spawn_managed_thread(self, run_method, protected=False):
""" Spawn a managed thread to run ``run_method``.
self._worker_threads[worker_ctx] = gt
return worker_ctx

Threads can be marked as ``protected``, which means the container will
not forcibly kill them until after all extensions have been killed.
Extensions that require a managed thread to complete their kill
procedure should ensure to mark them as ``protected``. For example,
:class:`nameko.messaging.QueueConsumer` cleanly closes connections
to the AMQP broker even when killed; the thread that holds that
connection is ``protected`` so it isn't stopped until after the
QueueConsumer returns from its kill.
def spawn_managed_thread(self, fn, protected=None, identifier=None):
""" Spawn a managed thread to run ``fn`` on behalf of an extension.
The passed `identifier` will be included in logs related to this
thread, and otherwise defaults to `fn.__name__`, if it is set.
Any uncaught errors inside ``run_method`` cause the container to be
killed.
Any uncaught errors inside ``fn`` cause the container to be killed.
It is the caller's responsibility to terminate their spawned threads.
Threads are killed automatically if they are still running after
all extensions are stopped during :meth:`ServiceContainer.stop`.
Extensions should delegate all thread spawning to the container.
"""
gt = eventlet.spawn(run_method)
if not protected:
self._active_threads[gt] = MANAGED_THREAD
else:
self._protected_threads.add(gt)
gt.link(self._handle_thread_exited)
if protected is not None:
warnings.warn(
"The signature of `spawn_managed_thread` has changed. "
"The `protected` kwarg is now deprecated, and extensions "
"can pass an idenifier as a keyword argument for better "
"logging. See :meth:`nameko.containers.ServiceContainer."
"spawn_managed_thread`.", DeprecationWarning
)
if identifier is None:
identifier = getattr(fn, '__name__', "<unknown>")

gt = eventlet.spawn(fn)
self._managed_threads[gt] = identifier
gt.link(self._handle_managed_thread_exited, identifier)
return gt

def _run_worker(self, worker_ctx, handle_result):
Expand All @@ -398,10 +394,7 @@ def _run_worker(self, worker_ctx, handle_result):

with _log_time('ran worker %s', worker_ctx):

# when we have better parallelization than ``spawningset``,
# do this injection inline
self.dependencies.all.inject(worker_ctx)
self.dependencies.all.worker_setup(worker_ctx)
self._worker_setup(worker_ctx)

result = exc_info = None
method_name = worker_ctx.entrypoint.method_name
Expand All @@ -426,54 +419,63 @@ def _run_worker(self, worker_ctx, handle_result):

with _log_time('tore down worker %s', worker_ctx):

_log.debug('signalling result for %s', worker_ctx)
self.dependencies.all.worker_result(
worker_ctx, result, exc_info)
self._worker_result(worker_ctx, result, exc_info)

# we don't need this any more, and breaking the cycle means
# this can be reclaimed immediately, rather than waiting for a
# gc sweep
del exc_info

self.dependencies.all.worker_teardown(worker_ctx)
# release?
self._worker_teardown(worker_ctx)

def _kill_active_threads(self):
""" Kill all managed threads that were not marked as "protected" when
they were spawned.
def _worker_setup(self, worker_ctx):
# TODO: when we have better parallelization than ``spawningset``,
# do this injection inline
self.dependencies.all.inject(worker_ctx)
self.dependencies.all.worker_setup(worker_ctx)

This set will include all worker threads generated by
:meth:`ServiceContainer.spawn_worker`.
def _worker_result(self, worker_ctx, result, exc_info):
_log.debug('signalling result for %s', worker_ctx)
self.dependencies.all.worker_result(worker_ctx, result, exc_info)

See :meth:`ServiceContainer.spawn_managed_thread`
def _worker_teardown(self, worker_ctx):
self.dependencies.all.worker_teardown(worker_ctx)

def _kill_worker_threads(self):
""" Kill any currently executing worker threads.
See :meth:`ServiceContainer.spawn_worker`
"""
num_active_threads = len(self._active_threads)
num_workers = len(self._worker_threads)

if num_active_threads:
_log.warning('killing %s active thread(s)', num_active_threads)
for gt, extension in list(self._active_threads.items()):
if extension is not MANAGED_THREAD:
_log.warning('killing active thread for %s', extension)
if num_workers:
_log.warning('killing %s active workers(s)', num_workers)
for worker_ctx, gt in list(self._worker_threads.items()):
_log.warning('killing active worker for %s', worker_ctx)
gt.kill()

def _kill_protected_threads(self):
""" Kill any managed threads marked as protected when they were
spawned.
def _kill_managed_threads(self):
""" Kill any currently executing managed threads.
See :meth:`ServiceContainer.spawn_managed_thread`
"""
num_protected_threads = len(self._protected_threads)
num_threads = len(self._managed_threads)

if num_protected_threads:
_log.warning('killing %s protected thread(s)',
num_protected_threads)
for gt in list(self._protected_threads):
if num_threads:
_log.warning('killing %s managed thread(s)', num_threads)
for gt, identifier in list(self._managed_threads.items()):
_log.warning('killing managed thread `%s`', identifier)
gt.kill()

def _handle_thread_exited(self, gt):
self._active_threads.pop(gt, None)
self._protected_threads.discard(gt)
def _handle_worker_thread_exited(self, gt, worker_ctx):
self._worker_threads.pop(worker_ctx, None)
self._handle_thread_exited(gt)

def _handle_managed_thread_exited(self, gt, extension):
self._managed_threads.pop(gt, None)
self._handle_thread_exited(gt)

def _handle_thread_exited(self, gt):
try:
gt.wait()

Expand All @@ -485,7 +487,7 @@ def _handle_thread_exited(self, gt):

except Exception:
_log.error('%s thread exited with error', self, exc_info=True)
# any error raised inside an active thread is unexpected behavior
# any uncaught error in a thread is unexpected behavior
# and probably a bug in the extension or container.
# to be safe we call self.kill() to kill our dependencies and
# provide the exception info to be raised in self.wait().
Expand Down
20 changes: 10 additions & 10 deletions nameko/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,27 @@
Provides core messaging decorators and dependency providers.
'''
from __future__ import absolute_import
from itertools import count

import socket
from functools import partial
from itertools import count
from logging import getLogger
import socket

import eventlet
import six
from eventlet.event import Event
from kombu.common import maybe_declare
from kombu.pools import producers, connections
from kombu import Connection
from kombu.common import maybe_declare
from kombu.mixins import ConsumerMixin
import six
from kombu.pools import connections, producers

from nameko.amqp import verify_amqp_uri
from nameko.constants import (
DEFAULT_RETRY_POLICY, AMQP_URI_CONFIG_KEY,
SERIALIZER_CONFIG_KEY, DEFAULT_SERIALIZER)
AMQP_URI_CONFIG_KEY, DEFAULT_RETRY_POLICY, DEFAULT_SERIALIZER,
SERIALIZER_CONFIG_KEY)
from nameko.exceptions import ContainerBeingKilled
from nameko.extensions import (
DependencyProvider, Entrypoint, SharedExtension, ProviderCollector)
DependencyProvider, Entrypoint, ProviderCollector, SharedExtension)

_log = getLogger(__name__)

Expand Down Expand Up @@ -195,8 +196,7 @@ def start(self):
self._starting = True

_log.debug('starting %s', self)
self._gt = self.container.spawn_managed_thread(self.run,
protected=True)
self._gt = self.container.spawn_managed_thread(self.run)
self._gt.link(self._handle_thread_exited)
try:
_log.debug('waiting for consumer ready %s', self)
Expand Down
6 changes: 6 additions & 0 deletions nameko/testing/pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def pytest_configure(config):
logging.basicConfig(level=log_level, stream=sys.stderr)


@pytest.fixture(autouse=True)
def always_warn_for_deprecation():
import warnings
warnings.simplefilter('always', DeprecationWarning)


@pytest.fixture
def empty_config():
from nameko.constants import AMQP_URI_CONFIG_KEY
Expand Down
3 changes: 2 additions & 1 deletion nameko/timer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import absolute_import
from logging import getLogger

import time
from logging import getLogger

from eventlet import Timeout
from eventlet.event import Event
Expand Down

0 comments on commit 46867e8

Please sign in to comment.