Skip to content

Commit

Permalink
fix: serialize Pandas NaN values into LineProtocol (#648)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Apr 16, 2024
1 parent a645ea9 commit e2cb42e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 20 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.42.0 [unreleased]

### Bug Fixes
1. [#648](https://github.com/influxdata/influxdb-client-python/pull/648): Fix `DataFrame` serialization with `NaN` values

## 1.41.0 [2024-03-01]

### Features
Expand Down
33 changes: 13 additions & 20 deletions influxdb_client/client/write/dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ def _itertuples(data_frame):
return zip(data_frame.index, *cols)


def _not_nan(x):
return x == x


def _any_not_nan(p, indexes):
return any(map(lambda x: _not_nan(p[x]), indexes))


class DataframeSerializer:
"""Serialize DataFrame into LineProtocols."""

Expand Down Expand Up @@ -77,7 +69,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
# When NaNs are present, the expression looks like this (split
# across two lines to satisfy the code-style checker)
#
# lambda p: f"""{measurement_name} {"" if math.isnan(p[1])
# lambda p: f"""{measurement_name} {"" if pd.isna(p[1])
# else f"{keys[0]}={p[1]}"},{keys[1]}={p[2]}i {p[0].value}"""
#
# When there's a NaN value in column a, we'll end up with a comma at the start of the
Expand Down Expand Up @@ -175,7 +167,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
# This column is a tag column.
if null_columns.iloc[index]:
key_value = f"""{{
'' if {val_format} == '' or type({val_format}) == float and math.isnan({val_format}) else
'' if {val_format} == '' or pd.isna({val_format}) else
f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}'
}}"""
else:
Expand All @@ -192,19 +184,16 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
# field column has no nulls, we don't run the comma-removal
# regexp substitution step.
sep = '' if len(field_indexes) == 0 else ','
if issubclass(value.type, np.integer):
field_value = f"{sep}{key_format}={{{val_format}}}i"
elif issubclass(value.type, np.bool_):
field_value = f'{sep}{key_format}={{{val_format}}}'
elif issubclass(value.type, np.floating):
if issubclass(value.type, np.integer) or issubclass(value.type, np.floating) or issubclass(value.type, np.bool_): # noqa: E501
suffix = 'i' if issubclass(value.type, np.integer) else ''
if null_columns.iloc[index]:
field_value = f"""{{"" if math.isnan({val_format}) else f"{sep}{key_format}={{{val_format}}}"}}"""
field_value = f"""{{"" if pd.isna({val_format}) else f"{sep}{key_format}={{{val_format}}}{suffix}"}}""" # noqa: E501
else:
field_value = f'{sep}{key_format}={{{val_format}}}'
field_value = f"{sep}{key_format}={{{val_format}}}{suffix}"
else:
if null_columns.iloc[index]:
field_value = f"""{{
'' if type({val_format}) == float and math.isnan({val_format}) else
'' if pd.isna({val_format}) else
f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'
}}"""
else:
Expand All @@ -229,17 +218,21 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
'_ESCAPE_KEY': _ESCAPE_KEY,
'_ESCAPE_STRING': _ESCAPE_STRING,
'keys': keys,
'math': math,
'pd': pd,
})

for k, v in dict(data_frame.dtypes).items():
if k in data_frame_tag_columns:
data_frame = data_frame.replace({k: ''}, np.nan)

def _any_not_nan(p, indexes):
return any(map(lambda x: not pd.isna(p[x]), indexes))

self.data_frame = data_frame
self.f = f
self.field_indexes = field_indexes
self.first_field_maybe_null = null_columns.iloc[field_indexes[0] - 1]
self._any_not_nan = _any_not_nan

#
# prepare chunks
Expand All @@ -266,7 +259,7 @@ def serialize(self, chunk_idx: int = None):
# When the first field is null (None/NaN), we'll have
# a spurious leading comma which needs to be removed.
lp = (re.sub('^(( |[^ ])* ),([a-zA-Z0-9])(.*)', '\\1\\3\\4', self.f(p))
for p in filter(lambda x: _any_not_nan(x, self.field_indexes), _itertuples(chunk)))
for p in filter(lambda x: self._any_not_nan(x, self.field_indexes), _itertuples(chunk)))
return list(lp)
else:
return list(map(self.f, _itertuples(chunk)))
Expand Down
26 changes: 26 additions & 0 deletions tests/test_WriteApiDataFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,32 @@ def test_write_object_field_nan(self):
self.assertEqual("measurement val=2i 1586046600000000000",
points[1])

def test_write_missing_values(self):
from influxdb_client.extras import pd

data_frame = pd.DataFrame({
"a_bool": [True, None, False],
"b_int": [None, 1, 2],
"c_float": [1.0, 2.0, None],
"d_str": ["a", "b", None],
})

data_frame['a_bool'] = data_frame['a_bool'].astype(pd.BooleanDtype())
data_frame['b_int'] = data_frame['b_int'].astype(pd.Int64Dtype())
data_frame['c_float'] = data_frame['c_float'].astype(pd.Float64Dtype())
data_frame['d_str'] = data_frame['d_str'].astype(pd.StringDtype())

print(data_frame)
points = data_frame_to_list_of_points(
data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='measurement')

self.assertEqual(3, len(points))
self.assertEqual("measurement a_bool=True,c_float=1.0,d_str=\"a\" 0", points[0])
self.assertEqual("measurement b_int=1i,c_float=2.0,d_str=\"b\" 1", points[1])
self.assertEqual("measurement a_bool=False,b_int=2i 2", points[2])

def test_write_field_bool(self):
from influxdb_client.extras import pd

Expand Down

0 comments on commit e2cb42e

Please sign in to comment.