diff --git a/README.rst b/README.rst
index 4044adec..db73e809 100644
--- a/README.rst
+++ b/README.rst
@@ -176,6 +176,7 @@ The following options are supported:
- ``ssl_ca_cert`` - set this to customize the certificate file to verify the peer
- ``connection_pool_maxsize`` - set the number of connections to save that can be reused by urllib3
- ``auth_basic`` - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
+- ``profilers`` - set the list of enabled `Flux profilers `_
.. code-block:: python
@@ -204,11 +205,122 @@ Supported properties are:
- ``INFLUXDB_V2_SSL_CA_CERT`` - set this to customize the certificate file to verify the peer
- ``INFLUXDB_V2_CONNECTION_POOL_MAXSIZE`` - set the number of connections to save that can be reused by urllib3
- ``INFLUXDB_V2_AUTH_BASIC`` - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
+- ``INFLUXDB_V2_PROFILERS`` - set the list of enabled `Flux profilers `_
.. code-block:: python
self.client = InfluxDBClient.from_env_properties()
+Profile query
+^^^^^^^^^^^^^
+
+The `Flux Profiler package `_ provides
+performance profiling tools for Flux queries and operations.
+
+You can enable printing profiler information of the Flux query in client library by:
+
+- set QueryOptions.profilers in QueryApi,
+- set ``INFLUXDB_V2_PROFILERS`` environment variable,
+- set ``profilers`` option in configuration file.
+
+When the profiler is enabled, the result of flux query contains additional tables "profiler/\*".
+In order to have consistent behaviour with enabled/disabled profiler, ``FluxCSVParser`` excludes "profiler/\*" measurements
+from result.
+
+Example how to enable profilers using API:
+
+.. code-block:: python
+
+ q = '''
+ from(bucket: stringParam)
+ |> range(start: -5m, stop: now())
+ |> filter(fn: (r) => r._measurement == "mem")
+ |> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
+ |> aggregateWindow(every: 1m, fn: mean)
+ |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
+ '''
+ p = {
+ "stringParam": "my-bucket",
+ }
+
+ query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"]))
+ csv_result = query_api.query(query=q, params=p)
+
+
+Example of a profiler output:
+
+.. code-block::
+
+ ===============
+ Profiler: query
+ ===============
+
+ from(bucket: stringParam)
+ |> range(start: -5m, stop: now())
+ |> filter(fn: (r) => r._measurement == "mem")
+ |> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
+ |> aggregateWindow(every: 1m, fn: mean)
+ |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
+
+ ========================
+ Profiler: profiler/query
+ ========================
+ result : _profiler
+ table : 0
+ _measurement : profiler/query
+ TotalDuration : 8924700
+ CompileDuration : 350900
+ QueueDuration : 33800
+ PlanDuration : 0
+ RequeueDuration : 0
+ ExecuteDuration : 8486500
+ Concurrency : 0
+ MaxAllocated : 2072
+ TotalAllocated : 0
+ flux/query-plan :
+
+ digraph {
+ ReadWindowAggregateByTime11
+ // every = 1m, aggregates = [mean], createEmpty = true, timeColumn = "_stop"
+ pivot8
+ generated_yield
+
+ ReadWindowAggregateByTime11 -> pivot8
+ pivot8 -> generated_yield
+ }
+
+
+ influxdb/scanned-bytes: 0
+ influxdb/scanned-values: 0
+
+ ===========================
+ Profiler: profiler/operator
+ ===========================
+ result : _profiler
+ table : 1
+ _measurement : profiler/operator
+ Type : *universe.pivotTransformation
+ Label : pivot8
+ Count : 3
+ MinDuration : 32600
+ MaxDuration : 126200
+ DurationSum : 193400
+ MeanDuration : 64466.666666666664
+
+ ===========================
+ Profiler: profiler/operator
+ ===========================
+ result : _profiler
+ table : 1
+ _measurement : profiler/operator
+ Type : *influxdb.readWindowAggregateSource
+ Label : ReadWindowAggregateByTime11
+ Count : 1
+ MinDuration : 940500
+ MaxDuration : 940500
+ DurationSum : 940500
+ MeanDuration : 940500.0
+
.. marker-index-end
diff --git a/examples/query.py b/examples/query.py
index ea2fff10..b9cc3ec9 100644
--- a/examples/query.py
+++ b/examples/query.py
@@ -62,7 +62,11 @@
"""
Query: using Stream
"""
- records = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -10m)')
+ records = query_api.query_stream('''
+ from(bucket:"my-bucket")
+ |> range(start: -10m)
+ |> filter(fn: (r) => r["_measurement"] == "my_measurement")
+ ''')
for record in records:
print(f'Temperature in {record["location"]} is {record["_value"]}')
@@ -70,7 +74,12 @@
"""
Interrupt a stream after retrieve a required data
"""
- large_stream = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -100d)')
+ large_stream = query_api.query_stream('''
+ from(bucket:"my-bucket")
+ |> range(start: -100d)
+ |> filter(fn: (r) => r["_measurement"] == "my_measurement")
+ ''')
+
for record in large_stream:
if record["location"] == "New York":
print(f'New York temperature: {record["_value"]}')
diff --git a/influxdb_client/client/flux_csv_parser.py b/influxdb_client/client/flux_csv_parser.py
index 4fbb5693..00ae04d9 100644
--- a/influxdb_client/client/flux_csv_parser.py
+++ b/influxdb_client/client/flux_csv_parser.py
@@ -46,13 +46,14 @@ class FluxCsvParser(object):
"""Parse to processing response from InfluxDB to FluxStructures or DataFrame."""
def __init__(self, response: HTTPResponse, serialization_mode: FluxSerializationMode,
- data_frame_index: List[str] = None) -> None:
+ data_frame_index: List[str] = None, profilers: List[str] = None) -> None:
"""Initialize defaults."""
self._response = response
self.tables = []
self._serialization_mode = serialization_mode
self._data_frame_index = data_frame_index
self._data_frame_values = []
+ self._profilers = profilers
pass
def __enter__(self):
@@ -101,7 +102,9 @@ def _parse_flux_response(self):
# Return already parsed DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
- yield self._prepare_data_frame()
+ df = self._prepare_data_frame()
+ if not self._is_profiler_table(table):
+ yield df
start_new_table = True
table = FluxTable()
@@ -152,6 +155,10 @@ def _parse_flux_response(self):
flux_record = self.parse_record(table_index - 1, table, csv)
+ if self._is_profiler_record(flux_record):
+ self._print_profiler_info(flux_record)
+ continue
+
if self._serialization_mode is FluxSerializationMode.tables:
self.tables[table_index - 1].records.append(flux_record)
@@ -164,7 +171,9 @@ def _parse_flux_response(self):
# Return latest DataFrame
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
- yield self._prepare_data_frame()
+ df = self._prepare_data_frame()
+ if not self._is_profiler_table(table):
+ yield df
def _prepare_data_frame(self):
from ..extras import pd
@@ -256,3 +265,42 @@ def add_column_names_and_tags(table, csv):
def _insert_table(self, table, table_index):
if self._serialization_mode is FluxSerializationMode.tables:
self.tables.insert(table_index, table)
+
+ def _is_profiler_record(self, flux_record: FluxRecord) -> bool:
+ if not self._profilers:
+ return False
+
+ for profiler in self._profilers:
+ if "_measurement" in flux_record.values and flux_record["_measurement"] == "profiler/" + profiler:
+ return True
+
+ return False
+
+ def _is_profiler_table(self, table: FluxTable) -> bool:
+
+ if not self._profilers:
+ return False
+
+ return any(filter(lambda column: (column.default_value == "_profiler" and column.label == "result"),
+ table.columns))
+
+ def table_list(self) -> List[FluxTable]:
+ """Get the list of flux tables."""
+ if not self._profilers:
+ return self.tables
+ else:
+ return list(filter(lambda table: not self._is_profiler_table(table), self.tables))
+
+ @staticmethod
+ def _print_profiler_info(flux_record: FluxRecord):
+ if flux_record.get_measurement().startswith("profiler/"):
+ msg = "Profiler: " + flux_record.get_measurement()
+ print("\n" + len(msg) * "=")
+ print(msg)
+ print(len(msg) * "=")
+ for name in flux_record.values:
+ val = flux_record[name]
+ if isinstance(val, str) and len(val) > 50:
+ print(f"{name:<20}: \n\n{val}")
+ elif val is not None:
+ print(f"{name:<20}: {val:<20}")
diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py
index 1fbd1d41..d56b79f1 100644
--- a/influxdb_client/client/influxdb_client.py
+++ b/influxdb_client/client/influxdb_client.py
@@ -12,7 +12,7 @@
from influxdb_client.client.delete_api import DeleteApi
from influxdb_client.client.labels_api import LabelsApi
from influxdb_client.client.organizations_api import OrganizationsApi
-from influxdb_client.client.query_api import QueryApi
+from influxdb_client.client.query_api import QueryApi, QueryOptions
from influxdb_client.client.tasks_api import TasksApi
from influxdb_client.client.users_api import UsersApi
from influxdb_client.client.write_api import WriteApi, WriteOptions, PointSettings
@@ -45,6 +45,7 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or
:key bool auth_basic: Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that
does not use auth-enabled but is protected by a reverse proxy with basic authentication.
(defaults to false, don't set to true when talking to InfluxDB 2)
+ :key list[str] profilers: list of enabled Flux profilers
"""
self.url = url
self.token = token
@@ -75,6 +76,8 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or
retries = kwargs.get('retries', False)
+ self.profilers = kwargs.get('profilers', None)
+
self.api_client = ApiClient(configuration=conf, header_name=auth_header_name,
header_value=auth_header_value, retries=retries)
@@ -111,6 +114,8 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz
- ssl_ca_cert
- connection_pool_maxsize
- auth_basic
+ - profilers
+
config.ini example::
@@ -121,6 +126,7 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz
timeout=6000
connection_pool_maxsize=25
auth_basic=false
+ profilers=query,operator
[tags]
id = 132-987-655
@@ -136,6 +142,7 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz
timeout = 6000
connection_pool_maxsize = 25
auth_basic = false
+ profilers="query, operator"
[tags]
id = "132-987-655"
@@ -181,9 +188,14 @@ def config_value(key: str):
tags = {k: v.strip('"') for k, v in config.items('tags')}
default_tags = dict(tags)
+ profilers = None
+ if config.has_option('influx2', 'profilers'):
+ profilers = [x.strip() for x in config_value('profilers').split(',')]
+
return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,
enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,
- connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic))
+ connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
+ profilers=profilers)
@classmethod
def from_env_properties(cls, debug=None, enable_gzip=False):
@@ -209,6 +221,11 @@ def from_env_properties(cls, debug=None, enable_gzip=False):
connection_pool_maxsize = os.getenv('INFLUXDB_V2_CONNECTION_POOL_MAXSIZE', None)
auth_basic = os.getenv('INFLUXDB_V2_AUTH_BASIC', "False")
+ prof = os.getenv("INFLUXDB_V2_PROFILERS", None)
+ profilers = None
+ if prof is not None:
+ profilers = [x.strip() for x in prof.split(',')]
+
default_tags = dict()
for key, value in os.environ.items():
@@ -217,7 +234,8 @@ def from_env_properties(cls, debug=None, enable_gzip=False):
return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,
enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,
- connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic))
+ connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),
+ profilers=profilers)
def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi:
"""
@@ -229,13 +247,14 @@ def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()
"""
return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings)
- def query_api(self) -> QueryApi:
+ def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi:
"""
Create a Query API instance.
+ :param query_options: optional query api configuration
:return: Query api instance
"""
- return QueryApi(self)
+ return QueryApi(self, query_options)
def close(self):
"""Shutdown the client."""
diff --git a/influxdb_client/client/query_api.py b/influxdb_client/client/query_api.py
index 56831721..18f477a1 100644
--- a/influxdb_client/client/query_api.py
+++ b/influxdb_client/client/query_api.py
@@ -10,26 +10,40 @@
from typing import List, Generator, Any
from influxdb_client import Dialect, IntegerLiteral, BooleanLiteral, FloatLiteral, DateTimeLiteral, StringLiteral, \
- VariableAssignment, Identifier, OptionStatement, File, DurationLiteral, Duration, UnaryExpression
+ VariableAssignment, Identifier, OptionStatement, File, DurationLiteral, Duration, UnaryExpression, \
+ ImportDeclaration, MemberAssignment, MemberExpression, ArrayExpression
from influxdb_client import Query, QueryService
from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode
from influxdb_client.client.flux_table import FluxTable, FluxRecord
from influxdb_client.client.util.date_utils import get_date_helper
+class QueryOptions(object):
+ """Query options."""
+
+ def __init__(self, profilers: List[str] = None) -> None:
+ """
+ Initialize query options.
+
+ :param profilers: list of enabled flux profilers
+ """
+ self.profilers = profilers
+
+
class QueryApi(object):
"""Implementation for '/api/v2/query' endpoint."""
default_dialect = Dialect(header=True, delimiter=",", comment_prefix="#",
annotations=["datatype", "group", "default"], date_time_format="RFC3339")
- def __init__(self, influxdb_client):
+ def __init__(self, influxdb_client, query_options=QueryOptions()):
"""
Initialize query client.
:param influxdb_client: influxdb client
"""
self._influxdb_client = influxdb_client
+ self._query_options = query_options
self._query_api = QueryService(influxdb_client.api_client)
def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect, params: dict = None):
@@ -82,11 +96,12 @@ def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']:
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)
- _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables)
+ _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables,
+ profilers=self._profilers())
list(_parser.generator())
- return _parser.tables
+ return _parser.table_list()
def query_stream(self, query: str, org=None, params: dict = None) -> Generator['FluxRecord', Any, None]:
"""
@@ -102,8 +117,8 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator['
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)
-
- _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream)
+ _parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream,
+ profilers=self._profilers())
return _parser.generator()
@@ -150,14 +165,27 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
async_req=False, _preload_content=False, _return_http_data_only=False)
_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
- data_frame_index=data_frame_index)
+ data_frame_index=data_frame_index,
+ profilers=self._profilers())
return _parser.generator()
- # private helper for c
- @staticmethod
- def _create_query(query, dialect=default_dialect, params: dict = None):
- created = Query(query=query, dialect=dialect, extern=QueryApi._build_flux_ast(params))
- return created
+ def _profilers(self):
+ if self._query_options and self._query_options.profilers:
+ return self._query_options.profilers
+ else:
+ return self._influxdb_client.profilers
+
+ def _create_query(self, query, dialect=default_dialect, params: dict = None):
+ profilers = self._profilers()
+ q = Query(query=query, dialect=dialect, extern=QueryApi._build_flux_ast(params, profilers))
+
+ if profilers:
+ print("\n===============")
+ print("Profiler: query")
+ print("===============")
+ print(query)
+
+ return q
@staticmethod
def _params_to_extern_ast(params: dict) -> List['OptionStatement']:
@@ -177,7 +205,6 @@ def _params_to_extern_ast(params: dict) -> List['OptionStatement']:
value = get_date_helper().to_utc(value)
literal = DateTimeLiteral("DateTimeLiteral", value.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
elif isinstance(value, timedelta):
- # convert to microsecodns
_micro_delta = int(value / timedelta(microseconds=1))
if _micro_delta < 0:
literal = UnaryExpression("UnaryExpression", argument=DurationLiteral("DurationLiteral", [
@@ -195,11 +222,40 @@ def _params_to_extern_ast(params: dict) -> List['OptionStatement']:
return statements
@staticmethod
- def _build_flux_ast(params: dict = None):
- if params is None:
- return None
+ def _build_flux_ast(params: dict = None, profilers: List[str] = None):
+
+ imports = []
+ body = []
+
+ if profilers is not None and len(profilers) > 0:
+ imports.append(ImportDeclaration(
+ "ImportDeclaration",
+ path=StringLiteral("StringLiteral", "profiler")))
+
+ elements = []
+ for profiler in profilers:
+ elements.append(StringLiteral("StringLiteral", value=profiler))
+
+ member = MemberExpression(
+ "MemberExpression",
+ object=Identifier("Identifier", "profiler"),
+ _property=Identifier("Identifier", "enabledProfilers"))
+
+ prof = OptionStatement(
+ "OptionStatement",
+ assignment=MemberAssignment(
+ "MemberAssignment",
+ member=member,
+ init=ArrayExpression(
+ "ArrayExpression",
+ elements=elements)))
+
+ body.append(prof)
+
+ if params is not None:
+ body.extend(QueryApi._params_to_extern_ast(params))
- return File(package=None, name=None, type=None, imports=[], body=QueryApi._params_to_extern_ast(params))
+ return File(package=None, name=None, type=None, imports=imports, body=body)
def __del__(self):
"""Close QueryAPI."""
diff --git a/tests/base_test.py b/tests/base_test.py
index a2db8140..131d1457 100644
--- a/tests/base_test.py
+++ b/tests/base_test.py
@@ -24,7 +24,7 @@ def setUp(self) -> None:
self.auth_token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")
self.org = os.getenv('INFLUXDB_V2_ORG', "my-org")
- self.client = InfluxDBClient(url=self.host, token=self.auth_token, debug=self.conf.debug, org=self.org)
+ self.client = InfluxDBClient(url=self.host, token=self.auth_token, debug=self.debug, org=self.org)
self.api_client = self.client.api_client
self.query_api = self.client.query_api()
diff --git a/tests/config.ini b/tests/config.ini
index e3a58cd0..32fc6d0c 100644
--- a/tests/config.ini
+++ b/tests/config.ini
@@ -5,6 +5,7 @@ token=my-token
timeout=6000
connection_pool_maxsize=55
auth_basic=false
+profilers=query, operator
[tags]
id = 132-987-655
diff --git a/tests/config.toml b/tests/config.toml
index af68b325..d0fc7cba 100644
--- a/tests/config.toml
+++ b/tests/config.toml
@@ -6,6 +6,7 @@
timeout = 6000
connection_pool_maxsize = 55
auth_basic = False
+ profilers = "query, operator"
[tags]
id = "132-987-655"
diff --git a/tests/test_InfluxDBClient.py b/tests/test_InfluxDBClient.py
index dc798fdc..faae6fef 100644
--- a/tests/test_InfluxDBClient.py
+++ b/tests/test_InfluxDBClient.py
@@ -73,6 +73,7 @@ def assertConfig(self):
self.assertEqual("${env.data_center}", self.client.default_tags["data_center"])
self.assertEqual(55, self.client.api_client.configuration.connection_pool_maxsize)
self.assertEqual(False, self.client.api_client.configuration.auth_basic)
+ self.assertEqual(["query", "operator"], self.client.profilers)
def test_init_from_file_ssl_default(self):
self.client = InfluxDBClient.from_config_file(f'{os.path.dirname(__file__)}/config.ini')
diff --git a/tests/test_QueryApi.py b/tests/test_QueryApi.py
index be2dd8a1..9a2a582f 100644
--- a/tests/test_QueryApi.py
+++ b/tests/test_QueryApi.py
@@ -2,13 +2,28 @@
import json
import unittest
-from influxdb_client import QueryApi, DurationLiteral, Duration, CallExpression, Expression, UnaryExpression, Identifier
+from dateutil.tz import tzutc
+from httpretty import httpretty
+
+from influxdb_client import QueryApi, DurationLiteral, Duration, CallExpression, Expression, UnaryExpression, \
+ Identifier, InfluxDBClient
+from influxdb_client.client.query_api import QueryOptions
from influxdb_client.client.util.date_utils import get_date_helper
from tests.base_test import BaseTest
class SimpleQueryTest(BaseTest):
+ def setUp(self) -> None:
+ super(SimpleQueryTest, self).setUp()
+
+ httpretty.enable()
+ httpretty.reset()
+
+ def tearDown(self) -> None:
+ self.client.close()
+ httpretty.disable()
+
def test_query_raw(self):
client = self.client
@@ -258,6 +273,194 @@ def test_parameter_ast(self):
self.assertEqual(json.dumps(got_sanitized, sort_keys=True, indent=2),
json.dumps(data[2], sort_keys=True, indent=2))
+ def test_query_profiler_enabled(self):
+ q = '''
+ from(bucket:stringParam)
+ |> range(start: 0, stop: callParam) |> last()
+ '''
+ p = {
+ "stringParam": "my-bucket",
+ "stopParam": get_date_helper().parse_date("2021-03-20T15:59:10.607352Z"),
+ "durationParam": DurationLiteral("DurationLiteral", [Duration(magnitude=1, unit="d")]),
+ "callParam": CallExpression(type="CallExpression", callee=Identifier(type="Identifier", name="now")),
+ }
+ query_api = self.client.query_api(query_options=QueryOptions(profilers=["query", "operator"]))
+ csv_result = query_api.query(query=q, params=p)
+
+ for table in csv_result:
+ self.assertFalse(any(filter(lambda column: (column.default_value == "_profiler"), table.columns)))
+ for flux_record in table:
+ self.assertFalse( flux_record["_measurement"].startswith("profiler/"))
+
+ records = self.client.query_api().query_stream(query=q, params=p)
+
+ for flux_record in records:
+ self.assertFalse(flux_record["_measurement"].startswith("profiler/"))
+
+ self.assertIsNotNone(csv_result)
+
+ def test_query_profiler_present(self):
+
+ client = self.client
+ q = '''
+ import "profiler"
+
+ option profiler.enabledProfilers = ["query", "operator"]
+
+ from(bucket:stringParam)
+ |> range(start: 0, stop: callParam) |> last()
+ '''
+
+ p = {
+ "stringParam": "my-bucket",
+ "stopParam": get_date_helper().parse_date("2021-03-20T15:59:10.607352Z"),
+ "durationParam": DurationLiteral("DurationLiteral", [Duration(magnitude=1, unit="d")]),
+ "callParam": CallExpression(type="CallExpression", callee=Identifier(type="Identifier", name="now")),
+ }
+ csv_result = client.query_api(query_options=QueryOptions(profilers=None)).query(query=q, params=p)
+ self.assertIsNotNone(csv_result)
+
+ found_profiler_table = False
+ found_profiler_records = False
+
+ for table in csv_result:
+ if any(filter(lambda column: (column.default_value == "_profiler"), table.columns)):
+ found_profiler_table = True
+ print(f"Profiler table : {table} ")
+ for flux_record in table:
+ if flux_record["_measurement"].startswith("profiler/"):
+ found_profiler_records = True
+ print(f"Profiler record: {flux_record}")
+
+ self.assertTrue(found_profiler_table)
+ self.assertTrue(found_profiler_records)
+
+ records = client.query_api().query_stream(query=q, params=p)
+
+ found_profiler_records = False
+ for flux_record in records:
+ if flux_record["_measurement"].startswith("profiler/"):
+ found_profiler_records = True
+ print(f"Profiler record: {flux_record}")
+ self.assertTrue(found_profiler_records)
+
+ def test_profiler_ast(self):
+
+ expect = {
+ "body": [
+ {
+ "assignment": {
+ "init": {
+ "elements": [
+ {
+ "type": "StringLiteral",
+ "value": "first-profiler"
+ },
+ {
+ "type": "StringLiteral",
+ "value": "second-profiler"
+ }
+ ],
+ "type": "ArrayExpression"
+ },
+ "member": {
+ "object": {
+ "name": "profiler",
+ "type": "Identifier"
+ },
+ "property": {
+ "name": "enabledProfilers",
+ "type": "Identifier"
+ },
+ "type": "MemberExpression"
+ },
+ "type": "MemberAssignment"
+ },
+ "type": "OptionStatement"
+ }
+ ],
+ "imports": [
+ {
+ "path": {
+ "type": "StringLiteral",
+ "value": "profiler"
+ },
+ "type": "ImportDeclaration"
+ }
+ ]
+ }
+
+ ast = QueryApi._build_flux_ast(params=None, profilers=["first-profiler", "second-profiler"])
+ got_sanitized = self.client.api_client.sanitize_for_serialization(ast)
+ print(json.dumps(got_sanitized, sort_keys=True, indent=2))
+
+ self.assertEqual(json.dumps(got_sanitized, sort_keys=True, indent=2),
+ json.dumps(expect, sort_keys=True, indent=2))
+
+ def test_profiler_mock(self):
+
+ query_response = """#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double,double,double
+#group,false,false,true,true,false,true,true,false,false,false
+#default,_result,,,,,,,,,
+,result,table,_start,_stop,_time,_measurement,host,available,free,used
+,,0,2021-05-24T08:40:44.7850004Z,2021-05-24T08:45:44.7850004Z,2021-05-24T08:41:00Z,mem,kozel.local,5832097792,317063168,11347771392
+,,0,2021-05-24T08:40:44.7850004Z,2021-05-24T08:45:44.7850004Z,2021-05-24T08:42:00Z,mem,kozel.local,5713765717.333333,118702080,11466103466.666666
+,,0,2021-05-24T08:40:44.7850004Z,2021-05-24T08:45:44.7850004Z,2021-05-24T08:43:00Z,mem,kozel.local,5776302080,135763968,11403567104
+,,0,2021-05-24T08:40:44.7850004Z,2021-05-24T08:45:44.7850004Z,2021-05-24T08:44:00Z,mem,kozel.local,5758485162.666667,85798229.33333333,11421384021.333334
+,,0,2021-05-24T08:40:44.7850004Z,2021-05-24T08:45:44.7850004Z,2021-05-24T08:45:00Z,mem,kozel.local,5788656981.333333,119243434.66666667,11391212202.666666
+,,0,2021-05-24T08:40:44.7850004Z,2021-05-24T08:45:44.7850004Z,2021-05-24T08:45:44.7850004Z,mem,kozel.local,5727718400,35330048,11452150784
+
+#datatype,string,long,string,long,long,long,long,long,long,long,long,long,string,string,long,long
+#group,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false
+#default,_profiler,,,,,,,,,,,,,,,
+,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,flux/query-plan,influxdb/scanned-bytes,influxdb/scanned-values
+,,0,profiler/query,8924700,350900,33800,0,0,8486500,0,2072,0,,"digraph {
+ ReadWindowAggregateByTime11
+ // every = 1m, aggregates = [mean], createEmpty = true, timeColumn = ""_stop""
+ pivot8
+ generated_yield
+
+ ReadWindowAggregateByTime11 -> pivot8
+ pivot8 -> generated_yield
+}
+
+",0,0
+
+#datatype,string,long,string,string,string,long,long,long,long,double
+#group,false,false,true,false,false,false,false,false,false,false
+#default,_profiler,,,,,,,,,
+,result,table,_measurement,Type,Label,Count,MinDuration,MaxDuration,DurationSum,MeanDuration
+,,1,profiler/operator,*universe.pivotTransformation,pivot8,3,32600,126200,193400,64466.666666666664
+,,1,profiler/operator,*influxdb.readWindowAggregateSource,ReadWindowAggregateByTime11,1,940500,940500,940500,940500
+"""
+
+ query = """
+ from(bucket: "my-bucket")
+ |> range(start: -5m, stop: now())
+ |> filter(fn: (r) => r._measurement == "mem")
+ |> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
+ |> aggregateWindow(every: 1m, fn: mean)
+ |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
+ """
+
+ httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body=query_response)
+ self.client = InfluxDBClient("http://localhost", "my-token", org="my-org", enable_gzip=False)
+ query_api = self.client.query_api(query_options=QueryOptions(profilers=["query", "operator"]))
+ tables = query_api.query(query=query)
+ self.assertEquals(len(tables), 1)
+ self.assertEquals(len(tables[0].columns), 10)
+ self.assertEquals(len(tables[0].records), 6)
+
+ self.assertEquals(tables[0].records[5].values,
+ {'result': '_result', 'table': 0,
+ '_start': datetime.datetime(2021, 5, 24, 8, 40, 44, 785000, tzinfo=tzutc()),
+ '_stop': datetime.datetime(2021, 5, 24, 8, 45, 44, 785000, tzinfo=tzutc()),
+ '_time': datetime.datetime(2021, 5, 24, 8, 45, 44, 785000, tzinfo=tzutc()),
+ '_measurement': 'mem',
+ 'host': 'kozel.local',
+ 'available': 5727718400, 'free': 35330048,
+ 'used': 11452150784})
+
if __name__ == '__main__':
unittest.main()