Skip to content

Commit

Permalink
fix: handle null values in data (#636)
Browse files Browse the repository at this point in the history
* fix: handle null values in Flux data

* test: add tests for null value handling and extension dtypes

* test: fix failures with empty warnings cases

* test: comment out dtype some extra assertion until solved

* test: skip extension dtypes test on pythn 3.7

* fix: single place of dtypes conversion

* fix: bump pandas dependency version

* docs: update CHANGELOG

* chore(build): trigger CI/CD pipeline

* fix: add use_extension_dtypes also to async query API methods
  • Loading branch information
alespour committed Jan 31, 2024
1 parent 27777d1 commit 7a5f655
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 29 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.41.0 [unreleased]

### Bug Fixes
1. [#636](https://github.com/influxdata/influxdb-client-python/pull/636): Handle missing data in data frames

## 1.40.0 [2024-01-30]

### Features
Expand Down
18 changes: 12 additions & 6 deletions influxdb_client/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,23 +277,27 @@ async def _to_flux_record_stream_async(self, response, query_options=None, respo
return (await _parser.__aenter__()).generator_async()

def _to_data_frame_stream(self, data_frame_index, response, query_options=None,
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full):
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full,
use_extension_dtypes=False):
"""
Parse HTTP response to DataFrame stream.
:param response: HTTP response from an HTTP client. Expected type: `urllib3.response.HTTPResponse`.
"""
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode)
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode,
use_extension_dtypes)
return _parser.generator()

async def _to_data_frame_stream_async(self, data_frame_index, response, query_options=None, response_metadata_mode:
FluxResponseMetadataMode = FluxResponseMetadataMode.full):
FluxResponseMetadataMode = FluxResponseMetadataMode.full,
use_extension_dtypes=False):
"""
Parse HTTP response to DataFrame stream.
:param response: HTTP response from an HTTP client. Expected type: `aiohttp.client_reqrep.ClientResponse`.
"""
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode)
_parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode,
use_extension_dtypes)
return (await _parser.__aenter__()).generator_async()

def _to_tables_parser(self, response, query_options, response_metadata_mode):
Expand All @@ -304,10 +308,12 @@ def _to_flux_record_stream_parser(self, query_options, response, response_metada
return FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream,
query_options=query_options, response_metadata_mode=response_metadata_mode)

def _to_data_frame_stream_parser(self, data_frame_index, query_options, response, response_metadata_mode):
def _to_data_frame_stream_parser(self, data_frame_index, query_options, response, response_metadata_mode,
use_extension_dtypes):
return FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
data_frame_index=data_frame_index, query_options=query_options,
response_metadata_mode=response_metadata_mode)
response_metadata_mode=response_metadata_mode,
use_extension_dtypes=use_extension_dtypes)

def _to_data_frames(self, _generator):
"""Parse stream of DataFrames into expected type."""
Expand Down
18 changes: 13 additions & 5 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class FluxCsvParser(object):

def __init__(self, response, serialization_mode: FluxSerializationMode,
data_frame_index: List[str] = None, query_options=None,
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> None:
response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full,
use_extension_dtypes=False) -> None:
"""
Initialize defaults.
Expand All @@ -75,6 +76,7 @@ def __init__(self, response, serialization_mode: FluxSerializationMode,
self.tables = TableList()
self._serialization_mode = serialization_mode
self._response_metadata_mode = response_metadata_mode
self._use_extension_dtypes = use_extension_dtypes
self._data_frame_index = data_frame_index
self._data_frame_values = []
self._profilers = query_options.profilers if query_options is not None else None
Expand Down Expand Up @@ -211,7 +213,7 @@ def _parse_flux_response_row(self, metadata, csv):
pass
else:

# to int converions todo
# to int conversions todo
current_id = int(csv[2])
if metadata.table_id == -1:
metadata.table_id = current_id
Expand Down Expand Up @@ -253,7 +255,11 @@ def _prepare_data_frame(self):
_temp_df = _temp_df.set_index(self._data_frame_index)

# Append data
return pd.concat([self._data_frame.astype(_temp_df.dtypes), _temp_df])
df = pd.concat([self._data_frame.astype(_temp_df.dtypes), _temp_df])

if self._use_extension_dtypes:
return df.convert_dtypes()
return df

def parse_record(self, table_index, table, csv):
"""Parse one record."""
Expand All @@ -273,8 +279,10 @@ def _to_value(self, str_val, column):
default_value = column.default_value
if default_value == '' or default_value is None:
if self._serialization_mode is FluxSerializationMode.dataFrame:
from ..extras import np
return self._to_value(np.nan, column)
if self._use_extension_dtypes:
from ..extras import pd
return pd.NA
return None
return None
return self._to_value(default_value, column)

Expand Down
22 changes: 18 additions & 4 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator['
async_req=False, _preload_content=False, _return_http_data_only=False)
return self._to_flux_record_stream(response, query_options=self._get_query_options())

def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None,
use_extension_dtypes: bool = False):
"""
Execute synchronous Flux query and return Pandas DataFrame.
Expand All @@ -234,6 +235,11 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
If not specified the default value from ``InfluxDBClient.org`` is used.
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:param use_extension_dtypes: set to ``True`` to use panda's extension data types.
Useful for queries with ``pivot`` function.
When data has missing values, column data type may change (to ``object`` or ``float64``).
Nullable extension types (``Int64``, ``Float64``, ``boolean``) support ``panda.NA`` value.
For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.
:return: :class:`~DataFrame` or :class:`~List[DataFrame]`
.. warning:: For the optimal processing of the query results use the ``pivot() function`` which align results as a table.
Expand All @@ -250,10 +256,12 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
- https://docs.influxdata.com/flux/latest/stdlib/universe/pivot/
- https://docs.influxdata.com/flux/latest/stdlib/influxdata/influxdb/schema/fieldsascols/
""" # noqa: E501
_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index, params=params)
_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index, params=params,
use_extension_dtypes=use_extension_dtypes)
return self._to_data_frames(_generator)

def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None,
use_extension_dtypes: bool = False):
"""
Execute synchronous Flux query and return stream of Pandas DataFrame as a :class:`~Generator[DataFrame]`.
Expand All @@ -265,6 +273,11 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
If not specified the default value from ``InfluxDBClient.org`` is used.
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:param use_extension_dtypes: set to ``True`` to use panda's extension data types.
Useful for queries with ``pivot`` function.
When data has missing values, column data type may change (to ``object`` or ``float64``).
Nullable extension types (``Int64``, ``Float64``, ``boolean``) support ``panda.NA`` value.
For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.
:return: :class:`~Generator[DataFrame]`
.. warning:: For the optimal processing of the query results use the ``pivot() function`` which align results as a table.
Expand All @@ -289,7 +302,8 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s

return self._to_data_frame_stream(data_frame_index=data_frame_index,
response=response,
query_options=self._get_query_options())
query_options=self._get_query_options(),
use_extension_dtypes=use_extension_dtypes)

def __del__(self):
"""Close QueryAPI."""
Expand Down
20 changes: 16 additions & 4 deletions influxdb_client/client/query_api_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ async def query_stream(self, query: str, org=None, params: dict = None) -> Async

return await self._to_flux_record_stream_async(response, query_options=self._get_query_options())

async def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None):
async def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None,
use_extension_dtypes: bool = False):
"""
Execute asynchronous Flux query and return :class:`~pandas.core.frame.DataFrame`.
Expand All @@ -132,6 +133,11 @@ async def query_data_frame(self, query: str, org=None, data_frame_index: List[st
If not specified the default value from ``InfluxDBClientAsync.org`` is used.
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:param use_extension_dtypes: set to ``True`` to use panda's extension data types.
Useful for queries with ``pivot`` function.
When data has missing values, column data type may change (to ``object`` or ``float64``).
Nullable extension types (``Int64``, ``Float64``, ``boolean``) support ``panda.NA`` value.
For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.
:return: :class:`~DataFrame` or :class:`~List[DataFrame]`
.. warning:: For the optimal processing of the query results use the ``pivot() function`` which align results as a table.
Expand All @@ -149,7 +155,7 @@ async def query_data_frame(self, query: str, org=None, data_frame_index: List[st
- https://docs.influxdata.com/flux/latest/stdlib/influxdata/influxdb/schema/fieldsascols/
""" # noqa: E501
_generator = await self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index,
params=params)
params=params, use_extension_dtypes=use_extension_dtypes)

dataframes = []
async for dataframe in _generator:
Expand All @@ -158,7 +164,7 @@ async def query_data_frame(self, query: str, org=None, data_frame_index: List[st
return self._to_data_frames(dataframes)

async def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None,
params: dict = None):
params: dict = None, use_extension_dtypes: bool = False):
"""
Execute asynchronous Flux query and return stream of :class:`~pandas.core.frame.DataFrame` as an AsyncGenerator[:class:`~pandas.core.frame.DataFrame`].
Expand All @@ -170,6 +176,11 @@ async def query_data_frame_stream(self, query: str, org=None, data_frame_index:
If not specified the default value from ``InfluxDBClientAsync.org`` is used.
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:param use_extension_dtypes: set to ``True`` to use panda's extension data types.
Useful for queries with ``pivot`` function.
When data has missing values, column data type may change (to ``object`` or ``float64``).
Nullable extension types (``Int64``, ``Float64``, ``boolean``) support ``panda.NA`` value.
For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.
:return: :class:`AsyncGenerator[:class:`DataFrame`]`
.. warning:: For the optimal processing of the query results use the ``pivot() function`` which align results as a table.
Expand All @@ -192,7 +203,8 @@ async def query_data_frame_stream(self, query: str, org=None, data_frame_index:
dataframe_query=True))

return await self._to_data_frame_stream_async(data_frame_index=data_frame_index, response=response,
query_options=self._get_query_options())
query_options=self._get_query_options(),
use_extension_dtypes=use_extension_dtypes)

async def query_raw(self, query: str, org=None, dialect=_BaseQueryApi.default_dialect, params: dict = None):
"""
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
]

extra_requires = [
'pandas>=0.25.3',
'pandas>=1.0.0',
'numpy'
]

Expand Down

0 comments on commit 7a5f655

Please sign in to comment.