diff --git a/.travis.yml b/.travis.yml index 63ec221f..b7e4db11 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ env: install: - sudo pip install tox - sudo pip install coveralls - - wget http://get.influxdb.org/influxdb_0.9.0_amd64.deb && sudo dpkg -i influxdb_0.9.0_amd64.deb + - wget https://s3.amazonaws.com/influxdb/influxdb_0.9.1_amd64.deb && sudo dpkg -i influxdb_0.9.1_amd64.deb script: - travis_wait 30 tox -e $TOX_ENV after_success: diff --git a/influxdb/client.py b/influxdb/client.py old mode 100755 new mode 100644 index 624cafb7..409c9cc5 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -11,6 +11,7 @@ import requests.exceptions from sys import version_info +from influxdb.line_protocol import make_lines from influxdb.resultset import ResultSet from .exceptions import InfluxDBClientError from .exceptions import InfluxDBServerError @@ -96,7 +97,8 @@ def __init__(self, self._headers = { 'Content-type': 'application/json', - 'Accept': 'text/plain'} + 'Accept': 'text/plain' + } @staticmethod def from_DSN(dsn, **kwargs): @@ -182,7 +184,7 @@ def switch_user(self, username, password): self._password = password def request(self, url, method='GET', params=None, data=None, - expected_response_code=200): + expected_response_code=200, headers=None): """Make a HTTP request to the InfluxDB API. :param url: the path of the HTTP request, e.g. write, query, etc. @@ -203,17 +205,13 @@ def request(self, url, method='GET', params=None, data=None, """ url = "{0}/{1}".format(self._baseurl, url) + if headers is None: + headers = self._headers + if params is None: params = {} - auth = { - 'u': self._username, - 'p': self._password - } - - params.update(auth) - - if data is not None and not isinstance(data, str): + if isinstance(data, (dict, list)): data = json.dumps(data) # Try to send the request a maximum of three times. (see #103) @@ -223,9 +221,10 @@ def request(self, url, method='GET', params=None, data=None, response = self._session.request( method=method, url=url, + auth=(self._username, self._password), params=params, data=data, - headers=self._headers, + headers=headers, verify=self._verify_ssl, timeout=self._timeout ) @@ -254,18 +253,24 @@ def write(self, data, params=None, expected_response_code=204): :returns: True, if the write operation is successful :rtype: bool """ + + headers = self._headers + headers['Content-type'] = 'application/octet-stream' + self.request( url="write", method='POST', params=params, - data=data, - expected_response_code=expected_response_code + data=make_lines(data).encode('utf-8'), + expected_response_code=expected_response_code, + headers=headers ) return True def query(self, query, params={}, + epoch=None, expected_response_code=200, database=None, raise_errors=True): @@ -294,6 +299,9 @@ def query(self, params['q'] = query params['db'] = database or self._database + if epoch is not None: + params['epoch'] = epoch + response = self.request( url="query", method='GET', @@ -391,22 +399,25 @@ def _write_points(self, 'points': points } - if time_precision: - data['precision'] = time_precision + if tags is not None: + data['tags'] = tags - if retention_policy: - data['retentionPolicy'] = retention_policy + params = { + 'db': database or self._database + } - if tags: - data['tags'] = tags + if time_precision is not None: + params['precision'] = time_precision - data['database'] = database or self._database + if retention_policy is not None: + params['rp'] = retention_policy if self.use_udp: self.send_packet(data) else: self.write( data=data, + params=params, expected_response_code=204 ) @@ -578,15 +589,20 @@ def get_list_users(self): """ return list(self.query("SHOW USERS").get_points()) - def create_user(self, username, password): + def create_user(self, username, password, admin=False): """Create a new user in InfluxDB :param username: the new username to create :type username: str :param password: the password for the new user :type password: str + :param admin: whether the user should have cluster administration + privileges or not + :type admin: boolean """ text = "CREATE USER {} WITH PASSWORD '{}'".format(username, password) + if admin: + text += ' WITH ALL PRIVILEGES' self.query(text) def drop_user(self, username): @@ -609,29 +625,27 @@ def set_user_password(self, username, password): text = "SET PASSWORD FOR {} = '{}'".format(username, password) self.query(text) - def delete_series(self, id, database=None): - """Delete series from a database. + def delete_series(self, database=None, measurement=None, tags=None): + """Delete series from a database. Series can be filtered by + measurement and tags. - :param id: the id of the series to be deleted - :type id: int + :param measurement: Delete all series from a measurement + :type id: string + :param tags: Delete all series that match given tags + :type id: dict :param database: the database from which the series should be deleted, defaults to client's current database :type database: str """ database = database or self._database - self.query('DROP SERIES %s' % id, database=database) - - def grant_admin_privileges(self, username): - """Grant cluster administration privileges to an user. - - :param username: the username to grant privileges to - :type username: str + query_str = 'DROP SERIES' + if measurement: + query_str += ' FROM "{}"'.format(measurement) - .. note:: Only a cluster administrator can create/ drop databases - and manage users. - """ - text = "GRANT ALL PRIVILEGES TO {}".format(username) - self.query(text) + if tags: + query_str += ' WHERE ' + ' and '.join(["{}='{}'".format(k, v) + for k, v in tags.items()]) + self.query(query_str, database=database) def revoke_admin_privileges(self, username): """Revoke cluster administration privileges from an user. @@ -683,9 +697,8 @@ def send_packet(self, packet): :param packet: the packet to be sent :type packet: dict """ - data = json.dumps(packet) - byte = data.encode('utf-8') - self.udp_socket.sendto(byte, (self._host, self.udp_port)) + data = make_lines(packet).encode('utf-8') + self.udp_socket.sendto(data, (self._host, self.udp_port)) class InfluxDBClusterClient(object): diff --git a/influxdb/line_protocol.py b/influxdb/line_protocol.py new file mode 100644 index 00000000..266cf59e --- /dev/null +++ b/influxdb/line_protocol.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from calendar import timegm +from copy import copy +from datetime import datetime + +from dateutil.parser import parse +from pytz import utc +from six import binary_type, text_type + + +def _convert_timestamp(timestamp): + if isinstance(timestamp, int): + return timestamp + if isinstance(_force_text(timestamp), text_type): + timestamp = parse(timestamp) + if isinstance(timestamp, datetime): + if timestamp.tzinfo: + timestamp = timestamp.astimezone(utc) + timestamp.replace(tzinfo=None) + return ( + timegm(timestamp.timetuple()) * 1e9 + + timestamp.microsecond * 1e3 + ) + raise ValueError(timestamp) + + +def _escape_tag(tag): + return tag.replace( + "\\", "\\\\" + ).replace( + " ", "\\ " + ).replace( + ",", "\\," + ).replace( + "=", "\\=" + ) + + +def _escape_value(value): + value = _force_text(value) + if isinstance(value, text_type): + return "\"{}\"".format(value.replace( + "\"", "\\\"" + )) + else: + return str(value) + + +def _force_text(data): + """ + Try to return a text aka unicode object from the given data. + """ + if isinstance(data, binary_type): + return data.decode('utf-8') + else: + return data + + +def make_lines(data): + """ + Extracts the points from the given dict and returns a Unicode string + matching the line protocol introduced in InfluxDB 0.9.0. + """ + lines = "" + static_tags = data.get('tags', None) + for point in data['points']: + # add measurement name + lines += _escape_tag(_force_text( + point.get('measurement', data.get('measurement')) + )) + "," + + # add tags + if static_tags is None: + tags = point.get('tags', {}) + else: + tags = copy(static_tags) + tags.update(point.get('tags', {})) + # tags should be sorted client-side to take load off server + for tag_key in sorted(tags.keys()): + lines += "{key}={value},".format( + key=_escape_tag(tag_key), + value=_escape_tag(tags[tag_key]), + ) + lines = lines[:-1] + " " # strip the trailing comma + + # add fields + for field_key in sorted(point['fields'].keys()): + lines += "{key}={value},".format( + key=_escape_tag(field_key), + value=_escape_value(point['fields'][field_key]), + ) + lines = lines[:-1] # strip the trailing comma + + # add timestamp + if 'time' in point: + lines += " " + _force_text(str(int( + _convert_timestamp(point['time']) + ))) + + lines += "\n" + return lines diff --git a/requirements.txt b/requirements.txt index 45cc6284..3445ca42 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ +python-dateutil>=2.0.0 +pytz requests>=1.0.3 -six==1.9.0 \ No newline at end of file +six==1.9.0 diff --git a/tests/influxdb/client_test.py b/tests/influxdb/client_test.py index ebb6ad92..43ecb68e 100644 --- a/tests/influxdb/client_test.py +++ b/tests/influxdb/client_test.py @@ -154,14 +154,9 @@ def test_write(self): ) self.assertEqual( - json.loads(m.last_request.body), - {"database": "mydb", - "retentionPolicy": "mypolicy", - "points": [{"measurement": "cpu_load_short", - "tags": {"host": "server01", - "region": "us-west"}, - "time": "2009-11-10T23:00:00Z", - "fields": {"value": 0.64}}]} + m.last_request.body, + b"cpu_load_short,host=server01,region=us-west " + b"value=0.64 1257894000000000000\n", ) def test_write_points(self): @@ -176,12 +171,10 @@ def test_write_points(self): cli.write_points( self.dummy_points, ) - self.assertDictEqual( - { - "database": "db", - "points": self.dummy_points, - }, - json.loads(m.last_request.body) + self.assertEqual( + "cpu_load_short,host=server01,region=us-west " + "value=0.64 1257894000000000000\n", + m.last_request.body.decode('utf-8'), ) def test_write_points_toplevel_attributes(self): @@ -199,14 +192,10 @@ def test_write_points_toplevel_attributes(self): tags={"tag": "hello"}, retention_policy="somepolicy" ) - self.assertDictEqual( - { - "database": "testdb", - "tags": {"tag": "hello"}, - "points": self.dummy_points, - "retentionPolicy": "somepolicy" - }, - json.loads(m.last_request.body) + self.assertEqual( + "cpu_load_short,host=server01,region=us-west,tag=hello " + "value=0.64 1257894000000000000\n", + m.last_request.body.decode('utf-8'), ) def test_write_points_batch(self): @@ -218,13 +207,11 @@ def test_write_points_batch(self): {"measurement": "network", "tags": {"direction": "out"}, "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} ] - expected_last_body = {"tags": {"host": "server01", - "region": "us-west"}, - "database": "db", - "points": [{"measurement": "network", - "tags": {"direction": "out"}, - "time": "2009-11-10T23:00:00Z", - "fields": {"value": 12.00}}]} + expected_last_body = ( + "network,direction=out,host=server01,region=us-west " + "value=12.0 1257894000000000000\n" + ) + with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, "http://localhost:8086/write", @@ -236,7 +223,8 @@ def test_write_points_batch(self): "region": "us-west"}, batch_size=2) self.assertEqual(m.call_count, 2) - self.assertEqual(expected_last_body, m.last_request.json()) + self.assertEqual(expected_last_body, + m.last_request.body.decode('utf-8')) def test_write_points_udp(self): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -251,12 +239,10 @@ def test_write_points_udp(self): received_data, addr = s.recvfrom(1024) - self.assertDictEqual( - { - "points": self.dummy_points, - "database": "test" - }, - json.loads(received_data.decode(), strict=True) + self.assertEqual( + "cpu_load_short,host=server01,region=us-west " + "value=0.64 1257894000000000000\n", + received_data.decode() ) def test_write_bad_precision_udp(self): @@ -294,12 +280,10 @@ def test_write_points_with_precision(self): time_precision='n' ) - self.assertDictEqual( - {'points': self.dummy_points, - 'database': 'db', - 'precision': 'n', - }, - json.loads(m.last_request.body) + self.assertEqual( + b"cpu_load_short,host=server01,region=us-west " + b"value=0.64 1257894000000000000\n", + m.last_request.body, ) def test_write_points_bad_precision(self): @@ -631,28 +615,6 @@ def test_get_list_users_empty(self): self.assertListEqual(self.cli.get_list_users(), []) - def test_grant_admin_privileges(self): - example_response = '{"results":[{}]}' - - with requests_mock.Mocker() as m: - m.register_uri( - requests_mock.GET, - "http://localhost:8086/query", - text=example_response - ) - self.cli.grant_admin_privileges('test') - - self.assertEqual( - m.last_request.qs['q'][0], - 'grant all privileges to test' - ) - - @raises(Exception) - def test_grant_admin_privileges_invalid(self): - cli = InfluxDBClient('host', 8086, 'username', 'password') - with _mocked_session(cli, 'get', 400): - self.cli.grant_admin_privileges('') - def test_revoke_admin_privileges(self): example_response = '{"results":[{}]}' diff --git a/tests/influxdb/client_test_with_server.py b/tests/influxdb/client_test_with_server.py index def0289f..0d4f36f7 100644 --- a/tests/influxdb/client_test_with_server.py +++ b/tests/influxdb/client_test_with_server.py @@ -15,7 +15,6 @@ import distutils.spawn from functools import partial import os -import re import shutil import subprocess import sys @@ -110,7 +109,7 @@ def point(serie_name, timestamp=None, tags=None, **fields): }, "time": "2009-11-10T23:01:35Z", "fields": { - "value": 33 + "value": 33.0 } } ] @@ -381,6 +380,12 @@ def test_create_user(self): self.assertIn({'user': 'test_user', 'admin': False}, rsp) + def test_create_user_admin(self): + self.cli.create_user('test_user', 'secret_password', True) + rsp = list(self.cli.query("SHOW USERS")['results']) + self.assertIn({'user': 'test_user', 'admin': True}, + rsp) + def test_create_user_blank_password(self): self.cli.create_user('test_user', '') rsp = list(self.cli.query("SHOW USERS")['results']) @@ -440,26 +445,9 @@ def test_drop_user_invalid(self): 'found invalid, expected', ctx.exception.content) - @unittest.skip("Broken as of 0.9.0") - def test_grant_admin_privileges(self): - self.cli.create_user('test', 'test') - self.assertEqual([{'user': 'test', 'admin': False}], - self.cli.get_list_users()) - self.cli.grant_admin_privileges('test') - self.assertEqual([{'user': 'test', 'admin': True}], - self.cli.get_list_users()) - - def test_grant_admin_privileges_invalid(self): - with self.assertRaises(InfluxDBClientError) as ctx: - self.cli.grant_admin_privileges('') - self.assertEqual(400, ctx.exception.code) - self.assertIn('{"error":"error parsing query: ', - ctx.exception.content) - @unittest.skip("Broken as of 0.9.0") def test_revoke_admin_privileges(self): - self.cli.create_user('test', 'test') - self.cli.grant_admin_privileges('test') + self.cli.create_user('test', 'test', admin=True) self.assertEqual([{'user': 'test', 'admin': True}], self.cli.get_list_users()) self.cli.revoke_admin_privileges('test') @@ -514,27 +502,18 @@ class CommonTests(ManyTestCasesWithServerMixin, influxdb_template_conf = os.path.join(THIS_DIR, 'influxdb.conf.template') def test_write(self): - new_dummy_point = dummy_point[0].copy() - new_dummy_point['database'] = 'db' - self.assertIs(True, self.cli.write(new_dummy_point)) + self.assertIs(True, self.cli.write( + {'points': dummy_point}, + params={'db': 'db'}, + )) - @unittest.skip("fail against real server instance, " - "don't know if it should succeed actually..") def test_write_check_read(self): self.test_write() - # hmmmm damn, - # after write has returned, if we directly query for the data it's not - # directly available.. (don't know if this is expected behavior ( - # but it maybe)) - # So we have to : - time.sleep(5) - # so that then the data is available through select : + time.sleep(1) rsp = self.cli.query('SELECT * FROM cpu_load_short', database='db') - self.assertEqual( - {'cpu_load_short': [ - {'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]}, - rsp - ) + self.assertListEqual([{'value': 0.64, + 'time': '2009-11-10T23:00:00Z'}], + list(rsp.get_points())) def test_write_points(self): self.assertIs(True, self.cli.write_points(dummy_point)) @@ -652,96 +631,6 @@ def test_write_points_batch(self): self.assertIn(12, net_out['series'][0]['values'][0]) self.assertIn(12.34, cpu['series'][0]['values'][0]) - def test_write_points_with_precision(self): - """ check that points written with an explicit precision have - actually that precision used. - """ - # for that we'll check that - for each precision - the actual 'time' - # value returned by a select has the correct regex format.. - # n : u'2015-03-20T15:23:36.615654966Z' - # u : u'2015-03-20T15:24:10.542554Z' - # ms : u'2015-03-20T15:24:50.878Z' - # s : u'2015-03-20T15:20:24Z' - # m : u'2015-03-20T15:25:00Z' - # h : u'2015-03-20T15:00:00Z' - base_regex = '\d{4}-\d{2}-\d{2}T\d{2}:' # YYYY-MM-DD 'T' hh: - base_s_regex = base_regex + '\d{2}:\d{2}' # base_regex + mm:ss - - point = { - "measurement": "cpu_load_short", - "tags": { - "host": "server01", - "region": "us-west" - }, - "time": "2009-11-10T12:34:56.123456789Z", - "fields": { - "value": 0.64 - } - } - - # As far as we can see the values aren't directly available depending - # on the precision used. - # The less the precision, the more to wait for the value to be - # actually written/available. - for idx, (precision, expected_regex, sleep_time) in enumerate(( - ('n', base_s_regex + '\.\d{9}Z', 1), - ('u', base_s_regex + '\.\d{6}Z', 1), - ('ms', base_s_regex + '\.\d{3}Z', 1), - ('s', base_s_regex + 'Z', 1), - - # ('h', base_regex + '00:00Z', ), - # that would require a sleep of possibly up to 3600 secs (/ 2 ?).. - )): - db = 'db1' # to not shoot us in the foot/head, - # we work on a fresh db each time: - self.cli.create_database(db) - before = datetime.datetime.now() - self.assertIs( - True, - self.cli.write_points( - [point], - time_precision=precision, - database=db)) - - # sys.stderr.write('checking presision with %r : - # before=%s\n' % (precision, before)) - after = datetime.datetime.now() - - if sleep_time > 1: - sleep_time -= (after if before.min != after.min - else before).second - - start = time.time() - timeout = start + sleep_time - # sys.stderr.write('should sleep %s ..\n' % sleep_time) - while time.time() < timeout: - rsp = self.cli.query('SELECT * FROM cpu_load_short', - database=db) - if rsp != {'cpu_load_short': []}: - # sys.stderr.write('already ? only slept %s\n' % ( - # time.time() - start)) - break - time.sleep(1) - else: - pass - # sys.stderr.write('ok !\n') - - # sys.stderr.write('sleeping %s..\n' % sleep_time) - - if sleep_time: - time.sleep(sleep_time) - - rsp = self.cli.query('SELECT * FROM cpu_load_short', database=db) - # sys.stderr.write('precision=%s rsp_timestamp = %r\n' % ( - # precision, rsp['cpu_load_short'][0]['time'])) - - m = re.match( - expected_regex, - list(rsp['cpu_load_short'])[0]['time'] - ) - self.assertIsNotNone(m) - self.cli.drop_database(db) - def test_query(self): self.assertIs(True, self.cli.write_points(dummy_point)) @@ -782,19 +671,18 @@ def test_get_list_series_and_delete(self): rsp ) - @unittest.skip("broken on 0.9.0") + def test_delete_series_invalid(self): + with self.assertRaises(InfluxDBClientError): + self.cli.delete_series() + def test_delete_series(self): - self.assertEqual( - len(self.cli.get_list_series()), 0 - ) - self.cli.write_points(dummy_point) - self.assertEqual( - len(self.cli.get_list_series()), 1 - ) - self.cli.delete_series(1) - self.assertEqual( - len(self.cli.get_list_series()), 0 - ) + self.assertEqual(len(self.cli.get_list_series()), 0) + self.cli.write_points(dummy_points) + self.assertEqual(len(self.cli.get_list_series()), 2) + self.cli.delete_series(measurement='cpu_load_short') + self.assertEqual(len(self.cli.get_list_series()), 1) + self.cli.delete_series(tags={'region': 'us-west'}) + self.assertEqual(len(self.cli.get_list_series()), 0) @unittest.skip("Broken as of 0.9.0") def test_get_list_series_DF(self): @@ -972,13 +860,11 @@ def test_query_multiple_series(self): ############################################################################ -@unittest.skip("Broken as of 0.9.0") @unittest.skipIf(not is_influxdb_bin_ok, "could not find influxd binary") class UdpTests(ManyTestCasesWithServerMixin, unittest.TestCase): influxdb_udp_enabled = True - influxdb_template_conf = os.path.join(THIS_DIR, 'influxdb.conf.template') @@ -989,14 +875,15 @@ def test_write_points_udp(self): 'root', '', database='db', - use_udp=True, udp_port=self.influxd_inst.udp_port + use_udp=True, + udp_port=self.influxd_inst.udp_port ) cli.write_points(dummy_point) # The points are not immediately available after write_points. # This is to be expected because we are using udp (no response !). # So we have to wait some time, - time.sleep(1) # 1 sec seems to be a good choice. + time.sleep(3) # 3 sec seems to be a good choice. rsp = self.cli.query('SELECT * FROM cpu_load_short') self.assertEqual( diff --git a/tests/influxdb/dataframe_client_test.py b/tests/influxdb/dataframe_client_test.py index 61b411fd..481eefbc 100644 --- a/tests/influxdb/dataframe_client_test.py +++ b/tests/influxdb/dataframe_client_test.py @@ -31,24 +31,11 @@ def test_write_points_from_dataframe(self): index=[now, now + timedelta(hours=1)], columns=["column_one", "column_two", "column_three"]) - expected = { - 'database': 'db', - 'points': [ - {'time': '1970-01-01T00:00:00+00:00', - 'fields': { - 'column_two': 1, - 'column_three': 1.0, - 'column_one': '1'}, - 'tags': {}, - 'measurement': 'foo'}, - {'time': '1970-01-01T01:00:00+00:00', - 'fields': { - 'column_two': 2, - 'column_three': 2.0, - 'column_one': '2'}, - 'tags': {}, - 'measurement': 'foo'}] - } + expected = ( + b"foo column_one=\"1\",column_three=1.0,column_two=1 0\n" + b"foo column_one=\"2\",column_three=2.0,column_two=2 " + b"3600000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -58,10 +45,10 @@ def test_write_points_from_dataframe(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, 'foo') - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) cli.write_points(dataframe, 'foo', tags=None) - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_in_batches(self): now = pd.Timestamp('1970-01-01 00:00+00:00') @@ -83,24 +70,10 @@ def test_write_points_from_dataframe_with_numeric_column_names(self): dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], index=[now, now + timedelta(hours=1)]) - expected = { - 'database': 'db', - 'points': [ - {'fields': { - '0': '1', - '1': 1, - '2': 1.0}, - 'tags': {'hello': 'there'}, - 'time': '1970-01-01T00:00:00+00:00', - 'measurement': 'foo'}, - {'fields': { - '0': '2', - '1': 2, - '2': 2.0}, - 'tags': {'hello': 'there'}, - 'time': '1970-01-01T01:00:00+00:00', - 'measurement': 'foo'}], - } + expected = ( + b"foo,hello=there 0=\"1\",1=1,2=1.0 0\n" + b"foo,hello=there 0=\"2\",1=2,2=2.0 3600000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -110,7 +83,7 @@ def test_write_points_from_dataframe_with_numeric_column_names(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, "foo", {"hello": "there"}) - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_with_period_index(self): dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], @@ -118,24 +91,11 @@ def test_write_points_from_dataframe_with_period_index(self): pd.Period('1970-01-02')], columns=["column_one", "column_two", "column_three"]) - expected = { - 'points': [ - {'measurement': 'foo', - 'tags': {}, - 'fields': { - 'column_one': '1', - 'column_two': 1, - 'column_three': 1.0}, - 'time': '1970-01-01T00:00:00+00:00'}, - {'measurement': 'foo', - 'tags': {}, - 'fields': { - 'column_one': '2', - 'column_two': 2, - 'column_three': 2.0}, - 'time': '1970-01-02T00:00:00+00:00'}], - 'database': 'db', - } + expected = ( + b"foo column_one=\"1\",column_three=1.0,column_two=1 0\n" + b"foo column_one=\"2\",column_three=2.0,column_two=2 " + b"86400000000000\n" + ) with requests_mock.Mocker() as m: m.register_uri(requests_mock.POST, @@ -145,7 +105,7 @@ def test_write_points_from_dataframe_with_period_index(self): cli = DataFrameClient(database='db') cli.write_points(dataframe, "foo") - self.assertEqual(json.loads(m.last_request.body), expected) + self.assertEqual(m.last_request.body, expected) def test_write_points_from_dataframe_with_time_precision(self): now = pd.Timestamp('1970-01-01 00:00+00:00') @@ -182,16 +142,15 @@ def test_write_points_from_dataframe_with_time_precision(self): measurement = "foo" cli.write_points(dataframe, measurement, time_precision='s') - points.update(precision='s') - self.assertEqual(json.loads(m.last_request.body), points) + self.assertEqual(m.last_request.qs['precision'], ['s']) cli.write_points(dataframe, measurement, time_precision='m') points.update(precision='m') - self.assertEqual(json.loads(m.last_request.body), points) + self.assertEqual(m.last_request.qs['precision'], ['m']) cli.write_points(dataframe, measurement, time_precision='u') points.update(precision='u') - self.assertEqual(json.loads(m.last_request.body), points) + self.assertEqual(m.last_request.qs['precision'], ['u']) @raises(TypeError) def test_write_points_from_dataframe_fails_without_time_index(self): diff --git a/tests/influxdb/misc.py b/tests/influxdb/misc.py index 4761d0e9..70e16f11 100644 --- a/tests/influxdb/misc.py +++ b/tests/influxdb/misc.py @@ -1,6 +1,7 @@ import socket +import time def get_free_port(ip='127.0.0.1'): @@ -12,6 +13,10 @@ def get_free_port(ip='127.0.0.1'): finally: sock.close() + # Is there a better way than a sleep? + # There were issues on Travis where the port was not yet free. + time.sleep(0.1) + def is_port_open(port, ip='127.0.0.1'): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)