Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions prometheus_client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def get_sample_value(self, name, labels=None):
REGISTRY = CollectorRegistry(auto_describe=True)
'''The default registry.'''

_METRIC_TYPES = ('counter', 'gauge', 'summary', 'histogram',
_METRIC_TYPES = ('counter', 'gauge', 'summary', 'histogram',
'gaugehistogram', 'unknown', 'info', 'stateset')


Expand Down Expand Up @@ -378,8 +378,8 @@ def add_metric(self, labels, buckets, sum_value, timestamp=None):
exemplar = None
if len(b) == 3:
exemplar = b[2]
self.samples.append(Sample(self.name + '_bucket',
dict(list(zip(self._labelnames, labels)) + [('le', bucket)]),
self.samples.append(Sample(self.name + '_bucket',
dict(list(zip(self._labelnames, labels)) + [('le', bucket)]),
value, timestamp, exemplar))
# +Inf is last and provides the count value.
self.samples.append(Sample(self.name + '_count', dict(zip(self._labelnames, labels)), buckets[-1][1], timestamp))
Expand Down Expand Up @@ -411,7 +411,7 @@ def add_metric(self, labels, buckets, timestamp=None):
'''
for bucket, value in buckets:
self.samples.append(Sample(
self.name + '_bucket',
self.name + '_bucket',
dict(list(zip(self._labelnames, labels)) + [('le', bucket)]),
value, timestamp))

Expand All @@ -438,7 +438,7 @@ def add_metric(self, labels, value, timestamp=None):
labels: A list of label values
value: A dict of labels
'''
self.samples.append(Sample(self.name + '_info',
self.samples.append(Sample(self.name + '_info',
dict(dict(zip(self._labelnames, labels)), **value), 1, timestamp))


Expand Down Expand Up @@ -586,6 +586,13 @@ def close(self):
self._f = None


def _mmap_key(metric_name, name, labelnames, labelvalues):
"""Format a key for use in the mmap file."""
# ensure labels are in consistent order for identity
labels = dict(zip(labelnames, labelvalues))
return json.dumps([metric_name, name, labels], sort_keys=True)


def _MultiProcessValue(_pidFunc=os.getpid):
files = {}
values = []
Expand Down Expand Up @@ -618,7 +625,7 @@ def __reset(self):
'{0}_{1}.db'.format(file_prefix, pid['value']))
files[file_prefix] = _MmapedDict(filename)
self._file = files[file_prefix]
self._key = json.dumps((metric_name, name, labelnames, labelvalues))
self._key = _mmap_key(metric_name, name, labelnames, labelvalues)
self._value = self._file.read_value(self._key)

def __check_for_pid_change(self):
Expand Down Expand Up @@ -1143,7 +1150,7 @@ class Enum(object):
Example usage:
from prometheus_client import Enum

e = Enum('task_state', 'Description of enum',
e = Enum('task_state', 'Description of enum',
states=['starting', 'running', 'stopped'])
e.state('running')

Expand Down
33 changes: 26 additions & 7 deletions prometheus_client/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,24 @@ def __init__(self, registry, path=None):
registry.register(self)

def collect(self):
files = glob.glob(os.path.join(self._path, '*.db'))
return self.merge(files, accumulate=True)

def merge(self, files, accumulate=True):
"""Merge metrics from given mmap files.

By default, histograms are accumulated, as per prometheus wire format.
But if writing the merged data back to mmap files, use
accumulate=False to avoid compound accumulation.
"""
metrics = {}
for f in glob.glob(os.path.join(self._path, '*.db')):
for f in files:
parts = os.path.basename(f).split('_')
typ = parts[0]
d = core._MmapedDict(f, read_mode=True)
for key, value in d.read_all_values():
metric_name, name, labelnames, labelvalues = json.loads(key)
metric_name, name, labels = json.loads(key)
labels_key = tuple(sorted(labels.items()))

metric = metrics.get(metric_name)
if metric is None:
Expand All @@ -39,10 +50,10 @@ def collect(self):
if typ == 'gauge':
pid = parts[2][:-3]
metric._multiprocess_mode = parts[1]
metric.add_sample(name, tuple(zip(labelnames, labelvalues)) + (('pid', pid), ), value)
metric.add_sample(name, labels_key + (('pid', pid), ), value)
else:
# The duplicates and labels are fixed in the next for.
metric.add_sample(name, tuple(zip(labelnames, labelvalues)), value)
metric.add_sample(name, labels_key, value)
d.close()

for metric in metrics.values():
Expand Down Expand Up @@ -86,9 +97,17 @@ def collect(self):
for labels, values in buckets.items():
acc = 0.0
for bucket, value in sorted(values.items()):
acc += value
samples[(metric.name + '_bucket', labels + (('le', core._floatToGoString(bucket)), ))] = acc
samples[(metric.name + '_count', labels)] = acc
sample_key = (
metric.name + '_bucket',
labels + (('le', core._floatToGoString(bucket)), ),
)
if accumulate:
acc += value
samples[sample_key] = acc
else:
samples[sample_key] = value
if accumulate:
samples[(metric.name + '_count', labels)] = acc

# Convert to correct sample format.
metric.samples = [core.Sample(name, dict(labels), value) for (name, labels), value in samples.items()]
Expand Down
120 changes: 118 additions & 2 deletions tests/test_multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
from __future__ import unicode_literals

import glob
import os
import shutil
import sys
import tempfile
import unittest

if sys.version_info < (2, 7):
# We need the skip decorators from unittest2 on Python 2.6.
import unittest2 as unittest
else:
import unittest


from prometheus_client import core
from prometheus_client.core import (
CollectorRegistry,
Counter,
Gauge,
Histogram,
Sample,
Summary,
)
from prometheus_client.multiprocess import (
Expand All @@ -25,7 +34,7 @@ def setUp(self):
os.environ['prometheus_multiproc_dir'] = self.tempdir
core._ValueClass = core._MultiProcessValue(lambda: 123)
self.registry = CollectorRegistry()
MultiProcessCollector(self.registry, self.tempdir)
self.collector = MultiProcessCollector(self.registry, self.tempdir)

def tearDown(self):
del os.environ['prometheus_multiproc_dir']
Expand Down Expand Up @@ -137,6 +146,113 @@ def test_counter_across_forks(self):
self.assertEqual(3, self.registry.get_sample_value('c_total'))
self.assertEqual(1, c1._value.get())

@unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.")
def test_collect(self):
pid = 0
core._ValueClass = core._MultiProcessValue(lambda: pid)
labels = dict((i, i) for i in 'abcd')

def add_label(key, value):
l = labels.copy()
l[key] = value
return l

c = Counter('c', 'help', labelnames=labels.keys(), registry=None)
g = Gauge('g', 'help', labelnames=labels.keys(), registry=None)
h = Histogram('h', 'help', labelnames=labels.keys(), registry=None)

c.labels(**labels).inc(1)
g.labels(**labels).set(1)
h.labels(**labels).observe(1)

pid = 1

c.labels(**labels).inc(1)
g.labels(**labels).set(1)
h.labels(**labels).observe(5)

metrics = dict((m.name, m) for m in self.collector.collect())

self.assertEqual(
metrics['c'].samples, [Sample('c_total', labels, 2.0)]
)
metrics['g'].samples.sort(key=lambda x: x[1]['pid'])
self.assertEqual(metrics['g'].samples, [
Sample('g', add_label('pid', '0'), 1.0),
Sample('g', add_label('pid', '1'), 1.0),
])

metrics['h'].samples.sort(
key=lambda x: (x[0], float(x[1].get('le', 0)))
)
expected_histogram = [
Sample('h_bucket', add_label('le', '0.005'), 0.0),
Sample('h_bucket', add_label('le', '0.01'), 0.0),
Sample('h_bucket', add_label('le', '0.025'), 0.0),
Sample('h_bucket', add_label('le', '0.05'), 0.0),
Sample('h_bucket', add_label('le', '0.075'), 0.0),
Sample('h_bucket', add_label('le', '0.1'), 0.0),
Sample('h_bucket', add_label('le', '0.25'), 0.0),
Sample('h_bucket', add_label('le', '0.5'), 0.0),
Sample('h_bucket', add_label('le', '0.75'), 0.0),
Sample('h_bucket', add_label('le', '1.0'), 1.0),
Sample('h_bucket', add_label('le', '2.5'), 1.0),
Sample('h_bucket', add_label('le', '5.0'), 2.0),
Sample('h_bucket', add_label('le', '7.5'), 2.0),
Sample('h_bucket', add_label('le', '10.0'), 2.0),
Sample('h_bucket', add_label('le', '+Inf'), 2.0),
Sample('h_count', labels, 2.0),
Sample('h_sum', labels, 6.0),
]

self.assertEqual(metrics['h'].samples, expected_histogram)

@unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.")
def test_merge_no_accumulate(self):
pid = 0
core._ValueClass = core._MultiProcessValue(lambda: pid)
labels = dict((i, i) for i in 'abcd')

def add_label(key, value):
l = labels.copy()
l[key] = value
return l

h = Histogram('h', 'help', labelnames=labels.keys(), registry=None)
h.labels(**labels).observe(1)
pid = 1
h.labels(**labels).observe(5)

path = os.path.join(os.environ['prometheus_multiproc_dir'], '*.db')
files = glob.glob(path)
metrics = dict(
(m.name, m) for m in self.collector.merge(files, accumulate=False)
)

metrics['h'].samples.sort(
key=lambda x: (x[0], float(x[1].get('le', 0)))
)
expected_histogram = [
Sample('h_bucket', add_label('le', '0.005'), 0.0),
Sample('h_bucket', add_label('le', '0.01'), 0.0),
Sample('h_bucket', add_label('le', '0.025'), 0.0),
Sample('h_bucket', add_label('le', '0.05'), 0.0),
Sample('h_bucket', add_label('le', '0.075'), 0.0),
Sample('h_bucket', add_label('le', '0.1'), 0.0),
Sample('h_bucket', add_label('le', '0.25'), 0.0),
Sample('h_bucket', add_label('le', '0.5'), 0.0),
Sample('h_bucket', add_label('le', '0.75'), 0.0),
Sample('h_bucket', add_label('le', '1.0'), 1.0),
Sample('h_bucket', add_label('le', '2.5'), 0.0),
Sample('h_bucket', add_label('le', '5.0'), 1.0),
Sample('h_bucket', add_label('le', '7.5'), 0.0),
Sample('h_bucket', add_label('le', '10.0'), 0.0),
Sample('h_bucket', add_label('le', '+Inf'), 0.0),
Sample('h_sum', labels, 6.0),
]

self.assertEqual(metrics['h'].samples, expected_histogram)


class TestMmapedDict(unittest.TestCase):
def setUp(self):
Expand Down