Skip to content

Commit

Permalink
Synced rpc and gettextutils modules from oslo-incubator
Browse files Browse the repository at this point in the history
The main reason for sync is to get the following oslo-rpc fixes in Neutron:
* I537015f452eb770acba41fdedfe221628f52a920 (reduces delays when reconnecting
  to Qpid in HA deployments)
* Ia148baa6e1ec632789ac3621c85173c2c16f3918 (fixed HA failover, Qpid part)
* I67923cb024bbd143edc8edccf35b9b400df31eb3 (fixed HA failover, RabbitMQ part)

Latest oslo-incubator commit at the moment of sync:
* 2eab986ef3c43f8d1e25065e3cbc1307860c25c7

Change-Id: I2f5bb0d195e050f755ecdbf06a6bbed587a04fbe
Closes-Bug: 1281148
Closes-Bug: 1261631
  • Loading branch information
booxter committed Mar 26, 2014
1 parent e75f485 commit 665222b
Show file tree
Hide file tree
Showing 13 changed files with 207 additions and 181 deletions.
17 changes: 17 additions & 0 deletions neutron/openstack/common/__init__.py
@@ -0,0 +1,17 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import six


six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox'))
99 changes: 63 additions & 36 deletions neutron/openstack/common/gettextutils.py
Expand Up @@ -23,18 +23,29 @@
"""

import copy
import functools
import gettext
import locale
from logging import handlers
import os
import re

from babel import localedata
import six

_localedir = os.environ.get('neutron'.upper() + '_LOCALEDIR')
_t = gettext.translation('neutron', localedir=_localedir, fallback=True)

# We use separate translation catalogs for each log level, so set up a
# mapping between the log level name and the translator. The domain
# for the log level is project_name + "-log-" + log_level so messages
# for each level end up in their own catalog.
_t_log_levels = dict(
(level, gettext.translation('neutron' + '-log-' + level,
localedir=_localedir,
fallback=True))
for level in ['info', 'warning', 'error', 'critical']
)

_AVAILABLE_LANGUAGES = {}
USE_LAZY = False

Expand All @@ -60,6 +71,28 @@ def _(msg):
return _t.ugettext(msg)


def _log_translation(msg, level):
"""Build a single translation of a log message
"""
if USE_LAZY:
return Message(msg, domain='neutron' + '-log-' + level)
else:
translator = _t_log_levels[level]
if six.PY3:
return translator.gettext(msg)
return translator.ugettext(msg)

# Translators for log levels.
#
# The abbreviated names are meant to reflect the usual use of a short
# name like '_'. The "L" is for "log" and the other letter comes from
# the level.
_LI = functools.partial(_log_translation, level='info')
_LW = functools.partial(_log_translation, level='warning')
_LE = functools.partial(_log_translation, level='error')
_LC = functools.partial(_log_translation, level='critical')


def install(domain, lazy=False):
"""Install a _() function using the given translation domain.
Expand Down Expand Up @@ -118,7 +151,8 @@ class Message(six.text_type):
and can be treated as such.
"""

def __new__(cls, msgid, msgtext=None, params=None, domain='neutron', *args):
def __new__(cls, msgid, msgtext=None, params=None,
domain='neutron', *args):
"""Create a new Message object.
In order for translation to work gettext requires a message ID, this
Expand Down Expand Up @@ -213,47 +247,22 @@ def _sanitize_mod_params(self, other):
if other is None:
params = (other,)
elif isinstance(other, dict):
params = self._trim_dictionary_parameters(other)
else:
params = self._copy_param(other)
return params

def _trim_dictionary_parameters(self, dict_param):
"""Return a dict that only has matching entries in the msgid."""
# NOTE(luisg): Here we trim down the dictionary passed as parameters
# to avoid carrying a lot of unnecessary weight around in the message
# object, for example if someone passes in Message() % locals() but
# only some params are used, and additionally we prevent errors for
# non-deepcopyable objects by unicoding() them.

# Look for %(param) keys in msgid;
# Skip %% and deal with the case where % is first character on the line
keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid)

# If we don't find any %(param) keys but have a %s
if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid):
# Apparently the full dictionary is the parameter
params = self._copy_param(dict_param)
else:
# Merge the dictionaries
# Copy each item in case one does not support deep copy.
params = {}
# Save our existing parameters as defaults to protect
# ourselves from losing values if we are called through an
# (erroneous) chain that builds a valid Message with
# arguments, and then does something like "msg % kwds"
# where kwds is an empty dictionary.
src = {}
if isinstance(self.params, dict):
src.update(self.params)
src.update(dict_param)
for key in keys:
params[key] = self._copy_param(src[key])

for key, val in self.params.items():
params[key] = self._copy_param(val)
for key, val in other.items():
params[key] = self._copy_param(val)
else:
params = self._copy_param(other)
return params

def _copy_param(self, param):
try:
return copy.deepcopy(param)
except TypeError:
except Exception:
# Fallback to casting to unicode this will handle the
# python code-like objects that can't be deep-copied
return six.text_type(param)
Expand Down Expand Up @@ -297,9 +306,27 @@ def get_available_languages(domain):
list_identifiers = (getattr(localedata, 'list', None) or
getattr(localedata, 'locale_identifiers'))
locale_identifiers = list_identifiers()

for i in locale_identifiers:
if find(i) is not None:
language_list.append(i)

# NOTE(luisg): Babel>=1.0,<1.3 has a bug where some OpenStack supported
# locales (e.g. 'zh_CN', and 'zh_TW') aren't supported even though they
# are perfectly legitimate locales:
# https://github.com/mitsuhiko/babel/issues/37
# In Babel 1.3 they fixed the bug and they support these locales, but
# they are still not explicitly "listed" by locale_identifiers().
# That is why we add the locales here explicitly if necessary so that
# they are listed as supported.
aliases = {'zh': 'zh_CN',
'zh_Hant_HK': 'zh_HK',
'zh_Hant': 'zh_TW',
'fil': 'tl_PH'}
for (locale, alias) in six.iteritems(aliases):
if locale in language_list and alias not in language_list:
language_list.append(alias)

_AVAILABLE_LANGUAGES[domain] = language_list
return copy.copy(language_list)

Expand Down
33 changes: 2 additions & 31 deletions neutron/openstack/common/rpc/__init__.py
Expand Up @@ -23,13 +23,9 @@
rpc.proxy
"""

import inspect

from oslo.config import cfg

from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import importutils
from neutron.openstack.common import local
from neutron.openstack.common import log as logging


Expand Down Expand Up @@ -93,24 +89,7 @@ def create_connection(new=True):
return _get_impl().create_connection(CONF, new=new)


def _check_for_lock():
if not CONF.debug:
return None

if ((hasattr(local.strong_store, 'locks_held')
and local.strong_store.locks_held)):
stack = ' :: '.join([frame[3] for frame in inspect.stack()])
LOG.warn(_('A RPC is being made while holding a lock. The locks '
'currently held are %(locks)s. This is probably a bug. '
'Please report it. Include the following: [%(stack)s].'),
{'locks': local.strong_store.locks_held,
'stack': stack})
return True

return False


def call(context, topic, msg, timeout=None, check_for_lock=False):
def call(context, topic, msg, timeout=None):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
Expand All @@ -124,16 +103,12 @@ def call(context, topic, msg, timeout=None, check_for_lock=False):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
if check_for_lock:
_check_for_lock()
return _get_impl().call(CONF, context, topic, msg, timeout)


Expand Down Expand Up @@ -176,7 +151,7 @@ def fanout_cast(context, topic, msg):
return _get_impl().fanout_cast(CONF, context, topic, msg)


def multicall(context, topic, msg, timeout=None, check_for_lock=False):
def multicall(context, topic, msg, timeout=None):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
Expand All @@ -194,8 +169,6 @@ def multicall(context, topic, msg, timeout=None, check_for_lock=False):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
Expand All @@ -205,8 +178,6 @@ def multicall(context, topic, msg, timeout=None, check_for_lock=False):
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
if check_for_lock:
_check_for_lock()
return _get_impl().multicall(CONF, context, topic, msg, timeout)


Expand Down
24 changes: 12 additions & 12 deletions neutron/openstack/common/rpc/amqp.py
Expand Up @@ -37,7 +37,7 @@


from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common.gettextutils import _, _LE
from neutron.openstack.common import local
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import common as rpc_common
Expand Down Expand Up @@ -72,7 +72,7 @@ def __init__(self, conf, connection_cls, *args, **kwargs):

# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug(_('Pool creating new connection'))
LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf)

def empty(self):
Expand Down Expand Up @@ -287,7 +287,7 @@ def unpack_context(conf, msg):
context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
rpc_common._safe_log(LOG.debug, 'unpacked context: %s', ctx.to_dict())
return ctx


Expand Down Expand Up @@ -339,7 +339,7 @@ def _add_unique_id(msg):
"""Add unique_id for checking duplicate messages."""
unique_id = uuid.uuid4().hex
msg.update({UNIQUE_ID: unique_id})
LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
LOG.debug('UNIQUE_ID is %s.' % (unique_id))


class _ThreadPoolWithWait(object):
Expand Down Expand Up @@ -432,7 +432,7 @@ def __call__(self, message_data):
# the previous context is stored in local.store.context
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
rpc_common._safe_log(LOG.debug, 'received %s', message_data)
self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
Expand Down Expand Up @@ -469,15 +469,15 @@ def _process_data(self, ctxt, version, method, namespace, args):
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
except rpc_common.ClientException as e:
LOG.debug(_('Expected exception during message handling (%s)') %
LOG.debug('Expected exception during message handling (%s)' %
e._exc_info[1])
ctxt.reply(None, e._exc_info,
connection_pool=self.connection_pool,
log_failure=False)
except Exception:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error(_('Exception during message handling'),
LOG.error(_LE('Exception during message handling'),
exc_info=exc_info)
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)

Expand Down Expand Up @@ -551,10 +551,10 @@ def create_connection(conf, new, connection_pool):

def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
LOG.debug(_('Making synchronous call on %s ...'), topic)
LOG.debug('Making synchronous call on %s ...', topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
LOG.debug('MSG_ID is %s' % (msg_id))
_add_unique_id(msg)
pack_context(msg, context)

Expand All @@ -580,7 +580,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):

def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
LOG.debug('Making asynchronous cast on %s...', topic)
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
Expand All @@ -589,7 +589,7 @@ def cast(conf, context, topic, msg, connection_pool):

def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
LOG.debug('Making asynchronous fanout cast...')
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
Expand Down Expand Up @@ -617,7 +617,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,

def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
LOG.debug('Sending %(event_type)s on %(topic)s',
dict(event_type=msg.get('event_type'),
topic=topic))
_add_unique_id(msg)
Expand Down
10 changes: 7 additions & 3 deletions neutron/openstack/common/rpc/common.py
Expand Up @@ -22,7 +22,7 @@
from oslo.config import cfg
import six

from neutron.openstack.common.gettextutils import _
from neutron.openstack.common.gettextutils import _, _LE
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import local
Expand Down Expand Up @@ -85,7 +85,7 @@ def __init__(self, message=None, **kwargs):
except Exception:
# kwargs doesn't match a variable in the message
# log the issue and the kwargs
LOG.exception(_('Exception in string format operation'))
LOG.exception(_LE('Exception in string format operation'))
for name, value in six.iteritems(kwargs):
LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened
Expand Down Expand Up @@ -269,6 +269,10 @@ def _fix_passwords(d):
d[k] = '<SANITIZED>'
elif k.lower() in SANITIZE:
d[k] = '<SANITIZED>'
elif isinstance(d[k], list):
for e in d[k]:
if isinstance(e, dict):
_fix_passwords(e)
elif isinstance(d[k], dict):
_fix_passwords(d[k])
return d
Expand All @@ -285,7 +289,7 @@ def serialize_remote_exception(failure_info, log_failure=True):
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
if log_failure:
LOG.error(_("Returning exception %s to caller"),
LOG.error(_LE("Returning exception %s to caller"),
six.text_type(failure))
LOG.error(tb)

Expand Down

0 comments on commit 665222b

Please sign in to comment.