Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.3.0 [unreleased]

### Features
1. [#49](https://github.com/influxdata/influxdb-client-python/issues/50): Implemented default tags

### API
1. [#47](https://github.com/influxdata/influxdb-client-python/pull/47): Updated swagger to latest version

Expand Down
28 changes: 28 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,34 @@ The batching is configurable by ``write_options``\ :

.. marker-batching-end

Default Tags
""""""""""""
.. marker-default-tags-start

Sometimes is useful to store same information in every measurement e.g. ``hostname``, ``location``, ``customer``.
The client is able to use static value or env property as a tag value.

The expressions:

- ``California Miner`` - static value
- ``${env.hostname}`` - environment property

.. code-block:: python

point_settings = PointSettings()
point_settings.add_default_tag("id", "132-987-655")
point_settings.add_default_tag("customer", "California Miner")
point_settings.add_default_tag("data_center", "${env.data_center}")

self.write_client = self.client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings)

.. code-block:: python

self.write_client = self.client.write_api(write_options=SYNCHRONOUS,
point_settings=PointSettings(**{"id": "132-987-655",
"customer": "California Miner"}))
.. marker-default-tags-end

Asynchronous client
"""""""""""""""""""

Expand Down
7 changes: 4 additions & 3 deletions influxdb_client/client/influxdb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from influxdb_client.client.query_api import QueryApi
from influxdb_client.client.tasks_api import TasksApi
from influxdb_client.client.users_api import UsersApi
from influxdb_client.client.write_api import WriteApi, WriteOptions
from influxdb_client.client.write_api import WriteApi, WriteOptions, PointSettings


class InfluxDBClient(object):
Expand Down Expand Up @@ -45,14 +45,15 @@ def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org
self.api_client = ApiClient(configuration=conf, header_name=auth_header_name,
header_value=auth_header_value)

def write_api(self, write_options=WriteOptions()) -> WriteApi:
def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi:
"""
Creates a Write API instance

:param point_settings:
:param write_options: write api configuration
:return: write api instance
"""
return WriteApi(influxdb_client=self, write_options=write_options)
return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings)

def query_api(self) -> QueryApi:
"""
Expand Down
44 changes: 43 additions & 1 deletion influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from time import sleep
from typing import Union, List

import os

import rx
from rx import operators as ops, Observable
from rx.scheduler import ThreadPoolScheduler
Expand Down Expand Up @@ -55,6 +57,33 @@ def __init__(self, write_type: WriteType = WriteType.batching,
ASYNCHRONOUS = WriteOptions(write_type=WriteType.asynchronous)


class PointSettings(object):

def __init__(self, **default_tags) -> None:

"""
Creates point settings for write api.

:param default_tags: Default tags which will be added to each point written by api.
"""

self.defaultTags = dict()

for key, val in default_tags.items():
self.add_default_tag(key, val)

@staticmethod
def _get_value(value):

if value.startswith("${env."):
return os.environ.get(value[6:-1])

return value

def add_default_tag(self, key, value) -> None:
self.defaultTags[key] = self._get_value(value)


class _BatchItem(object):
def __init__(self, key, data, size=1) -> None:
self.key = key
Expand Down Expand Up @@ -103,10 +132,12 @@ def _body_reduce(batch_items):

class WriteApi(AbstractClient):

def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()) -> None:
def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions(),
point_settings: PointSettings = PointSettings()) -> None:
self._influxdb_client = influxdb_client
self._write_service = WriteService(influxdb_client.api_client)
self._write_options = write_options
self._point_settings = point_settings
if self._write_options.write_type is WriteType.batching:
# Define Subject that listen incoming data and produces writes into InfluxDB
self._subject = Subject()
Expand Down Expand Up @@ -153,6 +184,17 @@ def write(self, bucket: str, org: str = None,
if self._write_options.write_type is WriteType.batching:
return self._write_batching(bucket, org, record, write_precision)

if self._point_settings.defaultTags and record:
for key, val in self._point_settings.defaultTags.items():
if isinstance(record, dict):
record.get("tags")[key] = val
else:
for r in record:
if isinstance(r, dict):
r.get("tags")[key] = val
elif isinstance(r, Point):
r.tag(key, val)

final_string = self._serialize(record, write_precision)

_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
Expand Down
150 changes: 147 additions & 3 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from __future__ import absolute_import

import datetime
import os
import unittest
import time
from multiprocessing.pool import ApplyResult

from influxdb_client import Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, PointSettings
from influxdb_client.rest import ApiException
from tests.base_test import BaseTest

Expand All @@ -17,7 +18,18 @@ class SynchronousWriteTest(BaseTest):

def setUp(self) -> None:
super().setUp()
self.write_client = self.client.write_api(write_options=SYNCHRONOUS)

os.environ['data_center'] = "LA"

self.id_tag = "132-987-655"
self.customer_tag = "California Miner"
self.data_center_key = "data_center"

self.write_client = self.client.write_api(write_options=SYNCHRONOUS,
point_settings=PointSettings(**{"id": self.id_tag,
"customer": self.customer_tag,
self.data_center_key:
'${env.data_center}'}))

def tearDown(self) -> None:
self.write_client.__del__()
Expand Down Expand Up @@ -97,6 +109,7 @@ def test_write_points_unicode(self):
p.field(field_name, utf8_val)
p.tag(tag, tag_value)
record_list = [p]
print(record_list)

self.write_client.write(bucket.name, self.org, record_list)

Expand All @@ -105,10 +118,55 @@ def test_write_points_unicode(self):
self.assertEqual(1, len(flux_result))
rec = flux_result[0].records[0]

self.assertEqual(self.id_tag, rec["id"])
self.assertEqual(self.customer_tag, rec["customer"])
self.assertEqual("LA", rec[self.data_center_key])

self.assertEqual(measurement, rec.get_measurement())
self.assertEqual(utf8_val, rec.get_value())
self.assertEqual(field_name, rec.get_field())

def test_write_using_default_tags(self):
bucket = self.create_test_bucket()

measurement = "h2o_feet"
field_name = "water_level"
val = "1.0"
val2 = "2.0"
tag = "location"
tag_value = "creek level"

p = Point(measurement)
p.field(field_name, val)
p.tag(tag, tag_value)
p.time(1)

p2 = Point(measurement)
p2.field(field_name, val2)
p2.tag(tag, tag_value)
p2.time(2)

record_list = [p, p2]
print(record_list)

self.write_client.write(bucket.name, self.org, record_list)

query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
flux_result = self.client.query_api().query(query)
self.assertEqual(1, len(flux_result))
rec = flux_result[0].records[0]
rec2 = flux_result[0].records[1]

self.assertEqual(self.id_tag, rec["id"])
self.assertEqual(self.customer_tag, rec["customer"])
self.assertEqual("LA", rec[self.data_center_key])

self.assertEqual(self.id_tag, rec2["id"])
self.assertEqual(self.customer_tag, rec2["customer"])
self.assertEqual("LA", rec2[self.data_center_key])

self.delete_test_bucket(bucket)

def test_write_result(self):
_bucket = self.create_test_bucket()

Expand Down Expand Up @@ -205,7 +263,18 @@ class AsynchronousWriteTest(BaseTest):

def setUp(self) -> None:
super().setUp()
self.write_client = self.client.write_api(write_options=ASYNCHRONOUS)

os.environ['data_center'] = "LA"

self.id_tag = "132-987-655"
self.customer_tag = "California Miner"
self.data_center_key = "data_center"

self.write_client = self.client.write_api(write_options=ASYNCHRONOUS,
point_settings=PointSettings(**{"id": self.id_tag,
"customer": self.customer_tag,
self.data_center_key:
'${env.data_center}'}))

def tearDown(self) -> None:
self.write_client.__del__()
Expand Down Expand Up @@ -261,6 +330,43 @@ def test_write_dictionaries(self):

self.delete_test_bucket(bucket)

def test_use_default_tags_with_dictionaries(self):
bucket = self.create_test_bucket()

_point1 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"time": "2009-11-10T22:00:00Z", "fields": {"water_level": 1.0}}
_point2 = {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"time": "2009-11-10T23:00:00Z", "fields": {"water_level": 2.0}}

_point_list = [_point1, _point2]

self.write_client.write(bucket.name, self.org, _point_list)
time.sleep(1)

query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
print(query)

flux_result = self.client.query_api().query(query)

self.assertEqual(1, len(flux_result))

records = flux_result[0].records

self.assertEqual(2, len(records))

rec = records[0]
rec2 = records[1]

self.assertEqual(self.id_tag, rec["id"])
self.assertEqual(self.customer_tag, rec["customer"])
self.assertEqual("LA", rec[self.data_center_key])

self.assertEqual(self.id_tag, rec2["id"])
self.assertEqual(self.customer_tag, rec2["customer"])
self.assertEqual("LA", rec2[self.data_center_key])

self.delete_test_bucket(bucket)

def test_write_bytes(self):
bucket = self.create_test_bucket()

Expand Down Expand Up @@ -300,5 +406,43 @@ def test_write_bytes(self):
self.delete_test_bucket(bucket)


class PointSettingTest(BaseTest):

def setUp(self) -> None:
super().setUp()
self.id_tag = "132-987-655"
self.customer_tag = "California Miner"

def tearDown(self) -> None:
self.write_client.__del__()
super().tearDown()

def test_point_settings(self):
self.write_client = self.client.write_api(write_options=SYNCHRONOUS,
point_settings=PointSettings(**{"id": self.id_tag,
"customer": self.customer_tag}))

default_tags = self.write_client._point_settings.defaultTags

self.assertEqual(self.id_tag, default_tags.get("id"))
self.assertEqual(self.customer_tag, default_tags.get("customer"))

def test_point_settings_with_add(self):
os.environ['data_center'] = "LA"

point_settings = PointSettings()
point_settings.add_default_tag("id", self.id_tag)
point_settings.add_default_tag("customer", self.customer_tag)
point_settings.add_default_tag("data_center", "${env.data_center}")

self.write_client = self.client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings)

default_tags = self.write_client._point_settings.defaultTags

self.assertEqual(self.id_tag, default_tags.get("id"))
self.assertEqual(self.customer_tag, default_tags.get("customer"))
self.assertEqual("LA", default_tags.get("data_center"))


if __name__ == '__main__':
unittest.main()