diff --git a/README.md b/README.md index 8aa1d326..0e730682 100644 --- a/README.md +++ b/README.md @@ -321,6 +321,24 @@ for more information. `instance_ip_grouping_key` returns a grouping key with the instance label set to the host's IP address. +### Handlers for authentication + +If the push gateway you are connecting to is protected with HTTP Basic Auth, +you can use a special handler to set the Authorization header. + +```python +from prometheus_client import CollectorRegistry, Gauge, push_to_gateway +from prometheus_client.exposition import basic_auth_handler + +def my_auth_handler(url, method, timeout, headers, data): + username = 'foobar' + password = 'secret123' + return basic_auth_handler(url, method, timeout, headers, data, username, password) +registry = CollectorRegistry() +g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry) +g.set_to_current_time() +push_to_gateway('localhost:9091', job='batchA', registry=registry, handler=my_auth_handler) +``` ## Bridges diff --git a/prometheus_client/exposition.py b/prometheus_client/exposition.py index 9ef4d259..26ac0c8b 100644 --- a/prometheus_client/exposition.py +++ b/prometheus_client/exposition.py @@ -8,6 +8,8 @@ import threading from contextlib import closing from wsgiref.simple_server import make_server +import base64 +import sys from . import core try: @@ -118,7 +120,46 @@ def write_to_textfile(path, registry): os.rename(tmppath, path) -def push_to_gateway(gateway, job, registry, grouping_key=None, timeout=None): +def default_handler(url, method, timeout, headers, data): + '''Default handler that implements HTTP/HTTPS connections. + + Used by the push_to_gateway functions. Can be re-used by other handlers.''' + def handle(): + request = Request(url, data=data) + request.get_method = lambda: method + for k, v in headers: + request.add_header(k, v) + resp = build_opener(HTTPHandler).open(request, timeout=timeout) + if resp.code >= 400: + raise IOError("error talking to pushgateway: {0} {1}".format( + resp.code, resp.msg)) + + return handle + + +def basic_auth_handler(url, method, timeout, headers, data, username=None, password=None): + '''Handler that implements HTTP/HTTPS connections with Basic Auth. + + Sets auth headers using supplied 'username' and 'password', if set. + Used by the push_to_gateway functions. Can be re-used by other handlers.''' + def handle(): + '''Handler that implements HTTP Basic Auth. + ''' + if username is not None and password is not None: + if sys.version_info >= (3,0): + auth_value = bytes('{0}:{1}'.format(username, password), 'utf8') + auth_token = str(base64.b64encode(auth_value), 'utf8') + else: + auth_value = '{0}:{1}'.format(username, password) + auth_token = base64.b64encode(auth_value) + auth_header = "Basic {0}".format(auth_token) + headers.append(['Authorization', auth_header]) + default_handler(url, method, timeout, headers, data)() + + return handle + + +def push_to_gateway(gateway, job, registry, grouping_key=None, timeout=None, handler=default_handler): '''Push metrics to the given pushgateway. `gateway` the url for your push gateway. Either of the form @@ -130,13 +171,37 @@ def push_to_gateway(gateway, job, registry, grouping_key=None, timeout=None): Defaults to None `timeout` is how long push will attempt to connect before giving up. Defaults to None + `handler` is an optional function which can be provided to perform + requests to the 'gateway'. + Defaults to None, in which case an http or https request + will be carried out by a default handler. + If not None, the argument must be a function which accepts + the following arguments: + url, method, timeout, headers, and content + May be used to implement additional functionality not + supported by the built-in default handler (such as SSL + client certicates, and HTTP authentication mechanisms). + 'url' is the URL for the request, the 'gateway' argument + described earlier will form the basis of this URL. + 'method' is the HTTP method which should be used when + carrying out the request. + 'timeout' requests not successfully completed after this + many seconds should be aborted. If timeout is None, then + the handler should not set a timeout. + 'headers' is a list of ("header-name","header-value") tuples + which must be passed to the pushgateway in the form of HTTP + request headers. + The function should raise an exception (e.g. IOError) on + failure. + 'content' is the data which should be used to form the HTTP + Message Body. This overwrites all metrics with the same job and grouping_key. This uses the PUT HTTP method.''' - _use_gateway('PUT', gateway, job, registry, grouping_key, timeout) + _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler) -def pushadd_to_gateway(gateway, job, registry, grouping_key=None, timeout=None): +def pushadd_to_gateway(gateway, job, registry, grouping_key=None, timeout=None, handler=default_handler): '''PushAdd metrics to the given pushgateway. `gateway` the url for your push gateway. Either of the form @@ -148,13 +213,19 @@ def pushadd_to_gateway(gateway, job, registry, grouping_key=None, timeout=None): Defaults to None `timeout` is how long push will attempt to connect before giving up. Defaults to None + `handler` is an optional function which can be provided to perform + requests to the 'gateway'. + Defaults to None, in which case an http or https request + will be carried out by a default handler. + See the 'prometheus_client.push_to_gateway' documentation + for implementation requirements. This replaces metrics with the same name, job and grouping_key. This uses the POST HTTP method.''' - _use_gateway('POST', gateway, job, registry, grouping_key, timeout) + _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler) -def delete_from_gateway(gateway, job, grouping_key=None, timeout=None): +def delete_from_gateway(gateway, job, grouping_key=None, timeout=None, handler=default_handler): '''Delete metrics from the given pushgateway. `gateway` the url for your push gateway. Either of the form @@ -165,14 +236,21 @@ def delete_from_gateway(gateway, job, grouping_key=None, timeout=None): Defaults to None `timeout` is how long delete will attempt to connect before giving up. Defaults to None + `handler` is an optional function which can be provided to perform + requests to the 'gateway'. + Defaults to None, in which case an http or https request + will be carried out by a default handler. + See the 'prometheus_client.push_to_gateway' documentation + for implementation requirements. This deletes metrics with the given job and grouping_key. This uses the DELETE HTTP method.''' - _use_gateway('DELETE', gateway, job, None, grouping_key, timeout) + _use_gateway('DELETE', gateway, job, None, grouping_key, timeout, handler) -def _use_gateway(method, gateway, job, registry, grouping_key, timeout): - if not (gateway.startswith('http://') or gateway.startswith('https://')): +def _use_gateway(method, gateway, job, registry, grouping_key, timeout, handler): + gateway_url = urlparse(gateway) + if not gateway_url.scheme: gateway = 'http://{0}'.format(gateway) url = '{0}/metrics/job/{1}'.format(gateway, quote_plus(job)) @@ -185,13 +263,9 @@ def _use_gateway(method, gateway, job, registry, grouping_key, timeout): url = url + ''.join(['/{0}/{1}'.format(quote_plus(str(k)), quote_plus(str(v))) for k, v in sorted(grouping_key.items())]) - request = Request(url, data=data) - request.add_header('Content-Type', CONTENT_TYPE_LATEST) - request.get_method = lambda: method - resp = build_opener(HTTPHandler).open(request, timeout=timeout) - if resp.code >= 400: - raise IOError("error talking to pushgateway: {0} {1}".format( - resp.code, resp.msg)) + headers=[('Content-Type', CONTENT_TYPE_LATEST)] + handler(url=url, method=method, timeout=timeout, + headers=headers, data=data)() def instance_ip_grouping_key(): '''Grouping key with instance set to the IP Address of this host.''' diff --git a/tests/test_exposition.py b/tests/test_exposition.py index fa5cfdb0..3b1fb16f 100644 --- a/tests/test_exposition.py +++ b/tests/test_exposition.py @@ -13,6 +13,7 @@ from prometheus_client import CollectorRegistry, generate_latest from prometheus_client import push_to_gateway, pushadd_to_gateway, delete_from_gateway from prometheus_client import CONTENT_TYPE_LATEST, instance_ip_grouping_key +from prometheus_client.exposition import default_handler, basic_auth_handler try: from BaseHTTPServer import BaseHTTPRequestHandler @@ -22,7 +23,6 @@ from http.server import BaseHTTPRequestHandler from http.server import HTTPServer - class TestGenerateText(unittest.TestCase): def setUp(self): self.registry = CollectorRegistry() @@ -99,7 +99,10 @@ def setUp(self): self.requests = requests = [] class TestHandler(BaseHTTPRequestHandler): def do_PUT(self): - self.send_response(201) + if 'with_basic_auth' in self.requestline and self.headers['authorization'] != 'Basic Zm9vOmJhcg==': + self.send_response(401) + else: + self.send_response(201) length = int(self.headers['content-length']) requests.append((self, self.rfile.read(length))) self.end_headers() @@ -108,7 +111,7 @@ def do_PUT(self): do_DELETE = do_PUT httpd = HTTPServer(('localhost', 0), TestHandler) - self.address = ':'.join([str(x) for x in httpd.server_address]) + self.address = 'http://localhost:{0}'.format(httpd.server_address[1]) class TestServer(threading.Thread): def run(self): httpd.handle_request() @@ -165,6 +168,26 @@ def test_delete_with_groupingkey(self): self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) self.assertEqual(self.requests[0][1], b'') + def test_push_with_handler(self): + def my_test_handler(url, method, timeout, headers, data): + headers.append(['X-Test-Header', 'foobar']) + return default_handler(url, method, timeout, headers, data) + push_to_gateway(self.address, "my_job", self.registry, handler=my_test_handler) + self.assertEqual(self.requests[0][0].command, 'PUT') + self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][0].headers.get('x-test-header'), 'foobar') + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_push_with_basic_auth_handler(self): + def my_auth_handler(url, method, timeout, headers, data): + return basic_auth_handler(url, method, timeout, headers, data, "foo", "bar") + push_to_gateway(self.address, "my_job_with_basic_auth", self.registry, handler=my_auth_handler) + self.assertEqual(self.requests[0][0].command, 'PUT') + self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job_with_basic_auth') + self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST) + self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + @unittest.skipIf( sys.platform == "darwin", "instance_ip_grouping_key() does not work on macOS."