From 3adf95f5c31696c7959d76c94505b4cccc8f6916 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Mon, 13 Jan 2020 10:27:37 +0100 Subject: [PATCH] Add support for default tags (#50) --- CHANGELOG.md | 3 + README.rst | 28 ++++ influxdb_client/client/influxdb_client.py | 7 +- influxdb_client/client/write_api.py | 44 ++++++- tests/test_WriteApi.py | 150 +++++++++++++++++++++- 5 files changed, 225 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d4e1a0f..befcac94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.3.0 [unreleased] +### Features +1. [#49](https://github.com/influxdata/influxdb-client-python/issues/50): Implemented default tags + ### API 1. [#47](https://github.com/influxdata/influxdb-client-python/pull/47): Updated swagger to latest version diff --git a/README.rst b/README.rst index 845f1591..7efaa295 100644 --- a/README.rst +++ b/README.rst @@ -252,6 +252,34 @@ The batching is configurable by ``write_options``\ : .. marker-batching-end +Default Tags +"""""""""""" +.. marker-default-tags-start + +Sometimes is useful to store same information in every measurement e.g. ``hostname``, ``location``, ``customer``. +The client is able to use static value or env property as a tag value. + +The expressions: + +- ``California Miner`` - static value +- ``${env.hostname}`` - environment property + +.. code-block:: python + + point_settings = PointSettings() + point_settings.add_default_tag("id", "132-987-655") + point_settings.add_default_tag("customer", "California Miner") + point_settings.add_default_tag("data_center", "${env.data_center}") + + self.write_client = self.client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings) + +.. code-block:: python + + self.write_client = self.client.write_api(write_options=SYNCHRONOUS, + point_settings=PointSettings(**{"id": "132-987-655", + "customer": "California Miner"})) +.. marker-default-tags-end + Asynchronous client """"""""""""""""""" diff --git a/influxdb_client/client/influxdb_client.py b/influxdb_client/client/influxdb_client.py index ea582149..b0fb64c2 100644 --- a/influxdb_client/client/influxdb_client.py +++ b/influxdb_client/client/influxdb_client.py @@ -9,7 +9,7 @@ from influxdb_client.client.query_api import QueryApi from influxdb_client.client.tasks_api import TasksApi from influxdb_client.client.users_api import UsersApi -from influxdb_client.client.write_api import WriteApi, WriteOptions +from influxdb_client.client.write_api import WriteApi, WriteOptions, PointSettings class InfluxDBClient(object): @@ -45,14 +45,15 @@ def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org self.api_client = ApiClient(configuration=conf, header_name=auth_header_name, header_value=auth_header_value) - def write_api(self, write_options=WriteOptions()) -> WriteApi: + def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi: """ Creates a Write API instance + :param point_settings: :param write_options: write api configuration :return: write api instance """ - return WriteApi(influxdb_client=self, write_options=write_options) + return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings) def query_api(self) -> QueryApi: """ diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 8bc988e6..b69e0646 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -6,6 +6,8 @@ from time import sleep from typing import Union, List +import os + import rx from rx import operators as ops, Observable from rx.scheduler import ThreadPoolScheduler @@ -55,6 +57,33 @@ def __init__(self, write_type: WriteType = WriteType.batching, ASYNCHRONOUS = WriteOptions(write_type=WriteType.asynchronous) +class PointSettings(object): + + def __init__(self, **default_tags) -> None: + + """ + Creates point settings for write api. + + :param default_tags: Default tags which will be added to each point written by api. + """ + + self.defaultTags = dict() + + for key, val in default_tags.items(): + self.add_default_tag(key, val) + + @staticmethod + def _get_value(value): + + if value.startswith("${env."): + return os.environ.get(value[6:-1]) + + return value + + def add_default_tag(self, key, value) -> None: + self.defaultTags[key] = self._get_value(value) + + class _BatchItem(object): def __init__(self, key, data, size=1) -> None: self.key = key @@ -103,10 +132,12 @@ def _body_reduce(batch_items): class WriteApi(AbstractClient): - def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()) -> None: + def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions(), + point_settings: PointSettings = PointSettings()) -> None: self._influxdb_client = influxdb_client self._write_service = WriteService(influxdb_client.api_client) self._write_options = write_options + self._point_settings = point_settings if self._write_options.write_type is WriteType.batching: # Define Subject that listen incoming data and produces writes into InfluxDB self._subject = Subject() @@ -153,6 +184,17 @@ def write(self, bucket: str, org: str = None, if self._write_options.write_type is WriteType.batching: return self._write_batching(bucket, org, record, write_precision) + if self._point_settings.defaultTags and record: + for key, val in self._point_settings.defaultTags.items(): + if isinstance(record, dict): + record.get("tags")[key] = val + else: + for r in record: + if isinstance(r, dict): + r.get("tags")[key] = val + elif isinstance(r, Point): + r.tag(key, val) + final_string = self._serialize(record, write_precision) _async_req = True if self._write_options.write_type == WriteType.asynchronous else False diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index cb4f0c9b..cfdf8e65 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -3,12 +3,13 @@ from __future__ import absolute_import import datetime +import os import unittest import time from multiprocessing.pool import ApplyResult from influxdb_client import Point, WritePrecision -from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS +from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, PointSettings from influxdb_client.rest import ApiException from tests.base_test import BaseTest @@ -17,7 +18,18 @@ class SynchronousWriteTest(BaseTest): def setUp(self) -> None: super().setUp() - self.write_client = self.client.write_api(write_options=SYNCHRONOUS) + + os.environ['data_center'] = "LA" + + self.id_tag = "132-987-655" + self.customer_tag = "California Miner" + self.data_center_key = "data_center" + + self.write_client = self.client.write_api(write_options=SYNCHRONOUS, + point_settings=PointSettings(**{"id": self.id_tag, + "customer": self.customer_tag, + self.data_center_key: + '${env.data_center}'})) def tearDown(self) -> None: self.write_client.__del__() @@ -97,6 +109,7 @@ def test_write_points_unicode(self): p.field(field_name, utf8_val) p.tag(tag, tag_value) record_list = [p] + print(record_list) self.write_client.write(bucket.name, self.org, record_list) @@ -105,10 +118,55 @@ def test_write_points_unicode(self): self.assertEqual(1, len(flux_result)) rec = flux_result[0].records[0] + self.assertEqual(self.id_tag, rec["id"]) + self.assertEqual(self.customer_tag, rec["customer"]) + self.assertEqual("LA", rec[self.data_center_key]) + self.assertEqual(measurement, rec.get_measurement()) self.assertEqual(utf8_val, rec.get_value()) self.assertEqual(field_name, rec.get_field()) + def test_write_using_default_tags(self): + bucket = self.create_test_bucket() + + measurement = "h2o_feet" + field_name = "water_level" + val = "1.0" + val2 = "2.0" + tag = "location" + tag_value = "creek level" + + p = Point(measurement) + p.field(field_name, val) + p.tag(tag, tag_value) + p.time(1) + + p2 = Point(measurement) + p2.field(field_name, val2) + p2.tag(tag, tag_value) + p2.time(2) + + record_list = [p, p2] + print(record_list) + + self.write_client.write(bucket.name, self.org, record_list) + + query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)' + flux_result = self.client.query_api().query(query) + self.assertEqual(1, len(flux_result)) + rec = flux_result[0].records[0] + rec2 = flux_result[0].records[1] + + self.assertEqual(self.id_tag, rec["id"]) + self.assertEqual(self.customer_tag, rec["customer"]) + self.assertEqual("LA", rec[self.data_center_key]) + + self.assertEqual(self.id_tag, rec2["id"]) + self.assertEqual(self.customer_tag, rec2["customer"]) + self.assertEqual("LA", rec2[self.data_center_key]) + + self.delete_test_bucket(bucket) + def test_write_result(self): _bucket = self.create_test_bucket() @@ -205,7 +263,18 @@ class AsynchronousWriteTest(BaseTest): def setUp(self) -> None: super().setUp() - self.write_client = self.client.write_api(write_options=ASYNCHRONOUS) + + os.environ['data_center'] = "LA" + + self.id_tag = "132-987-655" + self.customer_tag = "California Miner" + self.data_center_key = "data_center" + + self.write_client = self.client.write_api(write_options=ASYNCHRONOUS, + point_settings=PointSettings(**{"id": self.id_tag, + "customer": self.customer_tag, + self.data_center_key: + '${env.data_center}'})) def tearDown(self) -> None: self.write_client.__del__() @@ -261,6 +330,43 @@ def test_write_dictionaries(self): self.delete_test_bucket(bucket) + def test_use_default_tags_with_dictionaries(self): + bucket = self.create_test_bucket() + + _point1 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "time": "2009-11-10T22:00:00Z", "fields": {"water_level": 1.0}} + _point2 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, + "time": "2009-11-10T23:00:00Z", "fields": {"water_level": 2.0}} + + _point_list = [_point1, _point2] + + self.write_client.write(bucket.name, self.org, _point_list) + time.sleep(1) + + query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)' + print(query) + + flux_result = self.client.query_api().query(query) + + self.assertEqual(1, len(flux_result)) + + records = flux_result[0].records + + self.assertEqual(2, len(records)) + + rec = records[0] + rec2 = records[1] + + self.assertEqual(self.id_tag, rec["id"]) + self.assertEqual(self.customer_tag, rec["customer"]) + self.assertEqual("LA", rec[self.data_center_key]) + + self.assertEqual(self.id_tag, rec2["id"]) + self.assertEqual(self.customer_tag, rec2["customer"]) + self.assertEqual("LA", rec2[self.data_center_key]) + + self.delete_test_bucket(bucket) + def test_write_bytes(self): bucket = self.create_test_bucket() @@ -300,5 +406,43 @@ def test_write_bytes(self): self.delete_test_bucket(bucket) +class PointSettingTest(BaseTest): + + def setUp(self) -> None: + super().setUp() + self.id_tag = "132-987-655" + self.customer_tag = "California Miner" + + def tearDown(self) -> None: + self.write_client.__del__() + super().tearDown() + + def test_point_settings(self): + self.write_client = self.client.write_api(write_options=SYNCHRONOUS, + point_settings=PointSettings(**{"id": self.id_tag, + "customer": self.customer_tag})) + + default_tags = self.write_client._point_settings.defaultTags + + self.assertEqual(self.id_tag, default_tags.get("id")) + self.assertEqual(self.customer_tag, default_tags.get("customer")) + + def test_point_settings_with_add(self): + os.environ['data_center'] = "LA" + + point_settings = PointSettings() + point_settings.add_default_tag("id", self.id_tag) + point_settings.add_default_tag("customer", self.customer_tag) + point_settings.add_default_tag("data_center", "${env.data_center}") + + self.write_client = self.client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings) + + default_tags = self.write_client._point_settings.defaultTags + + self.assertEqual(self.id_tag, default_tags.get("id")) + self.assertEqual(self.customer_tag, default_tags.get("customer")) + self.assertEqual("LA", default_tags.get("data_center")) + + if __name__ == '__main__': unittest.main()