From a9ae378a91a7c65bf52944f319c103bc0a93c173 Mon Sep 17 00:00:00 2001 From: Maykel Moya Date: Sun, 22 Mar 2015 11:50:25 +0000 Subject: [PATCH 1/3] Add support for exporting metrics to a pushgateway --- README.md | 16 ++++++++++++++ prometheus_client/__init__.py | 40 +++++++++++++++++++++++++++++++++++ tests/test_client.py | 19 +++++++++++++++++ 3 files changed, 75 insertions(+) diff --git a/README.md b/README.md index d3c9e62a..e2bdf6e2 100644 --- a/README.md +++ b/README.md @@ -246,6 +246,22 @@ write_to_textfile('/configured/textfile/path/raid.prom', registry) A separate registry is used, as the default registry may contain other metrics such as those from the Process Collector. +## Exporting to a Pushgateway + +The [Pushgateway](https://github.com/prometheus/pushgateway) +allows ephemeral and batch jobs to expose their metrics to Prometheus. + +```python +from prometheus_client import CollectorRegistry,Gauge,push_to_gateway +registry = CollectorRegistry() +g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry) +g.set_to_current_time() +push_to_gateway(registry, job='batchA') +``` + +A separate registry is used, as the default registry may contain other metrics +such as those from the Process Collector. + ## Bridges It is also possible to expose metrics to systems other than Prometheus. diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index f40261ce..649b1466 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -4,6 +4,14 @@ from . import exposition from . import process_collector +try: + from urllib2 import urlopen, quote +except ImportError: + # Python 3 + from urllib.request import urlopen + from urllib.parse import quote + + __all__ = ['Counter', 'Gauge', 'Summary', 'Histogram'] # http://stackoverflow.com/questions/19913653/no-unicode-in-all-for-a-packages-init __all__ = [n.encode('ascii') for n in __all__] @@ -22,6 +30,38 @@ start_http_server = exposition.start_http_server write_to_textfile = exposition.write_to_textfile + +def build_pushgateway_url(job, instance=None, host='localhost', port=9091): + ''' + Build a valid pushgateway url + ''' + + if instance: + instancestr = '/instances/{}'.format(instance) + else: + instancestr = '' + + url = 'http://{}:{}/metrics/jobs/{}{}'.format(host, port, + quote(job), + quote(instancestr)) + return url + + +def push_to_gateway_url(url, registry, timeout=None): + '''Push metrics to the given url''' + + resp = urlopen(url, data=generate_latest(registry), timeout=timeout) + if resp.code >= 400: + raise IOError("error pushing to pushgateway: {0} {1}".format( + resp.code, resp.msg)) + + +def push_to_gateway(registry, job, instance=None, host='localhost', port=9091, timeout=None): + '''Push metrics to a pushgateway''' + + url = build_pushgateway_url(job, instance, host, port) + push_to_gateway_url(url, registry, timeout) + ProcessCollector = process_collector.ProcessCollector PROCESS_COLLECTOR = process_collector.PROCESS_COLLECTOR diff --git a/tests/test_client.py b/tests/test_client.py index 70204dad..2ad098e7 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -5,6 +5,7 @@ from prometheus_client import Gauge, Counter, Summary, Histogram, Metric from prometheus_client import CollectorRegistry, generate_latest, ProcessCollector +from prometheus_client import build_pushgateway_url class TestCounter(unittest.TestCase): @@ -372,6 +373,24 @@ def test_working_fake_pid(self): self.assertEqual(None, self.registry.get_sample_value('process_fake_namespace')) +class TestBuildPushgatewayUrl(unittest.TestCase): + def test_job_instance(self): + expected = 'http://localhost:9091/metrics/jobs/foojob/instances/fooinstance' + + url = build_pushgateway_url('foojob', 'fooinstance') + self.assertEqual(url, expected) + + def test_host_port(self): + expected = 'http://foohost:9092/metrics/jobs/foojob' + + url = build_pushgateway_url('foojob', host='foohost', port=9092) + self.assertEqual(url, expected) + + def test_url_escaping(self): + expected = 'http://localhost:9091/metrics/jobs/foo%20job' + + url = build_pushgateway_url('foo job') + self.assertEqual(url, expected) if __name__ == '__main__': From 6b312cd9e77bcb23cde5c7caf8f28acca2c6de5d Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Sun, 12 Jul 2015 21:52:52 +0100 Subject: [PATCH 2/3] Rework push gateway code. --- README.md | 2 +- prometheus_client/__init__.py | 43 +------------- prometheus_client/exposition.py | 49 +++++++++++++++ tests/test_client.py | 102 ++++++++++++++++++++++++++------ 4 files changed, 136 insertions(+), 60 deletions(-) diff --git a/README.md b/README.md index e2bdf6e2..f2efaa98 100644 --- a/README.md +++ b/README.md @@ -256,7 +256,7 @@ from prometheus_client import CollectorRegistry,Gauge,push_to_gateway registry = CollectorRegistry() g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry) g.set_to_current_time() -push_to_gateway(registry, job='batchA') +push_to_gateway('localhost:9091', job='batchA', registry=registry) ``` A separate registry is used, as the default registry may contain other metrics diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index 649b1466..29e889b5 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -4,14 +4,6 @@ from . import exposition from . import process_collector -try: - from urllib2 import urlopen, quote -except ImportError: - # Python 3 - from urllib.request import urlopen - from urllib.parse import quote - - __all__ = ['Counter', 'Gauge', 'Summary', 'Histogram'] # http://stackoverflow.com/questions/19913653/no-unicode-in-all-for-a-packages-init __all__ = [n.encode('ascii') for n in __all__] @@ -29,38 +21,9 @@ MetricsHandler = exposition.MetricsHandler start_http_server = exposition.start_http_server write_to_textfile = exposition.write_to_textfile - - -def build_pushgateway_url(job, instance=None, host='localhost', port=9091): - ''' - Build a valid pushgateway url - ''' - - if instance: - instancestr = '/instances/{}'.format(instance) - else: - instancestr = '' - - url = 'http://{}:{}/metrics/jobs/{}{}'.format(host, port, - quote(job), - quote(instancestr)) - return url - - -def push_to_gateway_url(url, registry, timeout=None): - '''Push metrics to the given url''' - - resp = urlopen(url, data=generate_latest(registry), timeout=timeout) - if resp.code >= 400: - raise IOError("error pushing to pushgateway: {0} {1}".format( - resp.code, resp.msg)) - - -def push_to_gateway(registry, job, instance=None, host='localhost', port=9091, timeout=None): - '''Push metrics to a pushgateway''' - - url = build_pushgateway_url(job, instance, host, port) - push_to_gateway_url(url, registry, timeout) +push_to_gateway = exposition.push_to_gateway +pushadd_to_gateway = exposition.pushadd_to_gateway +delete_from_gateway = exposition.delete_from_gateway ProcessCollector = process_collector.ProcessCollector PROCESS_COLLECTOR = process_collector.PROCESS_COLLECTOR diff --git a/prometheus_client/exposition.py b/prometheus_client/exposition.py index 51e57c07..6eeae54e 100644 --- a/prometheus_client/exposition.py +++ b/prometheus_client/exposition.py @@ -10,11 +10,15 @@ try: from BaseHTTPServer import BaseHTTPRequestHandler from BaseHTTPServer import HTTPServer + from urllib2 import build_opener, Request, HTTPHandler + from urllib import quote_plus except ImportError: # Python 3 unicode = str from http.server import BaseHTTPRequestHandler from http.server import HTTPServer + from urllib.request import build_opener, Request, HTTPHandler + from urllib.parse import quote_plus CONTENT_TYPE_LATEST = 'text/plain; version=0.0.4; charset=utf-8' @@ -72,3 +76,48 @@ def write_to_textfile(path, registry): f.write(generate_latest(registry)) # rename(2) is atomic. os.rename(tmppath, path) + + +def push_to_gateway(gateway, job, registry=core.REGISTRY, grouping_key=None, timeout=None): + '''Push metrics to the given pushgateway. + + This overwrites all metrics with the same job and grouping_key. + This uses the PUT HTTP method.''' + _use_gateway('PUT', gateway, job, registry, grouping_key, timeout) + + +def pushadd_to_gateway(gateway, job, registry=core.REGISTRY, grouping_key=None, timeout=None): + '''PushAdd metrics to the given pushgateway. + + This replaces metrics with the same name, job and grouping_key. + This uses the POST HTTP method.''' + _use_gateway('POST', gateway, job, registry, grouping_key, timeout) + + +def delete_from_gateway(gateway, job, grouping_key=None, timeout=None): + '''Delete metrics from the given pushgateway. + + This deletes metrics with the given job and grouping_key. + This uses the DELETE HTTP method.''' + _use_gateway('DELETE', gateway, job, None, grouping_key, timeout) + + +def _use_gateway(method, gateway, job, registry, grouping_key, timeout): + url = 'http://{0}/job/{1}'.format(gateway, quote_plus(job)) + + data = b'' + if method != 'DELETE': + data = generate_latest(registry) + + if grouping_key is None: + grouping_key = {} + url = url + ''.join(['/{0}/{1}'.format(quote_plus(str(k)), quote_plus(str(v))) + for k, v in sorted(grouping_key.items())]) + + request = Request(url, data=data) + request.add_header('Content-Type', CONTENT_TYPE_LATEST) + request.get_method = lambda: method + resp = build_opener(HTTPHandler).open(request, timeout=timeout) + if resp.code >= 400: + raise IOError("error talking to pushgateway: {0} {1}".format( + resp.code, resp.msg)) diff --git a/tests/test_client.py b/tests/test_client.py index 2ad098e7..d387aa0c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,11 +1,22 @@ from __future__ import unicode_literals import os +import threading import unittest from prometheus_client import Gauge, Counter, Summary, Histogram, Metric from prometheus_client import CollectorRegistry, generate_latest, ProcessCollector -from prometheus_client import build_pushgateway_url +from prometheus_client import push_to_gateway, pushadd_to_gateway, delete_from_gateway +from prometheus_client import CONTENT_TYPE_LATEST + +try: + from BaseHTTPServer import BaseHTTPRequestHandler + from BaseHTTPServer import HTTPServer +except ImportError: + # Python 3 + from http.server import BaseHTTPRequestHandler + from http.server import HTTPServer + class TestCounter(unittest.TestCase): @@ -373,24 +384,77 @@ def test_working_fake_pid(self): self.assertEqual(None, self.registry.get_sample_value('process_fake_namespace')) -class TestBuildPushgatewayUrl(unittest.TestCase): - def test_job_instance(self): - expected = 'http://localhost:9091/metrics/jobs/foojob/instances/fooinstance' - - url = build_pushgateway_url('foojob', 'fooinstance') - self.assertEqual(url, expected) - - def test_host_port(self): - expected = 'http://foohost:9092/metrics/jobs/foojob' - - url = build_pushgateway_url('foojob', host='foohost', port=9092) - self.assertEqual(url, expected) - - def test_url_escaping(self): - expected = 'http://localhost:9091/metrics/jobs/foo%20job' - - url = build_pushgateway_url('foo job') - self.assertEqual(url, expected) +class TestPushGateway(unittest.TestCase): + def setUp(self): + self.registry = CollectorRegistry() + self.counter = Gauge('g', 'help', registry=self.registry) + self.requests = requests = [] + class TestHandler(BaseHTTPRequestHandler): + def do_PUT(self): + self.send_response(201) + length = int(self.headers['content-length']) + requests.append((self, self.rfile.read(length))) + + do_POST = do_PUT + do_DELETE = do_PUT + + httpd = HTTPServer(('', 0), TestHandler) + self.address = ':'.join([str(x) for x in httpd.server_address]) + class TestServer(threading.Thread): + def run(self): + httpd.handle_request() + self.server = TestServer() + self.server.daemon = True + self.server.start() + + def test_push(self): + push_to_gateway(self.address, "my_job", self.registry) + self.assertEqual(self.requests[0][0].command, 'PUT') + self.assertEqual(self.requests[0][0].path, '/job/my_job') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_push_with_groupingkey(self): + push_to_gateway(self.address, "my_job", self.registry, {'a': 9}) + self.assertEqual(self.requests[0][0].command, 'PUT') + self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_push_with_complex_groupingkey(self): + push_to_gateway(self.address, "my_job", self.registry, {'a': 9, 'b': 'a/ z'}) + self.assertEqual(self.requests[0][0].command, 'PUT') + self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9/b/a%2F+z') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_pushadd(self): + pushadd_to_gateway(self.address, "my_job", self.registry) + self.assertEqual(self.requests[0][0].command, 'POST') + self.assertEqual(self.requests[0][0].path, '/job/my_job') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_pushadd_with_groupingkey(self): + pushadd_to_gateway(self.address, "my_job", self.registry, {'a': 9}) + self.assertEqual(self.requests[0][0].command, 'POST') + self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_delete(self): + delete_from_gateway(self.address, "my_job") + self.assertEqual(self.requests[0][0].command, 'DELETE') + self.assertEqual(self.requests[0][0].path, '/job/my_job') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'') + + def test_pushadd_with_groupingkey(self): + delete_from_gateway(self.address, "my_job", {'a': 9}) + self.assertEqual(self.requests[0][0].command, 'DELETE') + self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'') if __name__ == '__main__': From 3632050602be6e094ec54257cd450fd357ee9f6b Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 15 Jul 2015 21:32:29 +0100 Subject: [PATCH 3/3] Add function to provide IP as grouping key --- README.md | 11 +++++++++++ prometheus_client/__init__.py | 1 + prometheus_client/exposition.py | 7 +++++++ tests/test_client.py | 5 ++++- 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f2efaa98..33769544 100644 --- a/README.md +++ b/README.md @@ -262,6 +262,17 @@ push_to_gateway('localhost:9091', job='batchA', registry=registry) A separate registry is used, as the default registry may contain other metrics such as those from the Process Collector. +Pushgateway functions take a grouping key. `push_to_gateway` replaces metrics +with the same grouping key, `pushadd_to_gateway` only replaces metrics with the +same name and grouping key and `delete_from_gateway` deletes metrics with the +given job and grouping key. See the +[Pushgateway documentation](https://github.com/prometheus/pushgateway/blob/master/README.md) +for more information. + +`instance_ip_grouping_key` returns a grouping key with the instance label set +to the host's IP address. + + ## Bridges It is also possible to expose metrics to systems other than Prometheus. diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index 29e889b5..80424dbf 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -24,6 +24,7 @@ push_to_gateway = exposition.push_to_gateway pushadd_to_gateway = exposition.pushadd_to_gateway delete_from_gateway = exposition.delete_from_gateway +instance_ip_grouping_key = exposition.instance_ip_grouping_key ProcessCollector = process_collector.ProcessCollector PROCESS_COLLECTOR = process_collector.PROCESS_COLLECTOR diff --git a/prometheus_client/exposition.py b/prometheus_client/exposition.py index 6eeae54e..4b71fc44 100644 --- a/prometheus_client/exposition.py +++ b/prometheus_client/exposition.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import os +import socket import time import threading @@ -121,3 +122,9 @@ def _use_gateway(method, gateway, job, registry, grouping_key, timeout): if resp.code >= 400: raise IOError("error talking to pushgateway: {0} {1}".format( resp.code, resp.msg)) + +def instance_ip_grouping_key(): + '''Grouping key with instance set to the IP Address of this host.''' + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(('', 0)) + return {'instance': s.getsockname()[0]} diff --git a/tests/test_client.py b/tests/test_client.py index d387aa0c..2ee2fc08 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -7,7 +7,7 @@ from prometheus_client import Gauge, Counter, Summary, Histogram, Metric from prometheus_client import CollectorRegistry, generate_latest, ProcessCollector from prometheus_client import push_to_gateway, pushadd_to_gateway, delete_from_gateway -from prometheus_client import CONTENT_TYPE_LATEST +from prometheus_client import CONTENT_TYPE_LATEST, instance_ip_grouping_key try: from BaseHTTPServer import BaseHTTPRequestHandler @@ -456,6 +456,9 @@ def test_pushadd_with_groupingkey(self): self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) self.assertEqual(self.requests[0][1], b'') + def test_instance_ip_grouping_key(self): + self.assertTrue('' != instance_ip_grouping_key()['instance']) + if __name__ == '__main__': unittest.main()