diff --git a/influxdb/client.py b/influxdb/client.py index 5e39f490..c80ddaeb 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -12,11 +12,11 @@ import socket import struct import time +from itertools import chain, islice import msgpack import requests import requests.exceptions -from six.moves import xrange from six.moves.urllib.parse import urlparse from influxdb.line_protocol import make_lines, quote_ident, quote_literal @@ -565,8 +565,17 @@ def ping(self): @staticmethod def _batches(iterable, size): - for i in xrange(0, len(iterable), size): - yield iterable[i:i + size] + # Iterate over an iterable producing iterables of batches. Based on: + # http://code.activestate.com/recipes/303279-getting-items-in-batches/ + iterator = iter(iterable) + while True: + try: # Try get the first element in the iterator... + head = (next(iterator),) + except StopIteration: + return # ...so that we can stop if there isn't one + # Otherwise, lazily slice the rest of the batch + rest = islice(iterator, size - 1) + yield chain(head, rest) def _write_points(self, points, diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index 54116f7e..95f410bc 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -265,6 +265,36 @@ def test_write_points_batch(self): self.assertEqual(expected_last_body, m.last_request.body.decode('utf-8')) + def test_write_points_batch_generator(self): + """Test write points batch from a generator for TestInfluxDBClient.""" + dummy_points = [ + {"measurement": "cpu_usage", "tags": {"unit": "percent"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, + {"measurement": "network", "tags": {"direction": "in"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, + {"measurement": "network", "tags": {"direction": "out"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} + ] + dummy_points_generator = (point for point in dummy_points) + expected_last_body = ( + "network,direction=out,host=server01,region=us-west " + "value=12.0 1257894000000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + cli = InfluxDBClient(database='db') + cli.write_points(points=dummy_points_generator, + database='db', + tags={"host": "server01", + "region": "us-west"}, + batch_size=2) + self.assertEqual(m.call_count, 2) + self.assertEqual(expected_last_body, + m.last_request.body.decode('utf-8')) + def test_write_points_udp(self): """Test write points UDP for TestInfluxDBClient object.""" s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) diff --git a/influxdb/tests/server_tests/client_test_with_server.py b/influxdb/tests/server_tests/client_test_with_server.py index fda3f720..bf63bbfa 100644 --- a/influxdb/tests/server_tests/client_test_with_server.py +++ b/influxdb/tests/server_tests/client_test_with_server.py @@ -450,6 +450,33 @@ def test_write_points_batch(self): self.assertIn(12, net_out['series'][0]['values'][0]) self.assertIn(12.34, cpu['series'][0]['values'][0]) + def test_write_points_batch_generator(self): + """Test writing points in a batch from a generator.""" + dummy_points = [ + {"measurement": "cpu_usage", "tags": {"unit": "percent"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}}, + {"measurement": "network", "tags": {"direction": "in"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}}, + {"measurement": "network", "tags": {"direction": "out"}, + "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}} + ] + dummy_points_generator = (point for point in dummy_points) + self.cli.write_points(points=dummy_points_generator, + tags={"host": "server01", + "region": "us-west"}, + batch_size=2) + time.sleep(5) + net_in = self.cli.query("SELECT value FROM network " + "WHERE direction=$dir", + bind_params={'dir': 'in'} + ).raw + net_out = self.cli.query("SELECT value FROM network " + "WHERE direction='out'").raw + cpu = self.cli.query("SELECT value FROM cpu_usage").raw + self.assertIn(123, net_in['series'][0]['values'][0]) + self.assertIn(12, net_out['series'][0]['values'][0]) + self.assertIn(12.34, cpu['series'][0]['values'][0]) + def test_query(self): """Test querying data back from server.""" self.assertIs(True, self.cli.write_points(dummy_point))