diff --git a/influxdb/client.py b/influxdb/client.py old mode 100755 new mode 100644 index 007115fc..c295a424 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -11,6 +11,7 @@ import requests.exceptions from sys import version_info +from influxdb.line_protocol import make_lines from influxdb.resultset import ResultSet try: @@ -197,7 +198,7 @@ def switch_user(self, username, password): self._password = password def request(self, url, method='GET', params=None, data=None, - expected_response_code=200): + expected_response_code=200, headers=None): """Make a HTTP request to the InfluxDB API. :param url: the path of the HTTP request, e.g. write, query, etc. @@ -218,17 +219,13 @@ def request(self, url, method='GET', params=None, data=None, """ url = "{0}/{1}".format(self._baseurl, url) + if headers is None: + headers = self._headers + if params is None: params = {} - auth = { - 'u': self._username, - 'p': self._password - } - - params.update(auth) - - if data is not None and not isinstance(data, str): + if isinstance(data, dict) or isinstance(data, list): data = json.dumps(data) # Try to send the request a maximum of three times. (see #103) @@ -238,9 +235,10 @@ def request(self, url, method='GET', params=None, data=None, response = self._session.request( method=method, url=url, + auth=(self._username, self._password), params=params, data=data, - headers=self._headers, + headers=headers, verify=self._verify_ssl, timeout=self._timeout ) @@ -269,12 +267,17 @@ def write(self, data, params=None, expected_response_code=204): :returns: True, if the write operation is successful :rtype: bool """ + + headers = self._headers + headers['Content-type'] = 'application/octet-stream' + self.request( url="write", method='POST', params=params, - data=data, - expected_response_code=expected_response_code + data=make_lines(data).encode('utf-8'), + expected_response_code=expected_response_code, + headers=headers ) return True @@ -387,22 +390,25 @@ def _write_points(self, 'points': points } - if time_precision: - data['precision'] = time_precision - - if retention_policy: - data['retentionPolicy'] = retention_policy - if tags: data['tags'] = tags - data['database'] = database or self._database + params = { + 'db': database or self._database + } + + if time_precision: + params['precision'] = time_precision + + if retention_policy: + params['rp'] = retention_policy if self.use_udp: self.send_packet(data) else: self.write( data=data, + params=params, expected_response_code=204 ) @@ -679,9 +685,8 @@ def send_packet(self, packet): :param packet: the packet to be sent :type packet: dict """ - data = json.dumps(packet) - byte = data.encode('utf-8') - self.udp_socket.sendto(byte, (self._host, self.udp_port)) + data = make_lines(packet).encode('utf-8') + self.udp_socket.sendto(data, (self._host, self.udp_port)) class InfluxDBClusterClient(object): diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py new file mode 100644 index 00000000..2b99d469 --- /dev/null +++ b/influxdb/line_protocol.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from calendar import timegm +from copy import copy +from datetime import datetime + +from dateutil.parser import parse +from pytz import utc +from six import binary_type, text_type + + +def _convert_timestamp(timestamp): + if isinstance(timestamp, int): + return timestamp + if isinstance(_force_text(timestamp), text_type): + timestamp = parse(timestamp) + if isinstance(timestamp, datetime): + if timestamp.tzinfo: + timestamp = timestamp.astimezone(utc) + timestamp.replace(tzinfo=None) + return ( + timegm(timestamp.timetuple()) * 1e9 + + timestamp.microsecond * 1e3 + ) + raise ValueError(timestamp) + + +def _escape_tag(tag): + return tag.replace( + "\\", "\\\\" + ).replace( + " ", "\\ " + ).replace( + ",", "\\," + ).replace( + "=", "\\=" + ) + + +def _escape_value(value): + value = _force_text(value) + if isinstance(value, text_type): + return "\"{}\"".format(value.replace( + "\"", "\\\"" + )) + else: + return str(value) + + +def _force_text(data): + """ + Try to return a text aka unicode object from the given data. + """ + if isinstance(data, binary_type): + return data.decode('utf-8', 'replace') + else: + return data + + +def make_lines(data): + """ + Extracts the points from the given dict and returns a Unicode string + matching the line protocol introduced in InfluxDB 0.9.0. + """ + lines = "" + static_tags = data.get('tags', None) + for point in data['points']: + # add measurement name + lines += _escape_tag(_force_text( + point.get('measurement', data.get('measurement')) + )) + "," + + # add tags + if static_tags is None: + tags = point.get('tags', {}) + else: + tags = copy(static_tags) + tags.update(point.get('tags', {})) + # tags should be sorted client-side to take load off server + for tag_key in sorted(tags.keys()): + lines += "{key}={value},".format( + key=_escape_tag(tag_key), + value=_escape_tag(tags[tag_key]), + ) + lines = lines[:-1] + " " # strip the trailing comma + + # add fields + for field_key in sorted(point['fields'].keys()): + lines += "{key}={value},".format( + key=_escape_tag(field_key), + value=_escape_value(point['fields'][field_key]), + ) + lines = lines[:-1] # strip the trailing comma + + # add timestamp + if 'time' in point: + lines += " " + _force_text(str(int( + _convert_timestamp(point['time']) + ))) + + lines += "\n" + return lines diff --git a/requirements.txt b/requirements.txt index 45cc6284..3445ca42 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ +python-dateutil>=2.0.0 +pytz requests>=1.0.3 -six==1.9.0 \ No newline at end of file +six==1.9.0 diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index 605713fb..3d40d9a6 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -88,7 +88,7 @@ def setUp(self): "host": "server01", "region": "us-west" }, - "timestamp": "2009-11-10T23:00:00Z", + "time": "2009-11-10T23:00:00Z", "fields": { "value": 0.64 } @@ -149,19 +149,13 @@ def test_write(self): "points": [{"measurement": "cpu_load_short", "tags": {"host": "server01", "region": "us-west"}, - "timestamp": "2009-11-10T23:00:00Z", + "time": "2009-11-10T23:00:00Z", "fields": {"value": 0.64}}]} ) self.assertEqual( - json.loads(m.last_request.body), - {"database": "mydb", - "retentionPolicy": "mypolicy", - "points": [{"measurement": "cpu_load_short", - "tags": {"host": "server01", - "region": "us-west"}, - "timestamp": "2009-11-10T23:00:00Z", - "fields": {"value": 0.64}}]} + m.last_request.body, + b"cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", ) def test_write_points(self): @@ -176,12 +170,9 @@ def test_write_points(self): cli.write_points( self.dummy_points, ) - self.assertDictEqual( - { - "database": "db", - "points": self.dummy_points, - }, - json.loads(m.last_request.body) + self.assertEqual( + "cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", + m.last_request.body.decode('utf-8'), ) def test_write_points_toplevel_attributes(self): @@ -199,31 +190,26 @@ def test_write_points_toplevel_attributes(self): tags={"tag": "hello"}, retention_policy="somepolicy" ) - self.assertDictEqual( - { - "database": "testdb", - "tags": {"tag": "hello"}, - "points": self.dummy_points, - "retentionPolicy": "somepolicy" - }, - json.loads(m.last_request.body) + self.assertEqual( + "cpu_load_short,host=server01,region=us-west,tag=hello value=0.64 1257894000000000000\n", + m.last_request.body.decode('utf-8'), ) def test_write_points_batch(self): dummy_points = [ {"measurement": "cpu_usage", "tags": {"unit": "percent"}, - "timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, {"measurement": "network", "tags": {"direction": "in"}, - "timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, {"measurement": "network", "tags": {"direction": "out"}, - "timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} ] expected_last_body = {"tags": {"host": "server01", "region": "us-west"}, "database": "db", "points": [{"measurement": "network", "tags": {"direction": "out"}, - "timestamp": "2009-11-10T23:00:00Z", + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}]} with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -294,12 +280,9 @@ def test_write_points_with_precision(self): time_precision='n' ) - self.assertDictEqual( - {'points': self.dummy_points, - 'database': 'db', - 'precision': 'n', - }, - json.loads(m.last_request.body) + self.assertEqual( + b"cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", + m.last_request.body, ) def test_write_points_bad_precision(self): diff --git a/tests/influxdb/client_test_with_server.py b/tests/influxdb/client_test_with_server.py index 8c0bdc2d..1238e5cb 100644 --- a/tests/influxdb/client_test_with_server.py +++ b/tests/influxdb/client_test_with_server.py @@ -504,9 +504,10 @@ class CommonTests(ManyTestCasesWithServerMixin, influxdb_template_conf = os.path.join(THIS_DIR, 'influxdb.conf.template') def test_write(self): - new_dummy_point = dummy_point[0].copy() - new_dummy_point['database'] = 'db' - self.assertIs(True, self.cli.write(new_dummy_point)) + self.assertIs(True, self.cli.write( + {'points': dummy_point}, + params={'db': 'db'}, + )) @unittest.skip("fail against real server instance, " "don't know if it should succeed actually..") diff --git a/tests/influxdb/dataframe_client_test.py b/tests/influxdb/dataframe_client_test.py index 61b411fd..8f7e8e81 100644 --- a/tests/influxdb/dataframe_client_test.py +++ b/tests/influxdb/dataframe_client_test.py @@ -31,24 +31,10 @@ def test_write_points_from_dataframe(self): index=[now, now + timedelta(hours=1)], columns=["column_one", "column_two", "column_three"]) - expected = { - 'database': 'db', - 'points': [ - {'time': '1970-01-01T00:00:00+00:00', - 'fields': { - 'column_two': 1, - 'column_three': 1.0, - 'column_one': '1'}, - 'tags': {}, - 'measurement': 'foo'}, - {'time': '1970-01-01T01:00:00+00:00', - 'fields': { - 'column_two': 2, - 'column_three': 2.0, - 'column_one': '2'}, - 'tags': {}, - 'measurement': 'foo'}] - } + expected = ( + b"foo column_one=\"1\",column_three=1.0,column_two=1 0\n" + b"foo column_one=\"2\",column_three=2.0,column_two=2 3600000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -58,10 +44,10 @@ def test_write_points_from_dataframe(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, 'foo') - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) cli.write_points(dataframe, 'foo', tags=None) - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_in_batches(self): now = pd.Timestamp('1970-01-01 00:00+00:00') @@ -83,24 +69,10 @@ def test_write_points_from_dataframe_with_numeric_column_names(self): dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], index=[now, now + timedelta(hours=1)]) - expected = { - 'database': 'db', - 'points': [ - {'fields': { - '0': '1', - '1': 1, - '2': 1.0}, - 'tags': {'hello': 'there'}, - 'time': '1970-01-01T00:00:00+00:00', - 'measurement': 'foo'}, - {'fields': { - '0': '2', - '1': 2, - '2': 2.0}, - 'tags': {'hello': 'there'}, - 'time': '1970-01-01T01:00:00+00:00', - 'measurement': 'foo'}], - } + expected = ( + b"foo,hello=there 0=\"1\",1=1,2=1.0 0\n" + b"foo,hello=there 0=\"2\",1=2,2=2.0 3600000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -110,7 +82,7 @@ def test_write_points_from_dataframe_with_numeric_column_names(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, "foo", {"hello": "there"}) - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_with_period_index(self): dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], @@ -118,24 +90,10 @@ def test_write_points_from_dataframe_with_period_index(self): pd.Period('1970-01-02')], columns=["column_one", "column_two", "column_three"]) - expected = { - 'points': [ - {'measurement': 'foo', - 'tags': {}, - 'fields': { - 'column_one': '1', - 'column_two': 1, - 'column_three': 1.0}, - 'time': '1970-01-01T00:00:00+00:00'}, - {'measurement': 'foo', - 'tags': {}, - 'fields': { - 'column_one': '2', - 'column_two': 2, - 'column_three': 2.0}, - 'time': '1970-01-02T00:00:00+00:00'}], - 'database': 'db', - } + expected = ( + b"foo column_one=\"1\",column_three=1.0,column_two=1 0\n" + b"foo column_one=\"2\",column_three=2.0,column_two=2 86400000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -145,7 +103,7 @@ def test_write_points_from_dataframe_with_period_index(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, "foo") - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_with_time_precision(self): now = pd.Timestamp('1970-01-01 00:00+00:00')