Permalink
Browse files

Bump kombu to version 4.2 (#564)

* Bump kombu version

* backport not needed anymore

* Auth Failure is now handled in py-amqp: celery/py-amqp#106

* Use channel instead of connection.

* Tox update

* use mock_channel instead of mock_connection

* Use proper exceptions from kombu.

* Use proper exceptions from kombu.

* This should timestamp not datetime,

* Fixed toxiproxy based tests (in really ugly way).

* flake fixes

* Test fixes.

* Coverage fixes.

* DEFAULT_HEARTBEAT config.

* Better DEFAULT_TRANSPORT_OPTIONS config.

* Even better DEFAULT_TRANSPORT_OPTIONS config.

* Nitpicks.
  • Loading branch information...
s-maj authored and mattbennett committed Aug 9, 2018
1 parent 0bffbf7 commit 650da160289c03d7cfdd25242a65a2302bee1574
View
@@ -5,6 +5,13 @@ Here you can see the full list of changes between nameko versions. Versions
are in form of *headline.major.minor* numbers. Backwards-compatible changes
increment the minor version number only.
Version 2.11.0
-------------
Released: TBA
* Bump the minimum supported kombu version to 4.2 (#564)
Version 2.10.0
-------------
View
@@ -19,6 +19,7 @@ David Weterings (@davidweterings)
Radek Jankiewicz (@radekj)
Miguel Flores Ruiz de Eguino (@miguelfrde)
Artur Stawiarski (@astawiarski)
Sebastian Maj (@s-maj)
Jakub Borys (@kooba)
Andriy Kogut (@andriykohut)
Fabrizio Romano (@gianchub)
View
@@ -2,8 +2,3 @@
from .utils import verify_amqp_uri # noqa: F401
from .publish import ( # noqa: F401
get_connection, get_producer, UndeliverableMessage)
from amqp.transport import _AbstractTransport
# backport http://bit.do/do-not-implement-del
del _AbstractTransport.__del__
View
@@ -2,10 +2,12 @@
from contextlib import contextmanager
from kombu import Connection
from kombu.exceptions import ChannelError
from kombu.pools import connections, producers
from six.moves import queue as Queue
from nameko.constants import DEFAULT_RETRY_POLICY, PERSISTENT
from nameko.constants import (
DEFAULT_RETRY_POLICY, DEFAULT_TRANSPORT_OPTIONS, PERSISTENT
)
class UndeliverableMessage(Exception):
@@ -15,17 +17,20 @@ class UndeliverableMessage(Exception):
@contextmanager
def get_connection(amqp_uri, ssl=None):
conn = Connection(amqp_uri, ssl=ssl)
def get_connection(amqp_uri, ssl=None, transport_options=None):
if not transport_options:
transport_options = DEFAULT_TRANSPORT_OPTIONS.copy()
conn = Connection(amqp_uri, transport_options=transport_options, ssl=ssl)
with connections[conn].acquire(block=True) as connection:
yield connection
@contextmanager
def get_producer(amqp_uri, confirms=True, ssl=None):
transport_options = {
'confirm_publish': confirms
}
def get_producer(amqp_uri, confirms=True, ssl=None, transport_options=None):
if transport_options is None:
transport_options = DEFAULT_TRANSPORT_OPTIONS.copy()
transport_options['confirm_publish'] = confirms
conn = Connection(amqp_uri, transport_options=transport_options, ssl=ssl)
with producers[conn].acquire(block=True) as producer:
@@ -48,6 +53,13 @@ class Publisher(object):
aren't lost, for example due to stale connections.
"""
transport_options = DEFAULT_TRANSPORT_OPTIONS.copy()
"""
A dict of additional connection arguments to pass to alternate kombu
channel implementations. Consult the transport documentation for
available options.
"""
delivery_mode = PERSISTENT
"""
Default delivery mode for messages published by this Publisher.
@@ -108,8 +120,7 @@ class Publisher(object):
def __init__(
self, amqp_uri, use_confirms=None, serializer=None, compression=None,
delivery_mode=None, mandatory=None, priority=None, expiration=None,
declare=None, retry=None, retry_policy=None, ssl=None,
**publish_kwargs
declare=None, retry=None, retry_policy=None, ssl=None, **publish_kwargs
):
self.amqp_uri = amqp_uri
self.ssl = ssl
@@ -159,6 +170,10 @@ def publish(self, payload, **kwargs):
headers.update(kwargs.pop('extra_headers', {}))
use_confirms = kwargs.pop('use_confirms', self.use_confirms)
transport_options = kwargs.pop('transport_options',
self.transport_options
)
transport_options['confirm_publish'] = use_confirms
delivery_mode = kwargs.pop('delivery_mode', self.delivery_mode)
mandatory = kwargs.pop('mandatory', self.mandatory)
@@ -174,22 +189,30 @@ def publish(self, payload, **kwargs):
publish_kwargs.update(kwargs) # remaining publish-time kwargs win
with get_producer(self.amqp_uri, use_confirms, self.ssl) as producer:
producer.publish(
payload,
headers=headers,
delivery_mode=delivery_mode,
mandatory=mandatory,
priority=priority,
expiration=expiration,
compression=compression,
declare=declare,
retry=retry,
retry_policy=retry_policy,
serializer=serializer,
**publish_kwargs
)
with get_producer(self.amqp_uri,
use_confirms,
self.ssl,
transport_options,
) as producer:
try:
producer.publish(
payload,
headers=headers,
delivery_mode=delivery_mode,
mandatory=mandatory,
priority=priority,
expiration=expiration,
compression=compression,
declare=declare,
retry=retry,
retry_policy=retry_policy,
serializer=serializer,
**publish_kwargs
)
except ChannelError as exc:
if "NO_ROUTE" in str(exc):
raise UndeliverableMessage()
raise
if mandatory:
if not use_confirms:
@@ -198,10 +221,3 @@ def publish(self, payload, **kwargs):
"unroutable messages cannot be detected without "
"publish confirms enabled."
)
try:
returned_messages = producer.channel.returned_messages
returned = returned_messages.get_nowait()
except Queue.Empty:
pass
else:
raise UndeliverableMessage(returned)
View
@@ -7,9 +7,6 @@
from kombu.transport.pyamqp import Transport
BAD_CREDENTIALS = (
'Error connecting to broker, probably caused by invalid credentials'
)
BAD_VHOST = (
'Error connecting to broker, probably caused by using an invalid '
'or unauthorized vhost'
@@ -24,13 +21,8 @@ class ConnectionTester(amqp.Connection):
def __init__(self, *args, **kwargs):
try:
super(ConnectionTester, self).__init__(*args, **kwargs)
except IOError as exc:
if not hasattr(self, '_wait_tune_ok'):
raise
elif self._wait_tune_ok:
six.raise_from(IOError(BAD_CREDENTIALS), exc)
else: # pragma: no cover (rabbitmq >= 3.6.0)
six.raise_from(IOError(BAD_VHOST), exc)
except IOError as exc: # pragma: no cover (rabbitmq >= 3.6.0)
six.raise_from(IOError(BAD_VHOST), exc)
except NotAllowed as exc: # pragma: no cover (rabbitmq < 3.6.0)
six.raise_from(IOError(BAD_VHOST), exc)
View
@@ -6,6 +6,7 @@
ACCEPT_CONFIG_KEY = 'ACCEPT'
HEARTBEAT_CONFIG_KEY = 'HEARTBEAT'
AMQP_SSL_CONFIG_KEY = 'AMQP_SSL'
TRANSPORT_OPTIONS_CONFIG_KEY = 'TRANSPORT_OPTIONS'
MAX_WORKERS_CONFIG_KEY = 'max_workers'
PARENT_CALLS_CONFIG_KEY = 'parent_calls_tracked'
@@ -15,6 +16,12 @@
DEFAULT_SERIALIZER = 'json'
DEFAULT_RETRY_POLICY = {'max_retries': 3}
DEFAULT_HEARTBEAT = 60
DEFAULT_TRANSPORT_OPTIONS = {
'max_retries': 3,
'interval_start': 2,
'interval_step': 1,
'interval_max': 5
}
CALL_ID_STACK_CONTEXT_KEY = 'call_id_stack'
AUTH_TOKEN_CONTEXT_KEY = 'auth_token'
View
@@ -18,8 +18,9 @@
from nameko.amqp.publish import get_connection
from nameko.amqp.utils import verify_amqp_uri
from nameko.constants import (
AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, DEFAULT_HEARTBEAT, HEADER_PREFIX,
HEARTBEAT_CONFIG_KEY
AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, DEFAULT_HEARTBEAT,
DEFAULT_TRANSPORT_OPTIONS, HEADER_PREFIX, HEARTBEAT_CONFIG_KEY,
TRANSPORT_OPTIONS_CONFIG_KEY
)
from nameko.exceptions import ContainerBeingKilled
from nameko.extensions import (
@@ -157,12 +158,11 @@ def serializer(self):
def setup(self):
ssl = self.container.config.get(AMQP_SSL_CONFIG_KEY)
verify_amqp_uri(self.amqp_uri, ssl=ssl)
with get_connection(self.amqp_uri, ssl) as conn:
for entity in self.declare:
maybe_declare(entity, conn)
maybe_declare(entity, conn.channel())
serializer = self.options.pop('serializer', self.serializer)
@@ -351,8 +351,17 @@ def connection(self):
heartbeat = self.container.config.get(
HEARTBEAT_CONFIG_KEY, DEFAULT_HEARTBEAT
)
transport_options = self.container.config.get(
TRANSPORT_OPTIONS_CONFIG_KEY, DEFAULT_TRANSPORT_OPTIONS
)
ssl = self.container.config.get(AMQP_SSL_CONFIG_KEY)
return Connection(self.amqp_uri, heartbeat=heartbeat, ssl=ssl)
conn = Connection(self.amqp_uri,
transport_options=transport_options,
heartbeat=heartbeat,
ssl=ssl
)
return conn
def handle_message(self, provider, body, message):
ident = u"{}.handle_message[{}]".format(
View
@@ -22,7 +22,7 @@
packages=find_packages(exclude=['test', 'test.*']),
install_requires=[
"eventlet>=0.20.1",
"kombu>=3.0.1,<4",
"kombu>=4.2.0,<5",
"mock>=1.2",
"path.py>=6.2",
"pyyaml>=3.10",
View
@@ -1,6 +1,6 @@
from __future__ import absolute_import
from datetime import datetime
from time import time
import pytest
from amqp.exceptions import (
@@ -9,6 +9,7 @@
from kombu import Connection
from kombu.common import maybe_declare
from kombu.compression import get_encoder
from kombu.exceptions import OperationalError
from kombu.messaging import Exchange, Producer, Queue
from kombu.serialization import registry
from mock import ANY, MagicMock, Mock, call, patch
@@ -282,7 +283,7 @@ def test_message_properties(
def test_timestamp(
self, publisher, get_message_from_queue, queue
):
now = datetime.now().replace(microsecond=0)
now = int(time())
publisher.publish("payload", timestamp=now)
message = get_message_from_queue(queue.name)
@@ -336,7 +337,7 @@ def test_retry(
# with retry
with patch.object(Producer, '_publish', new=mock_publish):
with pytest.raises(RecoverableConnectionError):
with pytest.raises(OperationalError):
publisher.publish("payload", retry=True)
assert mock_publish.call_count == 1 + expected_retries
@@ -360,7 +361,7 @@ def test_retry_policy(
expected_retries = retry_policy['max_retries'] + 1
with patch.object(Producer, '_publish', new=mock_publish):
with pytest.raises(RecoverableConnectionError):
with pytest.raises(OperationalError):
publisher.publish("payload", retry_policy=retry_policy)
assert mock_publish.call_count == 1 + expected_retries
@@ -463,9 +464,9 @@ def test_use_confirms(self, get_producer):
publisher = Publisher("memory://", use_confirms=False)
publisher.publish("payload")
(_, use_confirms, _), _ = get_producer.call_args
use_confirms = get_producer.call_args[0][3].get('confirm_publish')
assert use_confirms is False
publisher.publish("payload", use_confirms=True)
(_, use_confirms, _), _ = get_producer.call_args
use_confirms = get_producer.call_args[0][3].get('confirm_publish')
assert use_confirms is True
View
@@ -2,6 +2,7 @@
import ssl
import pytest
from amqp.exceptions import AccessRefused, NotAllowed
from urllib3.util import Url, parse_url
from nameko.amqp import verify_amqp_uri
@@ -16,22 +17,20 @@ def test_bad_user(rabbit_config):
scheme, auth, host, port, path, _, _ = parse_url(rabbit_config['AMQP_URI'])
amqp_uri = Url(scheme, 'invalid:invalid', host, port, path).url
with pytest.raises(IOError) as exc_info:
with pytest.raises(AccessRefused) as exc_info:
verify_amqp_uri(amqp_uri)
message = str(exc_info.value)
assert 'Error connecting to broker' in message
assert 'invalid credentials' in message
assert 'Login was refused' in message
def test_bad_vhost(rabbit_config):
scheme, auth, host, port, path, _, _ = parse_url(rabbit_config['AMQP_URI'])
amqp_uri = Url(scheme, auth, host, port, '/unknown').url
with pytest.raises(IOError) as exc_info:
with pytest.raises(NotAllowed) as exc_info:
verify_amqp_uri(amqp_uri)
message = str(exc_info.value)
assert 'Error connecting to broker' in message
assert 'invalid or unauthorized vhost' in message
assert 'access to vhost' in message
def test_other_error(rabbit_config):
View
@@ -43,10 +43,10 @@ def mock_producer():
@pytest.yield_fixture
def mock_connection():
def mock_channel():
with patch('nameko.amqp.publish.connections') as patched:
with patched[ANY].acquire() as connection:
yield connection
yield connection.channel()
@pytest.yield_fixture(scope='session')
@@ -4,6 +4,7 @@
import pytest
from eventlet.event import Event
from kombu.connection import Connection
from kombu.exceptions import OperationalError
from kombu.message import Message
from mock import Mock, call
@@ -646,15 +647,15 @@ def test_normal(self, service_rpc):
def test_down(self, service_rpc, toxiproxy):
toxiproxy.disable()
with pytest.raises(socket.error) as exc_info:
with pytest.raises(OperationalError) as exc_info:
service_rpc.echo(1)
assert "ECONNREFUSED" in str(exc_info.value)
@pytest.mark.usefixtures('use_confirms')
def test_timeout(self, service_rpc, toxiproxy):
toxiproxy.set_timeout()
with pytest.raises(IOError) as exc_info:
with pytest.raises(OperationalError) as exc_info:
service_rpc.echo(1)
assert "Socket closed" in str(exc_info.value)
Oops, something went wrong.

0 comments on commit 650da16

Please sign in to comment.