Skip to content

Commit

Permalink
Collect request stats
Browse files Browse the repository at this point in the history
This is a thing that nodepool has been doing for ages. With the upcoming
changes to remove the task manager, the mechanism it has been using to
put activity in the right place isn't going to be available anymore. But
also, people using openstacksdk from within a service might also want to
be able to do the same logging.

This improves upon the old method as well, as it uses the history in the
response object to get and report on all of the calls made as part of a
request. This will catch things that do auto retries.

While we're in there, add support for reporting to prometheus instead.
The prometheus support does not read from config, and does not run an
http service, since openstacksdk is a library. It is expected that
an application that uses openstacksdk and wants request stats collected
will pass a prometheus_client.CollectorRegistry to collector_registry.

Change-Id: I7218179dd5f0c068a52a4704b2ce1a0942fdc0d1
  • Loading branch information
emonty committed Mar 11, 2019
1 parent f9b0911 commit c8b96cd
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 5 deletions.
2 changes: 2 additions & 0 deletions lower-constraints.txt
Expand Up @@ -24,6 +24,7 @@ os-client-config==1.28.0
os-service-types==1.2.0
oslotest==3.2.0
pbr==2.0.0
prometheus-client==0.4.2
Pygments==2.2.0
python-mimeparse==1.6.0
python-subunit==1.0.0
Expand All @@ -32,6 +33,7 @@ requests==2.18.0
requests-mock==1.2.0
requestsexceptions==1.2.0
six==1.10.0
statsd==3.3.0
stestr==1.0.0
stevedore==1.20.0
testrepository==0.0.18
Expand Down
4 changes: 4 additions & 0 deletions openstack/cloud/openstackcloud.py
Expand Up @@ -451,6 +451,10 @@ def _get_versioned_client(
interface=self.config.get_interface(service_type),
endpoint_override=self.config.get_endpoint(service_type),
region_name=self.config.region_name,
statsd_prefix=self.config.get_statsd_prefix(),
statsd_client=self.config.get_statsd_client(),
prometheus_counter=self.config.get_prometheus_counter(),
prometheus_histogram=self.config.get_prometheus_histogram(),
min_version=request_min_version,
max_version=request_max_version)
if adapter.get_endpoint():
Expand Down
81 changes: 80 additions & 1 deletion openstack/config/cloud_region.py
Expand Up @@ -21,6 +21,15 @@
import os_service_types
import requestsexceptions
from six.moves import urllib
try:
import statsd
except ImportError:
statsd = None
try:
import prometheus_client
except ImportError:
prometheus_client = None


from openstack import version as openstack_version
from openstack import _log
Expand Down Expand Up @@ -96,7 +105,9 @@ def __init__(self, name=None, region_name=None, config=None,
discovery_cache=None, extra_config=None,
cache_expiration_time=0, cache_expirations=None,
cache_path=None, cache_class='dogpile.cache.null',
cache_arguments=None, password_callback=None):
cache_arguments=None, password_callback=None,
statsd_host=None, statsd_port=None, statsd_prefix=None,
collector_registry=None):
self._name = name
self.region_name = region_name
self.config = _util.normalize_keys(config)
Expand All @@ -116,6 +127,11 @@ def __init__(self, name=None, region_name=None, config=None,
self._cache_class = cache_class
self._cache_arguments = cache_arguments
self._password_callback = password_callback
self._statsd_host = statsd_host
self._statsd_port = statsd_port
self._statsd_prefix = statsd_prefix
self._statsd_client = None
self._collector_registry = collector_registry

self._service_type_manager = os_service_types.ServiceTypes()

Expand Down Expand Up @@ -471,6 +487,11 @@ def get_session_client(
self.get_connect_retries(service_type))
kwargs.setdefault('status_code_retries',
self.get_status_code_retries(service_type))
kwargs.setdefault('statsd_prefix', self.get_statsd_prefix())
kwargs.setdefault('statsd_client', self.get_statsd_client())
kwargs.setdefault('prometheus_counter', self.get_prometheus_counter())
kwargs.setdefault(
'prometheus_histogram', self.get_prometheus_histogram())
endpoint_override = self.get_endpoint(service_type)
version = version_request.version
min_api_version = (
Expand Down Expand Up @@ -746,3 +767,61 @@ def get_rate_limit(self, service_type=None):
def get_concurrency(self, service_type=None):
return self._get_service_config(
'concurrency', service_type=service_type)

def get_statsd_client(self):
if not statsd:
return None
statsd_args = {}
if self._statsd_host:
statsd_args['host'] = self._statsd_host
if self._statsd_port:
statsd_args['port'] = self._statsd_port
if statsd_args:
return statsd.StatsClient(**statsd_args)
else:
return None

def get_statsd_prefix(self):
return self._statsd_prefix or 'openstack.api'

def get_prometheus_registry(self):
if not self._collector_registry and prometheus_client:
self._collector_registry = prometheus_client.REGISTRY
return self._collector_registry

def get_prometheus_histogram(self):
registry = self.get_prometheus_registry()
if not registry or not prometheus_client:
return
# We have to hide a reference to the histogram on the registry
# object, because it's collectors must be singletons for a given
# registry but register at creation time.
hist = getattr(registry, '_openstacksdk_histogram', None)
if not hist:
hist = prometheus_client.Histogram(
'openstack_http_response_time',
'Time taken for an http response to an OpenStack service',
labelnames=[
'method', 'endpoint', 'service_type', 'status_code'
],
registry=registry,
)
registry._openstacksdk_histogram = hist
return hist

def get_prometheus_counter(self):
registry = self.get_prometheus_registry()
if not registry or not prometheus_client:
return
counter = getattr(registry, '_openstacksdk_counter', None)
if not counter:
counter = prometheus_client.Counter(
'openstack_http_requests',
'Number of HTTP requests made to an OpenStack service',
labelnames=[
'method', 'endpoint', 'service_type', 'status_code'
],
registry=registry,
)
registry._openstacksdk_counter = counter
return counter
22 changes: 21 additions & 1 deletion openstack/config/loader.py
Expand Up @@ -140,7 +140,9 @@ def __init__(self, config_files=None, vendor_files=None,
envvar_prefix=None, secure_files=None,
pw_func=None, session_constructor=None,
app_name=None, app_version=None,
load_yaml_config=True, load_envvars=True):
load_yaml_config=True, load_envvars=True,
statsd_host=None, statsd_port=None,
statsd_prefix=None):
self.log = _log.setup_logging('openstack.config')
self._session_constructor = session_constructor
self._app_name = app_name
Expand Down Expand Up @@ -276,6 +278,21 @@ def __init__(self, config_files=None, vendor_files=None,
self._cache_expirations = cache_settings.get(
'expiration', self._cache_expirations)

if load_yaml_config:
statsd_config = self.cloud_config.get('statsd', {})
statsd_host = statsd_host or statsd_config.get('host')
statsd_port = statsd_port or statsd_config.get('port')
statsd_prefix = statsd_prefix or statsd_config.get('prefix')

if load_envvars:
statsd_host = statsd_host or os.environ.get('STATSD_HOST')
statsd_port = statsd_port or os.environ.get('STATSD_PORT')
statsd_prefix = statsd_prefix or os.environ.get('STATSD_PREFIX')

self._statsd_host = statsd_host
self._statsd_port = statsd_port
self._statsd_prefix = statsd_prefix

# Flag location to hold the peeked value of an argparse timeout value
self._argv_timeout = False

Expand Down Expand Up @@ -1091,6 +1108,9 @@ def get_one(
cache_class=self._cache_class,
cache_arguments=self._cache_arguments,
password_callback=self._pw_callback,
statsd_host=self._statsd_host,
statsd_port=self._statsd_port,
statsd_prefix=self._statsd_prefix,
)
# TODO(mordred) Backwards compat for OSC transition
get_one_cloud = get_one
Expand Down
45 changes: 42 additions & 3 deletions openstack/proxy.py
Expand Up @@ -59,7 +59,9 @@ def _extract_name(url, service_type=None):
# Strip leading version piece so that
# GET /v2.0/networks
# returns ['networks']
if url_parts[0] in ('v1', 'v2', 'v2.0'):
if (url_parts[0]
and url_parts[0][0] == 'v'
and url_parts[0][1] and url_parts[0][1].isdigit()):
url_parts = url_parts[1:]
name_parts = []
# Pull out every other URL portion - so that
Expand Down Expand Up @@ -118,12 +120,21 @@ class Proxy(adapter.Adapter):
``<service-type>_status_code_retries``.
"""

def __init__(self, *args, **kwargs):
def __init__(
self,
session,
statsd_client=None, statsd_prefix=None,
prometheus_counter=None, prometheus_histogram=None,
*args, **kwargs):
# NOTE(dtantsur): keystoneauth defaults retriable_status_codes to None,
# override it with a class-level value.
kwargs.setdefault('retriable_status_codes',
self.retriable_status_codes)
super(Proxy, self).__init__(*args, **kwargs)
super(Proxy, self).__init__(session=session, *args, **kwargs)
self._statsd_client = statsd_client
self._statsd_prefix = statsd_prefix
self._prometheus_counter = prometheus_counter
self._prometheus_histogram = prometheus_histogram

def request(
self, url, method, error_message=None,
Expand All @@ -132,8 +143,36 @@ def request(
url, method,
connect_retries=connect_retries, raise_exc=False,
**kwargs)
for h in response.history:
self._report_stats(h)
self._report_stats(response)
return response

def _report_stats(self, response):
if self._statsd_client:
self._report_stats_statsd(response)
if self._prometheus_counter and self._prometheus_histogram:
self._report_stats_prometheus(response)

def _report_stats_statsd(self, response):
name_parts = _extract_name(response.request.url, self.service_type)
key = '.'.join(
[self._statsd_prefix, self.service_type, response.request.method]
+ name_parts)
self._statsd_client.timing(key, int(response.elapsed.seconds * 1000))
self._statsd_client.incr(key)

def _report_stats_prometheus(self, response):
labels = dict(
method=response.request.method,
endpoint=response.request.url,
service_type=self.service_type,
status_code=response.status_code,
)
self._prometheus_counter.labels(**labels).inc()
self._prometheus_histogram.labels(**labels).observe(
response.elapsed.seconds)

def _version_matches(self, version):
api_version = self.get_api_major_version()
if api_version:
Expand Down

0 comments on commit c8b96cd

Please sign in to comment.