Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ The following options are supported:
- ``ssl_ca_cert`` - set this to customize the certificate file to verify the peer
- ``connection_pool_maxsize`` - set the number of connections to save that can be reused by urllib3
- ``auth_basic`` - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
- ``profilers`` - set the list of enabled `Flux profilers <https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/>`_

.. code-block:: python

Expand Down Expand Up @@ -204,11 +205,122 @@ Supported properties are:
- ``INFLUXDB_V2_SSL_CA_CERT`` - set this to customize the certificate file to verify the peer
- ``INFLUXDB_V2_CONNECTION_POOL_MAXSIZE`` - set the number of connections to save that can be reused by urllib3
- ``INFLUXDB_V2_AUTH_BASIC`` - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
- ``INFLUXDB_V2_PROFILERS`` - set the list of enabled `Flux profilers <https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/>`_

.. code-block:: python

self.client = InfluxDBClient.from_env_properties()

Profile query
^^^^^^^^^^^^^

The `Flux Profiler package <https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/>`_ provides
performance profiling tools for Flux queries and operations.

You can enable printing profiler information of the Flux query in client library by:

- set QueryOptions.profilers in QueryApi,
- set ``INFLUXDB_V2_PROFILERS`` environment variable,
- set ``profilers`` option in configuration file.

When the profiler is enabled, the result of flux query contains additional tables "profiler/\*".
In order to have consistent behaviour with enabled/disabled profiler, ``FluxCSVParser`` excludes "profiler/\*" measurements
from result.

Example how to enable profilers using API:

.. code-block:: python

q = '''
from(bucket: stringParam)
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
|> aggregateWindow(every: 1m, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
p = {
"stringParam": "my-bucket",
}

query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"]))
csv_result = query_api.query(query=q, params=p)


Example of a profiler output:

.. code-block::

===============
Profiler: query
===============

from(bucket: stringParam)
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
|> aggregateWindow(every: 1m, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

========================
Profiler: profiler/query
========================
result : _profiler
table : 0
_measurement : profiler/query
TotalDuration : 8924700
CompileDuration : 350900
QueueDuration : 33800
PlanDuration : 0
RequeueDuration : 0
ExecuteDuration : 8486500
Concurrency : 0
MaxAllocated : 2072
TotalAllocated : 0
flux/query-plan :

digraph {
ReadWindowAggregateByTime11
// every = 1m, aggregates = [mean], createEmpty = true, timeColumn = "_stop"
pivot8
generated_yield

ReadWindowAggregateByTime11 -> pivot8
pivot8 -> generated_yield
}


influxdb/scanned-bytes: 0
influxdb/scanned-values: 0

===========================
Profiler: profiler/operator
===========================
result : _profiler
table : 1
_measurement : profiler/operator
Type : *universe.pivotTransformation
Label : pivot8
Count : 3
MinDuration : 32600
MaxDuration : 126200
DurationSum : 193400
MeanDuration : 64466.666666666664

===========================
Profiler: profiler/operator
===========================
result : _profiler
table : 1
_measurement : profiler/operator
Type : *influxdb.readWindowAggregateSource
Label : ReadWindowAggregateByTime11
Count : 1
MinDuration : 940500
MaxDuration : 940500
DurationSum : 940500
MeanDuration : 940500.0

.. marker-index-end


Expand Down
13 changes: 11 additions & 2 deletions examples/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,24 @@
"""
Query: using Stream
"""
records = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -10m)')
records = query_api.query_stream('''
from(bucket:"my-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
''')

for record in records:
print(f'Temperature in {record["location"]} is {record["_value"]}')

"""
Interrupt a stream after retrieve a required data
"""
large_stream = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -100d)')
large_stream = query_api.query_stream('''
from(bucket:"my-bucket")
|> range(start: -100d)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
''')

for record in large_stream:
if record["location"] == "New York":
print(f'New York temperature: {record["_value"]}')
Expand Down
54 changes: 51 additions & 3 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ class FluxCsvParser(object):
"""Parse to processing response from InfluxDB to FluxStructures or DataFrame."""

def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode,
data_frame_index: List[str] = None) -> None:
data_frame_index: List[str] = None, profilers: List[str] = None) -> None:
"""Initialize defaults."""
self._response = response
self.tables = []
self._serialization_mode = serialization_mode
self._data_frame_index = data_frame_index
self._data_frame_values = []
self._profilers = profilers
pass

def __enter__(self):
Expand Down Expand Up @@ -101,7 +102,9 @@ def _parse_flux_response(self):

# Return already parsed DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
yield self._prepare_data_frame()
df = self._prepare_data_frame()
if not self._is_profiler_table(table):
yield df

start_new_table = True
table = FluxTable()
Expand Down Expand Up @@ -152,6 +155,10 @@ def _parse_flux_response(self):

flux_record = self.parse_record(table_index - 1, table, csv)

if self._is_profiler_record(flux_record):
self._print_profiler_info(flux_record)
continue

if self._serialization_mode is FluxSerializationMode.tables:
self.tables[table_index - 1].records.append(flux_record)

Expand All @@ -164,7 +171,9 @@ def _parse_flux_response(self):

# Return latest DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
yield self._prepare_data_frame()
df = self._prepare_data_frame()
if not self._is_profiler_table(table):
yield df

def _prepare_data_frame(self):
from ..extras import pd
Expand Down Expand Up @@ -256,3 +265,42 @@ def add_column_names_and_tags(table, csv):
def _insert_table(self, table, table_index):
if self._serialization_mode is FluxSerializationMode.tables:
self.tables.insert(table_index, table)

def _is_profiler_record(self, flux_record: FluxRecord) -> bool:
if not self._profilers:
return False

for profiler in self._profilers:
if "_measurement" in flux_record.values and flux_record["_measurement"] == "profiler/" + profiler:
return True

return False

def _is_profiler_table(self, table: FluxTable) -> bool:

if not self._profilers:
return False

return any(filter(lambda column: (column.default_value == "_profiler" and column.label == "result"),
table.columns))

def table_list(self) -> List[FluxTable]:
"""Get the list of flux tables."""
if not self._profilers:
return self.tables
else:
return list(filter(lambda table: not self._is_profiler_table(table), self.tables))

@staticmethod
def _print_profiler_info(flux_record: FluxRecord):
if flux_record.get_measurement().startswith("profiler/"):
msg = "Profiler: " + flux_record.get_measurement()
print("\n" + len(msg) * "=")
print(msg)
print(len(msg) * "=")
for name in flux_record.values:
val = flux_record[name]
if isinstance(val, str) and len(val) > 50:
print(f"{name:<20}: \n\n{val}")
elif val is not None:
print(f"{name:<20}: {val:<20}")
29 changes: 24 additions & 5 deletions influxdb_client/client/influxdb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from influxdb_client.client.delete_api import DeleteApi
from influxdb_client.client.labels_api import LabelsApi
from influxdb_client.client.organizations_api import OrganizationsApi
from influxdb_client.client.query_api import QueryApi
from influxdb_client.client.query_api import QueryApi, QueryOptions
from influxdb_client.client.tasks_api import TasksApi
from influxdb_client.client.users_api import UsersApi
from influxdb_client.client.write_api import WriteApi, WriteOptions, PointSettings
Expand Down Expand Up @@ -45,6 +45,7 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or
:key bool auth_basic: Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that
does not use auth-enabled but is protected by a reverse proxy with basic authentication.
(defaults to false, don't set to true when talking to InfluxDB 2)
:key list[str] profilers: list of enabled Flux profilers
"""
self.url = url
self.token = token
Expand Down Expand Up @@ -75,6 +76,8 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or

retries = kwargs.get('retries', False)

self.profilers = kwargs.get('profilers', None)

self.api_client = ApiClient(configuration=conf, header_name=auth_header_name,
header_value=auth_header_value, retries=retries)

Expand Down Expand Up @@ -111,6 +114,8 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz
- ssl_ca_cert
- connection_pool_maxsize
- auth_basic
- profilers


config.ini example::

Expand All @@ -121,6 +126,7 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz
timeout=6000
connection_pool_maxsize=25
auth_basic=false
profilers=query,operator

[tags]
id = 132-987-655
Expand All @@ -136,6 +142,7 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz
timeout = 6000
connection_pool_maxsize = 25
auth_basic = false
profilers="query, operator"

[tags]
id = "132-987-655"
Expand Down Expand Up @@ -181,9 +188,14 @@ def config_value(key: str):
tags = {k: v.strip('"') for k, v in config.items('tags')}
default_tags = dict(tags)

profilers = None
if config.has_option('influx2', 'profilers'):
profilers = [x.strip() for x in config_value('profilers').split(',')]

return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,
enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic))
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
profilers=profilers)

@classmethod
def from_env_properties(cls, debug=None, enable_gzip=False):
Expand All @@ -209,6 +221,11 @@ def from_env_properties(cls, debug=None, enable_gzip=False):
connection_pool_maxsize = os.getenv('INFLUXDB_V2_CONNECTION_POOL_MAXSIZE', None)
auth_basic = os.getenv('INFLUXDB_V2_AUTH_BASIC', "False")

prof = os.getenv("INFLUXDB_V2_PROFILERS", None)
profilers = None
if prof is not None:
profilers = [x.strip() for x in prof.split(',')]

default_tags = dict()

for key, value in os.environ.items():
Expand All @@ -217,7 +234,8 @@ def from_env_properties(cls, debug=None, enable_gzip=False):

return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,
enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic))
connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
profilers=profilers)

def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi:
"""
Expand All @@ -229,13 +247,14 @@ def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()
"""
return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings)

def query_api(self) -> QueryApi:
def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi:
"""
Create a Query API instance.

:param query_options: optional query api configuration
:return: Query api instance
"""
return QueryApi(self)
return QueryApi(self, query_options)

def close(self):
"""Shutdown the client."""
Expand Down
Loading