From 578ef7bb7708e2d629c74f46ea4d863214e4e779 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Mon, 12 Aug 2019 22:32:17 +0200 Subject: [PATCH 1/4] Make batched writing support all iterables --- influxdb/client.py | 15 ++++++++++++--- influxdb/tests/client_test.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index ad4c6b66..da009d71 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -6,6 +6,7 @@ from __future__ import print_function from __future__ import unicode_literals +import itertools import time import random @@ -13,7 +14,6 @@ import socket import requests import requests.exceptions -from six.moves import xrange from six.moves.urllib.parse import urlparse from influxdb.line_protocol import make_lines, quote_ident, quote_literal @@ -544,8 +544,17 @@ def ping(self): @staticmethod def _batches(iterable, size): - for i in xrange(0, len(iterable), size): - yield iterable[i:i + size] + # Iterate over an iterable producing iterables of batches. Based on: + # http://code.activestate.com/recipes/303279-getting-items-in-batches/ + iterator = iter(iterable) + while True: + try: # Try get the first element in the iterator... + head = (next(iterator),) + except StopIteration: + return # ...so that we can stop if there isn't one + # Otherwise, lazily slice the rest of the batch + rest = itertools.islice(iterator, size - 1) + yield itertools.chain(head, rest) def _write_points(self, points, diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 571b7ebc..b005554e 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -265,6 +265,37 @@ def test_write_points_batch(self): self.assertEqual(expected_last_body, m.last_request.body.decode('utf-8')) + def test_write_points_batch_generator(self): + """Test write points batch from a generator for TestInfluxDBClient object. + """ + dummy_points = [ + {"measurement": "cpu_usage", "tags": {"unit": "percent"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, + {"measurement": "network", "tags": {"direction": "in"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, + {"measurement": "network", "tags": {"direction": "out"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} + ] + dummy_points_generator = (point for point in dummy_points) + expected_last_body = ( + "network,direction=out,host=server01,region=us-west " + "value=12.0 1257894000000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + cli = InfluxDBClient(database='db') + cli.write_points(points=dummy_points_generator, + database='db', + tags={"host": "server01", + "region": "us-west"}, + batch_size=2) + self.assertEqual(m.call_count, 2) + self.assertEqual(expected_last_body, + m.last_request.body.decode('utf-8')) + def test_write_points_udp(self): """Test write points UDP for TestInfluxDBClient object.""" s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) From 16803cb490cba7165a18b30021de7e0295791ebb Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Mon, 12 Aug 2019 22:43:09 +0200 Subject: [PATCH 2/4] Also test batching generator against real server --- .../server_tests/client_test_with_server.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index fda3f720..bf63bbfa 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -450,6 +450,33 @@ def test_write_points_batch(self): self.assertIn(12, net_out['series'][0]['values'][0]) self.assertIn(12.34, cpu['series'][0]['values'][0]) + def test_write_points_batch_generator(self): + """Test writing points in a batch from a generator.""" + dummy_points = [ + {"measurement": "cpu_usage", "tags": {"unit": "percent"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, + {"measurement": "network", "tags": {"direction": "in"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, + {"measurement": "network", "tags": {"direction": "out"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} + ] + dummy_points_generator = (point for point in dummy_points) + self.cli.write_points(points=dummy_points_generator, + tags={"host": "server01", + "region": "us-west"}, + batch_size=2) + time.sleep(5) + net_in = self.cli.query("SELECT value FROM network " + "WHERE direction=$dir", + bind_params={'dir': 'in'} + ).raw + net_out = self.cli.query("SELECT value FROM network " + "WHERE direction='out'").raw + cpu = self.cli.query("SELECT value FROM cpu_usage").raw + self.assertIn(123, net_in['series'][0]['values'][0]) + self.assertIn(12, net_out['series'][0]['values'][0]) + self.assertIn(12.34, cpu['series'][0]['values'][0]) + def test_query(self): """Test querying data back from server.""" self.assertIs(True, self.cli.write_points(dummy_point)) From 325190113143fcd23b5457b2d445f637a19d5c5a Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Wed, 14 Aug 2019 11:52:22 +0200 Subject: [PATCH 3/4] Fix PEP257 error --- influxdb/tests/client_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index b005554e..9ab9cdfd 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -266,8 +266,7 @@ def test_write_points_batch(self): m.last_request.body.decode('utf-8')) def test_write_points_batch_generator(self): - """Test write points batch from a generator for TestInfluxDBClient object. - """ + """Test write points batch from a generator for TestInfluxDBClient.""" dummy_points = [ {"measurement": "cpu_usage", "tags": {"unit": "percent"}, "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, From d6e7d1cf57b61951a6d410b44f8d8a312d87ea42 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Tue, 10 Sep 2019 16:21:36 +0200 Subject: [PATCH 4/4] Import itertools functions directly --- influxdb/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index da009d71..dcc4934f 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -6,9 +6,9 @@ from __future__ import print_function from __future__ import unicode_literals -import itertools import time import random +from itertools import chain, islice import json import socket @@ -553,8 +553,8 @@ def _batches(iterable, size): except StopIteration: return # ...so that we can stop if there isn't one # Otherwise, lazily slice the rest of the batch - rest = itertools.islice(iterator, size - 1) - yield itertools.chain(head, rest) + rest = islice(iterator, size - 1) + yield chain(head, rest) def _write_points(self, points,