Skip to content

Commit

Permalink
#2: Added example with iot sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Aug 9, 2019
1 parent b99c7e8 commit fb813e9
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 10 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## 1.0.0 [unreleased]

### Features
1. [#2](https://github.com/bonitoo-io/influxdb-client-python/issues/2): The write client is able to write data in batches (configuration: `batch_size`, `flush_interval`, `jitter_interval`, `retry_interval`)
87 changes: 87 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ InfluxDB 2.0 python client library. TODO...
- [Features](#how-to-use)
- [Writing data](#writes)
- [How to efficiently import large dataset](#how-to-efficiently-import-large-dataset)
- [Efficiency write data from IOT sensor](#efficiency-write-data-from-iot-sensor)

## Requirements

Expand Down Expand Up @@ -101,6 +102,8 @@ The [WriteApiClient](https://github.com/bonitoo-io/influxdb-client-python/blob/m

#### How to efficiently import large dataset

- sources - [import_data_set.py](https://github.com/bonitoo-io/influxdb-client-python/blob/master/influxdb2_test/import_data_set.py)

```python
"""
Import VIX - CBOE Volatility Index - from "vix-daily.csv" file into InfluxDB 2.0
Expand Down Expand Up @@ -189,5 +192,89 @@ for table in result:
Close client
"""
client.__del__()
```

#### Efficiency write data from IOT sensor

- sources - [iot_sensor.py](https://github.com/bonitoo-io/influxdb-client-python/blob/master/influxdb2_test/iot_sensor.py)

```python
"""
Efficiency write data from IOT sensor - write changed temperature every minute
"""
import atexit
import platform
from datetime import timedelta

import psutil as psutil
import rx
from rx import operators as ops

from influxdb2.client.influxdb_client import InfluxDBClient
from influxdb2.client.write_api import WriteApi
from influxdb2.client.write_api import WriteOptions


def on_exit(db_client: InfluxDBClient, write_api: WriteApi):
"""Close clients after terminate a script.
:param db_client: InfluxDB client
:param write_api: WriteApi
:return: nothing
"""
write_api.__del__()
db_client.__del__()


def sensor_temperature():
"""Read a CPU temperature. The [psutil] doesn't support MacOS so we use [sysctl].
:return: actual CPU temperature
"""
os_name = platform.system()
if os_name == 'Darwin':
from subprocess import check_output
output = check_output(["sysctl", "machdep.xcpm.cpu_thermal_level"])
import re
return re.findall(r'\d+', str(output))[0]
else:
return psutil.sensors_temperatures()["coretemp"][0]


def line_protocol(temperature):
"""Create a InfluxDB line protocol with structure:
iot_sensor,hostname=mine_sensor_12,type=temperature value=68
:param temperature: the sensor temperature
:return: Line protocol to write into InfluxDB
"""

import socket
return 'iot_sensor,hostname={},type=temperature value={}'.format(socket.gethostname(), temperature)


"""
Read temperature every minute; distinct_until_changed - produce only if temperature change
"""
data = rx.interval(period=timedelta(seconds=60))\
.pipe(ops.map(lambda t: sensor_temperature()),
ops.map(lambda temperature: line_protocol(temperature)),
ops.distinct_until_changed())

_db_client = InfluxDBClient(url="http://localhost:9999/api/v2", token="my-token-123", org="my-org", debug=True)

"""
Create client that writes data into InfluxDB
"""
_write_api = _db_client.write_api(write_options=WriteOptions(batch_size=1))
_write_api.write(org="my-org", bucket="my-bucket", record=data)


"""
Call after terminate a script
"""
atexit.register(on_exit, _db_client, _write_api)

input()
```
4 changes: 2 additions & 2 deletions influxdb2/client/influxdb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from influxdb2.client.organizations_api import OrganizationsApi
from influxdb2.client.query_api import QueryApi
from influxdb2.client.users_api import UsersApi
from influxdb2.client.write_api import WriteApiClient, WriteOptions
from influxdb2.client.write_api import WriteApi, WriteOptions


class InfluxDBClient(object):
Expand Down Expand Up @@ -40,7 +40,7 @@ def __init__(self,

def write_api(self, write_options=WriteOptions()):
service = influxdb2.service.write_service.WriteService(self.api_client)
return WriteApiClient(service=service, write_options=write_options)
return WriteApi(service=service, write_options=write_options)

def query_api(self):
return QueryApi(self)
Expand Down
2 changes: 1 addition & 1 deletion influxdb2/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def _window_to_group(value):
ops.group_by(_group_by), ops.map(_group_to_batch), ops.merge_all())), ops.merge_all())


class WriteApiClient(AbstractClient):
class WriteApi(AbstractClient):

def __init__(self, service, write_options=WriteOptions()) -> None:
self._write_service = service
Expand Down
78 changes: 78 additions & 0 deletions influxdb2_test/iot_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Efficiency write data from IOT sensor - write changed temperature every minute
"""
import atexit
import platform
from datetime import timedelta

import psutil as psutil
import rx
from rx import operators as ops

from influxdb2.client.influxdb_client import InfluxDBClient
from influxdb2.client.write_api import WriteApi
from influxdb2.client.write_api import WriteOptions


def on_exit(db_client: InfluxDBClient, write_api: WriteApi):
"""Close clients after terminate a script.
:param db_client: InfluxDB client
:param write_api: WriteApi
:return: nothing
"""
write_api.__del__()
db_client.__del__()


def sensor_temperature():
"""Read a CPU temperature. The [psutil] doesn't support MacOS so we use [sysctl].
:return: actual CPU temperature
"""
os_name = platform.system()
if os_name == 'Darwin':
from subprocess import check_output
output = check_output(["sysctl", "machdep.xcpm.cpu_thermal_level"])
import re
return re.findall(r'\d+', str(output))[0]
else:
return psutil.sensors_temperatures()["coretemp"][0]


def line_protocol(temperature):
"""Create a InfluxDB line protocol with structure:
iot_sensor,hostname=mine_sensor_12,type=temperature value=68
:param temperature: the sensor temperature
:return: Line protocol to write into InfluxDB
"""

import socket
return 'iot_sensor,hostname={},type=temperature value={}'.format(socket.gethostname(), temperature)


"""
Read temperature every minute; distinct_until_changed - produce only if temperature change
"""
data = rx.interval(period=timedelta(seconds=60))\
.pipe(ops.map(lambda t: sensor_temperature()),
ops.map(lambda temperature: line_protocol(temperature)),
ops.distinct_until_changed())

_db_client = InfluxDBClient(url="http://localhost:9999/api/v2", token="my-token-123", org="my-org", debug=True)

"""
Create client that writes data into InfluxDB
"""
_write_api = _db_client.write_api(write_options=WriteOptions(batch_size=1))
_write_api.write(org="my-org", bucket="my-bucket", record=data)


"""
Call after terminate a script
"""
atexit.register(on_exit, _db_client, _write_api)

input()
14 changes: 7 additions & 7 deletions influxdb2_test/test_WriteApiBatching.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import influxdb2
from influxdb2 import WritePrecision, WriteService
from influxdb2.client.write.point import Point
from influxdb2.client.write_api import WriteOptions, WriteApiClient
from influxdb2.client.write_api import WriteOptions, WriteApi
from influxdb2_test.base_test import BaseTest


Expand All @@ -33,9 +33,9 @@ def setUp(self) -> None:
self._api_client = influxdb2.ApiClient(configuration=conf, header_name="Authorization",
header_value="Token my-token")

self._write_client = WriteApiClient(service=WriteService(api_client=self._api_client),
write_options=WriteOptions(batch_size=2, flush_interval=5_000,
retry_interval=3_000))
self._write_client = WriteApi(service=WriteService(api_client=self._api_client),
write_options=WriteOptions(batch_size=2, flush_interval=5_000,
retry_interval=3_000))

def tearDown(self) -> None:
pass
Expand Down Expand Up @@ -159,9 +159,9 @@ def test_flush_interval(self):

def test_jitter_interval(self):
self._write_client.__del__()
self._write_client = WriteApiClient(service=WriteService(api_client=self._api_client),
write_options=WriteOptions(batch_size=2, flush_interval=5_000,
jitter_interval=3_000))
self._write_client = WriteApi(service=WriteService(api_client=self._api_client),
write_options=WriteOptions(batch_size=2, flush_interval=5_000,
jitter_interval=3_000))

httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204)

Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ py>=1.4.31
randomize>=0.13
pytest>=5.0.0
httpretty>=0.9.6
psutil>=5.6.3

0 comments on commit fb813e9

Please sign in to comment.