From 3092e3090d00705139143e792732885d69869652 Mon Sep 17 00:00:00 2001 From: Lili Nie Date: Sat, 9 Oct 2021 04:36:01 -0400 Subject: [PATCH 1/7] provide a way to config login_method users are not able to config login_method with current code,if the default one(AMQPLAIN) of amqp/connection doesn't work for them --- nameko/amqp/publish.py | 12 +++++++----- nameko/constants.py | 1 + nameko/messaging.py | 9 ++++++--- nameko/rpc.py | 16 +++++++++++----- nameko/standalone/events.py | 5 +++-- nameko/standalone/rpc.py | 5 +++-- 6 files changed, 31 insertions(+), 17 deletions(-) diff --git a/nameko/amqp/publish.py b/nameko/amqp/publish.py index 042784ac3..d01dddf40 100644 --- a/nameko/amqp/publish.py +++ b/nameko/amqp/publish.py @@ -17,21 +17,21 @@ class UndeliverableMessage(Exception): @contextmanager -def get_connection(amqp_uri, ssl=None, transport_options=None): +def get_connection(amqp_uri, ssl=None, login_method=None, transport_options=None): if not transport_options: transport_options = DEFAULT_TRANSPORT_OPTIONS.copy() - conn = Connection(amqp_uri, transport_options=transport_options, ssl=ssl) + conn = Connection(amqp_uri, transport_options=transport_options, ssl=ssl, login_method=login_method) with connections[conn].acquire(block=True) as connection: yield connection @contextmanager -def get_producer(amqp_uri, confirms=True, ssl=None, transport_options=None): +def get_producer(amqp_uri, confirms=True, ssl=None, login_method=None, transport_options=None): if transport_options is None: transport_options = DEFAULT_TRANSPORT_OPTIONS.copy() transport_options['confirm_publish'] = confirms - conn = Connection(amqp_uri, transport_options=transport_options, ssl=ssl) + conn = Connection(amqp_uri, transport_options=transport_options, ssl=ssl, login_method=login_method) with producers[conn].acquire(block=True) as producer: yield producer @@ -120,10 +120,11 @@ class Publisher(object): def __init__( self, amqp_uri, use_confirms=None, serializer=None, compression=None, delivery_mode=None, mandatory=None, priority=None, expiration=None, - declare=None, retry=None, retry_policy=None, ssl=None, **publish_kwargs + declare=None, retry=None, retry_policy=None, ssl=None, login_method=None, **publish_kwargs ): self.amqp_uri = amqp_uri self.ssl = ssl + self.login_method = login_method # publish confirms if use_confirms is not None: @@ -192,6 +193,7 @@ def publish(self, payload, **kwargs): with get_producer(self.amqp_uri, use_confirms, self.ssl, + self.login_method, transport_options, ) as producer: try: diff --git a/nameko/constants.py b/nameko/constants.py index 846bfa024..6395e6c7d 100644 --- a/nameko/constants.py +++ b/nameko/constants.py @@ -7,6 +7,7 @@ HEARTBEAT_CONFIG_KEY = 'HEARTBEAT' AMQP_SSL_CONFIG_KEY = 'AMQP_SSL' TRANSPORT_OPTIONS_CONFIG_KEY = 'TRANSPORT_OPTIONS' +LOGIN_METHOD_CONFIG_KEY = 'LOGIN_METHOD' MAX_WORKERS_CONFIG_KEY = 'max_workers' PARENT_CALLS_CONFIG_KEY = 'parent_calls_tracked' diff --git a/nameko/messaging.py b/nameko/messaging.py index f8736a470..e47d5b949 100644 --- a/nameko/messaging.py +++ b/nameko/messaging.py @@ -19,7 +19,7 @@ from nameko.constants import ( AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, DEFAULT_HEARTBEAT, DEFAULT_TRANSPORT_OPTIONS, HEADER_PREFIX, HEARTBEAT_CONFIG_KEY, - TRANSPORT_OPTIONS_CONFIG_KEY + LOGIN_METHOD_CONFIG_KEY, TRANSPORT_OPTIONS_CONFIG_KEY ) from nameko.exceptions import ContainerBeingKilled from nameko.extensions import ( @@ -158,7 +158,7 @@ def serializer(self): def setup(self): ssl = self.container.config.get(AMQP_SSL_CONFIG_KEY) - + login_method = self.container.config.get(LOGIN_METHOD_CONFIG_KEY) with get_connection(self.amqp_uri, ssl) as conn: for entity in self.declare: maybe_declare(entity, conn.channel()) @@ -171,6 +171,7 @@ def setup(self): exchange=self.exchange, declare=self.declare, ssl=ssl, + login_method=login_method, **self.options ) @@ -350,10 +351,12 @@ def connection(self): TRANSPORT_OPTIONS_CONFIG_KEY, DEFAULT_TRANSPORT_OPTIONS ) ssl = self.container.config.get(AMQP_SSL_CONFIG_KEY) + login_method = self.container.config.get(LOGIN_METHOD_CONFIG_KEY) conn = Connection(self.amqp_uri, transport_options=transport_options, heartbeat=heartbeat, - ssl=ssl + ssl=ssl, + login_method=login_method ) return conn diff --git a/nameko/rpc.py b/nameko/rpc.py index e7af0138f..5bb347ac5 100644 --- a/nameko/rpc.py +++ b/nameko/rpc.py @@ -13,7 +13,7 @@ from nameko.amqp.publish import Publisher, UndeliverableMessage from nameko.constants import ( AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, DEFAULT_SERIALIZER, - RPC_EXCHANGE_CONFIG_KEY, SERIALIZER_CONFIG_KEY + RPC_EXCHANGE_CONFIG_KEY, SERIALIZER_CONFIG_KEY, LOGIN_METHOD_CONFIG_KEY ) from nameko.exceptions import ( ContainerBeingKilled, MalformedRequest, MethodNotFound, UnknownService, @@ -125,8 +125,9 @@ def handle_result(self, message, result, exc_info): ) exchange = get_rpc_exchange(self.container.config) ssl = self.container.config.get(AMQP_SSL_CONFIG_KEY) + login_method = self.container.config.get(LOGIN_METHOD_CONFIG_KEY) - responder = Responder(amqp_uri, exchange, serializer, message, ssl=ssl) + responder = Responder(amqp_uri, exchange, serializer, message, ssl=ssl, login_method=login_method) result, exc_info = responder.send_response(result, exc_info) self.queue_consumer.ack_message(message) @@ -179,13 +180,14 @@ class Responder(object): publisher_cls = Publisher def __init__( - self, amqp_uri, exchange, serializer, message, ssl=None + self, amqp_uri, exchange, serializer, message, ssl=None, login_method=None ): self.amqp_uri = amqp_uri self.serializer = serializer self.message = message self.exchange = exchange self.ssl = ssl + self.login_method = login_method def send_response(self, result, exc_info): @@ -215,7 +217,7 @@ def send_response(self, result, exc_info): routing_key = self.message.properties['reply_to'] correlation_id = self.message.properties.get('correlation_id') - publisher = self.publisher_cls(self.amqp_uri, ssl=self.ssl) + publisher = self.publisher_cls(self.amqp_uri, ssl=self.ssl, login_method= self.login_method) publisher.publish( payload, @@ -365,7 +367,7 @@ def __init__( serializer = options.pop('serializer', self.serializer) self.publisher = self.publisher_cls( - self.amqp_uri, serializer=serializer, ssl=self.ssl, **options + self.amqp_uri, serializer=serializer, ssl=self.ssl, login_method=self.login_method, **options ) def __call__(self, *args, **kwargs): @@ -385,6 +387,10 @@ def amqp_uri(self): def ssl(self): return self.container.config.get(AMQP_SSL_CONFIG_KEY) + @property + def login_method(self): + return self.container.config.get(LOGIN_METHOD_CONFIG_KEY) + @property def serializer(self): """ Default serializer to use when publishing message payloads. diff --git a/nameko/standalone/events.py b/nameko/standalone/events.py index f67a938a0..1c6c9f5ca 100644 --- a/nameko/standalone/events.py +++ b/nameko/standalone/events.py @@ -3,7 +3,7 @@ from nameko import serialization from nameko.amqp.publish import Publisher from nameko.constants import ( - AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, PERSISTENT + AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, PERSISTENT, LOGIN_METHOD_CONFIG_KEY ) @@ -27,11 +27,12 @@ def event_dispatcher(nameko_config, **kwargs): serializer = kwargs.pop('serializer', serializer) ssl = nameko_config.get(AMQP_SSL_CONFIG_KEY) + login_method = nameko_config.get(LOGIN_METHOD_CONFIG_KEY) # TODO: standalone event dispatcher should accept context event_data # and insert a call id - publisher = Publisher(amqp_uri, serializer=serializer, ssl=ssl, **kwargs) + publisher = Publisher(amqp_uri, serializer=serializer, ssl=ssl, login_method=login_method, **kwargs) def dispatch(service_name, event_type, event_data): """ Dispatch an event claiming to originate from `service_name` with diff --git a/nameko/standalone/rpc.py b/nameko/standalone/rpc.py index 16b9d62fd..95d8b26a9 100644 --- a/nameko/standalone/rpc.py +++ b/nameko/standalone/rpc.py @@ -9,7 +9,7 @@ from kombu.messaging import Consumer from nameko import serialization -from nameko.constants import AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY +from nameko.constants import AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, LOGIN_METHOD_CONFIG_KEY from nameko.containers import WorkerContext from nameko.exceptions import RpcTimeout from nameko.extensions import Entrypoint @@ -117,7 +117,8 @@ def register_provider(self, provider): amqp_uri = provider.container.config[AMQP_URI_CONFIG_KEY] ssl = provider.container.config.get(AMQP_SSL_CONFIG_KEY) - self.connection = Connection(amqp_uri, ssl=ssl) + login_method = provider.container.config.get(LOGIN_METHOD_CONFIG_KEY) + self.connection = Connection(amqp_uri, ssl=ssl, login_method=login_method) self.queue = provider.queue self._setup_consumer() From 75d8ed2a809f6c8d5929063bd2d73cdc17cdf284 Mon Sep 17 00:00:00 2001 From: Matt Yule-Bennett Date: Wed, 13 Oct 2021 16:06:23 +0100 Subject: [PATCH 2/7] import sort --- nameko/rpc.py | 2 +- nameko/standalone/events.py | 3 ++- nameko/standalone/rpc.py | 4 +++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/nameko/rpc.py b/nameko/rpc.py index 5bb347ac5..629737768 100644 --- a/nameko/rpc.py +++ b/nameko/rpc.py @@ -13,7 +13,7 @@ from nameko.amqp.publish import Publisher, UndeliverableMessage from nameko.constants import ( AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, DEFAULT_SERIALIZER, - RPC_EXCHANGE_CONFIG_KEY, SERIALIZER_CONFIG_KEY, LOGIN_METHOD_CONFIG_KEY + LOGIN_METHOD_CONFIG_KEY, RPC_EXCHANGE_CONFIG_KEY, SERIALIZER_CONFIG_KEY ) from nameko.exceptions import ( ContainerBeingKilled, MalformedRequest, MethodNotFound, UnknownService, diff --git a/nameko/standalone/events.py b/nameko/standalone/events.py index 1c6c9f5ca..0a1912c9c 100644 --- a/nameko/standalone/events.py +++ b/nameko/standalone/events.py @@ -3,7 +3,8 @@ from nameko import serialization from nameko.amqp.publish import Publisher from nameko.constants import ( - AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, PERSISTENT, LOGIN_METHOD_CONFIG_KEY + AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, LOGIN_METHOD_CONFIG_KEY, + PERSISTENT ) diff --git a/nameko/standalone/rpc.py b/nameko/standalone/rpc.py index 95d8b26a9..ebf920dd4 100644 --- a/nameko/standalone/rpc.py +++ b/nameko/standalone/rpc.py @@ -9,7 +9,9 @@ from kombu.messaging import Consumer from nameko import serialization -from nameko.constants import AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, LOGIN_METHOD_CONFIG_KEY +from nameko.constants import ( + AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, LOGIN_METHOD_CONFIG_KEY +) from nameko.containers import WorkerContext from nameko.exceptions import RpcTimeout from nameko.extensions import Entrypoint From ce9602cc8ee6fd68d99b1d3c8e7a7911ce97d7f0 Mon Sep 17 00:00:00 2001 From: Matt Yule-Bennett Date: Wed, 13 Oct 2021 16:20:31 +0100 Subject: [PATCH 3/7] formatting --- nameko/amqp/publish.py | 17 +++++++++++++---- nameko/rpc.py | 11 ++++++++--- nameko/standalone/events.py | 4 +++- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/nameko/amqp/publish.py b/nameko/amqp/publish.py index d01dddf40..b64cae381 100644 --- a/nameko/amqp/publish.py +++ b/nameko/amqp/publish.py @@ -20,18 +20,26 @@ class UndeliverableMessage(Exception): def get_connection(amqp_uri, ssl=None, login_method=None, transport_options=None): if not transport_options: transport_options = DEFAULT_TRANSPORT_OPTIONS.copy() - conn = Connection(amqp_uri, transport_options=transport_options, ssl=ssl, login_method=login_method) + conn = Connection( + amqp_uri, transport_options=transport_options, ssl=ssl, + login_method=login_method + ) with connections[conn].acquire(block=True) as connection: yield connection @contextmanager -def get_producer(amqp_uri, confirms=True, ssl=None, login_method=None, transport_options=None): +def get_producer( + amqp_uri, confirms=True, ssl=None, login_method=None, transport_options=None +): if transport_options is None: transport_options = DEFAULT_TRANSPORT_OPTIONS.copy() transport_options['confirm_publish'] = confirms - conn = Connection(amqp_uri, transport_options=transport_options, ssl=ssl, login_method=login_method) + conn = Connection( + amqp_uri, transport_options=transport_options, ssl=ssl, + login_method=login_method + ) with producers[conn].acquire(block=True) as producer: yield producer @@ -120,7 +128,8 @@ class Publisher(object): def __init__( self, amqp_uri, use_confirms=None, serializer=None, compression=None, delivery_mode=None, mandatory=None, priority=None, expiration=None, - declare=None, retry=None, retry_policy=None, ssl=None, login_method=None, **publish_kwargs + declare=None, retry=None, retry_policy=None, ssl=None, login_method=None, + **publish_kwargs ): self.amqp_uri = amqp_uri self.ssl = ssl diff --git a/nameko/rpc.py b/nameko/rpc.py index 629737768..26d2d1e62 100644 --- a/nameko/rpc.py +++ b/nameko/rpc.py @@ -127,7 +127,9 @@ def handle_result(self, message, result, exc_info): ssl = self.container.config.get(AMQP_SSL_CONFIG_KEY) login_method = self.container.config.get(LOGIN_METHOD_CONFIG_KEY) - responder = Responder(amqp_uri, exchange, serializer, message, ssl=ssl, login_method=login_method) + responder = Responder( + amqp_uri, exchange, serializer, message, ssl=ssl, login_method=login_method + ) result, exc_info = responder.send_response(result, exc_info) self.queue_consumer.ack_message(message) @@ -217,7 +219,9 @@ def send_response(self, result, exc_info): routing_key = self.message.properties['reply_to'] correlation_id = self.message.properties.get('correlation_id') - publisher = self.publisher_cls(self.amqp_uri, ssl=self.ssl, login_method= self.login_method) + publisher = self.publisher_cls( + self.amqp_uri, ssl=self.ssl, login_method=self.login_method + ) publisher.publish( payload, @@ -367,7 +371,8 @@ def __init__( serializer = options.pop('serializer', self.serializer) self.publisher = self.publisher_cls( - self.amqp_uri, serializer=serializer, ssl=self.ssl, login_method=self.login_method, **options + self.amqp_uri, serializer=serializer, ssl=self.ssl, + login_method=self.login_method, **options ) def __call__(self, *args, **kwargs): diff --git a/nameko/standalone/events.py b/nameko/standalone/events.py index 0a1912c9c..8e013a7ea 100644 --- a/nameko/standalone/events.py +++ b/nameko/standalone/events.py @@ -33,7 +33,9 @@ def event_dispatcher(nameko_config, **kwargs): # TODO: standalone event dispatcher should accept context event_data # and insert a call id - publisher = Publisher(amqp_uri, serializer=serializer, ssl=ssl, login_method=login_method, **kwargs) + publisher = Publisher( + amqp_uri, serializer=serializer, ssl=ssl, login_method=login_method, **kwargs + ) def dispatch(service_name, event_type, event_data): """ Dispatch an event claiming to originate from `service_name` with From 25f2bdef2d3338722579c6adcf41a1361785c8fa Mon Sep 17 00:00:00 2001 From: Matt Yule-Bennett Date: Wed, 13 Oct 2021 16:37:18 +0100 Subject: [PATCH 4/7] update test for new argument position --- test/amqp/test_publish.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/amqp/test_publish.py b/test/amqp/test_publish.py index a5064a02f..9e8f3cd4d 100644 --- a/test/amqp/test_publish.py +++ b/test/amqp/test_publish.py @@ -465,9 +465,9 @@ def test_use_confirms(self, get_producer): publisher = Publisher("memory://", use_confirms=False) publisher.publish("payload") - use_confirms = get_producer.call_args[0][3].get('confirm_publish') + use_confirms = get_producer.call_args[0][4].get('confirm_publish') assert use_confirms is False publisher.publish("payload", use_confirms=True) - use_confirms = get_producer.call_args[0][3].get('confirm_publish') + use_confirms = get_producer.call_args[0][4].get('confirm_publish') assert use_confirms is True From 6bcd41e25b617d24d8fb23001096680185071e1d Mon Sep 17 00:00:00 2001 From: Matt Yule-Bennett Date: Wed, 13 Oct 2021 16:56:31 +0100 Subject: [PATCH 5/7] update test for new argument position --- test/test_messaging.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_messaging.py b/test/test_messaging.py index a0bd915ff..dbdae7063 100644 --- a/test/test_messaging.py +++ b/test/test_messaging.py @@ -1200,11 +1200,11 @@ def test_use_confirms(self, mock_container, get_producer): publish = publisher.get_dependency(worker_ctx) publish("payload") - use_confirms = get_producer.call_args[0][3].get('confirm_publish') + use_confirms = get_producer.call_args[0][4].get('confirm_publish') assert use_confirms is False publish("payload", use_confirms=True) - use_confirms = get_producer.call_args[0][3].get('confirm_publish') + use_confirms = get_producer.call_args[0][4].get('confirm_publish') assert use_confirms is True From 8b0dd11f043ad51d79fd9d329972585ed7e9dc18 Mon Sep 17 00:00:00 2001 From: Matt Yule-Bennett Date: Thu, 14 Oct 2021 15:30:06 +0100 Subject: [PATCH 6/7] test alternative login methods, inc EXTERNAL --- test/amqp/test_publish.py | 48 ++++++++++++++++++++++++ test/standalone/test_event_dispatcher.py | 25 ++++++++++++ test/standalone/test_rpc_proxy.py | 25 ++++++++++++ test/test_events.py | 25 ++++++++++-- test/test_messaging.py | 28 +++++++++++--- test/test_rpc.py | 28 +++++++++++--- 6 files changed, 165 insertions(+), 14 deletions(-) diff --git a/test/amqp/test_publish.py b/test/amqp/test_publish.py index 9e8f3cd4d..20714efeb 100644 --- a/test/amqp/test_publish.py +++ b/test/amqp/test_publish.py @@ -15,6 +15,7 @@ from mock import ANY, MagicMock, Mock, call, patch from six.moves import queue +from nameko.constants import AMQP_SSL_CONFIG_KEY from nameko.amqp.publish import ( Publisher, UndeliverableMessage, get_connection, get_producer ) @@ -95,6 +96,53 @@ def test_confirms_enabled(self, rabbit_config): ) +@pytest.mark.behavioural +class TestLoginMethod(object): + + @pytest.fixture + def routing_key(self): + return "routing_key" + + @pytest.fixture + def exchange(self): + exchange = Exchange(name="exchange") + return exchange + + @pytest.fixture + def queue(self, exchange, routing_key): + queue = Queue( + name="queue", exchange=exchange, routing_key=routing_key + ) + return queue + + @pytest.fixture(params=["PLAIN", "AMQPLAIN", "EXTERNAL"]) + def login_method(self, request): + return request.param + + def test_login_method( + self, rabbit_ssl_config, login_method, exchange, queue, routing_key, + get_message_from_queue + ): + """ Verify that login_method can be provided to the publisher. + + SSL config is required because the EXTERNAL login method uses the client + certificate for authentication. + """ + publisher = Publisher( + rabbit_ssl_config['AMQP_URI'], + serializer="json", + exchange=exchange, + routing_key=routing_key, + declare=[exchange, queue], + login_method=login_method, + ssl=rabbit_ssl_config[AMQP_SSL_CONFIG_KEY] + ) + + publisher.publish("payload") + message = get_message_from_queue(queue.name) + assert message.payload == "payload" + + @pytest.mark.behavioural class TestPublisher(object): diff --git a/test/standalone/test_event_dispatcher.py b/test/standalone/test_event_dispatcher.py index 0542dbb71..c222cfdf2 100644 --- a/test/standalone/test_event_dispatcher.py +++ b/test/standalone/test_event_dispatcher.py @@ -4,6 +4,7 @@ from six.moves import queue from nameko.amqp import UndeliverableMessage +from nameko.constants import LOGIN_METHOD_CONFIG_KEY from nameko.events import event_handler from nameko.standalone.events import event_dispatcher, get_event_exchange from nameko.testing.services import entrypoint_waiter @@ -147,6 +148,30 @@ def test_restricted_parameters( class TestSSL(object): + @pytest.fixture(params=["PLAIN", "AMQPLAIN", "EXTERNAL"]) + def login_method(self, request): + return request.param + + @pytest.fixture(params=[True, False], ids=["use client cert", "no client cert"]) + def use_client_cert(self, request): + return request.param + + @pytest.fixture + def rabbit_ssl_config(self, rabbit_ssl_config, use_client_cert, login_method): + + if use_client_cert is False: + # remove certificate paths from config + rabbit_ssl_config['AMQP_SSL'] = True + + # set login method + rabbit_ssl_config[LOGIN_METHOD_CONFIG_KEY] = login_method + + # skip if not a valid combination + if login_method == "EXTERNAL" and not use_client_cert: + pytest.skip("EXTERNAL login method requires cert verification") + + return rabbit_ssl_config + def test_event_dispatcher_over_ssl( self, container_factory, rabbit_ssl_config, rabbit_config ): diff --git a/test/standalone/test_rpc_proxy.py b/test/standalone/test_rpc_proxy.py index 1f09b524e..55ada38b4 100644 --- a/test/standalone/test_rpc_proxy.py +++ b/test/standalone/test_rpc_proxy.py @@ -8,6 +8,7 @@ from kombu.message import Message from mock import Mock, call +from nameko.constants import LOGIN_METHOD_CONFIG_KEY from nameko.containers import WorkerContext from nameko.exceptions import RemoteError, RpcTimeout from nameko.extensions import DependencyProvider @@ -799,6 +800,30 @@ def test_reuse_when_recovered(self, service_rpc, toxiproxy): class TestSSL(object): + @pytest.fixture(params=["PLAIN", "AMQPLAIN", "EXTERNAL"]) + def login_method(self, request): + return request.param + + @pytest.fixture(params=[True, False], ids=["use client cert", "no client cert"]) + def use_client_cert(self, request): + return request.param + + @pytest.fixture + def rabbit_ssl_config(self, rabbit_ssl_config, use_client_cert, login_method): + + if use_client_cert is False: + # remove certificate paths from config + rabbit_ssl_config['AMQP_SSL'] = True + + # set login method + rabbit_ssl_config[LOGIN_METHOD_CONFIG_KEY] = login_method + + # skip if not a valid combination + if login_method == "EXTERNAL" and not use_client_cert: + pytest.skip("EXTERNAL login method requires cert verification") + + return rabbit_ssl_config + def test_rpc_proxy_over_ssl( self, container_factory, rabbit_ssl_config, rabbit_config ): diff --git a/test/test_events.py b/test/test_events.py index f77f3ed89..6ae01841a 100644 --- a/test/test_events.py +++ b/test/test_events.py @@ -6,6 +6,7 @@ from mock import ANY, Mock, create_autospec, patch from six.moves import queue +from nameko.constants import LOGIN_METHOD_CONFIG_KEY from nameko.containers import WorkerContext from nameko.events import ( BROADCAST, SERVICE_POOL, SINGLETON, EventDispatcher, EventHandler, @@ -843,12 +844,28 @@ def test_restricted_parameters( class TestSSL(object): - @pytest.fixture(params=[True, False]) - def rabbit_ssl_config(self, request, rabbit_ssl_config): - verify_certs = request.param - if verify_certs is False: + @pytest.fixture(params=["PLAIN", "AMQPLAIN", "EXTERNAL"]) + def login_method(self, request): + return request.param + + @pytest.fixture(params=[True, False], ids=["use client cert", "no client cert"]) + def use_client_cert(self, request): + return request.param + + @pytest.fixture + def rabbit_ssl_config(self, rabbit_ssl_config, use_client_cert, login_method): + + if use_client_cert is False: # remove certificate paths from config rabbit_ssl_config['AMQP_SSL'] = True + + # set login method + rabbit_ssl_config[LOGIN_METHOD_CONFIG_KEY] = login_method + + # skip if not a valid combination + if login_method == "EXTERNAL" and not use_client_cert: + pytest.skip("EXTERNAL login method requires cert verification") + return rabbit_ssl_config def test_event_handler_over_ssl( diff --git a/test/test_messaging.py b/test/test_messaging.py index dbdae7063..3772a3c6e 100644 --- a/test/test_messaging.py +++ b/test/test_messaging.py @@ -13,7 +13,9 @@ from nameko.amqp.publish import Publisher as PublisherCore from nameko.amqp.publish import get_producer -from nameko.constants import AMQP_URI_CONFIG_KEY, HEARTBEAT_CONFIG_KEY +from nameko.constants import ( + AMQP_URI_CONFIG_KEY, HEARTBEAT_CONFIG_KEY, LOGIN_METHOD_CONFIG_KEY +) from nameko.containers import WorkerContext from nameko.exceptions import ContainerBeingKilled from nameko.messaging import ( @@ -1215,12 +1217,28 @@ def queue(self,): queue = Queue(name="queue") return queue - @pytest.fixture(params=[True, False]) - def rabbit_ssl_config(self, request, rabbit_ssl_config): - verify_certs = request.param - if verify_certs is False: + @pytest.fixture(params=["PLAIN", "AMQPLAIN", "EXTERNAL"]) + def login_method(self, request): + return request.param + + @pytest.fixture(params=[True, False], ids=["use client cert", "no client cert"]) + def use_client_cert(self, request): + return request.param + + @pytest.fixture + def rabbit_ssl_config(self, rabbit_ssl_config, use_client_cert, login_method): + + if use_client_cert is False: # remove certificate paths from config rabbit_ssl_config['AMQP_SSL'] = True + + # set login method + rabbit_ssl_config[LOGIN_METHOD_CONFIG_KEY] = login_method + + # skip if not a valid combination + if login_method == "EXTERNAL" and not use_client_cert: + pytest.skip("EXTERNAL login method requires cert verification") + return rabbit_ssl_config def test_consume_over_ssl( diff --git a/test/test_rpc.py b/test/test_rpc.py index 8cf336ff2..6e38ddf2d 100644 --- a/test/test_rpc.py +++ b/test/test_rpc.py @@ -12,7 +12,9 @@ from mock import Mock, call, create_autospec, patch from six.moves import queue -from nameko.constants import HEARTBEAT_CONFIG_KEY, MAX_WORKERS_CONFIG_KEY +from nameko.constants import ( + LOGIN_METHOD_CONFIG_KEY, HEARTBEAT_CONFIG_KEY, MAX_WORKERS_CONFIG_KEY +) from nameko.containers import WorkerContext from nameko.events import event_handler from nameko.exceptions import ( @@ -1703,12 +1705,28 @@ def method(self): class TestSSL(object): - @pytest.fixture(params=[True, False]) - def rabbit_ssl_config(self, request, rabbit_ssl_config): - verify_certs = request.param - if verify_certs is False: + @pytest.fixture(params=["PLAIN", "AMQPLAIN", "EXTERNAL"]) + def login_method(self, request): + return request.param + + @pytest.fixture(params=[True, False], ids=["use client cert", "no client cert"]) + def use_client_cert(self, request): + return request.param + + @pytest.fixture + def rabbit_ssl_config(self, rabbit_ssl_config, use_client_cert, login_method): + + if use_client_cert is False: # remove certificate paths from config rabbit_ssl_config['AMQP_SSL'] = True + + # set login method + rabbit_ssl_config[LOGIN_METHOD_CONFIG_KEY] = login_method + + # skip if not a valid combination + if login_method == "EXTERNAL" and not use_client_cert: + pytest.skip("EXTERNAL login method requires cert verification") + return rabbit_ssl_config def test_rpc_entrypoint_over_ssl( From aa3d3f00303d6c82b9461e3bc132200357f13b84 Mon Sep 17 00:00:00 2001 From: Matt Yule-Bennett Date: Thu, 14 Oct 2021 15:33:59 +0100 Subject: [PATCH 7/7] import sort --- test/amqp/test_publish.py | 2 +- test/test_rpc.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/amqp/test_publish.py b/test/amqp/test_publish.py index 20714efeb..02133b840 100644 --- a/test/amqp/test_publish.py +++ b/test/amqp/test_publish.py @@ -15,10 +15,10 @@ from mock import ANY, MagicMock, Mock, call, patch from six.moves import queue -from nameko.constants import AMQP_SSL_CONFIG_KEY from nameko.amqp.publish import ( Publisher, UndeliverableMessage, get_connection, get_producer ) +from nameko.constants import AMQP_SSL_CONFIG_KEY def test_get_connection(rabbit_config): diff --git a/test/test_rpc.py b/test/test_rpc.py index 6e38ddf2d..b33648dea 100644 --- a/test/test_rpc.py +++ b/test/test_rpc.py @@ -13,7 +13,7 @@ from six.moves import queue from nameko.constants import ( - LOGIN_METHOD_CONFIG_KEY, HEARTBEAT_CONFIG_KEY, MAX_WORKERS_CONFIG_KEY + HEARTBEAT_CONFIG_KEY, LOGIN_METHOD_CONFIG_KEY, MAX_WORKERS_CONFIG_KEY ) from nameko.containers import WorkerContext from nameko.events import event_handler