Skip to content

Commit

Permalink
Merge pull request #341 from mattbennett/even-better-entrypoint-waiter
Browse files Browse the repository at this point in the history
Even better entrypoint waiter
  • Loading branch information
mattbennett committed Sep 30, 2016
2 parents cd0232a + 50e2976 commit 37298fe
Show file tree
Hide file tree
Showing 11 changed files with 1,034 additions and 118 deletions.
21 changes: 16 additions & 5 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ Released 2016-09-14
to allow easy usage of WSGI middleware and modifications of the WSGI server.
* Enhanced :func:`~nameko.testing.services.replace_dependencies` to allow
specific replacement values to be provided with named arguments.
* Enhanced :func:`~nameko.testing.services.entrypoint_waiter`. The new
implementation is backwards compatible but additionally:
- Gives you access to the result returned (or exception raised)
- Adds the ability to wait for a specific result
- Doesn't fire until the worker is completely torn down

Version 2.4.0
-------------
Expand All @@ -32,16 +37,19 @@ Released 2016-08-30
`ServiceContainer` class that defines the `worker_ctx_cls` attribute.
* Remove the `context_keys` attribute of the `WorkerContext`, which was
previously used to "whitelist" worker context data passed from call to call.
It was a feature that leaked from a specific implementation into the main framework, and not useful enough in its own right to continue to be
It was a feature that leaked from a specific implementation into the main
framework, and not useful enough in its own right to continue to be
supported.
* Refactor `ServiceContainer` internals for better separation between "managed" and "worker" threads. Improved logging when threads are killed.
* Refactor `ServiceContainer` internals for better separation between "managed"
and "worker" threads. Improved logging when threads are killed.

Version 2.3.1
-------------

Released 2016-05-11

* Deprecate ``MethodProxy.async`` in favour of ``MethodProxy.call_async`` in preparation for async becoming a keyword
* Deprecate ``MethodProxy.async`` in favour of ``MethodProxy.call_async`` in
preparation for async becoming a keyword
* Add support for loading logging configuration from ``config.yaml``


Expand Down Expand Up @@ -113,8 +121,11 @@ Released 2015-03-31
* Introduction of nameko "extensions" and nomenclature clarification
* Removal of ``DependencyFactory`` in favour of prototype pattern
* Complete documentation rewrite
* Spun out ``nameko.contrib.sqlalchemy`` into `nameko-sqlalchemy <https://pypi.python.org/pypi/nameko-sqlalchemy>`_ as a **community extension**.
* Spun out ``nameko.legacy`` package into `nameko-nova-compat <https://pypi.python.org/pypi/nameko-nova-compat>`_
* Spun out ``nameko.contrib.sqlalchemy`` into
`nameko-sqlalchemy <https://pypi.python.org/pypi/nameko-sqlalchemy>`_
as a **community extension**.
* Spun out ``nameko.legacy`` package into
`nameko-nova-compat <https://pypi.python.org/pypi/nameko-nova-compat>`_
* Rename the standalone rpc proxy to
:class:`~nameko.standalone.rpc.ServiceRpcProxy` and add a
:class:`~nameko.standalone.rpc.ClusterRpcProxy`, using a single reply queue
Expand Down
1 change: 0 additions & 1 deletion nameko/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from eventlet.event import Event
from eventlet.greenpool import GreenPool
from greenlet import GreenletExit # pylint: disable=E0611

from nameko.constants import (
CALL_ID_STACK_CONTEXT_KEY, DEFAULT_MAX_WORKERS,
DEFAULT_PARENT_CALLS_TRACKED, DEFAULT_SERIALIZER, MAX_WORKERS_CONFIG_KEY,
Expand Down
215 changes: 144 additions & 71 deletions nameko/testing/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
Utilities for testing nameko services.
"""

import inspect
from collections import OrderedDict
from contextlib import contextmanager
import inspect

import eventlet
from eventlet import event
from eventlet.semaphore import Semaphore
from mock import MagicMock

from nameko.extensions import DependencyProvider, Entrypoint
from nameko.exceptions import ExtensionNotFound
from nameko.testing.utils import get_extension, wait_for_worker_idle
from nameko.extensions import DependencyProvider, Entrypoint
from nameko.testing.utils import get_extension
from nameko.testing.waiting import WaitResult, wait_for_call


@contextmanager
Expand Down Expand Up @@ -48,93 +48,166 @@ def entrypoint_hook(container, method_name, context_data=None):
method_name, container))

def hook(*args, **kwargs):
result = event.Event()

def handle_result(worker_ctx, res=None, exc_info=None):
result.send(res, exc_info)
return res, exc_info

container.spawn_worker(entrypoint, args, kwargs,
context_data=context_data,
handle_result=handle_result)
hook_result = event.Event()

def wait_for_entrypoint():
with entrypoint_waiter(container, method_name) as waiter_result:
container.spawn_worker(
entrypoint, args, kwargs,
context_data=context_data
)
try:
hook_result.send(waiter_result.get())
except Exception as exc:
hook_result.send_exception(exc)

# If the container errors (e.g. due to a bad entrypoint), handle_result
# is never called and we hang. To mitigate, we spawn a greenlet waiting
# for the container, and if that throws we send the exception back
# as our result
def catch_container_errors(gt):
def wait_for_container():
try:
gt.wait()
container.wait()
except Exception as exc:
result.send_exception(exc)
if not hook_result.ready():
hook_result.send_exception(exc)

gt = eventlet.spawn(container.wait)
gt.link(catch_container_errors)
# If the container errors (e.g. due to a bad entrypoint), the
# entrypoint_waiter never completes. To mitigate, we also wait on
# the container, and if that throws we send the exception back
# as our result.
eventlet.spawn_n(wait_for_entrypoint)
eventlet.spawn_n(wait_for_container)

return result.wait()
return hook_result.wait()

yield hook


class EntrypointWaiter(DependencyProvider):
"""Helper for `entrypoint_waiter`
@contextmanager
def entrypoint_waiter(container, method_name, timeout=30, callback=None):
""" Context manager that waits until an entrypoint has fired, and
the generated worker has exited and been torn down.
DependencyProvider to be manually (and temporarily) added to an existing
container. Takes an entrypoint name, and exposes a `wait` method, which
will return once the entrypoint has fired.
"""
It yields a :class:`nameko.testing.waiting.WaitResult` object that can be
used to get the result returned (exception raised) by the entrypoint
after the waiter has exited.
class Timeout(Exception):
pass
:Parameters:
container : ServiceContainer
The container hosting the service owning the entrypoint
method_name : str
The name of the entrypoint decorated method on the service class
timeout : int
Maximum seconds to wait
callback : callable
Function to conditionally control whether the entrypoint_waiter
should exit for a particular invocation
def __init__(self, entrypoint):
self.attr_name = '_entrypoint_waiter_{}'.format(entrypoint)
self.entrypoint = entrypoint
self.done = Semaphore(value=0)
The `timeout` argument specifies the maximum number of seconds the
`entrypoint_waiter` should wait before exiting. It can be disabled by
passing `None`. The default is 30 seconds.
def worker_teardown(self, worker_ctx):
entrypoint = worker_ctx.entrypoint
if entrypoint.method_name == self.entrypoint:
self.done.release()
Optionally allows a `callback` to be provided which is invoked whenever
the entrypoint fires. If provided, the callback must return `True`
for the `entrypoint_waiter` to exit. The signature for the callback
function is::
def wait(self):
self.done.acquire()
def callback(worker_ctx, result, exc_info):
pass
Where there parameters are as follows:
@contextmanager
def entrypoint_waiter(container, entrypoint, timeout=30):
"""Helper to wait for entrypoints to fire (and complete)
worker_ctx (WorkerContext): WorkerContext of the entrypoint call.
Usage::
result (object): The return value of the entrypoint.
container = ServiceContainer(ExampleService, config)
with entrypoint_waiter(container, 'example_handler'):
... # e.g. rpc call that will result in handler being called
"""
exc_info (tuple): Tuple as returned by `sys.exc_info` if the
entrypoint raised an exception, otherwise `None`.
waiter = EntrypointWaiter(entrypoint)
if not get_extension(container, Entrypoint, method_name=entrypoint):
**Usage**
::
class Service(object):
name = "service"
@event_handler('srcservice', 'eventtype')
def handle_event(self, msg):
return msg
container = ServiceContainer(Service, config)
container.start()
# basic
with entrypoint_waiter(container, 'handle_event'):
... # action that dispatches event
# giving access to the result
with entrypoint_waiter(container, 'handle_event') as result:
... # action that dispatches event
res = result.get()
# with custom timeout
with entrypoint_waiter(container, 'handle_event', timeout=5):
... # action that dispatches event
# with callback that waits until entrypoint stops raising
def callback(worker_ctx, result, exc_info):
if exc_info is None:
return True
with entrypoint_waiter(container, 'handle_event', callback=callback):
... # action that dispatches event
"""
if not get_extension(container, Entrypoint, method_name=method_name):
raise RuntimeError("{} has no entrypoint `{}`".format(
container.service_name, entrypoint))
if get_extension(container, EntrypointWaiter, entrypoint=entrypoint):
raise RuntimeError("Waiter already registered for {}".format(
entrypoint))

# can't mess with dependencies while container is running
wait_for_worker_idle(container)
container.dependencies.add(waiter)

try:
yield
exc = waiter.Timeout(
"Entrypoint {}.{} failed to complete within {} seconds".format(
container.service_name, entrypoint, timeout)
)
with eventlet.Timeout(timeout, exception=exc):
waiter.wait()
finally:
wait_for_worker_idle(container)
container.dependencies.remove(waiter)
container.service_name, method_name))

class Result(WaitResult):
worker_ctx = None

def send(self, worker_ctx, result, exc_info):
self.worker_ctx = worker_ctx
super(Result, self).send(result, exc_info)

waiter_callback = callback
waiter_result = Result()

def on_worker_result(worker_ctx, result, exc_info):
complete = False
if worker_ctx.entrypoint.method_name == method_name:
if not callable(waiter_callback):
complete = True
else:
complete = waiter_callback(worker_ctx, result, exc_info)

if complete:
waiter_result.send(worker_ctx, result, exc_info)
return complete

def on_worker_teardown(worker_ctx):
if waiter_result.worker_ctx is worker_ctx:
return True
return False

exc = entrypoint_waiter.Timeout(
"Timeout on {}.{} after {} seconds".format(
container.service_name, method_name, timeout)
)

with eventlet.Timeout(timeout, exception=exc):
with wait_for_call(
container, '_worker_teardown',
lambda args, kwargs, res, exc: on_worker_teardown(*args)
):
with wait_for_call(
container, '_worker_result',
lambda args, kwargs, res, exc: on_worker_result(*args)
):
yield waiter_result


class EntrypointWaiterTimeout(Exception):
pass

entrypoint_waiter.Timeout = EntrypointWaiterTimeout


def worker_factory(service_cls, **dependencies):
Expand Down
5 changes: 5 additions & 0 deletions nameko/testing/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Common testing utilities.
"""
import warnings
from contextlib import contextmanager
from functools import partial

Expand Down Expand Up @@ -59,6 +60,10 @@ def wait_for_worker_idle(container, timeout=10):
Raises an :class:`eventlet.Timeout` if the method was not called
within ``timeout`` seconds.
"""
warnings.warn(
"`wait_for_worker_idle` is deprecated. Use the `entrypoint_waiter` "
"to wait for specific entrypoints instead.", DeprecationWarning
)
with eventlet.Timeout(timeout):
container._worker_pool.waitall()

Expand Down

0 comments on commit 37298fe

Please sign in to comment.