From 078350e35f43e63a4183b2c36cca9de1c54f09cf Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Sun, 21 Jun 2015 12:08:32 +0200 Subject: [PATCH 01/17] initial implementation of line protocol as introduced in https://github.com/influxdb/influxdb/pull/2696 --- influxdb/client.py | 23 +++------ influxdb/line_protocol.py | 103 ++++++++++++++++++++++++++++++++++++++ requirements.txt | 4 +- 3 files changed, 114 insertions(+), 16 deletions(-) create mode 100644 influxdb/line_protocol.py diff --git a/influxdb/client.py b/influxdb/client.py index 007115fc..0187e617 100755 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -11,6 +11,7 @@ import requests.exceptions from sys import version_info +from influxdb.line_protocol import make_lines from influxdb.resultset import ResultSet try: @@ -221,14 +222,7 @@ def request(self, url, method='GET', params=None, data=None, if params is None: params = {} - auth = { - 'u': self._username, - 'p': self._password - } - - params.update(auth) - - if data is not None and not isinstance(data, str): + if isinstance(data, dict) or isinstance(data, list): data = json.dumps(data) # Try to send the request a maximum of three times. (see #103) @@ -238,6 +232,7 @@ def request(self, url, method='GET', params=None, data=None, response = self._session.request( method=method, url=url, + auth=(self._username, self._password), params=params, data=data, headers=self._headers, @@ -270,10 +265,10 @@ def write(self, data, params=None, expected_response_code=204): :rtype: bool """ self.request( - url="write", + url="write_points", method='POST', params=params, - data=data, + data=make_lines(data).encode('utf-8'), expected_response_code=expected_response_code ) return True @@ -396,13 +391,12 @@ def _write_points(self, if tags: data['tags'] = tags - data['database'] = database or self._database - if self.use_udp: self.send_packet(data) else: self.write( data=data, + params={'db': database or self._database}, expected_response_code=204 ) @@ -679,9 +673,8 @@ def send_packet(self, packet): :param packet: the packet to be sent :type packet: dict """ - data = json.dumps(packet) - byte = data.encode('utf-8') - self.udp_socket.sendto(byte, (self._host, self.udp_port)) + data = make_lines(packet).encode('utf-8') + self.udp_socket.sendto(data, (self._host, self.udp_port)) class InfluxDBClusterClient(object): diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py new file mode 100644 index 00000000..11fb6d1e --- /dev/null +++ b/influxdb/line_protocol.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from copy import copy +from datetime import datetime +from time import mktime + +from dateutil.parser import parse +from pytz import utc +from six import binary_type, text_type + + +def _convert_timestamp(timestamp): + if isinstance(timestamp, int): + return timestamp + if isinstance(_force_text(timestamp), text_type): + timestamp = parse(timestamp) + if isinstance(timestamp, datetime): + if timestamp.tzinfo: + timestamp = timestamp.astimezone(utc) + timestamp.replace(tzinfo=None) + return ( + mktime(timestamp.timetuple()) * 1e9 + + timestamp.microsecond * 1e3 + ) + raise ValueError(timestamp) + + +def _escape_tag(tag): + return tag.replace( + "\\", "\\\\" + ).replace( + " ", "\\ " + ).replace( + ",", "\\," + ).replace( + "=", "\\=" + ) + + +def _escape_value(value): + value = _force_text(value) + if isinstance(value, text_type): + return "\"{}\"".format(value.replace( + "\"", "\\\"" + )) + else: + return str(value) + + +def _force_text(data): + """ + Try to return a text aka unicode object from the given data. + """ + if isinstance(data, binary_type): + return data.decode('utf-8', 'replace') + else: + return data + + +def make_lines(data): + """ + Extracts the points from the given dict and returns a Unicode string + matching the line protocol introduced in InfluxDB 0.9.0. + """ + lines = "" + static_tags = data.get('tags', None) + for point in data['points']: + # add measurement name + lines += _escape_tag(_force_text( + point.get('measurement', data.get('measurement')) + )) + "," + + # add tags + if static_tags is None: + tags = point.get('tags', {}) + else: + tags = copy(static_tags) + tags.update(point.get('tags', {})) + # tags should be sorted client-side to take load off server + for tag_key in sorted(tags.keys()): + lines += "{key}={value},".format( + key=_escape_tag(tag_key), + value=_escape_tag(tags[tag_key]), + ) + lines = lines[:-1] + " " # strip the trailing comma + + # add fields + for field_key in sorted(point['fields'].keys()): + lines += "{key}={value},".format( + key=_escape_tag(field_key), + value=_escape_value(point['fields'][field_key]), + ) + lines = lines[:-1] # strip the trailing comma + + # add timestamp + if 'timestamp' in point: + lines += " " + _force_text(str(int( + _convert_timestamp(point['timestamp']) + ))) + + lines += "\n" + return lines diff --git a/requirements.txt b/requirements.txt index 45cc6284..3445ca42 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ +python-dateutil>=2.0.0 +pytz requests>=1.0.3 -six==1.9.0 \ No newline at end of file +six==1.9.0 From 9b753de079d3b46f8a3ada1f27f791d739b44f47 Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Sun, 21 Jun 2015 12:09:28 +0200 Subject: [PATCH 02/17] update client tests for line protocol --- tests/influxdb/client_test.py | 29 ++++++++++------------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index 605713fb..e329eeb0 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -139,7 +139,7 @@ def test_write(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write", + "http://localhost:8086/write_points", status_code=204 ) cli = InfluxDBClient(database='db') @@ -154,21 +154,15 @@ def test_write(self): ) self.assertEqual( - json.loads(m.last_request.body), - {"database": "mydb", - "retentionPolicy": "mypolicy", - "points": [{"measurement": "cpu_load_short", - "tags": {"host": "server01", - "region": "us-west"}, - "timestamp": "2009-11-10T23:00:00Z", - "fields": {"value": 0.64}}]} + m.last_request.body, + b"cpu_load_short,host=server01,region=us-west value=0.64 1257890400000000000\n", ) def test_write_points(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write", + "http://localhost:8086/write_points", status_code=204 ) @@ -188,7 +182,7 @@ def test_write_points_toplevel_attributes(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write", + "http://localhost:8086/write_points", status_code=204 ) @@ -227,7 +221,7 @@ def test_write_points_batch(self): "fields": {"value": 12.00}}]} with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, - "http://localhost:8086/write", + "http://localhost:8086/write_points", status_code=204) cli = InfluxDBClient(database='db') cli.write_points(points=dummy_points, @@ -284,7 +278,7 @@ def test_write_points_with_precision(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write", + "http://localhost:8086/write_points", status_code=204 ) @@ -294,12 +288,9 @@ def test_write_points_with_precision(self): time_precision='n' ) - self.assertDictEqual( - {'points': self.dummy_points, - 'database': 'db', - 'precision': 'n', - }, - json.loads(m.last_request.body) + self.assertEqual( + b"cpu_load_short,host=server01,region=us-west value=0.64 1257890400000000000\n", + m.last_request.body, ) def test_write_points_bad_precision(self): From 99cc8caf8e599e86c8dd03bdc0ccbad2d8aa1a98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20=C5=A0tetiar?= Date: Wed, 24 Jun 2015 17:19:14 +0200 Subject: [PATCH 03/17] Fixes for line-protocol feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Working again with https://github.com/influxdb/influxdb/commit/246ce61b85d3d7b1eb5da354c35e2c4b4b42020e Signed-off-by: Petr Štetiar --- influxdb/client.py | 16 ++++++++++++---- tests/influxdb/client_test.py | 10 +++++----- 2 files changed, 17 insertions(+), 9 deletions(-) mode change 100755 => 100644 influxdb/client.py diff --git a/influxdb/client.py b/influxdb/client.py old mode 100755 new mode 100644 index 0187e617..3e87b739 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -198,7 +198,7 @@ def switch_user(self, username, password): self._password = password def request(self, url, method='GET', params=None, data=None, - expected_response_code=200): + expected_response_code=200, headers=None): """Make a HTTP request to the InfluxDB API. :param url: the path of the HTTP request, e.g. write, query, etc. @@ -219,6 +219,9 @@ def request(self, url, method='GET', params=None, data=None, """ url = "{0}/{1}".format(self._baseurl, url) + if headers is None: + headers = self._headers + if params is None: params = {} @@ -235,7 +238,7 @@ def request(self, url, method='GET', params=None, data=None, auth=(self._username, self._password), params=params, data=data, - headers=self._headers, + headers=headers, verify=self._verify_ssl, timeout=self._timeout ) @@ -264,12 +267,17 @@ def write(self, data, params=None, expected_response_code=204): :returns: True, if the write operation is successful :rtype: bool """ + + headers = self._headers + headers['Content-type'] = 'application/octet-stream' + self.request( - url="write_points", + url="write", method='POST', params=params, data=make_lines(data).encode('utf-8'), - expected_response_code=expected_response_code + expected_response_code=expected_response_code, + headers=headers ) return True diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index e329eeb0..8e3ef5f8 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -139,7 +139,7 @@ def test_write(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write_points", + "http://localhost:8086/write", status_code=204 ) cli = InfluxDBClient(database='db') @@ -162,7 +162,7 @@ def test_write_points(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write_points", + "http://localhost:8086/write", status_code=204 ) @@ -182,7 +182,7 @@ def test_write_points_toplevel_attributes(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write_points", + "http://localhost:8086/write", status_code=204 ) @@ -221,7 +221,7 @@ def test_write_points_batch(self): "fields": {"value": 12.00}}]} with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, - "http://localhost:8086/write_points", + "http://localhost:8086/write", status_code=204) cli = InfluxDBClient(database='db') cli.write_points(points=dummy_points, @@ -278,7 +278,7 @@ def test_write_points_with_precision(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write_points", + "http://localhost:8086/write", status_code=204 ) From ca10f5e31bf4a01443f6847b04c5c8e3ae01f6a4 Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Wed, 24 Jun 2015 22:47:48 +0200 Subject: [PATCH 04/17] line protocol: fix timestamp UTC conversion --- influxdb/line_protocol.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py index 11fb6d1e..27eafa0f 100644 --- a/influxdb/line_protocol.py +++ b/influxdb/line_protocol.py @@ -1,9 +1,9 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals +from calendar import timegm from copy import copy from datetime import datetime -from time import mktime from dateutil.parser import parse from pytz import utc @@ -20,7 +20,7 @@ def _convert_timestamp(timestamp): timestamp = timestamp.astimezone(utc) timestamp.replace(tzinfo=None) return ( - mktime(timestamp.timetuple()) * 1e9 + + timegm(timestamp.timetuple()) * 1e9 + timestamp.microsecond * 1e3 ) raise ValueError(timestamp) From f02967766a3f70e4efa6df2c8878ec27242cd01a Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Wed, 24 Jun 2015 22:48:47 +0200 Subject: [PATCH 05/17] the timestamp field is called 'time' --- influxdb/line_protocol.py | 4 ++-- tests/influxdb/client_test.py | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py index 27eafa0f..2b99d469 100644 --- a/influxdb/line_protocol.py +++ b/influxdb/line_protocol.py @@ -94,9 +94,9 @@ def make_lines(data): lines = lines[:-1] # strip the trailing comma # add timestamp - if 'timestamp' in point: + if 'time' in point: lines += " " + _force_text(str(int( - _convert_timestamp(point['timestamp']) + _convert_timestamp(point['time']) ))) lines += "\n" diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index 8e3ef5f8..ca913860 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -88,7 +88,7 @@ def setUp(self): "host": "server01", "region": "us-west" }, - "timestamp": "2009-11-10T23:00:00Z", + "time": "2009-11-10T23:00:00Z", "fields": { "value": 0.64 } @@ -149,7 +149,7 @@ def test_write(self): "points": [{"measurement": "cpu_load_short", "tags": {"host": "server01", "region": "us-west"}, - "timestamp": "2009-11-10T23:00:00Z", + "time": "2009-11-10T23:00:00Z", "fields": {"value": 0.64}}]} ) @@ -206,18 +206,18 @@ def test_write_points_toplevel_attributes(self): def test_write_points_batch(self): dummy_points = [ {"measurement": "cpu_usage", "tags": {"unit": "percent"}, - "timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, {"measurement": "network", "tags": {"direction": "in"}, - "timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, {"measurement": "network", "tags": {"direction": "out"}, - "timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} ] expected_last_body = {"tags": {"host": "server01", "region": "us-west"}, "database": "db", "points": [{"measurement": "network", "tags": {"direction": "out"}, - "timestamp": "2009-11-10T23:00:00Z", + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}]} with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, From dae6a8a69a86d452f7ffe2a0603d6b581fd31d1f Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Wed, 24 Jun 2015 22:49:15 +0200 Subject: [PATCH 06/17] fix some expected timestamps --- tests/influxdb/client_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index ca913860..45532907 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -155,7 +155,7 @@ def test_write(self): self.assertEqual( m.last_request.body, - b"cpu_load_short,host=server01,region=us-west value=0.64 1257890400000000000\n", + b"cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", ) def test_write_points(self): @@ -289,7 +289,7 @@ def test_write_points_with_precision(self): ) self.assertEqual( - b"cpu_load_short,host=server01,region=us-west value=0.64 1257890400000000000\n", + b"cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", m.last_request.body, ) From 7fae65ff0fcabb998b02d22c136edb19ffd4da7f Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Sat, 27 Jun 2015 01:32:31 +0200 Subject: [PATCH 07/17] set precision and retention policy in GET params --- influxdb/client.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 3e87b739..c295a424 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -390,21 +390,25 @@ def _write_points(self, 'points': points } + if tags: + data['tags'] = tags + + params = { + 'db': database or self._database + } + if time_precision: - data['precision'] = time_precision + params['precision'] = time_precision if retention_policy: - data['retentionPolicy'] = retention_policy - - if tags: - data['tags'] = tags + params['rp'] = retention_policy if self.use_udp: self.send_packet(data) else: self.write( data=data, - params={'db': database or self._database}, + params=params, expected_response_code=204 ) From 1350d05a6c60f9d916c991c3bfa1ab503cf35c3a Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Sat, 27 Jun 2015 01:39:19 +0200 Subject: [PATCH 08/17] update some more test cases --- tests/influxdb/client_test.py | 20 ++---- tests/influxdb/client_test_with_server.py | 7 ++- tests/influxdb/dataframe_client_test.py | 74 +++++------------------ 3 files changed, 26 insertions(+), 75 deletions(-) diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index 45532907..3d40d9a6 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -170,12 +170,9 @@ def test_write_points(self): cli.write_points( self.dummy_points, ) - self.assertDictEqual( - { - "database": "db", - "points": self.dummy_points, - }, - json.loads(m.last_request.body) + self.assertEqual( + "cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", + m.last_request.body.decode('utf-8'), ) def test_write_points_toplevel_attributes(self): @@ -193,14 +190,9 @@ def test_write_points_toplevel_attributes(self): tags={"tag": "hello"}, retention_policy="somepolicy" ) - self.assertDictEqual( - { - "database": "testdb", - "tags": {"tag": "hello"}, - "points": self.dummy_points, - "retentionPolicy": "somepolicy" - }, - json.loads(m.last_request.body) + self.assertEqual( + "cpu_load_short,host=server01,region=us-west,tag=hello value=0.64 1257894000000000000\n", + m.last_request.body.decode('utf-8'), ) def test_write_points_batch(self): diff --git a/tests/influxdb/client_test_with_server.py b/tests/influxdb/client_test_with_server.py index 8c0bdc2d..1238e5cb 100644 --- a/tests/influxdb/client_test_with_server.py +++ b/tests/influxdb/client_test_with_server.py @@ -504,9 +504,10 @@ class CommonTests(ManyTestCasesWithServerMixin, influxdb_template_conf = os.path.join(THIS_DIR, 'influxdb.conf.template') def test_write(self): - new_dummy_point = dummy_point[0].copy() - new_dummy_point['database'] = 'db' - self.assertIs(True, self.cli.write(new_dummy_point)) + self.assertIs(True, self.cli.write( + {'points': dummy_point}, + params={'db': 'db'}, + )) @unittest.skip("fail against real server instance, " "don't know if it should succeed actually..") diff --git a/tests/influxdb/dataframe_client_test.py b/tests/influxdb/dataframe_client_test.py index 61b411fd..8f7e8e81 100644 --- a/tests/influxdb/dataframe_client_test.py +++ b/tests/influxdb/dataframe_client_test.py @@ -31,24 +31,10 @@ def test_write_points_from_dataframe(self): index=[now, now + timedelta(hours=1)], columns=["column_one", "column_two", "column_three"]) - expected = { - 'database': 'db', - 'points': [ - {'time': '1970-01-01T00:00:00+00:00', - 'fields': { - 'column_two': 1, - 'column_three': 1.0, - 'column_one': '1'}, - 'tags': {}, - 'measurement': 'foo'}, - {'time': '1970-01-01T01:00:00+00:00', - 'fields': { - 'column_two': 2, - 'column_three': 2.0, - 'column_one': '2'}, - 'tags': {}, - 'measurement': 'foo'}] - } + expected = ( + b"foo column_one=\"1\",column_three=1.0,column_two=1 0\n" + b"foo column_one=\"2\",column_three=2.0,column_two=2 3600000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -58,10 +44,10 @@ def test_write_points_from_dataframe(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, 'foo') - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) cli.write_points(dataframe, 'foo', tags=None) - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_in_batches(self): now = pd.Timestamp('1970-01-01 00:00+00:00') @@ -83,24 +69,10 @@ def test_write_points_from_dataframe_with_numeric_column_names(self): dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], index=[now, now + timedelta(hours=1)]) - expected = { - 'database': 'db', - 'points': [ - {'fields': { - '0': '1', - '1': 1, - '2': 1.0}, - 'tags': {'hello': 'there'}, - 'time': '1970-01-01T00:00:00+00:00', - 'measurement': 'foo'}, - {'fields': { - '0': '2', - '1': 2, - '2': 2.0}, - 'tags': {'hello': 'there'}, - 'time': '1970-01-01T01:00:00+00:00', - 'measurement': 'foo'}], - } + expected = ( + b"foo,hello=there 0=\"1\",1=1,2=1.0 0\n" + b"foo,hello=there 0=\"2\",1=2,2=2.0 3600000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -110,7 +82,7 @@ def test_write_points_from_dataframe_with_numeric_column_names(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, "foo", {"hello": "there"}) - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_with_period_index(self): dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], @@ -118,24 +90,10 @@ def test_write_points_from_dataframe_with_period_index(self): pd.Period('1970-01-02')], columns=["column_one", "column_two", "column_three"]) - expected = { - 'points': [ - {'measurement': 'foo', - 'tags': {}, - 'fields': { - 'column_one': '1', - 'column_two': 1, - 'column_three': 1.0}, - 'time': '1970-01-01T00:00:00+00:00'}, - {'measurement': 'foo', - 'tags': {}, - 'fields': { - 'column_one': '2', - 'column_two': 2, - 'column_three': 2.0}, - 'time': '1970-01-02T00:00:00+00:00'}], - 'database': 'db', - } + expected = ( + b"foo column_one=\"1\",column_three=1.0,column_two=1 0\n" + b"foo column_one=\"2\",column_three=2.0,column_two=2 86400000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -145,7 +103,7 @@ def test_write_points_from_dataframe_with_period_index(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, "foo") - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_with_time_precision(self): now = pd.Timestamp('1970-01-01 00:00+00:00') From 6627efac8540c65ae1d0c38ddcebefb0d9b6aaea Mon Sep 17 00:00:00 2001 From: aviau Date: Fri, 3 Jul 2015 10:40:44 -0400 Subject: [PATCH 09/17] Formatting improvements * ``if value`` can lead to errors --- influxdb/client.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 2fa4226a..def990c8 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -97,7 +97,8 @@ def __init__(self, self._headers = { 'Content-type': 'application/json', - 'Accept': 'text/plain'} + 'Accept': 'text/plain' + } @staticmethod def from_DSN(dsn, **kwargs): @@ -394,17 +395,17 @@ def _write_points(self, 'points': points } - if tags: + if tags is not None: data['tags'] = tags params = { 'db': database or self._database } - if time_precision: + if time_precision is not None: params['precision'] = time_precision - if retention_policy: + if retention_policy is not None: params['rp'] = retention_policy if self.use_udp: From 2ea2437a894a8b704ef60f9ef4731c2720b9d576 Mon Sep 17 00:00:00 2001 From: aviau Date: Fri, 3 Jul 2015 11:14:58 -0400 Subject: [PATCH 10/17] New epoch parameter + pep8 + enable udp tests * Added epoch parameter to ``query`` * Removed test_write_points_with_precision as queries now always return nanosecond precision See doc: > The format of the returned timestamps complies with RFC3339, and > has nanosecond precision. --- influxdb/client.py | 4 + tests/influxdb/client_test.py | 12 ++- tests/influxdb/client_test_with_server.py | 98 +---------------------- tests/influxdb/dataframe_client_test.py | 6 +- 4 files changed, 19 insertions(+), 101 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index def990c8..9ebf3c0f 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -270,6 +270,7 @@ def write(self, data, params=None, expected_response_code=204): def query(self, query, params={}, + epoch=None, expected_response_code=200, database=None, raise_errors=True): @@ -298,6 +299,9 @@ def query(self, params['q'] = query params['db'] = database or self._database + if epoch is not None: + params['epoch'] = epoch + response = self.request( url="query", method='GET', diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index 4da32d7b..e9185208 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -155,7 +155,8 @@ def test_write(self): self.assertEqual( m.last_request.body, - b"cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", + b"cpu_load_short,host=server01,region=us-west " + b"value=0.64 1257894000000000000\n", ) def test_write_points(self): @@ -171,7 +172,8 @@ def test_write_points(self): self.dummy_points, ) self.assertEqual( - "cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", + "cpu_load_short,host=server01,region=us-west " + "value=0.64 1257894000000000000\n", m.last_request.body.decode('utf-8'), ) @@ -191,7 +193,8 @@ def test_write_points_toplevel_attributes(self): retention_policy="somepolicy" ) self.assertEqual( - "cpu_load_short,host=server01,region=us-west,tag=hello value=0.64 1257894000000000000\n", + "cpu_load_short,host=server01,region=us-west,tag=hello " + "value=0.64 1257894000000000000\n", m.last_request.body.decode('utf-8'), ) @@ -281,7 +284,8 @@ def test_write_points_with_precision(self): ) self.assertEqual( - b"cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", + b"cpu_load_short,host=server01,region=us-west " + b"value=0.64 1257894000000000000\n", m.last_request.body, ) diff --git a/tests/influxdb/client_test_with_server.py b/tests/influxdb/client_test_with_server.py index 99bd7a71..a298d3de 100644 --- a/tests/influxdb/client_test_with_server.py +++ b/tests/influxdb/client_test_with_server.py @@ -15,7 +15,6 @@ import distutils.spawn from functools import partial import os -import re import shutil import subprocess import sys @@ -653,96 +652,6 @@ def test_write_points_batch(self): self.assertIn(12, net_out['series'][0]['values'][0]) self.assertIn(12.34, cpu['series'][0]['values'][0]) - def test_write_points_with_precision(self): - """ check that points written with an explicit precision have - actually that precision used. - """ - # for that we'll check that - for each precision - the actual 'time' - # value returned by a select has the correct regex format.. - # n : u'2015-03-20T15:23:36.615654966Z' - # u : u'2015-03-20T15:24:10.542554Z' - # ms : u'2015-03-20T15:24:50.878Z' - # s : u'2015-03-20T15:20:24Z' - # m : u'2015-03-20T15:25:00Z' - # h : u'2015-03-20T15:00:00Z' - base_regex = '\d{4}-\d{2}-\d{2}T\d{2}:' # YYYY-MM-DD 'T' hh: - base_s_regex = base_regex + '\d{2}:\d{2}' # base_regex + mm:ss - - point = { - "measurement": "cpu_load_short", - "tags": { - "host": "server01", - "region": "us-west" - }, - "time": "2009-11-10T12:34:56.123456789Z", - "fields": { - "value": 0.64 - } - } - - # As far as we can see the values aren't directly available depending - # on the precision used. - # The less the precision, the more to wait for the value to be - # actually written/available. - for idx, (precision, expected_regex, sleep_time) in enumerate(( - ('n', base_s_regex + '\.\d{9}Z', 1), - ('u', base_s_regex + '\.\d{6}Z', 1), - ('ms', base_s_regex + '\.\d{3}Z', 1), - ('s', base_s_regex + 'Z', 1), - - # ('h', base_regex + '00:00Z', ), - # that would require a sleep of possibly up to 3600 secs (/ 2 ?).. - )): - db = 'db1' # to not shoot us in the foot/head, - # we work on a fresh db each time: - self.cli.create_database(db) - before = datetime.datetime.now() - self.assertIs( - True, - self.cli.write_points( - [point], - time_precision=precision, - database=db)) - - # sys.stderr.write('checking presision with %r : - # before=%s\n' % (precision, before)) - after = datetime.datetime.now() - - if sleep_time > 1: - sleep_time -= (after if before.min != after.min - else before).second - - start = time.time() - timeout = start + sleep_time - # sys.stderr.write('should sleep %s ..\n' % sleep_time) - while time.time() < timeout: - rsp = self.cli.query('SELECT * FROM cpu_load_short', - database=db) - if rsp != {'cpu_load_short': []}: - # sys.stderr.write('already ? only slept %s\n' % ( - # time.time() - start)) - break - time.sleep(1) - else: - pass - # sys.stderr.write('ok !\n') - - # sys.stderr.write('sleeping %s..\n' % sleep_time) - - if sleep_time: - time.sleep(sleep_time) - - rsp = self.cli.query('SELECT * FROM cpu_load_short', database=db) - # sys.stderr.write('precision=%s rsp_timestamp = %r\n' % ( - # precision, rsp['cpu_load_short'][0]['time'])) - - m = re.match( - expected_regex, - list(rsp['cpu_load_short'])[0]['time'] - ) - self.assertIsNotNone(m) - self.cli.drop_database(db) - def test_query(self): self.assertIs(True, self.cli.write_points(dummy_point)) @@ -973,13 +882,11 @@ def test_query_multiple_series(self): ############################################################################ -@unittest.skip("Broken as of 0.9.0") @unittest.skipIf(not is_influxdb_bin_ok, "could not find influxd binary") class UdpTests(ManyTestCasesWithServerMixin, unittest.TestCase): influxdb_udp_enabled = True - influxdb_template_conf = os.path.join(THIS_DIR, 'influxdb.conf.template') @@ -990,14 +897,15 @@ def test_write_points_udp(self): 'root', '', database='db', - use_udp=True, udp_port=self.influxd_inst.udp_port + use_udp=True, + udp_port=self.influxd_inst.udp_port ) cli.write_points(dummy_point) # The points are not immediately available after write_points. # This is to be expected because we are using udp (no response !). # So we have to wait some time, - time.sleep(1) # 1 sec seems to be a good choice. + time.sleep(3) # 3 sec seems to be a good choice. rsp = self.cli.query('SELECT * FROM cpu_load_short') self.assertEqual( diff --git a/tests/influxdb/dataframe_client_test.py b/tests/influxdb/dataframe_client_test.py index 8f7e8e81..7f54990a 100644 --- a/tests/influxdb/dataframe_client_test.py +++ b/tests/influxdb/dataframe_client_test.py @@ -33,7 +33,8 @@ def test_write_points_from_dataframe(self): "column_three"]) expected = ( b"foo column_one=\"1\",column_three=1.0,column_two=1 0\n" - b"foo column_one=\"2\",column_three=2.0,column_two=2 3600000000000\n" + b"foo column_one=\"2\",column_three=2.0,column_two=2 " + b"3600000000000\n" ) with requests_mock.Mocker() as m: @@ -92,7 +93,8 @@ def test_write_points_from_dataframe_with_period_index(self): "column_three"]) expected = ( b"foo column_one=\"1\",column_three=1.0,column_two=1 0\n" - b"foo column_one=\"2\",column_three=2.0,column_two=2 86400000000000\n" + b"foo column_one=\"2\",column_three=2.0,column_two=2 " + b"86400000000000\n" ) with requests_mock.Mocker() as m: From a17e743c0823db3508a08f196ff4568aa9e3edbe Mon Sep 17 00:00:00 2001 From: aviau Date: Fri, 3 Jul 2015 11:22:47 -0400 Subject: [PATCH 11/17] Fixed tests for new line protocol --- tests/influxdb/client_test.py | 24 ++++++++++-------------- tests/influxdb/dataframe_client_test.py | 7 +++---- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index e9185208..80fa49db 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -207,13 +207,11 @@ def test_write_points_batch(self): {"measurement": "network", "tags": {"direction": "out"}, "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} ] - expected_last_body = {"tags": {"host": "server01", - "region": "us-west"}, - "database": "db", - "points": [{"measurement": "network", - "tags": {"direction": "out"}, - "time": "2009-11-10T23:00:00Z", - "fields": {"value": 12.00}}]} + expected_last_body = ( + "network,direction=out,host=server01,region=us-west " + "value=12.0 1257894000000000000\n" + ) + with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, "http://localhost:8086/write", @@ -225,7 +223,7 @@ def test_write_points_batch(self): "region": "us-west"}, batch_size=2) self.assertEqual(m.call_count, 2) - self.assertEqual(expected_last_body, m.last_request.json()) + self.assertEqual(expected_last_body, m.last_request.body) def test_write_points_udp(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -240,12 +238,10 @@ def test_write_points_udp(self): received_data, addr = s.recvfrom(1024) - self.assertDictEqual( - { - "points": self.dummy_points, - "database": "test" - }, - json.loads(received_data.decode(), strict=True) + self.assertEqual( + "cpu_load_short,host=server01,region=us-west " + "value=0.64 1257894000000000000\n", + received_data.decode() ) def test_write_bad_precision_udp(self): diff --git a/tests/influxdb/dataframe_client_test.py b/tests/influxdb/dataframe_client_test.py index 7f54990a..481eefbc 100644 --- a/tests/influxdb/dataframe_client_test.py +++ b/tests/influxdb/dataframe_client_test.py @@ -142,16 +142,15 @@ def test_write_points_from_dataframe_with_time_precision(self): measurement = "foo" cli.write_points(dataframe, measurement, time_precision='s') - points.update(precision='s') - self.assertEqual(json.loads(m.last_request.body), points) + self.assertEqual(m.last_request.qs['precision'], ['s']) cli.write_points(dataframe, measurement, time_precision='m') points.update(precision='m') - self.assertEqual(json.loads(m.last_request.body), points) + self.assertEqual(m.last_request.qs['precision'], ['m']) cli.write_points(dataframe, measurement, time_precision='u') points.update(precision='u') - self.assertEqual(json.loads(m.last_request.body), points) + self.assertEqual(m.last_request.qs['precision'], ['u']) @raises(TypeError) def test_write_points_from_dataframe_fails_without_time_index(self): From 439f46c2a7eba21207a28eabb7eaac68577b153d Mon Sep 17 00:00:00 2001 From: aviau Date: Fri, 3 Jul 2015 12:19:48 -0400 Subject: [PATCH 12/17] isinstance: Use tuple --- influxdb/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb/client.py b/influxdb/client.py index 9ebf3c0f..8fde6103 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -211,7 +211,7 @@ def request(self, url, method='GET', params=None, data=None, if params is None: params = {} - if isinstance(data, dict) or isinstance(data, list): + if isinstance(data, (dict, list)): data = json.dumps(data) # Try to send the request a maximum of three times. (see #103) From 58dac7b1522ea6e4bbb82cdcd6c0e6a362ea61a6 Mon Sep 17 00:00:00 2001 From: Georgi Dimitrov Date: Fri, 3 Jul 2015 22:53:30 +0100 Subject: [PATCH 13/17] Fix write_points_batch test for python3. --- tests/influxdb/client_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index 80fa49db..d175b4f0 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -223,7 +223,8 @@ def test_write_points_batch(self): "region": "us-west"}, batch_size=2) self.assertEqual(m.call_count, 2) - self.assertEqual(expected_last_body, m.last_request.body) + self.assertEqual(expected_last_body, + m.last_request.body.decode('utf-8')) def test_write_points_udp(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) From 126b39d42e4b179d14e59a3cbbbb46b7d0e35158 Mon Sep 17 00:00:00 2001 From: Georgi Dimitrov Date: Sat, 4 Jul 2015 01:30:38 +0100 Subject: [PATCH 14/17] Update queries and tests for InfluxDB v0.9.1. * Update delete_series query. Instead of an id, it has FROM and WHERE = clauses. Update tests. * Remove grant_admin_privileges method. Cluster administration privileges are granted upon user creation. Remove tests. * Update create_user method to accept an optional argument to determine whether the user should be granted cluster administration privileges. Update tests. * Fix the test_write_check_read test case in the tests against a real server. * Fix revoke_admin_privileges test. It was failing because of the grant_admin_privileges method malfunction. --- influxdb/client.py | 39 +++++++------ tests/influxdb/client_test.py | 22 -------- tests/influxdb/client_test_with_server.py | 68 ++++++++--------------- 3 files changed, 44 insertions(+), 85 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 8fde6103..409c9cc5 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -589,15 +589,20 @@ def get_list_users(self): """ return list(self.query("SHOW USERS").get_points()) - def create_user(self, username, password): + def create_user(self, username, password, admin=False): """Create a new user in InfluxDB :param username: the new username to create :type username: str :param password: the password for the new user :type password: str + :param admin: whether the user should have cluster administration + privileges or not + :type admin: boolean """ text = "CREATE USER {} WITH PASSWORD '{}'".format(username, password) + if admin: + text += ' WITH ALL PRIVILEGES' self.query(text) def drop_user(self, username): @@ -620,29 +625,27 @@ def set_user_password(self, username, password): text = "SET PASSWORD FOR {} = '{}'".format(username, password) self.query(text) - def delete_series(self, id, database=None): - """Delete series from a database. + def delete_series(self, database=None, measurement=None, tags=None): + """Delete series from a database. Series can be filtered by + measurement and tags. - :param id: the id of the series to be deleted - :type id: int + :param measurement: Delete all series from a measurement + :type id: string + :param tags: Delete all series that match given tags + :type id: dict :param database: the database from which the series should be deleted, defaults to client's current database :type database: str """ database = database or self._database - self.query('DROP SERIES %s' % id, database=database) - - def grant_admin_privileges(self, username): - """Grant cluster administration privileges to an user. - - :param username: the username to grant privileges to - :type username: str - - .. note:: Only a cluster administrator can create/ drop databases - and manage users. - """ - text = "GRANT ALL PRIVILEGES TO {}".format(username) - self.query(text) + query_str = 'DROP SERIES' + if measurement: + query_str += ' FROM "{}"'.format(measurement) + + if tags: + query_str += ' WHERE ' + ' and '.join(["{}='{}'".format(k, v) + for k, v in tags.items()]) + self.query(query_str, database=database) def revoke_admin_privileges(self, username): """Revoke cluster administration privileges from an user. diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index d175b4f0..43ecb68e 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -615,28 +615,6 @@ def test_get_list_users_empty(self): self.assertListEqual(self.cli.get_list_users(), []) - def test_grant_admin_privileges(self): - example_response = '{"results":[{}]}' - - with requests_mock.Mocker() as m: - m.register_uri( - requests_mock.GET, - "http://localhost:8086/query", - text=example_response - ) - self.cli.grant_admin_privileges('test') - - self.assertEqual( - m.last_request.qs['q'][0], - 'grant all privileges to test' - ) - - @raises(Exception) - def test_grant_admin_privileges_invalid(self): - cli = InfluxDBClient('host', 8086, 'username', 'password') - with _mocked_session(cli, 'get', 400): - self.cli.grant_admin_privileges('') - def test_revoke_admin_privileges(self): example_response = '{"results":[{}]}' diff --git a/tests/influxdb/client_test_with_server.py b/tests/influxdb/client_test_with_server.py index a298d3de..0d4f36f7 100644 --- a/tests/influxdb/client_test_with_server.py +++ b/tests/influxdb/client_test_with_server.py @@ -109,7 +109,7 @@ def point(serie_name, timestamp=None, tags=None, **fields): }, "time": "2009-11-10T23:01:35Z", "fields": { - "value": 33 + "value": 33.0 } } ] @@ -380,6 +380,12 @@ def test_create_user(self): self.assertIn({'user': 'test_user', 'admin': False}, rsp) + def test_create_user_admin(self): + self.cli.create_user('test_user', 'secret_password', True) + rsp = list(self.cli.query("SHOW USERS")['results']) + self.assertIn({'user': 'test_user', 'admin': True}, + rsp) + def test_create_user_blank_password(self): self.cli.create_user('test_user', '') rsp = list(self.cli.query("SHOW USERS")['results']) @@ -439,26 +445,9 @@ def test_drop_user_invalid(self): 'found invalid, expected', ctx.exception.content) - @unittest.skip("Broken as of 0.9.0") - def test_grant_admin_privileges(self): - self.cli.create_user('test', 'test') - self.assertEqual([{'user': 'test', 'admin': False}], - self.cli.get_list_users()) - self.cli.grant_admin_privileges('test') - self.assertEqual([{'user': 'test', 'admin': True}], - self.cli.get_list_users()) - - def test_grant_admin_privileges_invalid(self): - with self.assertRaises(InfluxDBClientError) as ctx: - self.cli.grant_admin_privileges('') - self.assertEqual(400, ctx.exception.code) - self.assertIn('{"error":"error parsing query: ', - ctx.exception.content) - @unittest.skip("Broken as of 0.9.0") def test_revoke_admin_privileges(self): - self.cli.create_user('test', 'test') - self.cli.grant_admin_privileges('test') + self.cli.create_user('test', 'test', admin=True) self.assertEqual([{'user': 'test', 'admin': True}], self.cli.get_list_users()) self.cli.revoke_admin_privileges('test') @@ -518,23 +507,13 @@ def test_write(self): params={'db': 'db'}, )) - @unittest.skip("fail against real server instance, " - "don't know if it should succeed actually..") def test_write_check_read(self): self.test_write() - # hmmmm damn, - # after write has returned, if we directly query for the data it's not - # directly available.. (don't know if this is expected behavior ( - # but it maybe)) - # So we have to : - time.sleep(5) - # so that then the data is available through select : + time.sleep(1) rsp = self.cli.query('SELECT * FROM cpu_load_short', database='db') - self.assertEqual( - {'cpu_load_short': [ - {'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]}, - rsp - ) + self.assertListEqual([{'value': 0.64, + 'time': '2009-11-10T23:00:00Z'}], + list(rsp.get_points())) def test_write_points(self): self.assertIs(True, self.cli.write_points(dummy_point)) @@ -692,19 +671,18 @@ def test_get_list_series_and_delete(self): rsp ) - @unittest.skip("broken on 0.9.0") + def test_delete_series_invalid(self): + with self.assertRaises(InfluxDBClientError): + self.cli.delete_series() + def test_delete_series(self): - self.assertEqual( - len(self.cli.get_list_series()), 0 - ) - self.cli.write_points(dummy_point) - self.assertEqual( - len(self.cli.get_list_series()), 1 - ) - self.cli.delete_series(1) - self.assertEqual( - len(self.cli.get_list_series()), 0 - ) + self.assertEqual(len(self.cli.get_list_series()), 0) + self.cli.write_points(dummy_points) + self.assertEqual(len(self.cli.get_list_series()), 2) + self.cli.delete_series(measurement='cpu_load_short') + self.assertEqual(len(self.cli.get_list_series()), 1) + self.cli.delete_series(tags={'region': 'us-west'}) + self.assertEqual(len(self.cli.get_list_series()), 0) @unittest.skip("Broken as of 0.9.0") def test_get_list_series_DF(self): From ab590e33d4eb0a02c6b7598fb2d727637fb2c86a Mon Sep 17 00:00:00 2001 From: aviau Date: Sun, 5 Jul 2015 10:21:28 -0400 Subject: [PATCH 15/17] Travis.yml: Updated to InfluxDB 0.9.1 --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 564ce172..70e210aa 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ env: install: - sudo pip install tox - sudo pip install coveralls - - wget http://get.influxdb.org/influxdb_0.9.0_amd64.deb && sudo dpkg -i influxdb_0.9.0_amd64.deb + - wget https://s3.amazonaws.com/influxdb/influxdb_0.9.1_amd64.deb && sudo dpkg -i influxdb_0.9.1_amd64.deb script: - travis_wait 30 tox -e $TOX_ENV after_success: From 32f5bed33c0ef5ac5cf0762b3bbc3a64498a1a1d Mon Sep 17 00:00:00 2001 From: aviau Date: Mon, 6 Jul 2015 09:06:13 -0400 Subject: [PATCH 16/17] 'time.sleep(0.1)' before returning free port --- tests/influxdb/misc.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/influxdb/misc.py b/tests/influxdb/misc.py index 4761d0e9..70e16f11 100644 --- a/tests/influxdb/misc.py +++ b/tests/influxdb/misc.py @@ -1,6 +1,7 @@ import socket +import time def get_free_port(ip='127.0.0.1'): @@ -12,6 +13,10 @@ def get_free_port(ip='127.0.0.1'): finally: sock.close() + # Is there a better way than a sleep? + # There were issues on Travis where the port was not yet free. + time.sleep(0.1) + def is_port_open(port, ip='127.0.0.1'): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) From 1775b244206d678cc08cb7fd1bf4c1a632cde89b Mon Sep 17 00:00:00 2001 From: aviau Date: Tue, 7 Jul 2015 09:21:47 -0400 Subject: [PATCH 17/17] line_protocol: Don't replace when decoding utf-8 --- influxdb/line_protocol.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py index 2b99d469..266cf59e 100644 --- a/influxdb/line_protocol.py +++ b/influxdb/line_protocol.py @@ -53,7 +53,7 @@ def _force_text(data): Try to return a text aka unicode object from the given data. """ if isinstance(data, binary_type): - return data.decode('utf-8', 'replace') + return data.decode('utf-8') else: return data