From 078350e35f43e63a4183b2c36cca9de1c54f09cf Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Sun, 21 Jun 2015 12:08:32 +0200 Subject: [PATCH 1/8] initial implementation of line protocol as introduced in https://github.com/influxdb/influxdb/pull/2696 --- influxdb/client.py | 23 +++------ influxdb/line_protocol.py | 103 ++++++++++++++++++++++++++++++++++++++ requirements.txt | 4 +- 3 files changed, 114 insertions(+), 16 deletions(-) create mode 100644 influxdb/line_protocol.py diff --git a/influxdb/client.py b/influxdb/client.py index 007115fc..0187e617 100755 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -11,6 +11,7 @@ import requests.exceptions from sys import version_info +from influxdb.line_protocol import make_lines from influxdb.resultset import ResultSet try: @@ -221,14 +222,7 @@ def request(self, url, method='GET', params=None, data=None, if params is None: params = {} - auth = { - 'u': self._username, - 'p': self._password - } - - params.update(auth) - - if data is not None and not isinstance(data, str): + if isinstance(data, dict) or isinstance(data, list): data = json.dumps(data) # Try to send the request a maximum of three times. (see #103) @@ -238,6 +232,7 @@ def request(self, url, method='GET', params=None, data=None, response = self._session.request( method=method, url=url, + auth=(self._username, self._password), params=params, data=data, headers=self._headers, @@ -270,10 +265,10 @@ def write(self, data, params=None, expected_response_code=204): :rtype: bool """ self.request( - url="write", + url="write_points", method='POST', params=params, - data=data, + data=make_lines(data).encode('utf-8'), expected_response_code=expected_response_code ) return True @@ -396,13 +391,12 @@ def _write_points(self, if tags: data['tags'] = tags - data['database'] = database or self._database - if self.use_udp: self.send_packet(data) else: self.write( data=data, + params={'db': database or self._database}, expected_response_code=204 ) @@ -679,9 +673,8 @@ def send_packet(self, packet): :param packet: the packet to be sent :type packet: dict """ - data = json.dumps(packet) - byte = data.encode('utf-8') - self.udp_socket.sendto(byte, (self._host, self.udp_port)) + data = make_lines(packet).encode('utf-8') + self.udp_socket.sendto(data, (self._host, self.udp_port)) class InfluxDBClusterClient(object): diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py new file mode 100644 index 00000000..11fb6d1e --- /dev/null +++ b/influxdb/line_protocol.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from copy import copy +from datetime import datetime +from time import mktime + +from dateutil.parser import parse +from pytz import utc +from six import binary_type, text_type + + +def _convert_timestamp(timestamp): + if isinstance(timestamp, int): + return timestamp + if isinstance(_force_text(timestamp), text_type): + timestamp = parse(timestamp) + if isinstance(timestamp, datetime): + if timestamp.tzinfo: + timestamp = timestamp.astimezone(utc) + timestamp.replace(tzinfo=None) + return ( + mktime(timestamp.timetuple()) * 1e9 + + timestamp.microsecond * 1e3 + ) + raise ValueError(timestamp) + + +def _escape_tag(tag): + return tag.replace( + "\\", "\\\\" + ).replace( + " ", "\\ " + ).replace( + ",", "\\," + ).replace( + "=", "\\=" + ) + + +def _escape_value(value): + value = _force_text(value) + if isinstance(value, text_type): + return "\"{}\"".format(value.replace( + "\"", "\\\"" + )) + else: + return str(value) + + +def _force_text(data): + """ + Try to return a text aka unicode object from the given data. + """ + if isinstance(data, binary_type): + return data.decode('utf-8', 'replace') + else: + return data + + +def make_lines(data): + """ + Extracts the points from the given dict and returns a Unicode string + matching the line protocol introduced in InfluxDB 0.9.0. + """ + lines = "" + static_tags = data.get('tags', None) + for point in data['points']: + # add measurement name + lines += _escape_tag(_force_text( + point.get('measurement', data.get('measurement')) + )) + "," + + # add tags + if static_tags is None: + tags = point.get('tags', {}) + else: + tags = copy(static_tags) + tags.update(point.get('tags', {})) + # tags should be sorted client-side to take load off server + for tag_key in sorted(tags.keys()): + lines += "{key}={value},".format( + key=_escape_tag(tag_key), + value=_escape_tag(tags[tag_key]), + ) + lines = lines[:-1] + " " # strip the trailing comma + + # add fields + for field_key in sorted(point['fields'].keys()): + lines += "{key}={value},".format( + key=_escape_tag(field_key), + value=_escape_value(point['fields'][field_key]), + ) + lines = lines[:-1] # strip the trailing comma + + # add timestamp + if 'timestamp' in point: + lines += " " + _force_text(str(int( + _convert_timestamp(point['timestamp']) + ))) + + lines += "\n" + return lines diff --git a/requirements.txt b/requirements.txt index 45cc6284..3445ca42 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ +python-dateutil>=2.0.0 +pytz requests>=1.0.3 -six==1.9.0 \ No newline at end of file +six==1.9.0 From 9b753de079d3b46f8a3ada1f27f791d739b44f47 Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Sun, 21 Jun 2015 12:09:28 +0200 Subject: [PATCH 2/8] update client tests for line protocol --- tests/influxdb/client_test.py | 29 ++++++++++------------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index 605713fb..e329eeb0 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -139,7 +139,7 @@ def test_write(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write", + "http://localhost:8086/write_points", status_code=204 ) cli = InfluxDBClient(database='db') @@ -154,21 +154,15 @@ def test_write(self): ) self.assertEqual( - json.loads(m.last_request.body), - {"database": "mydb", - "retentionPolicy": "mypolicy", - "points": [{"measurement": "cpu_load_short", - "tags": {"host": "server01", - "region": "us-west"}, - "timestamp": "2009-11-10T23:00:00Z", - "fields": {"value": 0.64}}]} + m.last_request.body, + b"cpu_load_short,host=server01,region=us-west value=0.64 1257890400000000000\n", ) def test_write_points(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write", + "http://localhost:8086/write_points", status_code=204 ) @@ -188,7 +182,7 @@ def test_write_points_toplevel_attributes(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write", + "http://localhost:8086/write_points", status_code=204 ) @@ -227,7 +221,7 @@ def test_write_points_batch(self): "fields": {"value": 12.00}}]} with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, - "http://localhost:8086/write", + "http://localhost:8086/write_points", status_code=204) cli = InfluxDBClient(database='db') cli.write_points(points=dummy_points, @@ -284,7 +278,7 @@ def test_write_points_with_precision(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write", + "http://localhost:8086/write_points", status_code=204 ) @@ -294,12 +288,9 @@ def test_write_points_with_precision(self): time_precision='n' ) - self.assertDictEqual( - {'points': self.dummy_points, - 'database': 'db', - 'precision': 'n', - }, - json.loads(m.last_request.body) + self.assertEqual( + b"cpu_load_short,host=server01,region=us-west value=0.64 1257890400000000000\n", + m.last_request.body, ) def test_write_points_bad_precision(self): From 99cc8caf8e599e86c8dd03bdc0ccbad2d8aa1a98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20=C5=A0tetiar?= Date: Wed, 24 Jun 2015 17:19:14 +0200 Subject: [PATCH 3/8] Fixes for line-protocol feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Working again with https://github.com/influxdb/influxdb/commit/246ce61b85d3d7b1eb5da354c35e2c4b4b42020e Signed-off-by: Petr Štetiar --- influxdb/client.py | 16 ++++++++++++---- tests/influxdb/client_test.py | 10 +++++----- 2 files changed, 17 insertions(+), 9 deletions(-) mode change 100755 => 100644 influxdb/client.py diff --git a/influxdb/client.py b/influxdb/client.py old mode 100755 new mode 100644 index 0187e617..3e87b739 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -198,7 +198,7 @@ def switch_user(self, username, password): self._password = password def request(self, url, method='GET', params=None, data=None, - expected_response_code=200): + expected_response_code=200, headers=None): """Make a HTTP request to the InfluxDB API. :param url: the path of the HTTP request, e.g. write, query, etc. @@ -219,6 +219,9 @@ def request(self, url, method='GET', params=None, data=None, """ url = "{0}/{1}".format(self._baseurl, url) + if headers is None: + headers = self._headers + if params is None: params = {} @@ -235,7 +238,7 @@ def request(self, url, method='GET', params=None, data=None, auth=(self._username, self._password), params=params, data=data, - headers=self._headers, + headers=headers, verify=self._verify_ssl, timeout=self._timeout ) @@ -264,12 +267,17 @@ def write(self, data, params=None, expected_response_code=204): :returns: True, if the write operation is successful :rtype: bool """ + + headers = self._headers + headers['Content-type'] = 'application/octet-stream' + self.request( - url="write_points", + url="write", method='POST', params=params, data=make_lines(data).encode('utf-8'), - expected_response_code=expected_response_code + expected_response_code=expected_response_code, + headers=headers ) return True diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index e329eeb0..8e3ef5f8 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -139,7 +139,7 @@ def test_write(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write_points", + "http://localhost:8086/write", status_code=204 ) cli = InfluxDBClient(database='db') @@ -162,7 +162,7 @@ def test_write_points(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write_points", + "http://localhost:8086/write", status_code=204 ) @@ -182,7 +182,7 @@ def test_write_points_toplevel_attributes(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write_points", + "http://localhost:8086/write", status_code=204 ) @@ -221,7 +221,7 @@ def test_write_points_batch(self): "fields": {"value": 12.00}}]} with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, - "http://localhost:8086/write_points", + "http://localhost:8086/write", status_code=204) cli = InfluxDBClient(database='db') cli.write_points(points=dummy_points, @@ -278,7 +278,7 @@ def test_write_points_with_precision(self): with requests_mock.Mocker() as m: m.register_uri( requests_mock.POST, - "http://localhost:8086/write_points", + "http://localhost:8086/write", status_code=204 ) From ca10f5e31bf4a01443f6847b04c5c8e3ae01f6a4 Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Wed, 24 Jun 2015 22:47:48 +0200 Subject: [PATCH 4/8] line protocol: fix timestamp UTC conversion --- influxdb/line_protocol.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py index 11fb6d1e..27eafa0f 100644 --- a/influxdb/line_protocol.py +++ b/influxdb/line_protocol.py @@ -1,9 +1,9 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals +from calendar import timegm from copy import copy from datetime import datetime -from time import mktime from dateutil.parser import parse from pytz import utc @@ -20,7 +20,7 @@ def _convert_timestamp(timestamp): timestamp = timestamp.astimezone(utc) timestamp.replace(tzinfo=None) return ( - mktime(timestamp.timetuple()) * 1e9 + + timegm(timestamp.timetuple()) * 1e9 + timestamp.microsecond * 1e3 ) raise ValueError(timestamp) From f02967766a3f70e4efa6df2c8878ec27242cd01a Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Wed, 24 Jun 2015 22:48:47 +0200 Subject: [PATCH 5/8] the timestamp field is called 'time' --- influxdb/line_protocol.py | 4 ++-- tests/influxdb/client_test.py | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py index 27eafa0f..2b99d469 100644 --- a/influxdb/line_protocol.py +++ b/influxdb/line_protocol.py @@ -94,9 +94,9 @@ def make_lines(data): lines = lines[:-1] # strip the trailing comma # add timestamp - if 'timestamp' in point: + if 'time' in point: lines += " " + _force_text(str(int( - _convert_timestamp(point['timestamp']) + _convert_timestamp(point['time']) ))) lines += "\n" diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index 8e3ef5f8..ca913860 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -88,7 +88,7 @@ def setUp(self): "host": "server01", "region": "us-west" }, - "timestamp": "2009-11-10T23:00:00Z", + "time": "2009-11-10T23:00:00Z", "fields": { "value": 0.64 } @@ -149,7 +149,7 @@ def test_write(self): "points": [{"measurement": "cpu_load_short", "tags": {"host": "server01", "region": "us-west"}, - "timestamp": "2009-11-10T23:00:00Z", + "time": "2009-11-10T23:00:00Z", "fields": {"value": 0.64}}]} ) @@ -206,18 +206,18 @@ def test_write_points_toplevel_attributes(self): def test_write_points_batch(self): dummy_points = [ {"measurement": "cpu_usage", "tags": {"unit": "percent"}, - "timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, {"measurement": "network", "tags": {"direction": "in"}, - "timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, {"measurement": "network", "tags": {"direction": "out"}, - "timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} ] expected_last_body = {"tags": {"host": "server01", "region": "us-west"}, "database": "db", "points": [{"measurement": "network", "tags": {"direction": "out"}, - "timestamp": "2009-11-10T23:00:00Z", + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}]} with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, From dae6a8a69a86d452f7ffe2a0603d6b581fd31d1f Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Wed, 24 Jun 2015 22:49:15 +0200 Subject: [PATCH 6/8] fix some expected timestamps --- tests/influxdb/client_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index ca913860..45532907 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -155,7 +155,7 @@ def test_write(self): self.assertEqual( m.last_request.body, - b"cpu_load_short,host=server01,region=us-west value=0.64 1257890400000000000\n", + b"cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", ) def test_write_points(self): @@ -289,7 +289,7 @@ def test_write_points_with_precision(self): ) self.assertEqual( - b"cpu_load_short,host=server01,region=us-west value=0.64 1257890400000000000\n", + b"cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", m.last_request.body, ) From 7fae65ff0fcabb998b02d22c136edb19ffd4da7f Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Sat, 27 Jun 2015 01:32:31 +0200 Subject: [PATCH 7/8] set precision and retention policy in GET params --- influxdb/client.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 3e87b739..c295a424 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -390,21 +390,25 @@ def _write_points(self, 'points': points } + if tags: + data['tags'] = tags + + params = { + 'db': database or self._database + } + if time_precision: - data['precision'] = time_precision + params['precision'] = time_precision if retention_policy: - data['retentionPolicy'] = retention_policy - - if tags: - data['tags'] = tags + params['rp'] = retention_policy if self.use_udp: self.send_packet(data) else: self.write( data=data, - params={'db': database or self._database}, + params=params, expected_response_code=204 ) From 1350d05a6c60f9d916c991c3bfa1ab503cf35c3a Mon Sep 17 00:00:00 2001 From: Torsten Rehn Date: Sat, 27 Jun 2015 01:39:19 +0200 Subject: [PATCH 8/8] update some more test cases --- tests/influxdb/client_test.py | 20 ++---- tests/influxdb/client_test_with_server.py | 7 ++- tests/influxdb/dataframe_client_test.py | 74 +++++------------------ 3 files changed, 26 insertions(+), 75 deletions(-) diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index 45532907..3d40d9a6 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -170,12 +170,9 @@ def test_write_points(self): cli.write_points( self.dummy_points, ) - self.assertDictEqual( - { - "database": "db", - "points": self.dummy_points, - }, - json.loads(m.last_request.body) + self.assertEqual( + "cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n", + m.last_request.body.decode('utf-8'), ) def test_write_points_toplevel_attributes(self): @@ -193,14 +190,9 @@ def test_write_points_toplevel_attributes(self): tags={"tag": "hello"}, retention_policy="somepolicy" ) - self.assertDictEqual( - { - "database": "testdb", - "tags": {"tag": "hello"}, - "points": self.dummy_points, - "retentionPolicy": "somepolicy" - }, - json.loads(m.last_request.body) + self.assertEqual( + "cpu_load_short,host=server01,region=us-west,tag=hello value=0.64 1257894000000000000\n", + m.last_request.body.decode('utf-8'), ) def test_write_points_batch(self): diff --git a/tests/influxdb/client_test_with_server.py b/tests/influxdb/client_test_with_server.py index 8c0bdc2d..1238e5cb 100644 --- a/tests/influxdb/client_test_with_server.py +++ b/tests/influxdb/client_test_with_server.py @@ -504,9 +504,10 @@ class CommonTests(ManyTestCasesWithServerMixin, influxdb_template_conf = os.path.join(THIS_DIR, 'influxdb.conf.template') def test_write(self): - new_dummy_point = dummy_point[0].copy() - new_dummy_point['database'] = 'db' - self.assertIs(True, self.cli.write(new_dummy_point)) + self.assertIs(True, self.cli.write( + {'points': dummy_point}, + params={'db': 'db'}, + )) @unittest.skip("fail against real server instance, " "don't know if it should succeed actually..") diff --git a/tests/influxdb/dataframe_client_test.py b/tests/influxdb/dataframe_client_test.py index 61b411fd..8f7e8e81 100644 --- a/tests/influxdb/dataframe_client_test.py +++ b/tests/influxdb/dataframe_client_test.py @@ -31,24 +31,10 @@ def test_write_points_from_dataframe(self): index=[now, now + timedelta(hours=1)], columns=["column_one", "column_two", "column_three"]) - expected = { - 'database': 'db', - 'points': [ - {'time': '1970-01-01T00:00:00+00:00', - 'fields': { - 'column_two': 1, - 'column_three': 1.0, - 'column_one': '1'}, - 'tags': {}, - 'measurement': 'foo'}, - {'time': '1970-01-01T01:00:00+00:00', - 'fields': { - 'column_two': 2, - 'column_three': 2.0, - 'column_one': '2'}, - 'tags': {}, - 'measurement': 'foo'}] - } + expected = ( + b"foo column_one=\"1\",column_three=1.0,column_two=1 0\n" + b"foo column_one=\"2\",column_three=2.0,column_two=2 3600000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -58,10 +44,10 @@ def test_write_points_from_dataframe(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, 'foo') - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) cli.write_points(dataframe, 'foo', tags=None) - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_in_batches(self): now = pd.Timestamp('1970-01-01 00:00+00:00') @@ -83,24 +69,10 @@ def test_write_points_from_dataframe_with_numeric_column_names(self): dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], index=[now, now + timedelta(hours=1)]) - expected = { - 'database': 'db', - 'points': [ - {'fields': { - '0': '1', - '1': 1, - '2': 1.0}, - 'tags': {'hello': 'there'}, - 'time': '1970-01-01T00:00:00+00:00', - 'measurement': 'foo'}, - {'fields': { - '0': '2', - '1': 2, - '2': 2.0}, - 'tags': {'hello': 'there'}, - 'time': '1970-01-01T01:00:00+00:00', - 'measurement': 'foo'}], - } + expected = ( + b"foo,hello=there 0=\"1\",1=1,2=1.0 0\n" + b"foo,hello=there 0=\"2\",1=2,2=2.0 3600000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -110,7 +82,7 @@ def test_write_points_from_dataframe_with_numeric_column_names(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, "foo", {"hello": "there"}) - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_with_period_index(self): dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], @@ -118,24 +90,10 @@ def test_write_points_from_dataframe_with_period_index(self): pd.Period('1970-01-02')], columns=["column_one", "column_two", "column_three"]) - expected = { - 'points': [ - {'measurement': 'foo', - 'tags': {}, - 'fields': { - 'column_one': '1', - 'column_two': 1, - 'column_three': 1.0}, - 'time': '1970-01-01T00:00:00+00:00'}, - {'measurement': 'foo', - 'tags': {}, - 'fields': { - 'column_one': '2', - 'column_two': 2, - 'column_three': 2.0}, - 'time': '1970-01-02T00:00:00+00:00'}], - 'database': 'db', - } + expected = ( + b"foo column_one=\"1\",column_three=1.0,column_two=1 0\n" + b"foo column_one=\"2\",column_three=2.0,column_two=2 86400000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -145,7 +103,7 @@ def test_write_points_from_dataframe_with_period_index(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, "foo") - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_with_time_precision(self): now = pd.Timestamp('1970-01-01 00:00+00:00')