diff --git a/.gitignore b/.gitignore index 0ba244f6..fe6a1845 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,3 @@ dist .coverage .tox .*cache -htmlcov \ No newline at end of file diff --git a/prometheus_client/openmetrics/parser.py b/prometheus_client/openmetrics/parser.py index 6230d962..a964abb6 100644 --- a/prometheus_client/openmetrics/parser.py +++ b/prometheus_client/openmetrics/parser.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import math +import sys from .. import core @@ -112,11 +113,16 @@ def _parse_labels(it, text): elif char == '"': if not core._METRIC_LABEL_NAME_RE.match(''.join(labelname)): raise ValueError("Invalid line: " + text) - labels[''.join(labelname)] = ''.join(labelvalue) + utf8_str = ''.join(labelvalue) + try: + utf8_str.encode('utf-8') + except UnicodeError: + raise ValueError("Invalid line: " + text) + labels[''.join(labelname)] = utf8_str labelname = [] labelvalue = [] state = 'endoflabelvalue' - else: + else: labelvalue.append(char) elif state == 'endoflabelvalue': if char == ',': @@ -133,7 +139,7 @@ def _parse_labels(it, text): labelvalue.append('\n') elif char == '"': labelvalue.append('"') - else: + else: labelvalue.append('\\' + char) elif state == 'endoflabels': if char == ' ': @@ -142,7 +148,6 @@ def _parse_labels(it, text): raise ValueError("Invalid line: " + text) return labels - def _parse_sample(text): name = [] value = [] diff --git a/tests/openmetrics/test_parser.py b/tests/openmetrics/test_parser.py index 9339530a..47e844f2 100644 --- a/tests/openmetrics/test_parser.py +++ b/tests/openmetrics/test_parser.py @@ -247,6 +247,23 @@ def test_labels_with_curly_braces(self): metric_family.add_metric(["bar", "b{a}z"], 1) self.assertEqual([metric_family], list(families)) + def test_labels_with_invalid_utf8_values(self): + try: + if sys.version_info >= (3,): + inj = u'\uD802' + else: + inj = b'\xef' + str = '''# TYPE a counter +# HELP a help +a_total{foo="'''+inj+'''",bar="baz"} 1 +# EOF +''' + families = text_string_to_metric_families(str) + for f in families: pass + assert False + except ValueError: + assert True + def test_empty_help(self): families = text_string_to_metric_families("""# TYPE a counter # HELP a