Skip to content

Commit

Permalink
Merge pull request #253 from onefinestay/sensitive-variables
Browse files Browse the repository at this point in the history
Sensitive variables
  • Loading branch information
mattbennett committed May 11, 2015
2 parents 3f0cbc0 + 58bb1ca commit 6bf84dc
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 61 deletions.
127 changes: 71 additions & 56 deletions nameko/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,64 +101,78 @@ def dispatch(event_type, event_data):
class EventHandler(Consumer):

def __init__(self, source_service, event_type, handler_type=SERVICE_POOL,
reliable_delivery=True, requeue_on_error=False):
reliable_delivery=True, requeue_on_error=False,
sensitive_variables=()):
r"""
Decorate a method as a handler of ``event_type`` events on the service
called ``source_service``. ``event_type`` must be either a subclass of
:class:`~.Event` with a class attribute ``type`` or a string matching
the
value of this attribute.
``handler_type`` determines the behaviour of the handler:
- ``events.SERVICE_POOL``:
Event handlers are pooled by service type and method,
and one service instance from each pool receives the event. ::
.-[queue]- (service X handler-meth-1)
/
exchange o --[queue]- (service X handler-meth-2)
\
\ (service Y(instance 1) handler-meth)
\ /
[queue]
\
(service Y(instance 2) handler-meth)
- ``events.SINGLETON``:
Events are received by only one registered handler, regardless
of service type. If requeued on error, they may be handled
by a different service instance. ::
(service X handler-meth)
/
exchange o -- [queue]
\
(service Y handler-meth)
- ``events.BROADCAST``:
Events will be received by every handler. Events are broadcast
to every service instance, not just every service type
- use wisely! ::
[queue]- (service X(instance 1) handler-meth)
/
exchange o - [queue]- (service X(instance 2) handler-meth)
\
[queue]- (service Y handler-meth)
# TODO: this is defined by the Consumer actually...
If `requeue_on_error` is true, handlers will return the event to the
queue if an error occurs while handling it. Defaults to false.
If `reliable_delivery` is true, events will be held in the queue
until there is a handler to consume them. Defaults to true.
Raises an ``EventHandlerConfigurationError`` if the ``handler_type``
is set to ``BROADCAST`` and ``reliable_delivery`` is set to ``True``.
called ``source_service``.
:Parameters:
source_service : str
Name of the service that dispatches the event
event_type : str
Type of the event to handle
handler_type : str
Determines the behaviour of the handler in a cluster:
- ``events.SERVICE_POOL``:
Event handlers are pooled by service type and method,
and one service instance from each pool receives the
event. ::
.-[queue]- (service X handler-meth-1)
/
exchange o --[queue]- (service X handler-meth-2)
\
\ (service Y(inst. 1) handler-meth)
\ /
[queue]
\
(service Y(inst. 2) handler-meth)
- ``events.SINGLETON``:
Events are received by only one registered handler,
regardless of service type. If requeued on error, they may
be handled by a different service instance. ::
(service X handler-meth)
/
exchange o -- [queue]
\
(service Y handler-meth)
- ``events.BROADCAST``:
Events will be received by every handler. Events are
broadcast to every service instance, not just every service
type - use wisely! ::
[queue]- (service X(inst. 1) handler-meth)
/
exchange o - [queue]- (service X(inst. 2) handler-meth)
\
[queue]- (service Y handler-meth)
requeue_on_error : bool # TODO: defined by Consumer actually..
If true, handlers will return the event to the queue if an
error occurs while handling it. Defaults to False.
reliable_delivery : bool
Is true, events will be held in the queue until there is a
handler to consume them. Defaults to True.
sensitive_variables : string or tuple of strings
Mark an argument or part of an argument as sensitive. Saved
on the entrypoint instance as
``entrypoint.sensitive_variables`` for later inspection by
other extensions, for example a logging system.
:seealso: :func:`nameko.utils.get_redacted_args`
:Raises:
:exc:`EventHandlerConfigurationError` if the ``handler_type``
is set to ``BROADCAST`` and ``reliable_delivery`` is set to
``True``.
"""
if reliable_delivery and handler_type is BROADCAST:
raise EventHandlerConfigurationError(
Expand All @@ -169,6 +183,7 @@ def __init__(self, source_service, event_type, handler_type=SERVICE_POOL,
self.event_type = event_type
self.handler_type = handler_type
self.reliable_delivery = reliable_delivery
self.sensitive_variables = sensitive_variables

super(EventHandler, self).__init__(
queue=None, requeue_on_error=requeue_on_error)
Expand Down
18 changes: 14 additions & 4 deletions nameko/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,26 @@ class Rpc(Entrypoint, HeaderDecoder):

rpc_consumer = RpcConsumer()

def __init__(self, expected_exceptions=()):
def __init__(self, expected_exceptions=(), sensitive_variables=()):
""" Mark a method to be exposed over rpc
:Parameters:
expected_exceptions : exception class or tuple of exception classes
Stashed on the entrypoint instance for later inspection by
other extensions in the worker lifecycle. Use for exceptions
caused by the caller (e.g. bad arguments).
Specify exceptions that may be caused by the caller (e.g. by
providing bad arguments). Saved on the entrypoint instance as
``entrypoint.expected_exceptions`` for later inspection by
other extensions, for example a monitoring system.
sensitive_variables : string or tuple of strings
Mark an argument or part of an argument as sensitive. Saved on
the entrypoint instance as ``entrypoint.sensitive_variables``
for later inspection by other extensions, for example a
logging system.
:seealso: :func:`nameko.utils.get_redacted_args`
"""
self.expected_exceptions = expected_exceptions
self.sensitive_variables = sensitive_variables

def setup(self):
self.rpc_consumer.register_provider(self)
Expand Down
99 changes: 99 additions & 0 deletions nameko/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,106 @@
import sys
import inspect
import re

import eventlet
from eventlet.queue import LightQueue
import six

REDACTED = "********"


def get_redacted_args(entrypoint, *args, **kwargs):
""" Utility function for use with entrypoints that are marked with
``sensitive_variables`` -- e.g. :class:`nameko.rpc.Rpc` and
:class:`nameko.events.EventHandler`.
:Parameters:
entrypoint : :class:`~nameko.extensions.Entrypoint`
The entrypoint that fired.
args : tuple
Positional arguments for the method call.
kwargs : dict
Keyword arguments for the method call.
The entrypoint should have a ``sensitive_variables`` attribute, the value
of which is a string or tuple of strings specifying the arguments or
partial arguments that should be redacted. To partially redact an argument,
the following syntax is used::
<argument-name>.<dict-key>[<list-index>]
:Returns:
A dictionary as returned by :func:`inspect.getargspec`, but with
sensitive arguments or partial arguments redacted.
.. note::
This function does not raise if one of the ``sensitive_variables``
doesn't match or partially match the calling ``args`` and ``kwargs``.
This allows "fuzzier" pattern matching (e.g. redact a field if it is
present, and otherwise do nothing).
To avoid exposing sensitive variables through a typo, it is recommend
to test the configuration of each entrypoint with
``sensitive_variables`` individually. For example:
.. code-block:: python
class Service(object):
@rpc(sensitive_variables="foo.bar")
def method(self, foo):
pass
container = ServiceContainer(Service, {})
entrypoint = get_extension(container, Rpc, method_name="method")
# no redaction
foo = "arg"
expected_foo = {'foo': "arg"}
assert get_redacted_args(entrypoint, foo) == expected
# 'bar' key redacted
foo = {'bar': "secret value", 'baz': "normal value"}
expected = {'foo': {'bar': "********", 'baz': "normal value"}}
assert get_redacted_args(entrypoint, foo) == expected
.. seealso::
The tests for this utility demonstrate its full usage:
:class:`test.test_utils.TestGetRedactedArgs`
"""
sensitive_variables = entrypoint.sensitive_variables
if isinstance(sensitive_variables, six.string_types):
sensitive_variables = (sensitive_variables,)

method = getattr(entrypoint.container.service_cls, entrypoint.method_name)
argspec = inspect.getcallargs(method, None, *args, **kwargs)
del argspec['self']

def redact(data, keys):
key = keys[0]
if len(keys) == 1:
try:
data[key] = REDACTED
except (KeyError, IndexError, TypeError):
pass
else:
if key in data:
redact(data[key], keys[1:])

for variable in sensitive_variables:
keys = []
for dict_key, list_index in re.findall("(\w+)|\[(\d+)\]", variable):
if dict_key:
keys.append(dict_key)
elif list_index:
keys.append(int(list_index))

if keys[0] in argspec:
redact(argspec, keys)

return argspec


def fail_fast_imap(pool, call, items):
Expand Down
86 changes: 86 additions & 0 deletions test/test_sensitive_variables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import pytest

from nameko.events import event_handler, EventHandler
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc, Rpc
from nameko.testing.services import entrypoint_hook
from nameko.testing.utils import get_extension
from nameko.utils import get_redacted_args, REDACTED


redacted = {}


@pytest.fixture(autouse=True)
def reset():
redacted.clear()


class Logger(DependencyProvider):
""" Example DependencyProvider that makes use of ``get_redacted_args``
to redact ``sensitive_variables`` on entrypoints.
"""

def worker_setup(self, worker_ctx):
entrypoint = worker_ctx.entrypoint
args = worker_ctx.args
kwargs = worker_ctx.kwargs

redacted.update(get_redacted_args(entrypoint, *args, **kwargs))


class Service(object):
name = "service"

logger = Logger()

@event_handler("service", "event_type",
sensitive_variables="event_data.foo")
def handle(self, event_data):
pass

@rpc(sensitive_variables=("a", "b.x[0]", "b.x[2]"))
def method(self, a, b, c):
return [a, b, c]


def test_sensitive_rpc(container_factory):

container = container_factory(Service, {})
rpc_entrypoint = get_extension(container, Rpc)

assert rpc_entrypoint.sensitive_variables == ("a", "b.x[0]", "b.x[2]")

a = "A"
b = {'x': [1, 2, 3], 'y': [4, 5, 6]}
c = "C"

with entrypoint_hook(container, "method") as method:
assert method(a, b, c) == [a, b, c]

assert redacted == {
'a': REDACTED,
'b': {
'x': [REDACTED, 2, REDACTED],
'y': [4, 5, 6]
},
'c': 'C'
}


def test_sensitive_event(container_factory):

container = container_factory(Service, {})
handler_entrypoint = get_extension(container, EventHandler)

assert handler_entrypoint.sensitive_variables == "event_data.foo"

with entrypoint_hook(container, "handle") as handler:
handler({'foo': 'FOO', 'bar': 'BAR'})

assert redacted == {
'event_data': {
'foo': REDACTED,
'bar': 'BAR'
}
}

0 comments on commit 6bf84dc

Please sign in to comment.