diff --git a/prometheus_client/core.py b/prometheus_client/core.py index da3034d0..cb1e7c5b 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -185,7 +185,7 @@ def get_sample_value(self, name, labels=None): REGISTRY = CollectorRegistry(auto_describe=True) '''The default registry.''' -_METRIC_TYPES = ('counter', 'gauge', 'summary', 'histogram', +_METRIC_TYPES = ('counter', 'gauge', 'summary', 'histogram', 'gaugehistogram', 'unknown', 'info', 'stateset') @@ -378,8 +378,8 @@ def add_metric(self, labels, buckets, sum_value, timestamp=None): exemplar = None if len(b) == 3: exemplar = b[2] - self.samples.append(Sample(self.name + '_bucket', - dict(list(zip(self._labelnames, labels)) + [('le', bucket)]), + self.samples.append(Sample(self.name + '_bucket', + dict(list(zip(self._labelnames, labels)) + [('le', bucket)]), value, timestamp, exemplar)) # +Inf is last and provides the count value. self.samples.append(Sample(self.name + '_count', dict(zip(self._labelnames, labels)), buckets[-1][1], timestamp)) @@ -411,7 +411,7 @@ def add_metric(self, labels, buckets, timestamp=None): ''' for bucket, value in buckets: self.samples.append(Sample( - self.name + '_bucket', + self.name + '_bucket', dict(list(zip(self._labelnames, labels)) + [('le', bucket)]), value, timestamp)) @@ -438,7 +438,7 @@ def add_metric(self, labels, value, timestamp=None): labels: A list of label values value: A dict of labels ''' - self.samples.append(Sample(self.name + '_info', + self.samples.append(Sample(self.name + '_info', dict(dict(zip(self._labelnames, labels)), **value), 1, timestamp)) @@ -586,6 +586,13 @@ def close(self): self._f = None +def _mmap_key(metric_name, name, labelnames, labelvalues): + """Format a key for use in the mmap file.""" + # ensure labels are in consistent order for identity + labels = dict(zip(labelnames, labelvalues)) + return json.dumps([metric_name, name, labels], sort_keys=True) + + def _MultiProcessValue(_pidFunc=os.getpid): files = {} values = [] @@ -618,7 +625,7 @@ def __reset(self): '{0}_{1}.db'.format(file_prefix, pid['value'])) files[file_prefix] = _MmapedDict(filename) self._file = files[file_prefix] - self._key = json.dumps((metric_name, name, labelnames, labelvalues)) + self._key = _mmap_key(metric_name, name, labelnames, labelvalues) self._value = self._file.read_value(self._key) def __check_for_pid_change(self): @@ -1143,7 +1150,7 @@ class Enum(object): Example usage: from prometheus_client import Enum - e = Enum('task_state', 'Description of enum', + e = Enum('task_state', 'Description of enum', states=['starting', 'running', 'stopped']) e.state('running') diff --git a/prometheus_client/multiprocess.py b/prometheus_client/multiprocess.py index 7dd74b2b..55213153 100644 --- a/prometheus_client/multiprocess.py +++ b/prometheus_client/multiprocess.py @@ -23,13 +23,24 @@ def __init__(self, registry, path=None): registry.register(self) def collect(self): + files = glob.glob(os.path.join(self._path, '*.db')) + return self.merge(files, accumulate=True) + + def merge(self, files, accumulate=True): + """Merge metrics from given mmap files. + + By default, histograms are accumulated, as per prometheus wire format. + But if writing the merged data back to mmap files, use + accumulate=False to avoid compound accumulation. + """ metrics = {} - for f in glob.glob(os.path.join(self._path, '*.db')): + for f in files: parts = os.path.basename(f).split('_') typ = parts[0] d = core._MmapedDict(f, read_mode=True) for key, value in d.read_all_values(): - metric_name, name, labelnames, labelvalues = json.loads(key) + metric_name, name, labels = json.loads(key) + labels_key = tuple(sorted(labels.items())) metric = metrics.get(metric_name) if metric is None: @@ -39,10 +50,10 @@ def collect(self): if typ == 'gauge': pid = parts[2][:-3] metric._multiprocess_mode = parts[1] - metric.add_sample(name, tuple(zip(labelnames, labelvalues)) + (('pid', pid), ), value) + metric.add_sample(name, labels_key + (('pid', pid), ), value) else: # The duplicates and labels are fixed in the next for. - metric.add_sample(name, tuple(zip(labelnames, labelvalues)), value) + metric.add_sample(name, labels_key, value) d.close() for metric in metrics.values(): @@ -86,9 +97,17 @@ def collect(self): for labels, values in buckets.items(): acc = 0.0 for bucket, value in sorted(values.items()): - acc += value - samples[(metric.name + '_bucket', labels + (('le', core._floatToGoString(bucket)), ))] = acc - samples[(metric.name + '_count', labels)] = acc + sample_key = ( + metric.name + '_bucket', + labels + (('le', core._floatToGoString(bucket)), ), + ) + if accumulate: + acc += value + samples[sample_key] = acc + else: + samples[sample_key] = value + if accumulate: + samples[(metric.name + '_count', labels)] = acc # Convert to correct sample format. metric.samples = [core.Sample(name, dict(labels), value) for (name, labels), value in samples.items()] diff --git a/tests/test_multiprocess.py b/tests/test_multiprocess.py index ca84913f..501cb62f 100644 --- a/tests/test_multiprocess.py +++ b/tests/test_multiprocess.py @@ -1,9 +1,17 @@ from __future__ import unicode_literals +import glob import os import shutil +import sys import tempfile -import unittest + +if sys.version_info < (2, 7): + # We need the skip decorators from unittest2 on Python 2.6. + import unittest2 as unittest +else: + import unittest + from prometheus_client import core from prometheus_client.core import ( @@ -11,6 +19,7 @@ Counter, Gauge, Histogram, + Sample, Summary, ) from prometheus_client.multiprocess import ( @@ -25,7 +34,7 @@ def setUp(self): os.environ['prometheus_multiproc_dir'] = self.tempdir core._ValueClass = core._MultiProcessValue(lambda: 123) self.registry = CollectorRegistry() - MultiProcessCollector(self.registry, self.tempdir) + self.collector = MultiProcessCollector(self.registry, self.tempdir) def tearDown(self): del os.environ['prometheus_multiproc_dir'] @@ -137,6 +146,113 @@ def test_counter_across_forks(self): self.assertEqual(3, self.registry.get_sample_value('c_total')) self.assertEqual(1, c1._value.get()) + @unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.") + def test_collect(self): + pid = 0 + core._ValueClass = core._MultiProcessValue(lambda: pid) + labels = dict((i, i) for i in 'abcd') + + def add_label(key, value): + l = labels.copy() + l[key] = value + return l + + c = Counter('c', 'help', labelnames=labels.keys(), registry=None) + g = Gauge('g', 'help', labelnames=labels.keys(), registry=None) + h = Histogram('h', 'help', labelnames=labels.keys(), registry=None) + + c.labels(**labels).inc(1) + g.labels(**labels).set(1) + h.labels(**labels).observe(1) + + pid = 1 + + c.labels(**labels).inc(1) + g.labels(**labels).set(1) + h.labels(**labels).observe(5) + + metrics = dict((m.name, m) for m in self.collector.collect()) + + self.assertEqual( + metrics['c'].samples, [Sample('c_total', labels, 2.0)] + ) + metrics['g'].samples.sort(key=lambda x: x[1]['pid']) + self.assertEqual(metrics['g'].samples, [ + Sample('g', add_label('pid', '0'), 1.0), + Sample('g', add_label('pid', '1'), 1.0), + ]) + + metrics['h'].samples.sort( + key=lambda x: (x[0], float(x[1].get('le', 0))) + ) + expected_histogram = [ + Sample('h_bucket', add_label('le', '0.005'), 0.0), + Sample('h_bucket', add_label('le', '0.01'), 0.0), + Sample('h_bucket', add_label('le', '0.025'), 0.0), + Sample('h_bucket', add_label('le', '0.05'), 0.0), + Sample('h_bucket', add_label('le', '0.075'), 0.0), + Sample('h_bucket', add_label('le', '0.1'), 0.0), + Sample('h_bucket', add_label('le', '0.25'), 0.0), + Sample('h_bucket', add_label('le', '0.5'), 0.0), + Sample('h_bucket', add_label('le', '0.75'), 0.0), + Sample('h_bucket', add_label('le', '1.0'), 1.0), + Sample('h_bucket', add_label('le', '2.5'), 1.0), + Sample('h_bucket', add_label('le', '5.0'), 2.0), + Sample('h_bucket', add_label('le', '7.5'), 2.0), + Sample('h_bucket', add_label('le', '10.0'), 2.0), + Sample('h_bucket', add_label('le', '+Inf'), 2.0), + Sample('h_count', labels, 2.0), + Sample('h_sum', labels, 6.0), + ] + + self.assertEqual(metrics['h'].samples, expected_histogram) + + @unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.") + def test_merge_no_accumulate(self): + pid = 0 + core._ValueClass = core._MultiProcessValue(lambda: pid) + labels = dict((i, i) for i in 'abcd') + + def add_label(key, value): + l = labels.copy() + l[key] = value + return l + + h = Histogram('h', 'help', labelnames=labels.keys(), registry=None) + h.labels(**labels).observe(1) + pid = 1 + h.labels(**labels).observe(5) + + path = os.path.join(os.environ['prometheus_multiproc_dir'], '*.db') + files = glob.glob(path) + metrics = dict( + (m.name, m) for m in self.collector.merge(files, accumulate=False) + ) + + metrics['h'].samples.sort( + key=lambda x: (x[0], float(x[1].get('le', 0))) + ) + expected_histogram = [ + Sample('h_bucket', add_label('le', '0.005'), 0.0), + Sample('h_bucket', add_label('le', '0.01'), 0.0), + Sample('h_bucket', add_label('le', '0.025'), 0.0), + Sample('h_bucket', add_label('le', '0.05'), 0.0), + Sample('h_bucket', add_label('le', '0.075'), 0.0), + Sample('h_bucket', add_label('le', '0.1'), 0.0), + Sample('h_bucket', add_label('le', '0.25'), 0.0), + Sample('h_bucket', add_label('le', '0.5'), 0.0), + Sample('h_bucket', add_label('le', '0.75'), 0.0), + Sample('h_bucket', add_label('le', '1.0'), 1.0), + Sample('h_bucket', add_label('le', '2.5'), 0.0), + Sample('h_bucket', add_label('le', '5.0'), 1.0), + Sample('h_bucket', add_label('le', '7.5'), 0.0), + Sample('h_bucket', add_label('le', '10.0'), 0.0), + Sample('h_bucket', add_label('le', '+Inf'), 0.0), + Sample('h_sum', labels, 6.0), + ] + + self.assertEqual(metrics['h'].samples, expected_histogram) + class TestMmapedDict(unittest.TestCase): def setUp(self):