From f40f77065ca49334db3e3f587673d3844d925f37 Mon Sep 17 00:00:00 2001 From: pho Date: Thu, 28 Sep 2017 10:38:48 +0200 Subject: [PATCH 1/3] [ENH] : added feature: allow push DataFrame with NaNs values [FIX] : Timestamps not correctly converted to epoch on some systems --- influxdb/_dataframe_client.py | 32 ++++++++--- influxdb/tests/dataframe_client_test.py | 75 +++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 7 deletions(-) diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 31ee1c32..3aa15373 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -10,6 +10,7 @@ from collections import defaultdict import pandas as pd +import numpy as np from .client import InfluxDBClient from .line_protocol import _escape_tag @@ -274,6 +275,10 @@ def _convert_dataframe_to_lines(self, time_precision=None, numeric_precision=None): + dataframe = dataframe.dropna(how='all').copy() + if len(dataframe) == 0: + return [] + if not isinstance(dataframe, pd.DataFrame): raise TypeError('Must be DataFrame, but type was: {0}.' .format(type(dataframe))) @@ -325,11 +330,11 @@ def _convert_dataframe_to_lines(self, # Make array of timestamp ints if isinstance(dataframe.index, pd.PeriodIndex): - time = ((dataframe.index.to_timestamp().values.astype(int) / - precision_factor).astype(int).astype(str)) + time = ((dataframe.index.to_timestamp().values.astype(np.int64) / + precision_factor).astype(np.int64).astype(str)) else: - time = ((pd.to_datetime(dataframe.index).values.astype(int) / - precision_factor).astype(int).astype(str)) + time = ((pd.to_datetime(dataframe.index).values.astype(np.int64) / + precision_factor).astype(np.int64).astype(str)) # If tag columns exist, make an array of formatted tag keys and values if tag_columns: @@ -350,12 +355,16 @@ def _convert_dataframe_to_lines(self, # Make an array of formatted field keys and values field_df = dataframe[field_columns] + field_df = self._stringify_dataframe(field_df, numeric_precision, datatype='field') - field_df = (field_df.columns.values + '=').tolist() + field_df - field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]] - fields = field_df.sum(axis=1) + + def format_line(line): + line = line[~line.isnull()] # drop None entries + return ",".join((line.index + '=' + line.values)) + + fields = field_df.apply(format_line, axis=1) del field_df # Generate line protocol string @@ -364,6 +373,13 @@ def _convert_dataframe_to_lines(self, @staticmethod def _stringify_dataframe(dframe, numeric_precision, datatype='field'): + + # Prevent modification of input dataframe + dframe = dframe.copy() + + # Keep the positions where Null values are found + mask_null = dframe.isnull().values + # Find int and string columns for field-type data int_columns = dframe.select_dtypes(include=['integer']).columns string_columns = dframe.select_dtypes(include=['object']).columns @@ -407,6 +423,8 @@ def _stringify_dataframe(dframe, numeric_precision, datatype='field'): dframe = dframe.apply(_escape_pandas_series) dframe.columns = dframe.columns.astype(str) + + dframe = dframe.where(~mask_null, None) return dframe def _datetime_to_epoch(self, datetime, time_precision='s'): diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index 02aaac5f..269261d5 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -59,6 +59,81 @@ def test_write_points_from_dataframe(self): cli.write_points(dataframe, 'foo', tags=None) self.assertEqual(m.last_request.body, expected) + def test_write_points_from_dataframe_with_none(self): + """Test write points from df in TestDataFrameClient object.""" + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[["1", None, 1.0], ["2", 2.0, 2.0]], + index=[now, now + timedelta(hours=1)], + columns=["column_one", "column_two", + "column_three"]) + expected = ( + b"foo column_one=\"1\",column_three=1.0 0\n" + b"foo column_one=\"2\",column_two=2.0,column_three=2.0 " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo') + self.assertEqual(m.last_request.body, expected) + + cli.write_points(dataframe, 'foo', tags=None) + self.assertEqual(m.last_request.body, expected) + + def test_write_points_from_dataframe_with_line_of_none(self): + """Test write points from df in TestDataFrameClient object.""" + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[[None, None, None], ["2", 2.0, 2.0]], + index=[now, now + timedelta(hours=1)], + columns=["column_one", "column_two", + "column_three"]) + expected = ( + b"foo column_one=\"2\",column_two=2.0,column_three=2.0 " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo') + self.assertEqual(m.last_request.body, expected) + + cli.write_points(dataframe, 'foo', tags=None) + self.assertEqual(m.last_request.body, expected) + + def test_write_points_from_dataframe_with_all_none(self): + """Test write points from df in TestDataFrameClient object.""" + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[[None, None, None], [None, None, None]], + index=[now, now + timedelta(hours=1)], + columns=["column_one", "column_two", + "column_three"]) + expected = ( + b"\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo') + self.assertEqual(m.last_request.body, expected) + + cli.write_points(dataframe, 'foo', tags=None) + self.assertEqual(m.last_request.body, expected) + def test_write_points_from_dataframe_in_batches(self): """Test write points in batch from df in TestDataFrameClient object.""" now = pd.Timestamp('1970-01-01 00:00+00:00') From accd9f05a38bd7f62df518fc3cc214b6ba4dede2 Mon Sep 17 00:00:00 2001 From: pho Date: Thu, 28 Sep 2017 16:18:35 +0200 Subject: [PATCH 2/3] [BLD] : version bump 4.1.1b --- influxdb/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb/__init__.py b/influxdb/__init__.py index 6442e26b..80160b4b 100644 --- a/influxdb/__init__.py +++ b/influxdb/__init__.py @@ -18,4 +18,4 @@ ] -__version__ = '4.1.1' +__version__ = '4.1.1b' From f1a5aebd4212857fbe70aa0eb2a142498863aba2 Mon Sep 17 00:00:00 2001 From: Patrick Hoebeke Date: Thu, 17 May 2018 14:38:52 +0200 Subject: [PATCH 3/3] Update __init__.py --- influxdb/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb/__init__.py b/influxdb/__init__.py index be0ab289..005c963a 100644 --- a/influxdb/__init__.py +++ b/influxdb/__init__.py @@ -18,4 +18,4 @@ ] -__version__ = '5.0.0' \ No newline at end of file +__version__ = '5.0.0pho'