Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
Merge pull request #827 from rolincova/fix/data-frame-client-custom-i…
Browse files Browse the repository at this point in the history
…ndexes

Fix: add support for custom indexes for query in the DataFrameClient (#785)
  • Loading branch information
russorat committed Jun 3, 2020
2 parents 95e0efb + cb3156c commit fc0235e
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added
- Add support for custom headers in the InfluxDBClient (#710 thx @nathanielatom)
- Add support for custom indexes for query in the DataFrameClient (#785)

### Changed
- Amend retry to avoid sleep after last retry before raising exception (#790 thx @krzysbaranski)
Expand Down
23 changes: 16 additions & 7 deletions influxdb/_dataframe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def query(self,
chunked=False,
chunk_size=0,
method="GET",
dropna=True):
dropna=True,
data_frame_index=None):
"""
Query data into a DataFrame.
Expand Down Expand Up @@ -181,6 +182,8 @@ def query(self,
containing all results within that chunk
:param chunk_size: Size of each chunk to tell InfluxDB to use.
:param dropna: drop columns where all values are missing
:param data_frame_index: the list of columns that
are used as DataFrame index
:returns: the queried data
:rtype: :class:`~.ResultSet`
"""
Expand All @@ -196,13 +199,14 @@ def query(self,
results = super(DataFrameClient, self).query(query, **query_args)
if query.strip().upper().startswith("SELECT"):
if len(results) > 0:
return self._to_dataframe(results, dropna)
return self._to_dataframe(results, dropna,
data_frame_index=data_frame_index)
else:
return {}
else:
return results

def _to_dataframe(self, rs, dropna=True):
def _to_dataframe(self, rs, dropna=True, data_frame_index=None):
result = defaultdict(list)
if isinstance(rs, list):
return map(self._to_dataframe, rs,
Expand All @@ -216,10 +220,15 @@ def _to_dataframe(self, rs, dropna=True):
key = (name, tuple(sorted(tags.items())))
df = pd.DataFrame(data)
df.time = pd.to_datetime(df.time)
df.set_index('time', inplace=True)
if df.index.tzinfo is None:
df.index = df.index.tz_localize('UTC')
df.index.name = None

if data_frame_index:
df.set_index(data_frame_index, inplace=True)
else:
df.set_index('time', inplace=True)
if df.index.tzinfo is None:
df.index = df.index.tz_localize('UTC')
df.index.name = None

result[key].append(df)
for key, data in result.items():
df = pd.concat(data).sort_index()
Expand Down
4 changes: 2 additions & 2 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ def alter_retention_policy(self, name, database=None,
query_string = (
"ALTER RETENTION POLICY {0} ON {1}"
).format(quote_ident(name),
quote_ident(database or self._database), shard_duration)
quote_ident(database or self._database))
if duration:
query_string += " DURATION {0}".format(duration)
if shard_duration:
Expand Down Expand Up @@ -958,7 +958,7 @@ def drop_user(self, username):
:param username: the username to drop
:type username: str
"""
text = "DROP USER {0}".format(quote_ident(username), method="POST")
text = "DROP USER {0}".format(quote_ident(username))
self.query(text, method="POST")

def set_user_password(self, username, password):
Expand Down
2 changes: 1 addition & 1 deletion influxdb/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __new__(cls, *args, **kwargs):
allowed_time_precisions = ['h', 'm', 's', 'ms', 'u', 'ns', None]
if cls._time_precision not in allowed_time_precisions:
raise AttributeError(
'In {0}, time_precision is set, but invalid use any of {}.'
'In {}, time_precision is set, but invalid use any of {}.'
.format(cls.__name__, ','.join(allowed_time_precisions)))

cls._retention_policy = getattr(_meta, 'retention_policy', None)
Expand Down
6 changes: 3 additions & 3 deletions influxdb/influxdb08/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,10 @@ def write_points(self, data, time_precision='s', *args, **kwargs):
:type batch_size: int
"""
def list_chunks(l, n):
def list_chunks(data_list, n):
"""Yield successive n-sized chunks from l."""
for i in xrange(0, len(l), n):
yield l[i:i + n]
for i in xrange(0, len(data_list), n):
yield data_list[i:i + n]

batch_size = kwargs.get('batch_size')
if batch_size and batch_size > 0:
Expand Down
34 changes: 34 additions & 0 deletions influxdb/tests/dataframe_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1240,3 +1240,37 @@ def test_write_points_from_dataframe_with_tags_and_nan_json(self):
cli.write_points(dataframe, 'foo', tags=None, protocol='json',
tag_columns=['tag_one', 'tag_two'])
self.assertEqual(m.last_request.body, expected)

def test_query_custom_index(self):
"""Test query with custom indexes."""
data = {
"results": [
{
"series": [
{
"name": "cpu_load_short",
"columns": ["time", "value", "host"],
"values": [
[1, 0.55, "local"],
[2, 23422, "local"],
[3, 0.64, "local"]
]
}
]
}
]
}

cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
iql = "SELECT value FROM cpu_load_short WHERE region=$region;" \
"SELECT count(value) FROM cpu_load_short WHERE region=$region"
bind_params = {'region': 'us-west'}
with _mocked_session(cli, 'GET', 200, data):
result = cli.query(iql, bind_params=bind_params,
data_frame_index=["time", "host"])

_data_frame = result['cpu_load_short']
print(_data_frame)

self.assertListEqual(["time", "host"],
list(_data_frame.index.names))

0 comments on commit fc0235e

Please sign in to comment.