Skip to content

Commit

Permalink
Move each drivers options into its own group
Browse files Browse the repository at this point in the history
All drivers options are current stored into the DEFAULT group.
This change makes the configuration clearer by putting driver options
into a group named oslo_messaging_<driver>.

Closes-bug: #1417040
Change-Id: I96a9682afe7eb0caf1fbf47bbb0291833aec245b
  • Loading branch information
Mehdi Abaakouk committed Feb 2, 2015
1 parent f5b9def commit 824313a
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 89 deletions.
6 changes: 4 additions & 2 deletions oslo_messaging/_drivers/amqp.py
Expand Up @@ -42,11 +42,13 @@
help='Use durable queues in AMQP.'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
deprecated_group='DEFAULT',
help='Auto-delete queues in AMQP.'),

# FIXME(markmc): this was toplevel in openstack.common.rpc
cfg.IntOpt('rpc_conn_pool_size',
default=30,
deprecated_group='DEFAULT',
help='Size of RPC connection pool.'),
]

Expand All @@ -56,11 +58,11 @@

class ConnectionPool(pool.Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, conf, url, connection_cls):
def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
self.connection_cls = connection_cls
self.conf = conf
self.url = url
super(ConnectionPool, self).__init__(self.conf.rpc_conn_pool_size)
super(ConnectionPool, self).__init__(rpc_conn_pool_size)
self.reply_proxy = None

# TODO(comstud): Timeout connections not used in a while
Expand Down
45 changes: 32 additions & 13 deletions oslo_messaging/_drivers/impl_qpid.py
Expand Up @@ -41,41 +41,52 @@
qpid_opts = [
cfg.StrOpt('qpid_hostname',
default='localhost',
deprecated_group='DEFAULT',
help='Qpid broker hostname.'),
cfg.IntOpt('qpid_port',
default=5672,
deprecated_group='DEFAULT',
help='Qpid broker port.'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
deprecated_group='DEFAULT',
help='Qpid HA cluster host:port pairs.'),
cfg.StrOpt('qpid_username',
default='',
deprecated_group='DEFAULT',
help='Username for Qpid connection.'),
cfg.StrOpt('qpid_password',
default='',
deprecated_group='DEFAULT',
help='Password for Qpid connection.',
secret=True),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
deprecated_group='DEFAULT',
help='Space separated list of SASL mechanisms to use for '
'auth.'),
cfg.IntOpt('qpid_heartbeat',
default=60,
deprecated_group='DEFAULT',
help='Seconds between connection keepalive heartbeats.'),
cfg.StrOpt('qpid_protocol',
default='tcp',
deprecated_group='DEFAULT',
help="Transport to use, either 'tcp' or 'ssl'."),
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
deprecated_group='DEFAULT',
help='Whether to disable the Nagle algorithm.'),
cfg.IntOpt('qpid_receiver_capacity',
default=1,
deprecated_group='DEFAULT',
help='The number of prefetched messages held by receiver.'),
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
# this file could probably use some additional refactoring so that the
# differences between each version are split into different classes.
cfg.IntOpt('qpid_topology_version',
default=1,
deprecated_group='DEFAULT',
help="The qpid topology version to use. Version 1 is what "
"was originally used by impl_qpid. Version 2 includes "
"some backwards-incompatible changes that allow broker "
Expand Down Expand Up @@ -459,6 +470,7 @@ def __init__(self, conf, url):
self.session = None
self.consumers = {}
self.conf = conf
self.driver_conf = conf.oslo_messaging_qpid

self._consume_loop_stopped = False

Expand All @@ -476,7 +488,7 @@ def __init__(self, conf, url):
self.brokers_params.append(params)
else:
# Old configuration format
for adr in self.conf.qpid_hosts:
for adr in self.driver_conf.qpid_hosts:
hostname, port = netutils.parse_host_port(
adr, default_port=5672)

Expand All @@ -485,8 +497,8 @@ def __init__(self, conf, url):

params = {
'host': '%s:%d' % (hostname, port),
'username': self.conf.qpid_username,
'password': self.conf.qpid_password,
'username': self.driver_conf.qpid_username,
'password': self.driver_conf.qpid_password,
}
self.brokers_params.append(params)

Expand All @@ -505,12 +517,12 @@ def _connect(self, broker):
self.connection.username = broker['username']
self.connection.password = broker['password']

self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
self.connection.sasl_mechanisms = self.driver_conf.qpid_sasl_mechanisms
# Reconnection is done by self.reconnect()
self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.transport = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
self.connection.heartbeat = self.driver_conf.qpid_heartbeat
self.connection.transport = self.driver_conf.qpid_protocol
self.connection.tcp_nodelay = self.driver_conf.qpid_tcp_nodelay
self.connection.open()

def _register_consumer(self, consumer):
Expand Down Expand Up @@ -633,7 +645,8 @@ def _connect_error(exc):
"%(err_str)s"), log_info)

def _declare_consumer():
consumer = consumer_cls(self.conf, self.session, topic, callback)
consumer = consumer_cls(self.driver_conf, self.session, topic,
callback)
self._register_consumer(consumer)
return consumer

Expand Down Expand Up @@ -693,7 +706,8 @@ def _connect_error(exc):
"'%(topic)s': %(err_str)s"), log_info)

def _publisher_send():
publisher = cls(self.conf, self.session, topic=topic, **kwargs)
publisher = cls(self.driver_conf, self.session, topic=topic,
**kwargs)
publisher.send(msg)

return self.ensure(_connect_error, _publisher_send, retry=retry)
Expand Down Expand Up @@ -764,10 +778,15 @@ class QpidDriver(amqpdriver.AMQPDriverBase):

def __init__(self, conf, url,
default_exchange=None, allowed_remote_exmods=None):
conf.register_opts(qpid_opts)
conf.register_opts(rpc_amqp.amqp_opts)

connection_pool = rpc_amqp.ConnectionPool(conf, url, Connection)
opt_group = cfg.OptGroup(name='oslo_messaging_qpid',
title='QPID driver options')
conf.register_group(opt_group)
conf.register_opts(qpid_opts, group=opt_group)
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)

connection_pool = rpc_amqp.ConnectionPool(
conf, conf.oslo_messaging_qpid.rpc_conn_pool_size,
url, Connection)

super(QpidDriver, self).__init__(conf, url,
connection_pool,
Expand Down
83 changes: 54 additions & 29 deletions oslo_messaging/_drivers/impl_rabbit.py
Expand Up @@ -42,71 +42,88 @@
rabbit_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
deprecated_group='DEFAULT',
help='SSL version to use (valid only if SSL enabled). '
'Valid values are TLSv1 and SSLv23. SSLv2, SSLv3, '
'TLSv1_1, and TLSv1_2 may be available on some '
'distributions.'
),
cfg.StrOpt('kombu_ssl_keyfile',
default='',
deprecated_group='DEFAULT',
help='SSL key file (valid only if SSL enabled).'),
cfg.StrOpt('kombu_ssl_certfile',
default='',
deprecated_group='DEFAULT',
help='SSL cert file (valid only if SSL enabled).'),
cfg.StrOpt('kombu_ssl_ca_certs',
default='',
deprecated_group='DEFAULT',
help='SSL certification authority file '
'(valid only if SSL enabled).'),
cfg.FloatOpt('kombu_reconnect_delay',
default=1.0,
deprecated_group='DEFAULT',
help='How long to wait before reconnecting in response to an '
'AMQP consumer cancel notification.'),
cfg.StrOpt('rabbit_host',
default='localhost',
deprecated_group='DEFAULT',
help='The RabbitMQ broker address where a single node is '
'used.'),
cfg.IntOpt('rabbit_port',
default=5672,
deprecated_group='DEFAULT',
help='The RabbitMQ broker port where a single node is used.'),
cfg.ListOpt('rabbit_hosts',
default=['$rabbit_host:$rabbit_port'],
deprecated_group='DEFAULT',
help='RabbitMQ HA cluster host:port pairs.'),
cfg.BoolOpt('rabbit_use_ssl',
default=False,
deprecated_group='DEFAULT',
help='Connect over SSL for RabbitMQ.'),
cfg.StrOpt('rabbit_userid',
default='guest',
deprecated_group='DEFAULT',
help='The RabbitMQ userid.'),
cfg.StrOpt('rabbit_password',
default='guest',
deprecated_group='DEFAULT',
help='The RabbitMQ password.',
secret=True),
cfg.StrOpt('rabbit_login_method',
default='AMQPLAIN',
deprecated_group='DEFAULT',
help='The RabbitMQ login method.'),
cfg.StrOpt('rabbit_virtual_host',
default='/',
deprecated_group='DEFAULT',
help='The RabbitMQ virtual host.'),
cfg.IntOpt('rabbit_retry_interval',
default=1,
help='How frequently to retry connecting with RabbitMQ.'),
cfg.IntOpt('rabbit_retry_backoff',
default=2,
deprecated_group='DEFAULT',
help='How long to backoff for between retries when connecting '
'to RabbitMQ.'),
cfg.IntOpt('rabbit_max_retries',
default=0,
deprecated_group='DEFAULT',
help='Maximum number of RabbitMQ connection retries. '
'Default is 0 (infinite retry count).'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
deprecated_group='DEFAULT',
help='Use HA queues in RabbitMQ (x-ha-policy: all). '
'If you change this option, you must wipe the '
'RabbitMQ database.'),

# NOTE(sileht): deprecated option since oslo_messaging 1.5.0,
cfg.BoolOpt('fake_rabbit',
default=False,
deprecated_group='DEFAULT',
help='Deprecated, use rpc_backend=kombu+memory or '
'rpc_backend=fake'),
]
Expand Down Expand Up @@ -447,25 +464,26 @@ def __init__(self, conf, url):
self.consumers = []
self.consumer_num = itertools.count(1)
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
self.driver_conf = self.conf.oslo_messaging_rabbit
self.max_retries = self.driver_conf.rabbit_max_retries
# Try forever?
if self.max_retries <= 0:
self.max_retries = None
self.interval_start = self.conf.rabbit_retry_interval
self.interval_stepping = self.conf.rabbit_retry_backoff
self.interval_start = self.driver_conf.rabbit_retry_interval
self.interval_stepping = self.driver_conf.rabbit_retry_backoff
# max retry-interval = 30 seconds
self.interval_max = 30

self._ssl_params = self._fetch_ssl_params()
self._login_method = self.conf.rabbit_login_method
self._login_method = self.driver_conf.rabbit_login_method

if url.virtual_host is not None:
virtual_host = url.virtual_host
else:
virtual_host = self.conf.rabbit_virtual_host
virtual_host = self.driver_conf.rabbit_virtual_host

self._url = ''
if self.conf.fake_rabbit:
if self.driver_conf.fake_rabbit:
LOG.warn("Deprecated: fake_rabbit option is deprecated, set "
"rpc_backend to kombu+memory or use the fake "
"driver instead.")
Expand All @@ -487,13 +505,13 @@ def __init__(self, conf, url):
transport = url.transport.replace('kombu+', '')
self._url = "%s://%s" % (transport, virtual_host)
else:
for adr in self.conf.rabbit_hosts:
for adr in self.driver_conf.rabbit_hosts:
hostname, port = netutils.parse_host_port(
adr, default_port=self.conf.rabbit_port)
adr, default_port=self.driver_conf.rabbit_port)
self._url += '%samqp://%s:%s@%s:%s/%s' % (
";" if self._url else '',
parse.quote(self.conf.rabbit_userid),
parse.quote(self.conf.rabbit_password),
parse.quote(self.driver_conf.rabbit_userid),
parse.quote(self.driver_conf.rabbit_password),
hostname, port,
virtual_host)

Expand Down Expand Up @@ -561,15 +579,15 @@ def _fetch_ssl_params(self):
ssl_params = dict()

# http://docs.python.org/library/ssl.html - ssl.wrap_socket
if self.conf.kombu_ssl_version:
if self.driver_conf.kombu_ssl_version:
ssl_params['ssl_version'] = self.validate_ssl_version(
self.conf.kombu_ssl_version)
if self.conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
if self.conf.kombu_ssl_certfile:
ssl_params['certfile'] = self.conf.kombu_ssl_certfile
if self.conf.kombu_ssl_ca_certs:
ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
self.driver_conf.kombu_ssl_version)
if self.driver_conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.driver_conf.kombu_ssl_keyfile
if self.driver_conf.kombu_ssl_certfile:
ssl_params['certfile'] = self.driver_conf.kombu_ssl_certfile
if self.driver_conf.kombu_ssl_ca_certs:
ssl_params['ca_certs'] = self.driver_conf.kombu_ssl_ca_certs
# We might want to allow variations in the
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
Expand Down Expand Up @@ -602,8 +620,9 @@ def ensure(self, error_callback, method, retry=None,
def on_error(exc, interval):
error_callback and error_callback(exc)

interval = (self.conf.kombu_reconnect_delay + interval
if self.conf.kombu_reconnect_delay > 0 else interval)
interval = (self.driver_conf.kombu_reconnect_delay + interval
if self.driver_conf.kombu_reconnect_delay > 0
else interval)

info = {'hostname': self.connection.hostname,
'port': self.connection.port,
Expand All @@ -628,8 +647,8 @@ def on_error(exc, interval):
# use kombu for HA connection, the interval_step
# should sufficient, because the underlying kombu transport
# connection object freed.
if self.conf.kombu_reconnect_delay > 0:
time.sleep(self.conf.kombu_reconnect_delay)
if self.driver_conf.kombu_reconnect_delay > 0:
time.sleep(self.driver_conf.kombu_reconnect_delay)

def on_reconnection(new_channel):
"""Callback invoked when the kombu reconnects and creates
Expand Down Expand Up @@ -706,8 +725,8 @@ def _connect_error(exc):
"%(err_str)s"), log_info)

def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback,
six.next(self.consumer_num))
consumer = consumer_cls(self.driver_conf, self.channel, topic,
callback, six.next(self.consumer_num))
self.consumers.append(consumer)
return consumer

Expand Down Expand Up @@ -766,7 +785,8 @@ def _error_callback(exc):
"'%(topic)s': %(err_str)s"), log_info)

def _publish():
publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
publisher = cls(self.driver_conf, self.channel, topic=topic,
**kwargs)
publisher.send(msg, timeout)

self.ensure(_error_callback, _publish, retry=retry)
Expand Down Expand Up @@ -851,10 +871,15 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
def __init__(self, conf, url,
default_exchange=None,
allowed_remote_exmods=None):
conf.register_opts(rabbit_opts)
conf.register_opts(rpc_amqp.amqp_opts)

connection_pool = rpc_amqp.ConnectionPool(conf, url, Connection)
opt_group = cfg.OptGroup(name='oslo_messaging_rabbit',
title='RabbitMQ driver options')
conf.register_group(opt_group)
conf.register_opts(rabbit_opts, group=opt_group)
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)

connection_pool = rpc_amqp.ConnectionPool(
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
url, Connection)

super(RabbitDriver, self).__init__(conf, url,
connection_pool,
Expand Down

0 comments on commit 824313a

Please sign in to comment.