diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 0297e1e30..939a3cec2 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -15,7 +15,6 @@ import functools import itertools import logging -import random import socket import ssl import time @@ -24,8 +23,10 @@ import kombu import kombu.connection import kombu.entity +import kombu.exceptions import kombu.messaging import six +from six.moves.urllib import parse from oslo.config import cfg from oslo.messaging._drivers import amqp as rpc_amqp @@ -35,6 +36,7 @@ from oslo.messaging import exceptions from oslo.utils import netutils + rabbit_opts = [ cfg.StrOpt('kombu_ssl_version', default='', @@ -424,6 +426,7 @@ class Connection(object): def __init__(self, conf, url): self.consumers = [] + self.consumer_num = itertools.count(1) self.conf = conf self.max_retries = self.conf.rabbit_max_retries # Try forever? @@ -433,62 +436,62 @@ def __init__(self, conf, url): self.interval_stepping = self.conf.rabbit_retry_backoff # max retry-interval = 30 seconds self.interval_max = 30 - self.memory_transport = False - ssl_params = self._fetch_ssl_params() + self._ssl_params = self._fetch_ssl_params() + self._login_method = self.conf.rabbit_login_method if url.virtual_host is not None: virtual_host = url.virtual_host else: virtual_host = self.conf.rabbit_virtual_host - self.brokers_params = [] - if url.hosts: + self._url = '' + if self.conf.fake_rabbit: + # TODO(sileht): use memory://virtual_host into + # unit tests to remove cfg.CONF.fake_rabbit + self._url = 'memory://%s/' % virtual_host + elif url.hosts: for host in url.hosts: - params = { - 'hostname': host.hostname, - 'port': host.port or 5672, - 'userid': host.username or '', - 'password': host.password or '', - 'login_method': self.conf.rabbit_login_method, - 'virtual_host': virtual_host - } - if self.conf.fake_rabbit: - params['transport'] = 'memory' - if self.conf.rabbit_use_ssl: - params['ssl'] = ssl_params - - self.brokers_params.append(params) + transport = url.transport.replace('kombu+', '') + transport = url.transport.replace('rabbit', 'amqp') + self._url += '%s%s://%s:%s@%s:%s/%s' % ( + ";" if self._url else '', + transport, + parse.quote(host.username or ''), + parse.quote(host.password or ''), + host.hostname or '', str(host.port or 5672), + virtual_host) else: - # Old configuration format for adr in self.conf.rabbit_hosts: hostname, port = netutils.parse_host_port( adr, default_port=self.conf.rabbit_port) + self._url += '%samqp://%s:%s@%s:%s/%s' % ( + ";" if self._url else '', + parse.quote(self.conf.rabbit_userid), + parse.quote(self.conf.rabbit_password), + hostname, port, + virtual_host) - params = { - 'hostname': hostname, - 'port': port, - 'userid': self.conf.rabbit_userid, - 'password': self.conf.rabbit_password, - 'login_method': self.conf.rabbit_login_method, - 'virtual_host': virtual_host - } - - if self.conf.fake_rabbit: - params['transport'] = 'memory' - if self.conf.rabbit_use_ssl: - params['ssl'] = ssl_params - - self.brokers_params.append(params) - - random.shuffle(self.brokers_params) - self.brokers = itertools.cycle(self.brokers_params) + self.do_consume = True - self.memory_transport = self.conf.fake_rabbit + self.channel = None + self.connection = kombu.connection.Connection( + self._url, ssl=self._ssl_params, login_method=self._login_method, + failover_strategy="shuffle") + + LOG.info(_('Connecting to AMQP server on %(hostname)s:%(port)d'), + {'hostname': self.connection.hostname, + 'port': self.connection.port}) + # NOTE(sileht): just ensure the connection is setuped at startup + self.ensure(error_callback=None, + method=lambda channel: True) + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), + {'hostname': self.connection.hostname, + 'port': self.connection.port}) - self.connection = None - self.do_consume = None - self.reconnect() + if self.conf.fake_rabbit: + # Kludge to speed up tests. + self.connection.transport.polling_interval = 0.0 # FIXME(markmc): use oslo sslutils when it is available as a library _SSL_PROTOCOLS = { @@ -531,153 +534,87 @@ def _fetch_ssl_params(self): ssl_params['cert_reqs'] = ssl.CERT_REQUIRED # Return the extended behavior or just have the default behavior - return ssl_params or True + return ssl_params or None - def _connect(self, broker): - """Connect to rabbit. Re-establish any queues that may have - been declared before if we are reconnecting. Exceptions should - be handled by the caller. + def _setup_new_channel(self, new_channel): + """Callback invoked when the kombu connection have created + a new channel, we use it the reconfigure our consumers. """ - LOG.info(_("Connecting to AMQP server on " - "%(hostname)s:%(port)d"), broker) - self.connection = kombu.connection.BrokerConnection(**broker) - self.connection_errors = self.connection.connection_errors - self.channel_errors = self.connection.channel_errors - if self.memory_transport: - # Kludge to speed up tests. - self.connection.transport.polling_interval = 0.0 - self.do_consume = True self.consumer_num = itertools.count(1) - self.connection.connect() - self.channel = self.connection.channel() - # work around 'memory' transport bug in 1.1.3 - if self.memory_transport: - self.channel._new_queue('ae.undeliver') for consumer in self.consumers: - consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), - broker) - - def _disconnect(self): - if self.connection: - # XXX(nic): when reconnecting to a RabbitMQ cluster - # with mirrored queues in use, the attempt to release the - # connection can hang "indefinitely" somewhere deep down - # in Kombu. Blocking the thread for a bit prior to - # release seems to kludge around the problem where it is - # otherwise reproduceable. - if self.conf.kombu_reconnect_delay > 0: - LOG.info(_("Delaying reconnect for %1.1f seconds...") % - self.conf.kombu_reconnect_delay) - time.sleep(self.conf.kombu_reconnect_delay) - - try: - self.connection.release() - except self.connection_errors: - pass - self.connection = None + consumer.reconnect(new_channel) - def reconnect(self, retry=None): - """Handles reconnecting and re-establishing queues. - Will retry up to retry number of times. + def ensure(self, error_callback, method, retry=None, + timeout_is_error=True): + """Will retry up to retry number of times. retry = None means use the value of rabbit_max_retries retry = -1 means to retry forever retry = 0 means no retry retry = N means N retries - Sleep between tries, starting at self.interval_start - seconds, backing off self.interval_stepping number of seconds - each attempt. """ - attempt = 0 - loop_forever = False if retry is None: retry = self.max_retries if retry is None or retry < 0: - loop_forever = True + retry = None - while True: - self._disconnect() + def on_error(exc, interval): + error_callback and error_callback(exc) - broker = six.next(self.brokers) - attempt += 1 - try: - self._connect(broker) - return - except IOError as ex: - e = ex - except self.connection_errors as ex: - e = ex - except Exception as ex: - # NOTE(comstud): Unfortunately it's possible for amqplib - # to return an error not covered by its transport - # connection_errors in the case of a timeout waiting for - # a protocol response. (See paste link in LP888621) - # So, we check all exceptions for 'timeout' in them - # and try to reconnect in this case. - if 'timeout' not in six.text_type(e): - raise - e = ex - - log_info = {} - log_info['err_str'] = e - log_info['retry'] = retry or 0 - log_info.update(broker) - - if not loop_forever and attempt > retry: - msg = _('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(retry)d ' - 'tries: %(err_str)s') % log_info - LOG.error(msg) - raise exceptions.MessageDeliveryFailure(msg) + info = {'hostname': self.connection.hostname, + 'port': self.connection.port, + 'err_str': exc, 'sleep_time': interval} + + if 'Socket closed' in six.text_type(exc): + LOG.error(_('AMQP server %(hostname)s:%(port)d closed' + ' the connection. Check login credentials:' + ' %(err_str)s'), info) else: - if attempt == 1: - sleep_time = self.interval_start or 1 - elif attempt > 1: - sleep_time += self.interval_stepping - - sleep_time = min(sleep_time, self.interval_max) - - log_info['sleep_time'] = sleep_time - if 'Socket closed' in six.text_type(e): - LOG.error(_('AMQP server %(hostname)s:%(port)d closed' - ' the connection. Check login credentials:' - ' %(err_str)s'), log_info) - else: - LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' - 'unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.'), log_info) - time.sleep(sleep_time) - - def ensure(self, error_callback, method, retry=None): - while True: - try: - return method() - except self.connection_errors as e: - if error_callback: - error_callback(e) - except self.channel_errors as e: - if error_callback: - error_callback(e) - except (socket.timeout, IOError) as e: - if error_callback: - error_callback(e) - except Exception as e: - # NOTE(comstud): Unfortunately it's possible for amqplib - # to return an error not covered by its transport - # connection_errors in the case of a timeout waiting for - # a protocol response. (See paste link in LP888621) - # So, we check all exceptions for 'timeout' in them - # and try to reconnect in this case. - if 'timeout' not in six.text_type(e): - raise - if error_callback: - error_callback(e) - self.reconnect(retry=retry) - - def get_channel(self): - """Convenience call for bin/clear_rabbit_queues.""" - return self.channel + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' + 'unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.'), info) + + # XXX(nic): when reconnecting to a RabbitMQ cluster + # with mirrored queues in use, the attempt to release the + # connection can hang "indefinitely" somewhere deep down + # in Kombu. Blocking the thread for a bit prior to + # release seems to kludge around the problem where it is + # otherwise reproduceable. + # TODO(sileht): Check if this is useful since we + # use kombu for HA connection, the interval_step + # should sufficient, because the underlying kombu transport + # connection object freed. + if self.conf.kombu_reconnect_delay > 0: + LOG.info(_("Delaying reconnect for %1.1f seconds...") % + self.conf.kombu_reconnect_delay) + time.sleep(self.conf.kombu_reconnect_delay) + + recoverable_errors = (self.connection.recoverable_channel_errors + + self.connection.recoverable_connection_errors) + try: + autoretry_method = self.connection.autoretry( + method, channel=self.channel, + max_retries=retry, + errback=on_error, + interval_start=self.interval_start or 1, + interval_step=self.interval_stepping, + on_revive=self._setup_new_channel, + ) + ret, channel = autoretry_method() + self.channel = channel + return ret + except recoverable_errors as exc: + # NOTE(sileht): number of retry exceeded and the connection + # is still broken + msg = _('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(retry)d ' + 'tries: %(err_str)s') % { + 'hostname': self.connection.hostname, + 'port': self.connection.port, + 'err_str': exc, + 'retry': retry} + LOG.error(msg) + raise exceptions.MessageDeliveryFailure(msg) def close(self): """Close/release this connection.""" @@ -689,10 +626,8 @@ def reset(self): """Reset a connection so it can be used again.""" self.channel.close() self.channel = self.connection.channel() - # work around 'memory' transport bug in 1.1.3 - if self.memory_transport: - self.channel._new_queue('ae.undeliver') self.consumers = [] + self._setup_new_channel(self.channel) def declare_consumer(self, consumer_cls, topic, callback): """Create a Consumer using the class that was passed in and @@ -704,8 +639,8 @@ def _connect_error(exc): LOG.error(_("Failed to declare consumer for topic '%(topic)s': " "%(err_str)s"), log_info) - def _declare_consumer(): - consumer = consumer_cls(self.conf, self.channel, topic, callback, + def _declare_consumer(channel): + consumer = consumer_cls(self.conf, channel, topic, callback, six.next(self.consumer_num)) self.consumers.append(consumer) return consumer @@ -716,15 +651,11 @@ def iterconsume(self, limit=None, timeout=None): """Return an iterator that will consume from all queues/consumers.""" def _error_callback(exc): - if isinstance(exc, socket.timeout): - LOG.debug('Timed out waiting for RPC response: %s', exc) - raise rpc_common.Timeout() - else: - LOG.exception(_('Failed to consume message from queue: %s'), - exc) - self.do_consume = True + LOG.exception(_('Failed to consume message from queue: %s'), + exc) + self.do_consume = True - def _consume(): + def _consume(channel): if self.do_consume: queues_head = self.consumers[:-1] # not fanout. queues_tail = self.consumers[-1] # fanout @@ -732,7 +663,11 @@ def _consume(): queue.consume(nowait=True) queues_tail.consume(nowait=False) self.do_consume = False - return self.connection.drain_events(timeout=timeout) + try: + return self.connection.drain_events(timeout=timeout) + except socket.timeout as exc: + LOG.debug('Timed out waiting for RPC response: %s', exc) + raise rpc_common.Timeout() for iteration in itertools.count(0): if limit and iteration >= limit: @@ -748,8 +683,8 @@ def _error_callback(exc): LOG.exception(_("Failed to publish message to topic " "'%(topic)s': %(err_str)s"), log_info) - def _publish(): - publisher = cls(self.conf, self.channel, topic=topic, **kwargs) + def _publish(channel): + publisher = cls(self.conf, channel, topic=topic, **kwargs) publisher.send(msg, timeout) self.ensure(_error_callback, _publish, retry=retry) diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index de331ee8f..dc0d1a3f8 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -13,7 +13,6 @@ # under the License. import datetime -import operator import sys import threading import uuid @@ -21,6 +20,7 @@ import fixtures import kombu import mock +from oslotest import mockpatch import testscenarios from oslo import messaging @@ -49,87 +49,43 @@ class TestRabbitTransportURL(test_utils.BaseTestCase): scenarios = [ ('none', dict(url=None, - expected=[dict(hostname='localhost', - port=5672, - userid='guest', - password='guest', - virtual_host='/')])), + expected=["amqp://guest:guest@localhost:5672//"])), ('empty', dict(url='rabbit:///', - expected=[dict(hostname='localhost', - port=5672, - userid='guest', - password='guest', - virtual_host='')])), + expected=['amqp://guest:guest@localhost:5672/'])), ('localhost', dict(url='rabbit://localhost/', - expected=[dict(hostname='localhost', - port=5672, - userid='', - password='', - virtual_host='')])), + expected=['amqp://:@localhost:5672/'])), ('virtual_host', dict(url='rabbit:///vhost', - expected=[dict(hostname='localhost', - port=5672, - userid='guest', - password='guest', - virtual_host='vhost')])), + expected=['amqp://guest:guest@localhost:5672/vhost'])), ('no_creds', dict(url='rabbit://host/virtual_host', - expected=[dict(hostname='host', - port=5672, - userid='', - password='', - virtual_host='virtual_host')])), + expected=['amqp://:@host:5672/virtual_host'])), ('no_port', dict(url='rabbit://user:password@host/virtual_host', - expected=[dict(hostname='host', - port=5672, - userid='user', - password='password', - virtual_host='virtual_host')])), + expected=['amqp://user:password@host:5672/virtual_host'])), ('full_url', dict(url='rabbit://user:password@host:10/virtual_host', - expected=[dict(hostname='host', - port=10, - userid='user', - password='password', - virtual_host='virtual_host')])), + expected=['amqp://user:password@host:10/virtual_host'])), ('full_two_url', dict(url='rabbit://user:password@host:10,' 'user2:password2@host2:12/virtual_host', - expected=[dict(hostname='host', - port=10, - userid='user', - password='password', - virtual_host='virtual_host'), - dict(hostname='host2', - port=12, - userid='user2', - password='password2', - virtual_host='virtual_host') - ] + expected=["amqp://user:password@host:10/virtual_host", + "amqp://user2:password2@host2:12/virtual_host"] )), - ] - def test_transport_url(self): - self.messaging_conf.in_memory = True + @mock.patch('oslo.messaging._drivers.impl_rabbit.Connection.ensure') + def test_transport_url(self, fake_ensure): + self.messaging_conf.in_memory = False transport = messaging.get_transport(self.conf, self.url) self.addCleanup(transport.cleanup) driver = transport._driver - brokers_params = driver._get_connection().brokers_params[:] - brokers_params = [dict((k, v) for k, v in broker.items() - if k not in ['transport', 'login_method']) - for broker in brokers_params] - - self.assertEqual(sorted(self.expected, - key=operator.itemgetter('hostname')), - sorted(brokers_params, - key=operator.itemgetter('hostname'))) + urls = driver._get_connection()._url.split(";") + self.assertEqual(sorted(self.expected), sorted(urls)) class TestSendReceive(test_utils.BaseTestCase): @@ -674,62 +630,38 @@ def setUp(self): self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5'] self.config(rabbit_hosts=self.brokers) - hostname_sets = set() - self.info = {'attempt': 0, - 'fail': False} - - def _connect(myself, params): - # do as little work that is enough to pass connection attempt - myself.connection = kombu.connection.BrokerConnection(**params) - myself.connection_errors = myself.connection.connection_errors - - hostname = params['hostname'] - self.assertNotIn(hostname, hostname_sets) - hostname_sets.add(hostname) - - self.info['attempt'] += 1 - if self.info['fail']: - raise IOError('fake fail') - - # just make sure connection instantiation does not fail with an - # exception - self.stubs.Set(rabbit_driver.Connection, '_connect', _connect) + self.kombu_connect = mock.Mock() + self.useFixture(mockpatch.Patch( + 'kombu.connection.Connection.connect', + side_effect=self.kombu_connect)) + self.useFixture(mockpatch.Patch( + 'kombu.connection.Connection.channel')) # starting from the first broker in the list url = messaging.TransportURL.parse(self.conf, None) self.connection = rabbit_driver.Connection(self.conf, url) self.addCleanup(self.connection.close) - self.info.update({'attempt': 0, - 'fail': True}) - hostname_sets.clear() - - def test_reconnect_order(self): - self.assertRaises(messaging.MessageDeliveryFailure, - self.connection.reconnect, - retry=len(self.brokers) - 1) - self.assertEqual(len(self.brokers), self.info['attempt']) - def test_ensure_four_retry(self): mock_callback = mock.Mock(side_effect=IOError) self.assertRaises(messaging.MessageDeliveryFailure, self.connection.ensure, None, mock_callback, retry=4) - self.assertEqual(5, self.info['attempt']) - self.assertEqual(1, mock_callback.call_count) + self.assertEqual(5, self.kombu_connect.call_count) + self.assertEqual(6, mock_callback.call_count) def test_ensure_one_retry(self): mock_callback = mock.Mock(side_effect=IOError) self.assertRaises(messaging.MessageDeliveryFailure, self.connection.ensure, None, mock_callback, retry=1) - self.assertEqual(2, self.info['attempt']) - self.assertEqual(1, mock_callback.call_count) + self.assertEqual(2, self.kombu_connect.call_count) + self.assertEqual(3, mock_callback.call_count) def test_ensure_no_retry(self): mock_callback = mock.Mock(side_effect=IOError) self.assertRaises(messaging.MessageDeliveryFailure, self.connection.ensure, None, mock_callback, retry=0) - self.assertEqual(1, self.info['attempt']) - self.assertEqual(1, mock_callback.call_count) + self.assertEqual(1, self.kombu_connect.call_count) + self.assertEqual(2, mock_callback.call_count)