Skip to content

Commit

Permalink
Merge "Ensure kombu channels are closed"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Jan 30, 2015
2 parents 6fc9099 + e7e5506 commit f5b9def
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions oslo_messaging/_drivers/impl_rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ def __init__(self, conf, url):
'port': self.connection.port})
# NOTE(sileht): just ensure the connection is setuped at startup
self.ensure(error_callback=None,
method=lambda channel: True)
method=lambda: True)
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
{'hostname': self.connection.hostname,
'port': self.connection.port})
Expand Down Expand Up @@ -600,8 +600,6 @@ def ensure(self, error_callback, method, retry=None,
retry = None

def on_error(exc, interval):
self.channel = None

error_callback and error_callback(exc)

interval = (self.conf.kombu_reconnect_delay + interval
Expand Down Expand Up @@ -637,6 +635,7 @@ def on_reconnection(new_channel):
"""Callback invoked when the kombu reconnects and creates
a new channel, we use it the reconfigure our consumers.
"""
self._set_current_channel(new_channel)
self.consumer_num = itertools.count(1)
for consumer in self.consumers:
consumer.reconnect(new_channel)
Expand All @@ -646,22 +645,26 @@ def on_reconnection(new_channel):
{'hostname': self.connection.hostname,
'port': self.connection.port})

def execute_method(channel):
self._set_current_channel(channel)
method()

recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
try:
autoretry_method = self.connection.autoretry(
method, channel=self.channel,
execute_method, channel=self.channel,
max_retries=retry,
errback=on_error,
interval_start=self.interval_start or 1,
interval_step=self.interval_stepping,
on_revive=on_reconnection,
)
ret, channel = autoretry_method()
self.channel = channel
self._set_current_channel(channel)
return ret
except recoverable_errors as exc:
self.channel = None
self._set_current_channel(None)
# NOTE(sileht): number of retry exceeded and the connection
# is still broken
msg = _('Unable to connect to AMQP server on '
Expand All @@ -674,17 +677,21 @@ def on_reconnection(new_channel):
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)

def _set_current_channel(self, new_channel):
if self.channel is not None and new_channel != self.channel:
self.connection.maybe_close_channel(self.channel)
self.channel = new_channel

def close(self):
"""Close/release this connection."""
if self.connection:
self._set_current_channel(None)
self.connection.release()
self.connection = None

def reset(self):
"""Reset a connection so it can be used again."""
if self.channel is not None:
self.channel.close()
self.channel = self.connection.channel()
self._set_current_channel(self.connection.channel())
self.consumers = []
self.consumer_num = itertools.count(1)

Expand All @@ -698,8 +705,8 @@ def _connect_error(exc):
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s"), log_info)

def _declare_consumer(channel):
consumer = consumer_cls(self.conf, channel, topic, callback,
def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback,
six.next(self.consumer_num))
self.consumers.append(consumer)
return consumer
Expand All @@ -722,7 +729,7 @@ def _error_callback(exc):
LOG.exception(_('Failed to consume message from queue: %s'),
exc)

def _consume(channel):
def _consume():
if self.do_consume:
queues_head = self.consumers[:-1] # not fanout.
queues_tail = self.consumers[-1] # fanout
Expand Down Expand Up @@ -758,8 +765,8 @@ def _error_callback(exc):
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)

def _publish(channel):
publisher = cls(self.conf, channel, topic=topic, **kwargs)
def _publish():
publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
publisher.send(msg, timeout)

self.ensure(_error_callback, _publish, retry=retry)
Expand Down

0 comments on commit f5b9def

Please sign in to comment.