Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide a way to config login_method #713

Merged
merged 7 commits into from Oct 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 16 additions & 5 deletions nameko/amqp/publish.py
Expand Up @@ -17,21 +17,29 @@ class UndeliverableMessage(Exception):


@contextmanager
def get_connection(amqp_uri, ssl=None, transport_options=None):
def get_connection(amqp_uri, ssl=None, login_method=None, transport_options=None):
if not transport_options:
transport_options = DEFAULT_TRANSPORT_OPTIONS.copy()
conn = Connection(amqp_uri, transport_options=transport_options, ssl=ssl)
conn = Connection(
amqp_uri, transport_options=transport_options, ssl=ssl,
login_method=login_method
)

with connections[conn].acquire(block=True) as connection:
yield connection


@contextmanager
def get_producer(amqp_uri, confirms=True, ssl=None, transport_options=None):
def get_producer(
amqp_uri, confirms=True, ssl=None, login_method=None, transport_options=None
):
if transport_options is None:
transport_options = DEFAULT_TRANSPORT_OPTIONS.copy()
transport_options['confirm_publish'] = confirms
conn = Connection(amqp_uri, transport_options=transport_options, ssl=ssl)
conn = Connection(
amqp_uri, transport_options=transport_options, ssl=ssl,
login_method=login_method
)

with producers[conn].acquire(block=True) as producer:
yield producer
Expand Down Expand Up @@ -120,10 +128,12 @@ class Publisher(object):
def __init__(
self, amqp_uri, use_confirms=None, serializer=None, compression=None,
delivery_mode=None, mandatory=None, priority=None, expiration=None,
declare=None, retry=None, retry_policy=None, ssl=None, **publish_kwargs
declare=None, retry=None, retry_policy=None, ssl=None, login_method=None,
**publish_kwargs
):
self.amqp_uri = amqp_uri
self.ssl = ssl
self.login_method = login_method

# publish confirms
if use_confirms is not None:
Expand Down Expand Up @@ -192,6 +202,7 @@ def publish(self, payload, **kwargs):
with get_producer(self.amqp_uri,
use_confirms,
self.ssl,
self.login_method,
transport_options,
) as producer:
try:
Expand Down
1 change: 1 addition & 0 deletions nameko/constants.py
Expand Up @@ -7,6 +7,7 @@
HEARTBEAT_CONFIG_KEY = 'HEARTBEAT'
AMQP_SSL_CONFIG_KEY = 'AMQP_SSL'
TRANSPORT_OPTIONS_CONFIG_KEY = 'TRANSPORT_OPTIONS'
LOGIN_METHOD_CONFIG_KEY = 'LOGIN_METHOD'

MAX_WORKERS_CONFIG_KEY = 'max_workers'
PARENT_CALLS_CONFIG_KEY = 'parent_calls_tracked'
Expand Down
9 changes: 6 additions & 3 deletions nameko/messaging.py
Expand Up @@ -19,7 +19,7 @@
from nameko.constants import (
AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, DEFAULT_HEARTBEAT,
DEFAULT_TRANSPORT_OPTIONS, HEADER_PREFIX, HEARTBEAT_CONFIG_KEY,
TRANSPORT_OPTIONS_CONFIG_KEY
LOGIN_METHOD_CONFIG_KEY, TRANSPORT_OPTIONS_CONFIG_KEY
)
from nameko.exceptions import ContainerBeingKilled
from nameko.extensions import (
Expand Down Expand Up @@ -158,7 +158,7 @@ def serializer(self):
def setup(self):

ssl = self.container.config.get(AMQP_SSL_CONFIG_KEY)

login_method = self.container.config.get(LOGIN_METHOD_CONFIG_KEY)
with get_connection(self.amqp_uri, ssl) as conn:
for entity in self.declare:
maybe_declare(entity, conn.channel())
Expand All @@ -171,6 +171,7 @@ def setup(self):
exchange=self.exchange,
declare=self.declare,
ssl=ssl,
login_method=login_method,
**self.options
)

Expand Down Expand Up @@ -350,10 +351,12 @@ def connection(self):
TRANSPORT_OPTIONS_CONFIG_KEY, DEFAULT_TRANSPORT_OPTIONS
)
ssl = self.container.config.get(AMQP_SSL_CONFIG_KEY)
login_method = self.container.config.get(LOGIN_METHOD_CONFIG_KEY)
conn = Connection(self.amqp_uri,
transport_options=transport_options,
heartbeat=heartbeat,
ssl=ssl
ssl=ssl,
login_method=login_method
)

return conn
Expand Down
21 changes: 16 additions & 5 deletions nameko/rpc.py
Expand Up @@ -13,7 +13,7 @@
from nameko.amqp.publish import Publisher, UndeliverableMessage
from nameko.constants import (
AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, DEFAULT_SERIALIZER,
RPC_EXCHANGE_CONFIG_KEY, SERIALIZER_CONFIG_KEY
LOGIN_METHOD_CONFIG_KEY, RPC_EXCHANGE_CONFIG_KEY, SERIALIZER_CONFIG_KEY
)
from nameko.exceptions import (
ContainerBeingKilled, MalformedRequest, MethodNotFound, UnknownService,
Expand Down Expand Up @@ -125,8 +125,11 @@ def handle_result(self, message, result, exc_info):
)
exchange = get_rpc_exchange(self.container.config)
ssl = self.container.config.get(AMQP_SSL_CONFIG_KEY)
login_method = self.container.config.get(LOGIN_METHOD_CONFIG_KEY)

responder = Responder(amqp_uri, exchange, serializer, message, ssl=ssl)
responder = Responder(
amqp_uri, exchange, serializer, message, ssl=ssl, login_method=login_method
)
result, exc_info = responder.send_response(result, exc_info)

self.queue_consumer.ack_message(message)
Expand Down Expand Up @@ -179,13 +182,14 @@ class Responder(object):
publisher_cls = Publisher

def __init__(
self, amqp_uri, exchange, serializer, message, ssl=None
self, amqp_uri, exchange, serializer, message, ssl=None, login_method=None
):
self.amqp_uri = amqp_uri
self.serializer = serializer
self.message = message
self.exchange = exchange
self.ssl = ssl
self.login_method = login_method

def send_response(self, result, exc_info):

Expand Down Expand Up @@ -215,7 +219,9 @@ def send_response(self, result, exc_info):
routing_key = self.message.properties['reply_to']
correlation_id = self.message.properties.get('correlation_id')

publisher = self.publisher_cls(self.amqp_uri, ssl=self.ssl)
publisher = self.publisher_cls(
self.amqp_uri, ssl=self.ssl, login_method=self.login_method
)

publisher.publish(
payload,
Expand Down Expand Up @@ -365,7 +371,8 @@ def __init__(
serializer = options.pop('serializer', self.serializer)

self.publisher = self.publisher_cls(
self.amqp_uri, serializer=serializer, ssl=self.ssl, **options
self.amqp_uri, serializer=serializer, ssl=self.ssl,
login_method=self.login_method, **options
)

def __call__(self, *args, **kwargs):
Expand All @@ -385,6 +392,10 @@ def amqp_uri(self):
def ssl(self):
return self.container.config.get(AMQP_SSL_CONFIG_KEY)

@property
def login_method(self):
return self.container.config.get(LOGIN_METHOD_CONFIG_KEY)

@property
def serializer(self):
""" Default serializer to use when publishing message payloads.
Expand Down
8 changes: 6 additions & 2 deletions nameko/standalone/events.py
Expand Up @@ -3,7 +3,8 @@
from nameko import serialization
from nameko.amqp.publish import Publisher
from nameko.constants import (
AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, PERSISTENT
AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, LOGIN_METHOD_CONFIG_KEY,
PERSISTENT
)


Expand All @@ -27,11 +28,14 @@ def event_dispatcher(nameko_config, **kwargs):
serializer = kwargs.pop('serializer', serializer)

ssl = nameko_config.get(AMQP_SSL_CONFIG_KEY)
login_method = nameko_config.get(LOGIN_METHOD_CONFIG_KEY)

# TODO: standalone event dispatcher should accept context event_data
# and insert a call id

publisher = Publisher(amqp_uri, serializer=serializer, ssl=ssl, **kwargs)
publisher = Publisher(
amqp_uri, serializer=serializer, ssl=ssl, login_method=login_method, **kwargs
)

def dispatch(service_name, event_type, event_data):
""" Dispatch an event claiming to originate from `service_name` with
Expand Down
7 changes: 5 additions & 2 deletions nameko/standalone/rpc.py
Expand Up @@ -9,7 +9,9 @@
from kombu.messaging import Consumer

from nameko import serialization
from nameko.constants import AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY
from nameko.constants import (
AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, LOGIN_METHOD_CONFIG_KEY
)
from nameko.containers import WorkerContext
from nameko.exceptions import RpcTimeout
from nameko.extensions import Entrypoint
Expand Down Expand Up @@ -117,7 +119,8 @@ def register_provider(self, provider):

amqp_uri = provider.container.config[AMQP_URI_CONFIG_KEY]
ssl = provider.container.config.get(AMQP_SSL_CONFIG_KEY)
self.connection = Connection(amqp_uri, ssl=ssl)
login_method = provider.container.config.get(LOGIN_METHOD_CONFIG_KEY)
self.connection = Connection(amqp_uri, ssl=ssl, login_method=login_method)

self.queue = provider.queue
self._setup_consumer()
Expand Down
52 changes: 50 additions & 2 deletions test/amqp/test_publish.py
Expand Up @@ -18,6 +18,7 @@
from nameko.amqp.publish import (
Publisher, UndeliverableMessage, get_connection, get_producer
)
from nameko.constants import AMQP_SSL_CONFIG_KEY


def test_get_connection(rabbit_config):
Expand Down Expand Up @@ -95,6 +96,53 @@ def test_confirms_enabled(self, rabbit_config):
)


@pytest.mark.behavioural
class TestLoginMethod(object):

@pytest.fixture
def routing_key(self):
return "routing_key"

@pytest.fixture
def exchange(self):
exchange = Exchange(name="exchange")
return exchange

@pytest.fixture
def queue(self, exchange, routing_key):
queue = Queue(
name="queue", exchange=exchange, routing_key=routing_key
)
return queue

@pytest.fixture(params=["PLAIN", "AMQPLAIN", "EXTERNAL"])
def login_method(self, request):
return request.param

def test_login_method(
self, rabbit_ssl_config, login_method, exchange, queue, routing_key,
get_message_from_queue
):
""" Verify that login_method can be provided to the publisher.

SSL config is required because the EXTERNAL login method uses the client
certificate for authentication.
"""
publisher = Publisher(
rabbit_ssl_config['AMQP_URI'],
serializer="json",
exchange=exchange,
routing_key=routing_key,
declare=[exchange, queue],
login_method=login_method,
ssl=rabbit_ssl_config[AMQP_SSL_CONFIG_KEY]
)

publisher.publish("payload")
message = get_message_from_queue(queue.name)
assert message.payload == "payload"


@pytest.mark.behavioural
class TestPublisher(object):

Expand Down Expand Up @@ -465,9 +513,9 @@ def test_use_confirms(self, get_producer):
publisher = Publisher("memory://", use_confirms=False)

publisher.publish("payload")
use_confirms = get_producer.call_args[0][3].get('confirm_publish')
use_confirms = get_producer.call_args[0][4].get('confirm_publish')
assert use_confirms is False

publisher.publish("payload", use_confirms=True)
use_confirms = get_producer.call_args[0][3].get('confirm_publish')
use_confirms = get_producer.call_args[0][4].get('confirm_publish')
assert use_confirms is True
25 changes: 25 additions & 0 deletions test/standalone/test_event_dispatcher.py
Expand Up @@ -4,6 +4,7 @@
from six.moves import queue

from nameko.amqp import UndeliverableMessage
from nameko.constants import LOGIN_METHOD_CONFIG_KEY
from nameko.events import event_handler
from nameko.standalone.events import event_dispatcher, get_event_exchange
from nameko.testing.services import entrypoint_waiter
Expand Down Expand Up @@ -147,6 +148,30 @@ def test_restricted_parameters(

class TestSSL(object):

@pytest.fixture(params=["PLAIN", "AMQPLAIN", "EXTERNAL"])
def login_method(self, request):
return request.param

@pytest.fixture(params=[True, False], ids=["use client cert", "no client cert"])
def use_client_cert(self, request):
return request.param

@pytest.fixture
def rabbit_ssl_config(self, rabbit_ssl_config, use_client_cert, login_method):

if use_client_cert is False:
# remove certificate paths from config
rabbit_ssl_config['AMQP_SSL'] = True

# set login method
rabbit_ssl_config[LOGIN_METHOD_CONFIG_KEY] = login_method

# skip if not a valid combination
if login_method == "EXTERNAL" and not use_client_cert:
pytest.skip("EXTERNAL login method requires cert verification")

return rabbit_ssl_config

def test_event_dispatcher_over_ssl(
self, container_factory, rabbit_ssl_config, rabbit_config
):
Expand Down
25 changes: 25 additions & 0 deletions test/standalone/test_rpc_proxy.py
Expand Up @@ -8,6 +8,7 @@
from kombu.message import Message
from mock import Mock, call

from nameko.constants import LOGIN_METHOD_CONFIG_KEY
from nameko.containers import WorkerContext
from nameko.exceptions import RemoteError, RpcTimeout
from nameko.extensions import DependencyProvider
Expand Down Expand Up @@ -799,6 +800,30 @@ def test_reuse_when_recovered(self, service_rpc, toxiproxy):

class TestSSL(object):

@pytest.fixture(params=["PLAIN", "AMQPLAIN", "EXTERNAL"])
def login_method(self, request):
return request.param

@pytest.fixture(params=[True, False], ids=["use client cert", "no client cert"])
def use_client_cert(self, request):
return request.param

@pytest.fixture
def rabbit_ssl_config(self, rabbit_ssl_config, use_client_cert, login_method):

if use_client_cert is False:
# remove certificate paths from config
rabbit_ssl_config['AMQP_SSL'] = True

# set login method
rabbit_ssl_config[LOGIN_METHOD_CONFIG_KEY] = login_method

# skip if not a valid combination
if login_method == "EXTERNAL" and not use_client_cert:
pytest.skip("EXTERNAL login method requires cert verification")

return rabbit_ssl_config

def test_rpc_proxy_over_ssl(
self, container_factory, rabbit_ssl_config, rabbit_config
):
Expand Down