diff --git a/CHANGELOG.md b/CHANGELOG.md index a182a7b4..636b73e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.7.0 [unreleased] +### Bug Fixes +1. [#85](https://github.com/influxdata/influxdb-client-python/issues/85): Fixed a possibility to generate empty write batch + ## 1.6.0 [2020-04-17] ### Documentation diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 5c401e21..d51eafa7 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -1,13 +1,12 @@ # coding: utf-8 import logging +import os from datetime import timedelta from enum import Enum from random import random from time import sleep from typing import Union, List -import os - import rx from rx import operators as ops, Observable from rx.scheduler import ThreadPoolScheduler @@ -172,8 +171,9 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions() ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs))))), ops.merge_all())), # Write data into InfluxDB (possibility to retry if its fail) - ops.map(mapper=lambda batch: self._retryable(data=batch, delay=self._jitter_delay())), # - ops.merge_all())\ + ops.filter(lambda batch: batch.size > 0), + ops.map(mapper=lambda batch: self._retryable(data=batch, delay=self._jitter_delay())), + ops.merge_all()) \ .subscribe(self._on_next, self._on_error, self._on_complete) else: diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index 393fa586..b661c202 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -378,6 +378,35 @@ def test_user_agent_header(self): self.assertEqual(1, len(requests)) self.assertEqual(f'influxdb-client-python/{influxdb_client.__version__}', requests[0].headers['User-Agent']) + def test_to_low_flush_interval(self): + + self._write_client.__del__() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(batch_size=8, + flush_interval=1, + jitter_interval=1000)) + + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) + + for i in range(50): + val_one = float(i) + val_two = float(i) + 0.5 + point_one = Point("OneMillis").tag("sensor", "sensor1").field("PSI", val_one).time(time=i) + point_two = Point("OneMillis").tag("sensor", "sensor2").field("PSI", val_two).time(time=i) + + self._write_client.write("my-bucket", "my-org", [point_one, point_two]) + time.sleep(0.1) + + self._write_client.__del__() + + _requests = httpretty.httpretty.latest_requests + + for _request in _requests: + body = _request.parsed_body + self.assertTrue(body, msg="Parsed body should be not empty " + str(_request)) + + httpretty.reset() + if __name__ == '__main__': unittest.main()