From f127af5bd57985c389c8f188332335780079dce8 Mon Sep 17 00:00:00 2001 From: Jason Stirnaman Date: Wed, 29 May 2024 18:17:04 -0500 Subject: [PATCH] fix: reference: python v3 client library examples should use write_options, not WriteOptions Fixes #5475 --- .../reference/client-libraries/v3/python.md | 913 +++++++++++------ .../reference/client-libraries/v3/python.md | 922 +++++++++++------ .../reference/client-libraries/v3/python.md | 924 ++++++++++++------ 3 files changed, 1864 insertions(+), 895 deletions(-) diff --git a/content/influxdb/cloud-dedicated/reference/client-libraries/v3/python.md b/content/influxdb/cloud-dedicated/reference/client-libraries/v3/python.md index e2123e7290..2fdd4e9f97 100644 --- a/content/influxdb/cloud-dedicated/reference/client-libraries/v3/python.md +++ b/content/influxdb/cloud-dedicated/reference/client-libraries/v3/python.md @@ -7,33 +7,49 @@ menu: name: Python parent: v3 client libraries identifier: influxdb3-python -influxdb/cloud-dedicated/tags: [python, gRPC, SQL, client libraries] +influxdb/cloud-dedicated/tags: [Flight API, python, gRPC, SQL, client libraries] +metadata: [influxdb3-python v0.5.0] weight: 201 aliases: - /influxdb/cloud-dedicated/reference/client-libraries/v3/pyinflux3/ related: - /influxdb/cloud-dedicated/query-data/execute-queries/troubleshoot/ list_code_example: > + + + ```python + from influxdb_client_3 import(InfluxDBClient3, + WriteOptions, + write_client_options) + + # Instantiate batch writing options for the client - # Instantiate an InfluxDB client configured for a database + write_options = WriteOptions() + wco = write_client_options(write_options=write_options) - client = InfluxDBClient3( - "https://{{< influxdb/host >}}", - database="DATABASE_NAME", - token="DATABASE_TOKEN") + # Instantiate an InfluxDB v3 client - # Execute the query and retrieve data formatted as a PyArrow Table + with InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN", + write_client_options=wco) as client: - table = client.query( - '''SELECT * - FROM home - WHERE time >= now() - INTERVAL '90 days' - ORDER BY time''' - ) + # Write data in batches + client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time', + tag_columns=["room"]) - ``` + # Execute a query and retrieve data formatted as a PyArrow Table + + table = client.query( + '''SELECT * + FROM home + WHERE time >= now() - INTERVAL '90 days' + ORDER BY time''') + ``` --- The InfluxDB v3 [`influxdb3-python` Python client library](https://github.com/InfluxCommunity/influxdb3-python) @@ -44,20 +60,32 @@ Client libraries can be used to construct line protocol data, transform data fro to line protocol, and batch write line protocol data to InfluxDB HTTP APIs. InfluxDB v3 client libraries can query {{% product-name %}} using SQL or InfluxQL. -The `influxdb3-python` Python client library wraps the Apache Arrow `pyarrow.flight` client +The `influxdb3-python` Python client library wraps the Apache Arrow `pyarrow.flight` client in a convenient InfluxDB v3 interface for executing SQL and InfluxQL queries, requesting server metadata, and retrieving data from {{% product-name %}} using the Flight protocol with gRPC. - +{{% note %}} +Code samples in this page use the [Get started home sensor sample data](/influxdb/cloud-dedicated/reference/sample-data/#get-started-home-sensor-data). +{{% /note %}} - [Installation](#installation) - [Importing the module](#importing-the-module) - [API reference](#api-reference) - [Classes](#classes) - [Class InfluxDBClient3](#class-influxdbclient3) + - [Parameters](#parameters) + - [Writing modes](#writing-modes) + - [InfluxDBClient3 instance methods](#influxdbclient3-instance-methods) + - [InfluxDBClient3.write](#influxdbclient3write) + - [InfluxDBClient3.write_file](#influxdbclient3write_file) + - [InfluxDBClient3.query](#influxdbclient3query) + - [InfluxDBClient3.close](#influxdbclient3close) - [Class Point](#class-point) - [Class WriteOptions](#class-writeoptions) + - [Parameters](#parameters-4) - [Functions](#functions) + - [Function write_client_options(\*\*kwargs)](#function-write_client_optionskwargs) + - [Function flight_client_options(\*\*kwargs)](#function-flight_client_optionskwargs) - [Constants](#constants) - [Exceptions](#exceptions) @@ -87,17 +115,15 @@ from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions ``` - [`influxdb_client_3.InfluxDBClient3`](#class-influxdbclient3): a class for interacting with InfluxDB -- [`influxdb_client_3.Point`](#class-point):a class for constructing a time series data -point +- [`influxdb_client_3.Point`](#class-point): a class for constructing a time series data + point - `influxdb_client_3.WriteOptions`: a class for configuring client -write options. + write options. ## API reference The `influxdb_client_3` module includes the following classes and functions. - - - [Classes](#classes) - [Functions](#functions) - [Constants](#constants) @@ -106,117 +132,103 @@ The `influxdb_client_3` module includes the following classes and functions. ## Classes - [Class InfluxDBClient3](#class-influxdbclient3) + - [Parameters](#parameters) + - [Writing modes](#writing-modes) + - [InfluxDBClient3 instance methods](#influxdbclient3-instance-methods) - [InfluxDBClient3.write](#influxdbclient3write) - [InfluxDBClient3.write_file](#influxdbclient3write_file) - [InfluxDBClient3.query](#influxdbclient3query) - [InfluxDBClient3.close](#influxdbclient3close) - [Class Point](#class-point) - [Class WriteOptions](#class-writeoptions) + - [Parameters](#parameters-4) ## Class InfluxDBClient3 Provides an interface for interacting with InfluxDB APIs for writing and querying data. -### Syntax - -```python -__init__(self, host=None, org=None, database=None, token=None, - write_client_options=None, flight_client_options=None, **kwargs) -``` - -Initializes and returns an `InfluxDBClient3` instance with the following: +The `InfluxDBClient3` constructor initializes and returns a client instance with the following: - A singleton _write client_ configured for writing to the database. - A singleton _Flight client_ configured for querying the database. ### Parameters -- **org** (str): The organization name (for {{% product-name %}}, set this to an empty string (`""`)). -- **database** (str): The database to use for writing and querying. -- **write_client_options** (dict): Options to use when writing to InfluxDB. +- **`host`** (string): The host URL of the InfluxDB instance. +- **`database`** (string): The database to use for writing and querying. +- **`token`** (string): A database token with read/write permissions. +- _Optional_ **`write_client_options`** (dict): Options to use when writing to InfluxDB. If `None`, writes are [synchronous](#synchronous-writing). -- **flight_client_options** (dict): Options to use when querying InfluxDB. - -#### Non-batch writing +- _Optional_ **`flight_client_options`** (dict): Options to use when querying InfluxDB. -When writing data in non-batching mode, the client immediately tries to write the data, doesn't retry failed requests, and doesn't invoke response callbacks. +### Writing modes -#### Batch writing +When writing data, the client uses one of the following modes: -In batching mode, the client adds the record or records to a batch, and then schedules the batch for writing to InfluxDB. -The client writes the batch to InfluxDB after reaching `write_client_options.batch_size` or `write_client_options.flush_interval`. -If a write fails, the client reschedules the write according to the `write_client_options` retry options. -When using batching mode, you can define `success_callback`, `error_callback`, and `retry_callback` functions. +- [Synchronous writing](#synchronous-writing) +- [Batch writing](#batch-writing) +- Asynchronous writing: Deprecated -To use batching mode, pass `WriteOptions` as key-value pairs to the client `write_client_options` parameter--for example: +#### Synchronous writing -1. Instantiate `WriteOptions()` with defaults or with -`WriteOptions.write_type=WriteType.batching`. +Default. When no `write_client_options` are provided during the initialization of `InfluxDBClient3`, writes are synchronous. +When writing data in synchronous mode, the client immediately tries to write the provided data to InfluxDB, doesn't retry failed requests, and doesn't invoke response callbacks. - ```python - # Create a WriteOptions instance for batch writes with batch size, flush, and retry defaults. - write_options = WriteOptions() - ``` +##### Example: initialize a client with synchronous (non-batch) defaults -2. Pass `write_options` from the preceding step to the `write_client_options` function. +The following example initializes a client for writing and querying data in an {{% product-name %}} database. +Given that `write_client_options` isn't specified, the client uses the default [synchronous writing](#synchronous-writing) mode. - ```python - wco = write_client_options(WriteOptions=write_options) - ``` +{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}} - The output is a dict with `WriteOptions` key-value pairs. + -3. Initialize the client, setting the `write_client_options` argument to `wco` from the preceding step. +--> - {{< tabs-wrapper >}} -{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}} ```python -from influxdb_client_3 import Point, InfluxDBClient3 - -points = [ - Point("home") - .tag("room", "Kitchen") - .field("temp", 25.3) - .field('hum', 20.2) - .field('co', 9), - Point("home") - .tag("room", "Living Room") - .field("temp", 24.0) - .field('hum', 20.0) - .field('co', 5)] - -with InfluxDBClient3(token="DATABASE_TOKEN", - host="{{< influxdb/host >}}", - database="DATABASE_NAME", - write_client_options=wco) as client: +from influxdb_client_3 import InfluxDBClient3 - client.write(record=points) +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") ``` -{{% /code-placeholders %}} - {{< /tabs-wrapper >}} -#### Synchronous writing +{{% /code-placeholders %}} -Synchronous mode is the default mode for writing data (in batch and non-batch modes). -To specify synchronous mode, set `_write_client_options=None` or `_write_client_options.write_type=WriteType.synchronous`. +Replace the following: -### Examples +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-dedicated/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/cloud-dedicated/admin/tokens/#database-tokens) + with read/write permissions on the specified database -##### Initialize a client +To explicitly specify synchronous mode, create a client with `write_options=SYNCHRONOUS`--for example: -The following example initializes a client for writing and querying data in a {{% product-name %}} database. -When writing or querying, the client waits synchronously for the response. +{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}} -Given `client.write_client_options` doesn't set `WriteOptions`, the client uses the default [non-batch writing](#non-batch-writing) mode. + + -{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}} ```python -from influxdb_client_3 import InfluxDBClient3 +from influxdb_client_3 import InfluxDBClient3, write_client_options, SYNCHRONOUS + +wco = write_client_options(write_options=SYNCHRONOUS) -client = InfluxDBClient3(token="DATABASE_TOKEN", - host="{{< influxdb/host >}}", - database="DATABASE_NAME") +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN", + write_client_options=wco, + flight_client_options=None) ``` + {{% /code-placeholders %}} Replace the following: @@ -224,49 +236,81 @@ Replace the following: - {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-dedicated/admin/databases/) - {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: an {{% product-name %}} [database token](/influxdb/cloud-dedicated/admin/tokens/#database-tokens) - with read permissions on the specified database + with write permissions on the specified database + +#### Batch writing -##### Initialize a client for batch writing +Batch writing is particularly useful for efficient bulk data operations. +Options include setting batch size, flush intervals, retry intervals, and more. -The following example shows how to initialize a client for batch writing data to the database. -When writing data, the client uses batch mode with default options and -invokes the callback function, if specified, for the response status (success, error, or retryable error). +Batch writing groups multiple writes into a single request to InfluxDB. +In batching mode, the client adds the record or records to a batch, and then schedules the batch for writing to InfluxDB. +The client writes the batch to InfluxDB after reaching `write_client_options.batch_size` or `write_client_options.flush_interval`. +If a write fails, the client reschedules the write according to the `write_client_options` retry options. -{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} -```python - from influxdb_client_3 import InfluxDBClient3, - write_client_options, - WriteOptions, - InfluxDBError +##### Configuring write client options - # Define callbacks for write responses - def success(self, data: str): - print(f"Successfully wrote batch: data: {data}") +Use `WriteOptions` and `write_client_options` to configure batch writing and response handling for the client: - def error(self, data: str, exception: InfluxDBError): - print(f"Failed writing batch: config: {self}, data: {data}, - error: {exception}") +1. Instantiate `WriteOptions`. To use batch defaults, call the constructor without specifying parameters. +2. Call `write_client_options` and use the `write_options` parameter to specify the `WriteOptions` instance from the preceding step. + Specify callback parameters (success, error, and retry) to invoke functions on success or error. +3. Instantiate `InfluxDBClient3` and use the `write_client_options` parameter to specify the `dict` output from the preceding step. - def retry(self, data: str, exception: InfluxDBError): - print(f"Failed retry writing batch: config: {self}, data: {data}, - error: {exception}") +##### Example: initialize a client using batch defaults and callbacks - # Instantiate WriteOptions for batching - write_options = WriteOptions() - wco = write_client_options(success_callback=success, - error_callback=error, - retry_callback=retry, - WriteOptions=write_options) - - # Use the with...as statement to ensure the file is properly closed and resources - # are released. - with InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}", - org="", database="DATABASE_NAME", - write_client_options=wco) as client: +The following example shows how to use batch mode with defaults and +specify callback functions for the response status (success, error, or retryable error). + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} - client.write_file(file='./home.csv', - timestamp_column='time', tag_columns=["room"]) + + + +```python +from influxdb_client_3 import(InfluxDBClient3, + write_client_options, + WriteOptions, + InfluxDBError) + +status = None + +# Define callbacks for write responses +def success(self, data: str): + status = "Success writing batch: data: {data}" + assert status.startsWith('Success'), f"Expected {status} to be success" + +def error(self, data: str, err: InfluxDBError): + status = f"Error writing batch: config: {self}, data: {data}, error: {err}" + assert status.startsWith('Success'), f"Expected {status} to be success" + + +def retry(self, data: str, err: InfluxDBError): + status = f"Retry error writing batch: config: {self}, data: {data}, error: {err}" + assert status.startsWith('Success'), f"Expected {status} to be success" + +# Instantiate WriteOptions for batching +write_options = WriteOptions() +wco = write_client_options(success_callback=success, + error_callback=error, + retry_callback=retry, + write_options=write_options) + +# Use the with...as statement to ensure the file is properly closed and resources +# are released. +with InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN", + write_client_options=wco) as client: + + client.write_file(file='./data/home-sensor-data.csv', + timestamp_column='time', tag_columns=["room"], write_precision='s') +``` + {{% /code-placeholders %}} Replace the following: @@ -274,7 +318,7 @@ Replace the following: - {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-dedicated/admin/databases/) - {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: an {{% product-name %}} [database token](/influxdb/cloud-dedicated/admin/tokens/#database-tokens) - with read permissions on the specified database + with write permissions on the specified database ### InfluxDBClient3 instance methods @@ -282,62 +326,123 @@ Replace the following: Writes a record or a list of records to InfluxDB. -#### Syntax - -```python -write(self, record=None, **kwargs) -``` - #### Parameters -- **`record`**: A record or list of records to write. A record can be a `Point` object, a dict that represents a point, a line protocol string, or a `DataFrame`. -- **`database`**: The database to write to. Default is to write to the database specified for the client. -- **`**kwargs`**: Additional write options--for example: - - **`write_precision`**: _Optional_. Default is `"ns"`. - Specifies the [precision](/influxdb/cloud-dedicated/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. - - **`write_client_options`**: _Optional_. - Specifies callback functions and options for [batch writing](#batch-writing) mode. +- **`record`** (_record_ or list): A record or list of records to write. A record can be a `Point` object, a dict that represents a point, a line protocol string, or a `DataFrame`. +- **`database`** (string): The database to write to. Default is to write to the database specified for the client. +- **`**kwargs`\*\*: Additional write options--for example: + - **`write_precision`** (string): _Optional_. Default is `"ns"`. + Specifies the [precision](/influxdb/cloud-dedicated/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. + - **`write_client_options`** (dict): _Optional_. + Specifies callback functions and options for [batch writing](#batch-writing) mode. + To generate the `dict`, use the [`write_client_options` function](#function-write_client_optionskwargs). -#### Examples - -##### Write a line protocol string +#### Example: write a line protocol string {{% influxdb/custom-timestamps %}} {{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + + ```python from influxdb_client_3 import InfluxDBClient3 -points = "home,room=Living\ Room temp=21.1,hum=35.9,co=0i 1641024000" +point = "home,room=Living\\ Room temp=21.1,hum=35.9,co=0i 1641024000" -client = InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}", - database="DATABASE_NAME") +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") -client.write(record=points, write_precision="s") +client.write(record=point, write_precision="s") +``` + +The following sample code executes an SQL query to retrieve the point: + + + +```python +# Execute an SQL query +table = client.query(query='''SELECT room + FROM home + WHERE temp=21.1 + AND time=from_unixtime(1641024000)''') +# table is a pyarrow.Table +room = table[0][0] +assert f"{room}" == 'Living Room', f"Expected {room} to be Living Room" ``` + {{% /code-placeholders %}} {{% /influxdb/custom-timestamps %}} -##### Write data using points +Replace the following: + +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-dedicated/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/cloud-dedicated/admin/tokens/#database-tokens) + with write permissions on the specified database + +#### Example: write data using points The `influxdb_client_3.Point` class provides an interface for constructing a data point for a measurement and setting fields, tags, and the timestamp for the point. The following example shows how to create a `Point` object, and then write the data to InfluxDB. +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + + ```python from influxdb_client_3 import Point, InfluxDBClient3 -point = Point("home").tag("room", "Kitchen").field("temp", 72) -... + +point = Point("home").tag("room", "Kitchen").field("temp", 21.5).field("hum", .25) +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") client.write(point) ``` -###### Write data using a dict + + +The following sample code executes an InfluxQL query to retrieve the written data: + +```python +# Execute an InfluxQL query +table = client.query(query='''SELECT DISTINCT(temp) as val + FROM home + WHERE temp > 21.0 + AND time >= now() - 10m''', language="influxql") +# table is a pyarrow.Table +df = table.to_pandas() +assert 21.5 in df['val'].values, f"Expected value in {df['val']}" +``` + +{{% /code-placeholders %}} + +Replace the following: + +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-dedicated/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/cloud-dedicated/admin/tokens/#database-tokens) + with write permissions on the specified database + +##### Example: write data using a dict `InfluxDBClient3` can serialize a dictionary object into line protocol. If you pass a `dict` to `InfluxDBClient3.write`, the client expects the `dict` to have the following _point_ attributes: -- **measurement** (str): the measurement name +- **measurement** (string): the measurement name - **tags** (dict): a dictionary of tag key-value pairs - **fields** (dict): a dictionary of field key-value pairs - **time**: the [timestamp](/influxdb/cloud-dedicated/reference/glossary/#timestamp) for the record @@ -347,6 +452,14 @@ data to InfluxDB. {{% influxdb/custom-timestamps %}} {{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + + ```python from influxdb_client_3 import InfluxDBClient3 @@ -358,81 +471,106 @@ points = { "time": 1641067200 } -client = InfluxDBClient3(token="DATABASE_TOKEN", - host="{{< influxdb/host >}}", - database="DATABASE_NAME", - org="") +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") client.write(record=points, write_precision="s") ``` + {{% /code-placeholders %}} {{% /influxdb/custom-timestamps %}} +Replace the following: + +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-dedicated/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/cloud-dedicated/admin/tokens/#database-tokens) + with write permissions on the specified database + ### InfluxDBClient3.write_file Writes data from a file to InfluxDB. Execution is synchronous. -#### Syntax - -```python -write_file(self, file, measurement_name=None, tag_columns=[], - timestamp_column='time', **kwargs) -``` #### Parameters -- **`file`** (str): A path to a file containing records to write to InfluxDB. - The filename must end with one of the following supported extensions. - For more information about encoding and formatting data, see the documentation for each supported format: - - - `.feather`: [Feather](https://arrow.apache.org/docs/python/feather.html) - - `.parquet`: [Parquet](https://arrow.apache.org/docs/python/parquet.html) - - `.csv`: [Comma-separated values](https://arrow.apache.org/docs/python/csv.html) - - `.json`: [JSON](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) - - `.orc`: [ORC](https://arrow.apache.org/docs/python/orc.html) - -- **`measurement_name`**: Defines the measurement name for records in the file. +- **`file`** (string): A path to a file containing records to write to InfluxDB. + The filename must end with one of the following supported extensions. + For more information about encoding and formatting data, see the documentation for each supported format: + + - `.feather`: [Feather](https://arrow.apache.org/docs/python/feather.html) + - `.parquet`: [Parquet](https://arrow.apache.org/docs/python/parquet.html) + - `.csv`: [Comma-separated values](https://arrow.apache.org/docs/python/csv.html) + - `.json`: [JSON](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) + - `.orc`: [ORC](https://arrow.apache.org/docs/python/orc.html) + +- **`measurement_name`** (string): Defines the measurement name for records in the file. The specified value takes precedence over `measurement` and `iox::measurement` columns in the file. If no value is specified for the parameter, and a `measurement` column exists in the file, the `measurement` column value is used for the measurement name. If no value is specified for the parameter, and no `measurement` column exists, the `iox::measurement` column value is used for the measurement name. -- **`tag_columns`**: A list containing the names of tag columns. +- **`tag_columns`** (list): Tag column names. Columns not included in the list and not specified by another parameter are assumed to be fields. -- **`timestamp_column`**: The name of the column that contains timestamps. Default is `'time'`. -- **`database`**: The database to write to. Default is to write to the database specified for the client. -- **`**kwargs`**: Additional write options--for example: - - **`write_precision`**: _Optional_. Default is `"ns"`. - Specifies the [precision](/influxdb/cloud-dedicated/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. - - **`write_client_options`**: _Optional_. - Specifies callback functions and options for [batch writing](#batch-writing) mode. - -#### Examples - -##### Write data from a file - -The following example shows how to configure write options for batching, retries, and callbacks, +- **`timestamp_column`** (string): The name of the column that contains timestamps. Default is `'time'`. +- **`database`** (`str`): The database to write to. Default is to write to the database specified for the client. +- **`file_parser_options`** (callable): A function for providing additional arguments to the file parser. +- **`**kwargs`**: Additional options to pass to the `WriteAPI`--for example: + - **`write_precision`** (string): _Optional_. Default is `"ns"`. + Specifies the [precision](/influxdb/cloud-dedicated/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. + - **`write_client_options`** (dict): _Optional_. + Specifies callback functions and options for [batch writing](#batch-writing) mode. + To generate the `dict`, use the [`write_client_options` function](#function-write_client_optionskwargs). + +#### Example: use batch options when writing file data + +The following example shows how to specify customized write options for batching, retries, and response callbacks, and how to write data from CSV and JSON files to InfluxDB: {{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} -```python -from influxdb_client_3 import InfluxDBClient3, write_client_options, - WritePrecision, WriteOptions, InfluxDBError -class BatchingCallback(object): - - # Define callbacks for write responses - def success(self, data: str): - print(f"Successfully wrote batch: data: {data}") - - def error(self, data: str, exception: InfluxDBError): - print(f"Failed writing batch: config: {self}, data: {data}, - error: {exception}") - - def retry(self, data: str, exception: InfluxDBError): - print(f"Failed retry writing batch: config: {self}, data: {data}, - error: {exception}") + + -# Instantiate the callbacks -callback = BatchingCallback() +```python +from influxdb_client_3 import(InfluxDBClient3, write_client_options, + WritePrecision, WriteOptions, InfluxDBError) + +# Define the result object +result = { + 'config': None, + 'status': None, + 'data': None, + 'error': None +} + +# Define callbacks for write responses +def success_callback(self, data: str): + result['config'] = self + result['status'] = 'success' + result['data'] = data + + assert result['data'] != None, f"Expected {result['data']}" + print("Successfully wrote data: {result['data']}") + +def error_callback(self, data: str, exception: InfluxDBError): + result['config'] = self + result['status'] = 'error' + result['data'] = data + result['error'] = exception + + assert result['status'] == "success", f"Expected {result['error']} to be success for {result['config']}" + +def retry_callback(self, data: str, exception: InfluxDBError): + result['config'] = self + result['status'] = 'retry_error' + result['data'] = data + result['error'] = exception + + assert result['status'] == "success", f"Expected {result['status']} to be success for {result['config']}" write_options = WriteOptions(batch_size=500, flush_interval=10_000, @@ -442,199 +580,372 @@ write_options = WriteOptions(batch_size=500, max_retry_delay=30_000, exponential_base=2) -wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options) -with InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}", - database="DATABASE_NAME", - _write_client_options=wco) as client: +wco = write_client_options(success_callback=success_callback, + error_callback=error_callback, + retry_callback=retry_callback, + write_options=write_options) + +with InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN", + write_client_options=wco) as client: + + client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time', + tag_columns=["room"], write_precision='s') - client.write_file(file='./home.csv', timestamp_column='time', - tag_columns=["room"]) - - client.write_file(file='./home.json', timestamp_column='time', - tag_columns=["room"], date_unit='ns') + client.write_file(file='./data/home-sensor-data.json', timestamp_column='time', + tag_columns=["room"], write_precision='s') ``` + {{% /code-placeholders %}} -### InfluxDBClient3.query +Replace the following: -Sends a Flight request to execute the specified SQL or InfluxQL query. -Returns all data in the query result as an Arrow table. +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-dedicated/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/cloud-dedicated/admin/tokens/#database-tokens) + with write permissions on the specified database -#### Syntax +### InfluxDBClient3.query -```python -query(self, query, language="sql", mode="all", **kwargs ) -``` +Sends a Flight request to execute the specified SQL or InfluxQL query. +Returns all data in the query result as an Arrow table ([`pyarrow.Table`](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html) instance). #### Parameters -- **`query`** (str): the SQL or InfluxQL to execute. -- **`language`** (str): the query language used in the `query` parameter--`"sql"` or `"influxql"`. Default is `"sql"`. -- **`mode`** (str): Specifies the output to return from the [`pyarrow.flight.FlightStreamReader`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader). - Default is `"all"`. - - `all`: Read the entire contents of the stream and return it as a `pyarrow.Table`. - - `chunk`: Read the next message (a `FlightStreamChunk`) and return `data` and `app_metadata`. - Returns `null` if there are no more messages. - - `pandas`: Read the contents of the stream and return it as a `pandas.DataFrame`. - - `reader`: Convert the `FlightStreamReader` into a [`pyarrow.RecordBatchReader`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html#pyarrow-recordbatchreader). - - `schema`: Return the schema for all record batches in the stream. +- **`query`** (string): the SQL or InfluxQL to execute. +- **`language`** (string): the query language used in the `query` parameter--`"sql"` or `"influxql"`. Default is `"sql"`. +- **`mode`** (string): Specifies the output to return from the [`pyarrow.flight.FlightStreamReader`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader). + Default is `"all"`. + - `all`: Read the entire contents of the stream and return it as a [`pyarrow.Table`](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html). + - `chunk`: Read the next message (a `FlightStreamChunk`) and return `data` and `app_metadata`. + Returns `null` if there are no more messages. + - `pandas`: Read the contents of the stream and return it as a [`pandas.DataFrame`](https://pandas.pydata.org/pandas-docs/stable/reference/frame.html). + - `reader`: Convert the `FlightStreamReader` into a [`pyarrow.RecordBatchReader`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html#pyarrow-recordbatchreader). + - `schema`: Return the schema for all record batches in the stream. - **`**kwargs`**: [`FlightCallOptions`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightCallOptions.html#pyarrow.flight.FlightCallOptions) -#### Examples +#### Example: query using SQL -##### Query using SQL +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + ```python -table = client.query("SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'") +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") + +table = client.query("SELECT * from home WHERE time >= now() - INTERVAL '90 days'") + # Filter columns. print(table.select(['room', 'temp'])) + # Use PyArrow to aggregate data. print(table.group_by('hum').aggregate([])) ``` -##### Query using InfluxQL +{{% /code-placeholders %}} + +In the examples, replace the following: + +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-dedicated/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/cloud-dedicated/admin/tokens/#database-tokens) + with read permission on the specified database + +#### Example: query using InfluxQL + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + ```python -query = "SELECT * FROM measurement WHERE time >= -90d" +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") +query = "SELECT * from home WHERE time >= -90d" table = client.query(query=query, language="influxql") + # Filter columns. print(table.select(['room', 'temp'])) ``` -##### Read all data from the stream and return a pandas DataFrame +{{% /code-placeholders %}} + +##### Example: read all data from the stream and return a pandas DataFrame + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + ```python -query = "SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'" +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") +query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" pd = client.query(query=query, mode="pandas") # Print the pandas DataFrame formatted as a Markdown table. print(pd.to_markdown()) ``` -##### View the schema for all batches in the stream +{{% /code-placeholders %}} + +##### Example: view the schema for all batches in the stream + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + ```python -table = client.query(''' - SELECT * - FROM measurement - WHERE time >= now() - INTERVAL '90 days'''' -) -# Get the schema attribute value. +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") +table = client.query("""SELECT * + from home + WHERE time >= now() - INTERVAL '90 days'""") + +# View the table schema. print(table.schema) ``` -##### Retrieve the result schema and no data +{{% /code-placeholders %}} + +##### Example: retrieve the result schema and no data + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + ```python -query = "SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'" +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") +query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" schema = client.query(query=query, mode="schema") print(schema) ``` +{{% /code-placeholders %}} + ##### Specify a timeout Pass `timeout=` for [`FlightCallOptions`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightCallOptions.html#pyarrow.flight.FlightCallOptions) to use a custom timeout. +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + + +```python +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") +query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" client.query(query=query, timeout=5) ``` +{{% /code-placeholders %}} + ### InfluxDBClient3.close Sends all remaining records from the batch to InfluxDB, and then closes the underlying write client and Flight client to release resources. -#### Syntax +#### Example: close a client + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + ```python +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") client.close() ``` -## Class Point +{{% /code-placeholders %}} -```python -influxdb_client_3.Point -``` +## Class Point Provides an interface for constructing a time series data point for a measurement, and setting fields, tags, and timestamp. -The following example shows how to create a `Point`, and then write the -data to InfluxDB. + + ```python +from influxdb_client_3 import Point point = Point("home").tag("room", "Living Room").field("temp", 72) -client.write(point) ``` +See how to [write data using points](#example-write-data-using-points). + ## Class WriteOptions +Provides an interface for constructing options that customize batch writing behavior, such as batch size and retry. + + + -Options for configuring client behavior (batch size, retry, callbacks, etc.) when writing data to InfluxDB. +```python +from influxdb_client_3 import WriteOptions -For client configuration examples, see [Initialize a client](#initialize-a-client). +write_options = WriteOptions(batch_size=500, + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) +``` -### Syntax +See how to [use batch options for writing data](#example-use-batch-options-when-writing-file-data). -```python -__init__(self, write_type: WriteType = WriteType.batching, - batch_size=1_000, flush_interval=1_000, - jitter_interval=0, - retry_interval=5_000, - max_retries=5, - max_retry_delay=125_000, - max_retry_time=180_000, - exponential_base=2, - max_close_wait=300_000, - write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: +### Parameters -``` +- **`batch_size`**: Default is `1000`. +- **`flush_interval`**: Default is `1000`. +- **`jitter_interval`**: Default is `0`. +- **`retry_interval`**: Default is `5000`. +- **`max_retries`**: Default is `5`. +- **`max_retry_delay`**: Default is `125000`. +- **`max_retry_time`**: Default is `180000`. +- **`exponential_base`**: Default is `2`. +- **`max_close_wait`**: Default is `300000`. +- **`write_scheduler`**: Default is `ThreadPoolScheduler(max_workers=1)`. ## Functions - [influxdb_client_3.write_client_options](#function-write_client_optionskwargs) - [influxdb_client_3.flight_client_options](#function-flight_client_optionskwargs) -### Function write_client_options(**kwargs) +### Function write_client_options(\*\*kwargs) -```python -influxdb_client_3.write_client_options(kwargs) -``` +Returns a `dict` with the specified write client options. -- Takes the following parameters: +#### Parameters - - `kwargs`: keyword arguments for `WriteApi` +The function takes the following keyword arguments: -- Returns a dictionary of write client options. +- **`write_options`** ([`WriteOptions`](#class-writeoptions)): Specifies whether the client writes data using synchronous mode or batching mode. If using batching mode, the client uses the specified batching options. +- **`point_settings`** (dict): Default tags that the client will add to each point when writing the data to InfluxDB. +- **`success_callback`** (callable): If using batching mode, a function to call after data is written successfully to InfluxDB (HTTP status `204`) +- **`error_callback`** (callable): if using batching mode, a function to call if data is not written successfully (the response has a non-`204` HTTP status) +- **`retry_callback`** (callable): if using batching mode, a function to call if the request is a retry (using batching mode) and data is not written successfully -### Function flight_client_options(**kwargs) +#### Example: instantiate options for batch writing ```python -influxdb_client_3.flight_client_options(kwargs) +from influxdb_client_3 import write_client_options, WriteOptions +from influxdb_client_3.write_client.client.write_api import WriteType + +def success(): + print("Success") +def error(): + print("Error") +def retry(): + print("Retry error") + +write_options = WriteOptions() +wco = write_client_options(success_callback=success, + error_callback=error, + retry_callback=retry, + write_options=write_options) + +assert wco['success_callback'] +assert wco['error_callback'] +assert wco['retry_callback'] +assert wco['write_options'].write_type == WriteType.batching ``` -- Takes the following parameters: +#### Example: instantiate options for synchronous writing - - `kwargs`: keyword arguments for `FlightClient` +```python +from influxdb_client_3 import write_client_options, SYNCHRONOUS +from influxdb_client_3.write_client.client.write_api import WriteType + +wco = write_client_options(write_options=SYNCHRONOUS) -- Returns a dictionary of Flight client options. +assert wco['write_options'].write_type == WriteType.synchronous +``` -#### Examples +### Function flight_client_options(\*\*kwargs) -##### Specify the root certificate path +Returns a `dict` with the specified [FlightClient](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightClient.html) parameters. + +#### Parameters + +- `kwargs`: keyword arguments for [`pyarrow.flight.FlightClient`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightClient.html) parameters + +#### Example: specify the root certificate path + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + ```python from influxdb_client_3 import InfluxDBClient3, flight_client_options @@ -644,14 +955,22 @@ fh = open(certifi.where(), "r") cert = fh.read() fh.close() -client = InfluxDBClient3( - token="DATABASE_TOKEN", - host="{{< influxdb/host >}}", - database="DATABASE_NAME", - flight_client_options=flight_client_options( - tls_root_certs=cert)) +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN", + fco=flight_client_options(tls_root_certs=cert)) ``` +{{% /code-placeholders %}} + +Replace the following: + +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: + the name of your {{% product-name %}} [database](/influxdb/cloud-dedicated/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/cloud-dedicated/admin/tokens/#database-tokens) + with read permission on the specified database + ## Constants - `influxdb_client_3.SYNCHRONOUS`: Represents synchronous write mode diff --git a/content/influxdb/cloud-serverless/reference/client-libraries/v3/python.md b/content/influxdb/cloud-serverless/reference/client-libraries/v3/python.md index 69989962ad..f75a6d1754 100644 --- a/content/influxdb/cloud-serverless/reference/client-libraries/v3/python.md +++ b/content/influxdb/cloud-serverless/reference/client-libraries/v3/python.md @@ -7,33 +7,50 @@ menu: name: Python parent: v3 client libraries identifier: influxdb3-python +influxdb/cloud-serverless/tags: + [Flight API, python, gRPC, SQL, client libraries] +metadata: [influxdb3-python v0.5.0] weight: 201 -influxdb/cloud-serverless/tags: [python, gRPC, SQL, client libraries] aliases: - /influxdb/cloud-serverless/reference/client-libraries/v3/pyinflux3/ related: - /influxdb/cloud-serverless/query-data/execute-queries/troubleshoot/ list_code_example: > + + + ```python + from influxdb_client_3 import(InfluxDBClient3, + WriteOptions, + write_client_options) + + # Instantiate batch writing options for the client - # Instantiate an InfluxDB client configured for a bucket + write_options = WriteOptions() + wco = write_client_options(write_options=write_options) - client = InfluxDBClient3( - "https://{{< influxdb/host >}}", - database="BUCKET_NAME", - token="API_TOKEN") + # Instantiate an InfluxDB v3 client - # Execute the query and retrieve data formatted as a PyArrow Table + with InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN", + write_client_options=wco) as client: - table = client.query( - '''SELECT * - FROM home - WHERE time >= now() - INTERVAL '90 days' - ORDER BY time''' - ) + # Write data in batches + client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time', + tag_columns=["room"]) - ``` + # Execute a query and retrieve data formatted as a PyArrow Table + + table = client.query( + '''SELECT * + FROM home + WHERE time >= now() - INTERVAL '90 days' + ORDER BY time''') + ``` --- The InfluxDB v3 [`influxdb3-python` Python client library](https://github.com/InfluxCommunity/influxdb3-python) @@ -44,20 +61,32 @@ Client libraries can be used to construct line protocol data, transform data fro to line protocol, and batch write line protocol data to InfluxDB HTTP APIs. InfluxDB v3 client libraries can query {{% product-name %}} using SQL or InfluxQL. -The `influxdb3-python` Python client library wraps the Apache Arrow `pyarrow.flight` client +The `influxdb3-python` Python client library wraps the Apache Arrow `pyarrow.flight` client in a convenient InfluxDB v3 interface for executing SQL and InfluxQL queries, requesting server metadata, and retrieving data from {{% product-name %}} using the Flight protocol with gRPC. - +{{% note %}} +Code samples in this page use the [Get started home sensor sample data](/influxdb/cloud-serverless/reference/sample-data/#get-started-home-sensor-data). +{{% /note %}} - [Installation](#installation) - [Importing the module](#importing-the-module) - [API reference](#api-reference) - [Classes](#classes) - [Class InfluxDBClient3](#class-influxdbclient3) + - [Parameters](#parameters) + - [Writing modes](#writing-modes) + - [InfluxDBClient3 instance methods](#influxdbclient3-instance-methods) + - [InfluxDBClient3.write](#influxdbclient3write) + - [InfluxDBClient3.write_file](#influxdbclient3write_file) + - [InfluxDBClient3.query](#influxdbclient3query) + - [InfluxDBClient3.close](#influxdbclient3close) - [Class Point](#class-point) - [Class WriteOptions](#class-writeoptions) + - [Parameters](#parameters-4) - [Functions](#functions) + - [Function write_client_options(\*\*kwargs)](#function-write_client_optionskwargs) + - [Function flight_client_options(\*\*kwargs)](#function-flight_client_optionskwargs) - [Constants](#constants) - [Exceptions](#exceptions) @@ -87,17 +116,15 @@ from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions ``` - [`influxdb_client_3.InfluxDBClient3`](#class-influxdbclient3): a class for interacting with InfluxDB -- [`influxdb_client_3.Point`](#class-point):a class for constructing a time series data -point +- [`influxdb_client_3.Point`](#class-point): a class for constructing a time series data + point - `influxdb_client_3.WriteOptions`: a class for configuring client -write options. + write options. ## API reference The `influxdb_client_3` module includes the following classes and functions. - - - [Classes](#classes) - [Functions](#functions) - [Constants](#constants) @@ -106,169 +133,193 @@ The `influxdb_client_3` module includes the following classes and functions. ## Classes - [Class InfluxDBClient3](#class-influxdbclient3) + - [Parameters](#parameters) + - [Writing modes](#writing-modes) + - [InfluxDBClient3 instance methods](#influxdbclient3-instance-methods) - [InfluxDBClient3.write](#influxdbclient3write) - [InfluxDBClient3.write_file](#influxdbclient3write_file) - [InfluxDBClient3.query](#influxdbclient3query) - [InfluxDBClient3.close](#influxdbclient3close) - [Class Point](#class-point) - [Class WriteOptions](#class-writeoptions) + - [Parameters](#parameters-4) ## Class InfluxDBClient3 Provides an interface for interacting with InfluxDB APIs for writing and querying data. -### Syntax - -```python -__init__(self, host=None, org=None, database=None, token=None, - write_client_options=None, flight_client_options=None, **kwargs) -``` - -Initializes and returns an `InfluxDBClient3` instance with the following: +The `InfluxDBClient3` constructor initializes and returns a client instance with the following: -- A singleton _write client_ configured for writing to the database (bucket). -- A singleton _Flight client_ configured for querying the database (bucket). +- A singleton _write client_ configured for writing to the database. +- A singleton _Flight client_ configured for querying the database. ### Parameters -- **org** (str): The organization name (for {{% product-name %}}, set this to an empty string (`""`)). -- **database** (str): The database (bucket) to use for writing and querying. -- **write_client_options** (dict): Options to use when writing to InfluxDB. +- **`host`** (string): The host URL of the InfluxDB instance. +- **`database`** (string): The bucket to use for writing and querying. +- **`token`** (string): An API token with read/write permissions. +- _Optional_ **`write_client_options`** (dict): Options to use when writing to InfluxDB. If `None`, writes are [synchronous](#synchronous-writing). -- **flight_client_options** (dict): Options to use when querying InfluxDB. +- _Optional_ **`flight_client_options`** (dict): Options to use when querying InfluxDB. -#### Non-batch writing +### Writing modes -When writing data in non-batching mode, the client immediately tries to write the data, doesn't retry failed requests, and doesn't invoke response callbacks. +When writing data, the client uses one of the following modes: -#### Batch writing +- [Synchronous writing](#synchronous-writing) +- [Batch writing](#batch-writing) +- Asynchronous writing: Deprecated -In batching mode, the client adds the record or records to a batch, and then schedules the batch for writing to InfluxDB. -The client writes the batch to InfluxDB after reaching `write_client_options.batch_size` or `write_client_options.flush_interval`. -If a write fails, the client reschedules the write according to the `write_client_options` retry options. -When using batching mode, you can define `success_callback`, `error_callback`, and `retry_callback` functions. +#### Synchronous writing -To use batching mode, pass `WriteOptions` as key-value pairs to the client `write_client_options` parameter--for example: +Default. When no `write_client_options` are provided during the initialization of `InfluxDBClient3`, writes are synchronous. +When writing data in synchronous mode, the client immediately tries to write the provided data to InfluxDB, doesn't retry failed requests, and doesn't invoke response callbacks. -1. Instantiate `WriteOptions()` with defaults or with -`WriteOptions.write_type=WriteType.batching`. +##### Example: initialize a client with synchronous (non-batch) defaults - ```python - # Create a WriteOptions instance for batch writes with batch size, flush, and retry defaults. - write_options = WriteOptions() - ``` +The following example initializes a client for writing and querying data in an {{% product-name %}} database. +Given that `write_client_options` isn't specified, the client uses the default [synchronous writing](#synchronous-writing) mode. -2. Pass `write_options` from the preceding step to the `write_client_options` function. - - ```python - wco = write_client_options(WriteOptions=write_options) - ``` +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} - The output is a dict with `WriteOptions` key-value pairs. + -3. Initialize the client, setting the `write_client_options` argument to `wco` from the preceding step. +--> - {{< tabs-wrapper >}} -{{% code-placeholders "(BUCKET|API)_(NAME|TOKEN)" %}} ```python -from influxdb_client_3 import Point, InfluxDBClient3 - -points = [ - Point("home") - .tag("room", "Kitchen") - .field("temp", 25.3) - .field('hum', 20.2) - .field('co', 9), - Point("home") - .tag("room", "Living Room") - .field("temp", 24.0) - .field('hum', 20.0) - .field('co', 5)] - -with InfluxDBClient3(token="API_TOKEN", - host="{{< influxdb/host >}}", - org="", database="BUCKET_NAME", - write_client_options=wco) as client: +from influxdb_client_3 import InfluxDBClient3 - client.write(record=points) +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN") ``` + {{% /code-placeholders %}} - {{< /tabs-wrapper >}} -#### Synchronous writing +Replace the following: -Synchronous mode is the default mode for writing data (in batch and non-batch modes). -To specify synchronous mode, set `_write_client_options=None` or `_write_client_options.write_type=WriteType.synchronous`. +- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-serverless/admin/buckets/) +- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) + with read/write permissions on the specified database -### Examples +To explicitly specify synchronous mode, create a client with `write_options=SYNCHRONOUS`--for example: -#### Initialize a client +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} -The following example initializes a client for writing and querying data in a {{% product-name %}} database (bucket). -When writing or querying, the client waits synchronously for the response. + + -{{% code-placeholders "BUCKET_(NAME|TOKEN)|API_TOKEN" %}} ```python -from influxdb_client_3 import InfluxDBClient3 +from influxdb_client_3 import InfluxDBClient3, write_client_options, SYNCHRONOUS + +wco = write_client_options(write_options=SYNCHRONOUS) -client = InfluxDBClient3(token="API_TOKEN", - host="{{< influxdb/host >}}", - database="BUCKET_NAME") +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN", + write_client_options=wco, + flight_client_options=None) ``` + {{% /code-placeholders %}} Replace the following: -- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [bucket](/influxdb/cloud-serverless/admin/buckets/) -- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) with read permissions on the specified bucket +- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-serverless/admin/buckets/) +- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) + with write permissions on the specified database -##### Initialize a client for batch writing +#### Batch writing -The following example shows how to initialize a client for batch writing data to the bucket. -When writing data, the client uses batch mode with default options and -invokes the callback function, if specified, for the response status (success, error, or retryable error). +Batch writing is particularly useful for efficient bulk data operations. +Options include setting batch size, flush intervals, retry intervals, and more. -{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} -```python - from influxdb_client_3 import InfluxDBClient3, - write_client_options, - WriteOptions, - InfluxDBError +Batch writing groups multiple writes into a single request to InfluxDB. +In batching mode, the client adds the record or records to a batch, and then schedules the batch for writing to InfluxDB. +The client writes the batch to InfluxDB after reaching `write_client_options.batch_size` or `write_client_options.flush_interval`. +If a write fails, the client reschedules the write according to the `write_client_options` retry options. - # Define callbacks for write responses - def success(self, data: str): - print(f"Successfully wrote batch: data: {data}") +##### Configuring write client options - def error(self, data: str, exception: InfluxDBError): - print(f"Failed writing batch: config: {self}, data: {data}, - error: {exception}") +Use `WriteOptions` and `write_client_options` to configure batch writing and response handling for the client: - def retry(self, data: str, exception: InfluxDBError): - print(f"Failed retry writing batch: config: {self}, data: {data}, - error: {exception}") +1. Instantiate `WriteOptions`. To use batch defaults, call the constructor without specifying parameters. +2. Call `write_client_options` and use the `write_options` parameter to specify the `WriteOptions` instance from the preceding step. + Specify callback parameters (success, error, and retry) to invoke functions on success or error. +3. Instantiate `InfluxDBClient3` and use the `write_client_options` parameter to specify the `dict` output from the preceding step. - # Instantiate WriteOptions for batching - write_options = WriteOptions() - wco = write_client_options(success_callback=success, - error_callback=error, - retry_callback=retry, - WriteOptions=write_options) - - # Use the with...as statement to ensure the file is properly closed and resources - # are released. - with InfluxDBClient3(token="API_TOKEN", host="{{< influxdb/host >}}", - org="", database="BUCKET_NAME", - write_client_options=wco) as client: +##### Example: initialize a client using batch defaults and callbacks + +The following example shows how to use batch mode with defaults and +specify callback functions for the response status (success, error, or retryable error). - client.write_file(file='./home.csv', - timestamp_column='time', tag_columns=["room"]) +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + + + +```python +from influxdb_client_3 import(InfluxDBClient3, + write_client_options, + WriteOptions, + InfluxDBError) + +status = None + +# Define callbacks for write responses +def success(self, data: str): + status = "Success writing batch: data: {data}" + assert status.startsWith('Success'), f"Expected {status} to be success" + +def error(self, data: str, err: InfluxDBError): + status = f"Error writing batch: config: {self}, data: {data}, error: {err}" + assert status.startsWith('Success'), f"Expected {status} to be success" + + +def retry(self, data: str, err: InfluxDBError): + status = f"Retry error writing batch: config: {self}, data: {data}, error: {err}" + assert status.startsWith('Success'), f"Expected {status} to be success" + +# Instantiate WriteOptions for batching +write_options = WriteOptions() +wco = write_client_options(success_callback=success, + error_callback=error, + retry_callback=retry, + write_options=write_options) + +# Use the with...as statement to ensure the file is properly closed and resources +# are released. +with InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN", + write_client_options=wco) as client: + + client.write_file(file='./data/home-sensor-data.csv', + timestamp_column='time', tag_columns=["room"], write_precision='s') +``` + {{% /code-placeholders %}} Replace the following: -- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [bucket](/influxdb/cloud-serverless/admin/buckets/) -- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) with read permissions on the specified bucket +- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-serverless/admin/buckets/) +- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) + with write permissions on the specified database ### InfluxDBClient3 instance methods @@ -276,62 +327,123 @@ Replace the following: Writes a record or a list of records to InfluxDB. -#### Syntax - -```python -write(self, record=None, **kwargs) -``` - #### Parameters -- **`record`**: A record or list of records to write. A record can be a `Point` object, a dict that represents a point, a line protocol string, or a `DataFrame`. -- **`database`**: The database (bucket) to write to. Default is to write to the database specified for the client. -- **`**kwargs`**: Additional write options--for example: - - **`write_precision`**: _Optional_. Default is `"ns"`. - Specifies the [precision](/influxdb/cloud-serverless/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. - - **`write_client_options`**: _Optional_. - Specifies callback functions and options for [batch writing](#batch-writing) mode. - -#### Examples +- **`record`** (_record_ or list): A record or list of records to write. A record can be a `Point` object, a dict that represents a point, a line protocol string, or a `DataFrame`. +- **`bucket`** (string): _Optional_. The bucket to write to. Default is to write to the bucket specified for the client. +- **`**kwargs`\*\*: Additional write options--for example: + - **`write_precision` (string)**: _Optional_. Default is `"ns"`. + Specifies the [precision](/influxdb/cloud-serverless/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. + - **`write_client_options`** (dict): _Optional_. + Specifies callback functions and options for [batch writing](#batch-writing) mode. + To generate the `dict`, use the [`write_client_options` function](#function-write_client_optionskwargs). -##### Write a line protocol string +#### Example: write a line protocol string {{% influxdb/custom-timestamps %}} {{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + + + ```python from influxdb_client_3 import InfluxDBClient3 -points = "home,room=Living\ Room temp=21.1,hum=35.9,co=0i 1641024000" +point = "home,room=Living\\ Room temp=21.1,hum=35.9,co=0i 1641024000" -client = InfluxDBClient3(token="API_TOKEN", host="{{< influxdb/host >}}", - database="BUCKET_NAME") +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN") -client.write(record=points, write_precision="s") +client.write(record=point, write_precision="s") ``` + +The following sample code executes an SQL query to retrieve the point: + + + +```python +# Execute an SQL query +table = client.query(query='''SELECT room + FROM home + WHERE temp=21.1 + AND time=from_unixtime(1641024000)''') +# table is a pyarrow.Table +room = table[0][0] +assert f"{room}" == 'Living Room', f"Expected {room} to be Living Room" +``` + {{% /code-placeholders %}} {{% /influxdb/custom-timestamps %}} -##### Write data using points +Replace the following: + +- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [bucket](/influxdb/cloud-serverless/admin/buckets/) +- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) + with write permissions on the specified bucket + +#### Example: write data using points The `influxdb_client_3.Point` class provides an interface for constructing a data point for a measurement and setting fields, tags, and the timestamp for the point. The following example shows how to create a `Point` object, and then write the data to InfluxDB. +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + + + ```python from influxdb_client_3 import Point, InfluxDBClient3 -point = Point("home").tag("room", "Kitchen").field("temp", 72) -... + +point = Point("home").tag("room", "Kitchen").field("temp", 21.5).field("hum", .25) +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN") client.write(point) ``` -###### Write data using a dict + + +The following sample code executes an InfluxQL query to retrieve the written data: + +```python +# Execute an InfluxQL query +table = client.query(query='''SELECT DISTINCT(temp) as val + FROM home + WHERE temp > 21.0 + AND time >= now() - 10m''', language="influxql") +# table is a pyarrow.Table +df = table.to_pandas() +assert 21.5 in df['val'].values, f"Expected value in {df['val']}" +``` + +{{% /code-placeholders %}} + +Replace the following: + +- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-serverless/admin/buckets/) +- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) + with write permissions on the specified database + +##### Example: write data using a dict `InfluxDBClient3` can serialize a dictionary object into line protocol. If you pass a `dict` to `InfluxDBClient3.write`, the client expects the `dict` to have the following _point_ attributes: -- **measurement** (str): the measurement name +- **measurement** (string): the measurement name - **tags** (dict): a dictionary of tag key-value pairs - **fields** (dict): a dictionary of field key-value pairs - **time**: the [timestamp](/influxdb/cloud-serverless/reference/glossary/#timestamp) for the record @@ -341,6 +453,14 @@ data to InfluxDB. {{% influxdb/custom-timestamps %}} {{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + + + ```python from influxdb_client_3 import InfluxDBClient3 @@ -352,81 +472,106 @@ points = { "time": 1641067200 } -client = InfluxDBClient3(token="API_TOKEN", - host="{{< influxdb/host >}}", - database="BUCKET_NAME", - org="") +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN") client.write(record=points, write_precision="s") ``` + {{% /code-placeholders %}} {{% /influxdb/custom-timestamps %}} +Replace the following: + +- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-serverless/admin/buckets/) +- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) + with write permissions on the specified database + ### InfluxDBClient3.write_file Writes data from a file to InfluxDB. Execution is synchronous. -#### Syntax - -```python -write_file(self, file, measurement_name=None, tag_columns=[], - timestamp_column='time', **kwargs) -``` #### Parameters -- **`file`** (str): A path to a file containing records to write to InfluxDB. - The filename must end with one of the following supported extensions. - For more information about encoding and formatting data, see the documentation for each supported format: - - - `.feather`: [Feather](https://arrow.apache.org/docs/python/feather.html) - - `.parquet`: [Parquet](https://arrow.apache.org/docs/python/parquet.html) - - `.csv`: [Comma-separated values](https://arrow.apache.org/docs/python/csv.html) - - `.json`: [JSON](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) - - `.orc`: [ORC](https://arrow.apache.org/docs/python/orc.html) - -- **`measurement_name`**: Defines the measurement name for records in the file. +- **`file`** (string): A path to a file containing records to write to InfluxDB. + The filename must end with one of the following supported extensions. + For more information about encoding and formatting data, see the documentation for each supported format: + + - `.feather`: [Feather](https://arrow.apache.org/docs/python/feather.html) + - `.parquet`: [Parquet](https://arrow.apache.org/docs/python/parquet.html) + - `.csv`: [Comma-separated values](https://arrow.apache.org/docs/python/csv.html) + - `.json`: [JSON](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) + - `.orc`: [ORC](https://arrow.apache.org/docs/python/orc.html) + +- **`measurement_name`** (string): Defines the measurement name for records in the file. The specified value takes precedence over `measurement` and `iox::measurement` columns in the file. If no value is specified for the parameter, and a `measurement` column exists in the file, the `measurement` column value is used for the measurement name. If no value is specified for the parameter, and no `measurement` column exists, the `iox::measurement` column value is used for the measurement name. -- **`tag_columns`**: A list containing the names of tag columns. +- **`tag_columns`** (list): Tag column names. Columns not included in the list and not specified by another parameter are assumed to be fields. -- **`timestamp_column`**: The name of the column that contains timestamps. Default is `'time'`. -- **`database`**: The database (bucket) to write to. Default is to write to the database (bucket) specified for the client. -- **`**kwargs`**: Additional write options--for example: - - **`write_precision`**: _Optional_. Default is `"ns"`. - Specifies the [precision](/influxdb/cloud-serverless/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. - - **`write_client_options`**: _Optional_. - Specifies callback functions and options for [batch writing](#batch-writing) mode. - -#### Examples - -##### Write data from a file - -The following example shows how to configure write options for batching, retries, and callbacks, +- **`timestamp_column`** (string): The name of the column that contains timestamps. Default is `'time'`. +- **`database`** (string): The bucket to write to. Default is to write to the bucket specified for the client. +- **`file_parser_options`** (callable): A function for providing additional arguments to the file parser. +- **`**kwargs`**: Additional options to pass to the `WriteAPI`--for example: + - **`write_precision`** (string): _Optional_. Default is `"ns"`. + Specifies the [precision](/influxdb/cloud-serverless/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. + - **`write_client_options`** (dict): _Optional_. + Specifies callback functions and options for [batch writing](#batch-writing) mode. + To generate the `dict`, use the [`write_client_options` function](#function-write_client_optionskwargs). + +#### Example: use batch options when writing file data + +The following example shows how to specify customized write options for batching, retries, and response callbacks, and how to write data from CSV and JSON files to InfluxDB: {{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} -```python -from influxdb_client_3 import InfluxDBClient3, write_client_options, - WritePrecision, WriteOptions, InfluxDBError - -class BatchingCallback(object): - - # Define callbacks for write responses - def success(self, data: str): - print(f"Successfully wrote batch: data: {data}") - def error(self, data: str, exception: InfluxDBError): - print(f"Failed writing batch: config: {self}, data: {data}, - error: {exception}") - - def retry(self, data: str, exception: InfluxDBError): - print(f"Failed retry writing batch: config: {self}, data: {data}, - error: {exception}") + + -# Instantiate the callbacks -callback = BatchingCallback() +```python +from influxdb_client_3 import(InfluxDBClient3, write_client_options, + WritePrecision, WriteOptions, InfluxDBError) + +# Define the result object +result = { + 'config': None, + 'status': None, + 'data': None, + 'error': None +} + +# Define callbacks for write responses +def success_callback(self, data: str): + result['config'] = self + result['status'] = 'success' + result['data'] = data + + assert result['data'] != None, f"Expected {result['data']}" + print("Successfully wrote data: {result['data']}") + +def error_callback(self, data: str, exception: InfluxDBError): + result['config'] = self + result['status'] = 'error' + result['data'] = data + result['error'] = exception + + assert result['status'] == "success", f"Expected {result['error']} to be success for {result['config']}" + +def retry_callback(self, data: str, exception: InfluxDBError): + result['config'] = self + result['status'] = 'retry_error' + result['data'] = data + result['error'] = exception + + assert result['status'] == "success", f"Expected {result['status']} to be success for {result['config']}" write_options = WriteOptions(batch_size=500, flush_interval=10_000, @@ -436,199 +581,372 @@ write_options = WriteOptions(batch_size=500, max_retry_delay=30_000, exponential_base=2) -wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options) -with InfluxDBClient3(token="API_TOKEN", host="{{< influxdb/host >}}", - database="BUCKET_NAME", +wco = write_client_options(success_callback=success_callback, + error_callback=error_callback, + retry_callback=retry_callback, + write_options=write_options) + +with InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN", write_client_options=wco) as client: - client.write_file(file='./home.csv', timestamp_column='time', - tag_columns=["room"]) - - client.write_file(file='./home.json', timestamp_column='time', - tag_columns=["room"], date_unit='ns') + client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time', + tag_columns=["room"], write_precision='s') + + client.write_file(file='./data/home-sensor-data.json', timestamp_column='time', + tag_columns=["room"], write_precision='s') ``` + {{% /code-placeholders %}} -### InfluxDBClient3.query +Replace the following: -Sends a Flight request to execute the specified SQL or InfluxQL query. -Returns all data in the query result as an Arrow table. +- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-serverless/admin/buckets/) +- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) + with write permissions on the specified database -#### Syntax +### InfluxDBClient3.query -```python -query(self, query, language="sql", mode="all", **kwargs ) -``` +Sends a Flight request to execute the specified SQL or InfluxQL query. +Returns all data in the query result as an Arrow table ([`pyarrow.Table`](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html) instance). #### Parameters -- **`query`** (str): the SQL or InfluxQL to execute. -- **`language`** (str): the query language used in the `query` parameter--`"sql"` or `"influxql"`. Default is `"sql"`. -- **`mode`** (str): Specifies the output to return from the [`pyarrow.flight.FlightStreamReader`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader). - Default is `"all"`. - - `all`: Read the entire contents of the stream and return it as a `pyarrow.Table`. - - `chunk`: Read the next message (a `FlightStreamChunk`) and return `data` and `app_metadata`. - Returns `null` if there are no more messages. - - `pandas`: Read the contents of the stream and return it as a `pandas.DataFrame`. - - `reader`: Convert the `FlightStreamReader` into a [`pyarrow.RecordBatchReader`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html#pyarrow-recordbatchreader). - - `schema`: Return the schema for all record batches in the stream. +- **`query`** (string): the SQL or InfluxQL to execute. +- **`language`** (string): the query language used in the `query` parameter--`"sql"` or `"influxql"`. Default is `"sql"`. +- **`mode`** (string): Specifies the output to return from the [`pyarrow.flight.FlightStreamReader`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader). + Default is `"all"`. + - `all`: Read the entire contents of the stream and return it as a [`pyarrow.Table`](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html). + - `chunk`: Read the next message (a `FlightStreamChunk`) and return `data` and `app_metadata`. + Returns `null` if there are no more messages. + - `pandas`: Read the contents of the stream and return it as a [`pandas.DataFrame`](https://pandas.pydata.org/pandas-docs/stable/reference/frame.html). + - `reader`: Convert the `FlightStreamReader` into a [`pyarrow.RecordBatchReader`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html#pyarrow-recordbatchreader). + - `schema`: Return the schema for all record batches in the stream. - **`**kwargs`**: [`FlightCallOptions`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightCallOptions.html#pyarrow.flight.FlightCallOptions) -#### Examples +#### Example: query using SQL -##### Query using SQL +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + + +```python +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN") + +table = client.query("SELECT * from home WHERE time >= now() - INTERVAL '90 days'") + # Filter columns. print(table.select(['room', 'temp'])) + # Use PyArrow to aggregate data. print(table.group_by('hum').aggregate([])) ``` -##### Query using InfluxQL +{{% /code-placeholders %}} + +In the examples, replace the following: + +- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/cloud-serverless/admin/buckets/) +- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) + with read permission on the specified database + +#### Example: query using InfluxQL + +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + + ```python -query = "SELECT * FROM measurement WHERE time >= -90d" +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN") +query = "SELECT * from home WHERE time >= -90d" table = client.query(query=query, language="influxql") + # Filter columns. print(table.select(['room', 'temp'])) ``` -##### Read all data from the stream and return a pandas DataFrame +{{% /code-placeholders %}} + +##### Example: read all data from the stream and return a pandas DataFrame +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + + + +```python +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN") +query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" pd = client.query(query=query, mode="pandas") # Print the pandas DataFrame formatted as a Markdown table. print(pd.to_markdown()) ``` -##### View the schema for all batches in the stream +{{% /code-placeholders %}} + +##### Example: view the schema for all batches in the stream + +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + + ```python -table = client.query(''' - SELECT * - FROM measurement - WHERE time >= now() - INTERVAL '90 days'''' -) -# Get the schema attribute value. +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN") +table = client.query("""SELECT * + from home + WHERE time >= now() - INTERVAL '90 days'""") + +# View the table schema. print(table.schema) ``` -##### Retrieve the result schema and no data +{{% /code-placeholders %}} + +##### Example: retrieve the result schema and no data + +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + + ```python -query = "SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'" +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN") +query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" schema = client.query(query=query, mode="schema") print(schema) ``` +{{% /code-placeholders %}} + ##### Specify a timeout Pass `timeout=` for [`FlightCallOptions`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightCallOptions.html#pyarrow.flight.FlightCallOptions) to use a custom timeout. +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + + + ```python -query = "SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'" +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN") +query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" client.query(query=query, timeout=5) ``` +{{% /code-placeholders %}} + ### InfluxDBClient3.close Sends all remaining records from the batch to InfluxDB, and then closes the underlying write client and Flight client to release resources. -#### Syntax +#### Example: close a client + +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + ```python +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN") client.close() ``` -## Class Point +{{% /code-placeholders %}} -```python -influxdb_client_3.Point -``` +## Class Point Provides an interface for constructing a time series data point for a measurement, and setting fields, tags, and timestamp. -The following example shows how to create a `Point`, and then write the -data to InfluxDB. + + ```python +from influxdb_client_3 import Point point = Point("home").tag("room", "Living Room").field("temp", 72) -client.write(point) ``` +See how to [write data using points](#example-write-data-using-points). + ## Class WriteOptions +Provides an interface for constructing options that customize batch writing behavior, such as batch size and retry. + + + -Options for configuring client behavior (batch size, retry, callbacks, etc.) when writing data to InfluxDB. +```python +from influxdb_client_3 import WriteOptions -For client configuration examples, see [Initialize a client](#initialize-a-client). +write_options = WriteOptions(batch_size=500, + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) +``` -### Syntax +See how to [use batch options for writing data](#example-use-batch-options-when-writing-file-data). -```python -__init__(self, write_type: WriteType = WriteType.batching, - batch_size=1_000, flush_interval=1_000, - jitter_interval=0, - retry_interval=5_000, - max_retries=5, - max_retry_delay=125_000, - max_retry_time=180_000, - exponential_base=2, - max_close_wait=300_000, - write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: +### Parameters -``` +- **`batch_size`**: Default is `1000`. +- **`flush_interval`**: Default is `1000`. +- **`jitter_interval`**: Default is `0`. +- **`retry_interval`**: Default is `5000`. +- **`max_retries`**: Default is `5`. +- **`max_retry_delay`**: Default is `125000`. +- **`max_retry_time`**: Default is `180000`. +- **`exponential_base`**: Default is `2`. +- **`max_close_wait`**: Default is `300000`. +- **`write_scheduler`**: Default is `ThreadPoolScheduler(max_workers=1)`. ## Functions - [influxdb_client_3.write_client_options](#function-write_client_optionskwargs) - [influxdb_client_3.flight_client_options](#function-flight_client_optionskwargs) -### Function write_client_options(**kwargs) +### Function write_client_options(\*\*kwargs) -```python -influxdb_client_3.write_client_options(kwargs) -``` +Returns a `dict` with the specified write client options. -- Takes the following parameters: +#### Parameters - - `kwargs`: keyword arguments for `WriteApi` +The function takes the following keyword arguments: -- Returns a dictionary of write client options. +- **`write_options`** ([`WriteOptions`](#class-writeoptions)): Specifies whether the client writes data using synchronous mode or batching mode. If using batching mode, the client uses the specified batching options. +- **`point_settings`** (dict): Default tags that the client will add to each point when writing the data to InfluxDB. +- **`success_callback`** (callable): If using batching mode, a function to call after data is written successfully to InfluxDB (HTTP status `204`) +- **`error_callback`** (callable): if using batching mode, a function to call if data is not written successfully (the response has a non-`204` HTTP status) +- **`retry_callback`** (callable): if using batching mode, a function to call if the request is a retry (using batching mode) and data is not written successfully -### Function flight_client_options(**kwargs) +#### Example: instantiate options for batch writing ```python -influxdb_client_3.flight_client_options(kwargs) +from influxdb_client_3 import write_client_options, WriteOptions +from influxdb_client_3.write_client.client.write_api import WriteType + +def success(): + print("Success") +def error(): + print("Error") +def retry(): + print("Retry error") + +write_options = WriteOptions() +wco = write_client_options(success_callback=success, + error_callback=error, + retry_callback=retry, + write_options=write_options) + +assert wco['success_callback'] +assert wco['error_callback'] +assert wco['retry_callback'] +assert wco['write_options'].write_type == WriteType.batching ``` -- Takes the following parameters: +#### Example: instantiate options for synchronous writing - - `kwargs`: keyword arguments for `FlightClient` +```python +from influxdb_client_3 import write_client_options, SYNCHRONOUS +from influxdb_client_3.write_client.client.write_api import WriteType + +wco = write_client_options(write_options=SYNCHRONOUS) -- Returns a dictionary of Flight client options. +assert wco['write_options'].write_type == WriteType.synchronous +``` -#### Examples +### Function flight_client_options(\*\*kwargs) -##### Specify the root certificate path +Returns a `dict` with the specified [FlightClient](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightClient.html) parameters. + +#### Parameters + +- `kwargs`: keyword arguments for [`pyarrow.flight.FlightClient`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightClient.html) parameters + +#### Example: specify the root certificate path + +{{% code-placeholders "BUCKET_NAME|API_TOKEN" %}} + + + ```python from influxdb_client_3 import InfluxDBClient3, flight_client_options @@ -638,14 +956,22 @@ fh = open(certifi.where(), "r") cert = fh.read() fh.close() -client = InfluxDBClient3( - token="API_TOKEN", - host="{{< influxdb/host >}}", - database="BUCKET_NAME", - flight_client_options=flight_client_options( - tls_root_certs=cert)) +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"BUCKET_NAME", + token=f"API_TOKEN", + fco=flight_client_options(tls_root_certs=cert)) ``` +{{% /code-placeholders %}} + +Replace the following: + +- {{% code-placeholder-key %}}`BUCKET_NAME`{{% /code-placeholder-key %}}: + the name of your {{% product-name %}} [database](/influxdb/cloud-serverless/admin/buckets/) +- {{% code-placeholder-key %}}`API_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [API token](/influxdb/cloud-serverless/admin/tokens/) + with read permission on the specified database + ## Constants - `influxdb_client_3.SYNCHRONOUS`: Represents synchronous write mode diff --git a/content/influxdb/clustered/reference/client-libraries/v3/python.md b/content/influxdb/clustered/reference/client-libraries/v3/python.md index 6a08488c5e..47f571a8ba 100644 --- a/content/influxdb/clustered/reference/client-libraries/v3/python.md +++ b/content/influxdb/clustered/reference/client-libraries/v3/python.md @@ -7,33 +7,49 @@ menu: name: Python parent: v3 client libraries identifier: influxdb3-python -influxdb/clustered/tags: [Flight client, python, gRPC, SQL, Flight SQL, client libraries] +influxdb/clustered/tags: [Flight API, python, gRPC, SQL, client libraries] +metadata: [influxdb3-python v0.5.0] weight: 201 aliases: - /influxdb/clustered/reference/client-libraries/v3/pyinflux3/ related: - /influxdb/clustered/query-data/execute-queries/troubleshoot/ list_code_example: > - ```py - from influxdb_client_3 import InfluxDBClient3 + + + ```python + from influxdb_client_3 import(InfluxDBClient3, + WriteOptions, + write_client_options) - # Instantiate an InfluxDB client configured for a database + # Instantiate batch writing options for the client - client = InfluxDBClient3( - "https://{{< influxdb/host >}}", - database="DATABASE_NAME", - token="DATABASE_TOKEN") + write_options = WriteOptions() + wco = write_client_options(write_options=write_options) - # Execute the query and retrieve data formatted as a PyArrow Table + # Instantiate an InfluxDB v3 client - table = client.query( - '''SELECT * - FROM home - WHERE time >= now() - INTERVAL '90 days' - ORDER BY time''' - ) + with InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN", + write_client_options=wco) as client: - ``` + # Write data in batches + client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time', + tag_columns=["room"]) + + # Execute a query and retrieve data formatted as a PyArrow Table + + table = client.query( + '''SELECT * + FROM home + WHERE time >= now() - INTERVAL '90 days' + ORDER BY time''') + ``` --- The InfluxDB v3 [`influxdb3-python` Python client library](https://github.com/InfluxCommunity/influxdb3-python) @@ -44,20 +60,32 @@ Client libraries can be used to construct line protocol data, transform data fro to line protocol, and batch write line protocol data to InfluxDB HTTP APIs. InfluxDB v3 client libraries can query {{% product-name %}} using SQL or InfluxQL. -The `influxdb3-python` Python client library wraps the Apache Arrow `pyarrow.flight` client +The `influxdb3-python` Python client library wraps the Apache Arrow `pyarrow.flight` client in a convenient InfluxDB v3 interface for executing SQL and InfluxQL queries, requesting server metadata, and retrieving data from {{% product-name %}} using the Flight protocol with gRPC. - +{{% note %}} +Code samples in this page use the [Get started home sensor sample data](/influxdb/clustered/reference/sample-data/#get-started-home-sensor-data). +{{% /note %}} - [Installation](#installation) - [Importing the module](#importing-the-module) - [API reference](#api-reference) - [Classes](#classes) - [Class InfluxDBClient3](#class-influxdbclient3) + - [Parameters](#parameters) + - [Writing modes](#writing-modes) + - [InfluxDBClient3 instance methods](#influxdbclient3-instance-methods) + - [InfluxDBClient3.write](#influxdbclient3write) + - [InfluxDBClient3.write_file](#influxdbclient3write_file) + - [InfluxDBClient3.query](#influxdbclient3query) + - [InfluxDBClient3.close](#influxdbclient3close) - [Class Point](#class-point) - [Class WriteOptions](#class-writeoptions) + - [Parameters](#parameters-4) - [Functions](#functions) + - [Function write_client_options(\*\*kwargs)](#function-write_client_optionskwargs) + - [Function flight_client_options(\*\*kwargs)](#function-flight_client_optionskwargs) - [Constants](#constants) - [Exceptions](#exceptions) @@ -87,17 +115,15 @@ from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions ``` - [`influxdb_client_3.InfluxDBClient3`](#class-influxdbclient3): a class for interacting with InfluxDB -- [`influxdb_client_3.Point`](#class-point):a class for constructing a time series data -point +- [`influxdb_client_3.Point`](#class-point): a class for constructing a time series data + point - `influxdb_client_3.WriteOptions`: a class for configuring client -write options. + write options. ## API reference The `influxdb_client_3` module includes the following classes and functions. - - - [Classes](#classes) - [Functions](#functions) - [Constants](#constants) @@ -106,171 +132,193 @@ The `influxdb_client_3` module includes the following classes and functions. ## Classes - [Class InfluxDBClient3](#class-influxdbclient3) + - [Parameters](#parameters) + - [Writing modes](#writing-modes) + - [InfluxDBClient3 instance methods](#influxdbclient3-instance-methods) - [InfluxDBClient3.write](#influxdbclient3write) - [InfluxDBClient3.write_file](#influxdbclient3write_file) - [InfluxDBClient3.query](#influxdbclient3query) - [InfluxDBClient3.close](#influxdbclient3close) - [Class Point](#class-point) - [Class WriteOptions](#class-writeoptions) + - [Parameters](#parameters-4) ## Class InfluxDBClient3 Provides an interface for interacting with InfluxDB APIs for writing and querying data. -### Syntax - -```python -__init__(self, host=None, org=None, database=None, token=None, - write_client_options=None, flight_client_options=None, **kwargs) -``` - -Initializes and returns an `InfluxDBClient3` instance with the following: +The `InfluxDBClient3` constructor initializes and returns a client instance with the following: - A singleton _write client_ configured for writing to the database. - A singleton _Flight client_ configured for querying the database. ### Parameters -- **org** (str): The organization name (for {{% product-name %}}, set this to an empty string (`""`)). -- **database** (str): The database to use for writing and querying. -- **write_client_options** (dict): Options to use when writing to InfluxDB. +- **`host`** (string): The host URL of the InfluxDB instance. +- **`database`** (string): The database to use for writing and querying. +- **`token`** (string): A database token with read/write permissions. +- _Optional_ **`write_client_options`** (dict): Options to use when writing to InfluxDB. If `None`, writes are [synchronous](#synchronous-writing). -- **flight_client_options** (dict): Options to use when querying InfluxDB. +- _Optional_ **`flight_client_options`** (dict): Options to use when querying InfluxDB. -#### Non-batch writing +### Writing modes -When writing data in non-batching mode, the client immediately tries to write the data, doesn't retry failed requests, and doesn't invoke response callbacks. +When writing data, the client uses one of the following modes: -#### Batch writing - -In batching mode, the client adds the record or records to a batch, and then schedules the batch for writing to InfluxDB. -The client writes the batch to InfluxDB after reaching `write_client_options.batch_size` or `write_client_options.flush_interval`. -If a write fails, the client reschedules the write according to the `write_client_options` retry options. -When using batching mode, you can define `success_callback`, `error_callback`, and `retry_callback` functions. +- [Synchronous writing](#synchronous-writing) +- [Batch writing](#batch-writing) +- Asynchronous writing: Deprecated -To use batching mode, pass `WriteOptions` as key-value pairs to the client `write_client_options` parameter--for example: +#### Synchronous writing -1. Instantiate `WriteOptions()` with defaults or with -`WriteOptions.write_type=WriteType.batching`. +Default. When no `write_client_options` are provided during the initialization of `InfluxDBClient3`, writes are synchronous. +When writing data in synchronous mode, the client immediately tries to write the provided data to InfluxDB, doesn't retry failed requests, and doesn't invoke response callbacks. - ```python - # Create a WriteOptions instance for batch writes with batch size, flush, and retry defaults. - write_options = WriteOptions() - ``` +##### Example: initialize a client with synchronous (non-batch) defaults -2. Pass `write_options` from the preceding step to the `write_client_options` function. +The following example initializes a client for writing and querying data in an {{% product-name %}} database. +Given that `write_client_options` isn't specified, the client uses the default [synchronous writing](#synchronous-writing) mode. - ```py - wco = write_client_options(WriteOptions=write_options) - ``` +{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}} - The output is a dict with `WriteOptions` key-value pairs. + -3. Initialize the client, setting the `write_client_options` argument to `wco` from the preceding step. +--> - {{< tabs-wrapper >}} -{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}} -```py -from influxdb_client_3 import Point, InfluxDBClient3 - -points = [ - Point("home") - .tag("room", "Kitchen") - .field("temp", 25.3) - .field('hum', 20.2) - .field('co', 9), - Point("home") - .tag("room", "Living Room") - .field("temp", 24.0) - .field('hum', 20.0) - .field('co', 5)] - -with InfluxDBClient3(token="DATABASE_TOKEN", - host="{{< influxdb/host >}}", - database="DATABASE_NAME", - write_client_options=wco) as client: +```python +from influxdb_client_3 import InfluxDBClient3 - client.write(record=points) +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") ``` -{{% /code-placeholders %}} - {{< /tabs-wrapper >}} -#### Synchronous writing +{{% /code-placeholders %}} -Synchronous mode is the default mode for writing data (in batch and non-batch modes). -To specify synchronous mode, set `_write_client_options=None` or `_write_client_options.write_type=WriteType.synchronous`. +Replace the following: -### Examples +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/clustered/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/#database-tokens) + with read/write permissions on the specified database -##### Initialize a client +To explicitly specify synchronous mode, create a client with `write_options=SYNCHRONOUS`--for example: -The following example initializes a client for writing and querying data in a {{% product-name %}} database. -When writing or querying, the client waits synchronously for the response. +{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}} -Given `client.write_client_options` doesn't set `WriteOptions`, the client uses the default [non-batch writing](#non-batch-writing) mode. + + -{{% code-placeholders "DATABASE_(NAME|TOKEN)" %}} ```python -from influxdb_client_3 import InfluxDBClient3 +from influxdb_client_3 import InfluxDBClient3, write_client_options, SYNCHRONOUS -client = InfluxDBClient3(token="DATABASE_TOKEN", - host="{{< influxdb/host >}}", - database="DATABASE_NAME") +wco = write_client_options(write_options=SYNCHRONOUS) + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN", + write_client_options=wco, + flight_client_options=None) ``` + {{% /code-placeholders %}} Replace the following: - {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/clustered/admin/databases/) -- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: a [database token](/influxdb/clustered/admin/tokens/#database-tokens) with read permissions on the specified database +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/#database-tokens) + with write permissions on the specified database + +#### Batch writing -##### Initialize a client for batch writing +Batch writing is particularly useful for efficient bulk data operations. +Options include setting batch size, flush intervals, retry intervals, and more. -The following example shows how to initialize a client for batch writing data to the database. -When writing data, the client uses batch mode with default options and -invokes the callback function, if specified, for the response status (success, error, or retryable error). +Batch writing groups multiple writes into a single request to InfluxDB. +In batching mode, the client adds the record or records to a batch, and then schedules the batch for writing to InfluxDB. +The client writes the batch to InfluxDB after reaching `write_client_options.batch_size` or `write_client_options.flush_interval`. +If a write fails, the client reschedules the write according to the `write_client_options` retry options. -{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} -```python - from influxdb_client_3 import InfluxDBClient3, - write_client_options, - WriteOptions, - InfluxDBError +##### Configuring write client options - # Define callbacks for write responses - def success(self, data: str): - print(f"Successfully wrote batch: data: {data}") +Use `WriteOptions` and `write_client_options` to configure batch writing and response handling for the client: - def error(self, data: str, exception: InfluxDBError): - print(f"Failed writing batch: config: {self}, data: {data}, - error: {exception}") +1. Instantiate `WriteOptions`. To use batch defaults, call the constructor without specifying parameters. +2. Call `write_client_options` and use the `write_options` parameter to specify the `WriteOptions` instance from the preceding step. + Specify callback parameters (success, error, and retry) to invoke functions on success or error. +3. Instantiate `InfluxDBClient3` and use the `write_client_options` parameter to specify the `dict` output from the preceding step. - def retry(self, data: str, exception: InfluxDBError): - print(f"Failed retry writing batch: config: {self}, data: {data}, - error: {exception}") +##### Example: initialize a client using batch defaults and callbacks - # Instantiate WriteOptions for batching - write_options = WriteOptions() - wco = write_client_options(success_callback=success, - error_callback=error, - retry_callback=retry, - WriteOptions=write_options) - - # Use the with...as statement to ensure the file is properly closed and resources - # are released. - with InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}", - org="", database="DATABASE_NAME", - write_client_options=wco) as client: +The following example shows how to use batch mode with defaults and +specify callback functions for the response status (success, error, or retryable error). + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} - client.write_file(file='./home.csv', - timestamp_column='time', tag_columns=["room"]) + + + +```python +from influxdb_client_3 import(InfluxDBClient3, + write_client_options, + WriteOptions, + InfluxDBError) + +status = None + +# Define callbacks for write responses +def success(self, data: str): + status = "Success writing batch: data: {data}" + assert status.startsWith('Success'), f"Expected {status} to be success" + +def error(self, data: str, err: InfluxDBError): + status = f"Error writing batch: config: {self}, data: {data}, error: {err}" + assert status.startsWith('Success'), f"Expected {status} to be success" + + +def retry(self, data: str, err: InfluxDBError): + status = f"Retry error writing batch: config: {self}, data: {data}, error: {err}" + assert status.startsWith('Success'), f"Expected {status} to be success" + +# Instantiate WriteOptions for batching +write_options = WriteOptions() +wco = write_client_options(success_callback=success, + error_callback=error, + retry_callback=retry, + write_options=write_options) + +# Use the with...as statement to ensure the file is properly closed and resources +# are released. +with InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN", + write_client_options=wco) as client: + + client.write_file(file='./data/home-sensor-data.csv', + timestamp_column='time', tag_columns=["room"], write_precision='s') +``` + {{% /code-placeholders %}} Replace the following: - {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/clustered/admin/databases/) -- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: a [database token](/influxdb/clustered/admin/tokens/#database-tokens) with read permissions on the specified database +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/#database-tokens) + with write permissions on the specified database ### InfluxDBClient3 instance methods @@ -278,62 +326,123 @@ Replace the following: Writes a record or a list of records to InfluxDB. -#### Syntax - -```python -write(self, record=None, **kwargs) -``` - #### Parameters -- **`record`**: A record or list of records to write. A record can be a `Point` object, a dict that represents a point, a line protocol string, or a `DataFrame`. -- **`database`**: The database to write to. Default is to write to the database specified for the client. -- **`**kwargs`**: Additional write options--for example: - - **`write_precision`**: _Optional_. Default is `"ns"`. - Specifies the [precision](/influxdb/clustered/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. - - **`write_client_options`**: _Optional_. - Specifies callback functions and options for [batch writing](#batch-writing) mode. - -#### Examples +- **`record`** (_record_ or list): A record or list of records to write. A record can be a `Point` object, a dict that represents a point, a line protocol string, or a `DataFrame`. +- **`database`** (string): The database to write to. Default is to write to the database specified for the client. +- **`**kwargs`\*\*: Additional write options--for example: + - **`write_precision`** (string): _Optional_. Default is `"ns"`. + Specifies the [precision](/influxdb/clustered/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. + - **`write_client_options`** (dict): _Optional_. + Specifies callback functions and options for [batch writing](#batch-writing) mode. + To generate the `dict`, use the [`write_client_options` function](#function-write_client_optionskwargs). -##### Write a line protocol string +#### Example: write a line protocol string {{% influxdb/custom-timestamps %}} {{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + + ```python from influxdb_client_3 import InfluxDBClient3 -points = "home,room=Living\ Room temp=21.1,hum=35.9,co=0i 1641024000" +point = "home,room=Living\\ Room temp=21.1,hum=35.9,co=0i 1641024000" -client = InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}", - database="DATABASE_NAME") +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") -client.write(record=points, write_precision="s") +client.write(record=point, write_precision="s") +``` + +The following sample code executes an SQL query to retrieve the point: + + + +```python +# Execute an SQL query +table = client.query(query='''SELECT room + FROM home + WHERE temp=21.1 + AND time=from_unixtime(1641024000)''') +# table is a pyarrow.Table +room = table[0][0] +assert f"{room}" == 'Living Room', f"Expected {room} to be Living Room" ``` + {{% /code-placeholders %}} {{% /influxdb/custom-timestamps %}} -##### Write data using points +Replace the following: + +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/clustered/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/#database-tokens) + with write permissions on the specified database + +#### Example: write data using points The `influxdb_client_3.Point` class provides an interface for constructing a data point for a measurement and setting fields, tags, and the timestamp for the point. The following example shows how to create a `Point` object, and then write the data to InfluxDB. +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + + ```python from influxdb_client_3 import Point, InfluxDBClient3 -point = Point("home").tag("room", "Kitchen").field("temp", 72) -... + +point = Point("home").tag("room", "Kitchen").field("temp", 21.5).field("hum", .25) +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") client.write(point) ``` -###### Write data using a dict + + +The following sample code executes an InfluxQL query to retrieve the written data: + +```python +# Execute an InfluxQL query +table = client.query(query='''SELECT DISTINCT(temp) as val + FROM home + WHERE temp > 21.0 + AND time >= now() - 10m''', language="influxql") +# table is a pyarrow.Table +df = table.to_pandas() +assert 21.5 in df['val'].values, f"Expected value in {df['val']}" +``` + +{{% /code-placeholders %}} + +Replace the following: + +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/clustered/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/#database-tokens) + with write permissions on the specified database + +##### Example: write data using a dict `InfluxDBClient3` can serialize a dictionary object into line protocol. If you pass a `dict` to `InfluxDBClient3.write`, the client expects the `dict` to have the following _point_ attributes: -- **measurement** (str): the measurement name +- **measurement** (string): the measurement name - **tags** (dict): a dictionary of tag key-value pairs - **fields** (dict): a dictionary of field key-value pairs - **time**: the [timestamp](/influxdb/clustered/reference/glossary/#timestamp) for the record @@ -343,7 +452,15 @@ data to InfluxDB. {{% influxdb/custom-timestamps %}} {{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} -```py + + + + +```python from influxdb_client_3 import InfluxDBClient3 # Using point dictionary structure @@ -354,80 +471,106 @@ points = { "time": 1641067200 } -client = InfluxDBClient3(token="DATABASE_TOKEN", - host="{{< influxdb/host >}}", - database="DATABASE_NAME") +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") client.write(record=points, write_precision="s") ``` + {{% /code-placeholders %}} {{% /influxdb/custom-timestamps %}} +Replace the following: + +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/clustered/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/#database-tokens) + with write permissions on the specified database + ### InfluxDBClient3.write_file Writes data from a file to InfluxDB. Execution is synchronous. -#### Syntax - -```python -write_file(self, file, measurement_name=None, tag_columns=[], - timestamp_column='time', **kwargs) -``` #### Parameters -- **`file`** (str): A path to a file containing records to write to InfluxDB. - The filename must end with one of the following supported extensions. - For more information about encoding and formatting data, see the documentation for each supported format: - - - `.feather`: [Feather](https://arrow.apache.org/docs/python/feather.html) - - `.parquet`: [Parquet](https://arrow.apache.org/docs/python/parquet.html) - - `.csv`: [Comma-separated values](https://arrow.apache.org/docs/python/csv.html) - - `.json`: [JSON](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) - - `.orc`: [ORC](https://arrow.apache.org/docs/python/orc.html) - -- **`measurement_name`**: Defines the measurement name for records in the file. +- **`file`** (string): A path to a file containing records to write to InfluxDB. + The filename must end with one of the following supported extensions. + For more information about encoding and formatting data, see the documentation for each supported format: + + - `.feather`: [Feather](https://arrow.apache.org/docs/python/feather.html) + - `.parquet`: [Parquet](https://arrow.apache.org/docs/python/parquet.html) + - `.csv`: [Comma-separated values](https://arrow.apache.org/docs/python/csv.html) + - `.json`: [JSON](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) + - `.orc`: [ORC](https://arrow.apache.org/docs/python/orc.html) + +- **`measurement_name`** (string): Defines the measurement name for records in the file. The specified value takes precedence over `measurement` and `iox::measurement` columns in the file. If no value is specified for the parameter, and a `measurement` column exists in the file, the `measurement` column value is used for the measurement name. If no value is specified for the parameter, and no `measurement` column exists, the `iox::measurement` column value is used for the measurement name. -- **`tag_columns`**: A list containing the names of tag columns. +- **`tag_columns`** (list): Tag column names. Columns not included in the list and not specified by another parameter are assumed to be fields. -- **`timestamp_column`**: The name of the column that contains timestamps. Default is `'time'`. -- **`database`**: The database to write to. Default is to write to the database specified for the client. -- **`**kwargs`**: Additional write options--for example: - - **`write_precision`**: _Optional_. Default is `"ns"`. - Specifies the [precision](/influxdb/clustered/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. - - **`write_client_options`**: _Optional_. - Specifies callback functions and options for [batch writing](#batch-writing) mode. - -#### Examples - -##### Write data from a file - -The following example shows how to configure write options for batching, retries, and callbacks, +- **`timestamp_column`** (string): The name of the column that contains timestamps. Default is `'time'`. +- **`database`** (`str`): The database to write to. Default is to write to the database specified for the client. +- **`file_parser_options`** (callable): A function for providing additional arguments to the file parser. +- **`**kwargs`**: Additional options to pass to the `WriteAPI`--for example: + - **`write_precision`** (string): _Optional_. Default is `"ns"`. + Specifies the [precision](/influxdb/clustered/reference/glossary/#precision) (`"ms"`, `"s"`, `"us"`, `"ns"`) for timestamps in `record`. + - **`write_client_options`** (dict): _Optional_. + Specifies callback functions and options for [batch writing](#batch-writing) mode. + To generate the `dict`, use the [`write_client_options` function](#function-write_client_optionskwargs). + +#### Example: use batch options when writing file data + +The following example shows how to specify customized write options for batching, retries, and response callbacks, and how to write data from CSV and JSON files to InfluxDB: {{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} -```python -from influxdb_client_3 import InfluxDBClient3, write_client_options, - WritePrecision, WriteOptions, InfluxDBError - -class BatchingCallback(object): - # Define callbacks for write responses - def success(self, data: str): - print(f"Successfully wrote batch: data: {data}") - - def error(self, data: str, exception: InfluxDBError): - print(f"Failed writing batch: config: {self}, data: {data}, - error: {exception}") - - def retry(self, data: str, exception: InfluxDBError): - print(f"Failed retry writing batch: config: {self}, data: {data}, - error: {exception}") + + -# Instantiate the callbacks -callback = BatchingCallback() +```python +from influxdb_client_3 import(InfluxDBClient3, write_client_options, + WritePrecision, WriteOptions, InfluxDBError) + +# Define the result object +result = { + 'config': None, + 'status': None, + 'data': None, + 'error': None +} + +# Define callbacks for write responses +def success_callback(self, data: str): + result['config'] = self + result['status'] = 'success' + result['data'] = data + + assert result['data'] != None, f"Expected {result['data']}" + print("Successfully wrote data: {result['data']}") + +def error_callback(self, data: str, exception: InfluxDBError): + result['config'] = self + result['status'] = 'error' + result['data'] = data + result['error'] = exception + + assert result['status'] == "success", f"Expected {result['error']} to be success for {result['config']}" + +def retry_callback(self, data: str, exception: InfluxDBError): + result['config'] = self + result['status'] = 'retry_error' + result['data'] = data + result['error'] = exception + + assert result['status'] == "success", f"Expected {result['status']} to be success for {result['config']}" write_options = WriteOptions(batch_size=500, flush_interval=10_000, @@ -437,199 +580,372 @@ write_options = WriteOptions(batch_size=500, max_retry_delay=30_000, exponential_base=2) -wco = write_client_options(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry, - WriteOptions=write_options) -with InfluxDBClient3(token="DATABASE_TOKEN", host="{{< influxdb/host >}}", - database="DATABASE_NAME", - _write_client_options=wco) as client: +wco = write_client_options(success_callback=success_callback, + error_callback=error_callback, + retry_callback=retry_callback, + write_options=write_options) + +with InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN", + write_client_options=wco) as client: + + client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time', + tag_columns=["room"], write_precision='s') - client.write_file(file='./home.csv', timestamp_column='time', - tag_columns=["room"]) - - client.write_file(file='./home.json', timestamp_column='time', - tag_columns=["room"], date_unit='ns') + client.write_file(file='./data/home-sensor-data.json', timestamp_column='time', + tag_columns=["room"], write_precision='s') ``` + {{% /code-placeholders %}} -### InfluxDBClient3.query +Replace the following: -Sends a Flight request to execute the specified SQL or InfluxQL query. -Returns all data in the query result as an Arrow table. +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/clustered/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/#database-tokens) + with write permissions on the specified database -#### Syntax +### InfluxDBClient3.query -```python -query(self, query, language="sql", mode="all", **kwargs ) -``` +Sends a Flight request to execute the specified SQL or InfluxQL query. +Returns all data in the query result as an Arrow table ([`pyarrow.Table`](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html) instance). #### Parameters -- **`query`** (str): the SQL or InfluxQL to execute. -- **`language`** (str): the query language used in the `query` parameter--`"sql"` or `"influxql"`. Default is `"sql"`. -- **`mode`** (str): Specifies the output to return from the [`pyarrow.flight.FlightStreamReader`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader). - Default is `"all"`. - - `all`: Read the entire contents of the stream and return it as a `pyarrow.Table`. - - `chunk`: Read the next message (a `FlightStreamChunk`) and return `data` and `app_metadata`. - Returns `null` if there are no more messages. - - `pandas`: Read the contents of the stream and return it as a `pandas.DataFrame`. - - `reader`: Convert the `FlightStreamReader` into a [`pyarrow.RecordBatchReader`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html#pyarrow-recordbatchreader). - - `schema`: Return the schema for all record batches in the stream. +- **`query`** (string): the SQL or InfluxQL to execute. +- **`language`** (string): the query language used in the `query` parameter--`"sql"` or `"influxql"`. Default is `"sql"`. +- **`mode`** (string): Specifies the output to return from the [`pyarrow.flight.FlightStreamReader`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightStreamReader.html#pyarrow.flight.FlightStreamReader). + Default is `"all"`. + - `all`: Read the entire contents of the stream and return it as a [`pyarrow.Table`](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html). + - `chunk`: Read the next message (a `FlightStreamChunk`) and return `data` and `app_metadata`. + Returns `null` if there are no more messages. + - `pandas`: Read the contents of the stream and return it as a [`pandas.DataFrame`](https://pandas.pydata.org/pandas-docs/stable/reference/frame.html). + - `reader`: Convert the `FlightStreamReader` into a [`pyarrow.RecordBatchReader`](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html#pyarrow-recordbatchreader). + - `schema`: Return the schema for all record batches in the stream. - **`**kwargs`**: [`FlightCallOptions`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightCallOptions.html#pyarrow.flight.FlightCallOptions) -#### Examples +#### Example: query using SQL + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} -##### Query using SQL + + ```python -table = client.query("SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'") +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") + +table = client.query("SELECT * from home WHERE time >= now() - INTERVAL '90 days'") + # Filter columns. print(table.select(['room', 'temp'])) + # Use PyArrow to aggregate data. print(table.group_by('hum').aggregate([])) ``` -##### Query using InfluxQL +{{% /code-placeholders %}} + +In the examples, replace the following: + +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: the name of your {{% product-name %}} [database](/influxdb/clustered/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/#database-tokens) + with read permission on the specified database + +#### Example: query using InfluxQL + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + ```python -query = "SELECT * FROM measurement WHERE time >= -90d" +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") +query = "SELECT * from home WHERE time >= -90d" table = client.query(query=query, language="influxql") + # Filter columns. print(table.select(['room', 'temp'])) ``` -##### Read all data from the stream and return a pandas DataFrame +{{% /code-placeholders %}} + +##### Example: read all data from the stream and return a pandas DataFrame + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + ```python -query = "SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'" +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") +query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" pd = client.query(query=query, mode="pandas") # Print the pandas DataFrame formatted as a Markdown table. print(pd.to_markdown()) ``` -##### View the schema for all batches in the stream +{{% /code-placeholders %}} + +##### Example: view the schema for all batches in the stream +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + + +```python +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") +table = client.query("""SELECT * + from home + WHERE time >= now() - INTERVAL '90 days'""") + +# View the table schema. print(table.schema) ``` -##### Retrieve the result schema and no data +{{% /code-placeholders %}} + +##### Example: retrieve the result schema and no data + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + ```python -query = "SELECT * FROM measurement WHERE time >= now() - INTERVAL '90 days'" +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") +query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" schema = client.query(query=query, mode="schema") print(schema) ``` +{{% /code-placeholders %}} + ##### Specify a timeout Pass `timeout=` for [`FlightCallOptions`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightCallOptions.html#pyarrow.flight.FlightCallOptions) to use a custom timeout. +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + + +```python +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") +query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" client.query(query=query, timeout=5) ``` +{{% /code-placeholders %}} + ### InfluxDBClient3.close Sends all remaining records from the batch to InfluxDB, and then closes the underlying write client and Flight client to release resources. -#### Syntax +#### Example: close a client + +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + ```python +from influxdb_client_3 import InfluxDBClient3 + +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN") client.close() ``` -## Class Point +{{% /code-placeholders %}} -```python -influxdb_client_3.Point -``` +## Class Point Provides an interface for constructing a time series data point for a measurement, and setting fields, tags, and timestamp. -The following example shows how to create a `Point`, and then write the -data to InfluxDB. + + -```py +```python +from influxdb_client_3 import Point point = Point("home").tag("room", "Living Room").field("temp", 72) -client.write(point) ``` +See how to [write data using points](#example-write-data-using-points). + ## Class WriteOptions +Provides an interface for constructing options that customize batch writing behavior, such as batch size and retry. + + + -Options for configuring client behavior (batch size, retry, callbacks, etc.) when writing data to InfluxDB. +```python +from influxdb_client_3 import WriteOptions -For client configuration examples, see [Initialize a client](#initialize-a-client). +write_options = WriteOptions(batch_size=500, + flush_interval=10_000, + jitter_interval=2_000, + retry_interval=5_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) +``` -### Syntax +See how to [use batch options for writing data](#example-use-batch-options-when-writing-file-data). -```python -__init__(self, write_type: WriteType = WriteType.batching, - batch_size=1_000, flush_interval=1_000, - jitter_interval=0, - retry_interval=5_000, - max_retries=5, - max_retry_delay=125_000, - max_retry_time=180_000, - exponential_base=2, - max_close_wait=300_000, - write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: +### Parameters -``` +- **`batch_size`**: Default is `1000`. +- **`flush_interval`**: Default is `1000`. +- **`jitter_interval`**: Default is `0`. +- **`retry_interval`**: Default is `5000`. +- **`max_retries`**: Default is `5`. +- **`max_retry_delay`**: Default is `125000`. +- **`max_retry_time`**: Default is `180000`. +- **`exponential_base`**: Default is `2`. +- **`max_close_wait`**: Default is `300000`. +- **`write_scheduler`**: Default is `ThreadPoolScheduler(max_workers=1)`. ## Functions - [influxdb_client_3.write_client_options](#function-write_client_optionskwargs) - [influxdb_client_3.flight_client_options](#function-flight_client_optionskwargs) -### Function write_client_options(**kwargs) +### Function write_client_options(\*\*kwargs) -```python -influxdb_client_3.write_client_options(kwargs) -``` +Returns a `dict` with the specified write client options. + +#### Parameters + +The function takes the following keyword arguments: -- Takes the following parameters: +- **`write_options`** ([`WriteOptions`](#class-writeoptions)): Specifies whether the client writes data using synchronous mode or batching mode. If using batching mode, the client uses the specified batching options. +- **`point_settings`** (dict): Default tags that the client will add to each point when writing the data to InfluxDB. +- **`success_callback`** (callable): If using batching mode, a function to call after data is written successfully to InfluxDB (HTTP status `204`) +- **`error_callback`** (callable): if using batching mode, a function to call if data is not written successfully (the response has a non-`204` HTTP status) +- **`retry_callback`** (callable): if using batching mode, a function to call if the request is a retry (using batching mode) and data is not written successfully - - `kwargs`: keyword arguments for `WriteApi` +#### Example: instantiate options for batch writing -- Returns a dictionary of write client options. +```python +from influxdb_client_3 import write_client_options, WriteOptions +from influxdb_client_3.write_client.client.write_api import WriteType + +def success(): + print("Success") +def error(): + print("Error") +def retry(): + print("Retry error") + +write_options = WriteOptions() +wco = write_client_options(success_callback=success, + error_callback=error, + retry_callback=retry, + write_options=write_options) + +assert wco['success_callback'] +assert wco['error_callback'] +assert wco['retry_callback'] +assert wco['write_options'].write_type == WriteType.batching +``` -### Function flight_client_options(**kwargs) +#### Example: instantiate options for synchronous writing ```python -influxdb_client_3.flight_client_options(kwargs) +from influxdb_client_3 import write_client_options, SYNCHRONOUS +from influxdb_client_3.write_client.client.write_api import WriteType + +wco = write_client_options(write_options=SYNCHRONOUS) + +assert wco['write_options'].write_type == WriteType.synchronous ``` -- Takes the following parameters: +### Function flight_client_options(\*\*kwargs) - - `kwargs`: keyword arguments for `FlightClient` +Returns a `dict` with the specified [FlightClient](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightClient.html) parameters. -- Returns a dictionary of Flight client options. +#### Parameters + +- `kwargs`: keyword arguments for [`pyarrow.flight.FlightClient`](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightClient.html) parameters -#### Examples +#### Example: specify the root certificate path -##### Specify the root certificate path +{{% code-placeholders "DATABASE_NAME|DATABASE_TOKEN" %}} + + + ```python from influxdb_client_3 import InfluxDBClient3, flight_client_options @@ -639,14 +955,22 @@ fh = open(certifi.where(), "r") cert = fh.read() fh.close() -client = InfluxDBClient3( - token="DATABASE_TOKEN", - host="{{< influxdb/host >}}", - database="DATABASE_NAME", - flight_client_options=flight_client_options( - tls_root_certs=cert)) +client = InfluxDBClient3(host=f"{{< influxdb/host >}}", + database=f"DATABASE_NAME", + token=f"DATABASE_TOKEN", + fco=flight_client_options(tls_root_certs=cert)) ``` +{{% /code-placeholders %}} + +Replace the following: + +- {{% code-placeholder-key %}}`DATABASE_NAME`{{% /code-placeholder-key %}}: + the name of your {{% product-name %}} [database](/influxdb/clustered/admin/databases/) +- {{% code-placeholder-key %}}`DATABASE_TOKEN`{{% /code-placeholder-key %}}: + an {{% product-name %}} [database token](/influxdb/clustered/admin/tokens/#database-tokens) + with read permission on the specified database + ## Constants - `influxdb_client_3.SYNCHRONOUS`: Represents synchronous write mode