-
Notifications
You must be signed in to change notification settings - Fork 186
/
Copy pathwrite_structured_data.py
66 lines (55 loc) · 1.68 KB
/
write_structured_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from collections import namedtuple
from dataclasses import dataclass
from datetime import datetime, timezone
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
class Sensor(namedtuple('Sensor', ['name', 'location', 'version', 'pressure', 'temperature', 'timestamp'])):
"""
Named structure - Sensor
"""
pass
@dataclass
class Car:
"""
DataClass structure - Car
"""
engine: str
type: str
speed: float
"""
Initialize client
"""
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
"""
Sensor "current" state
"""
sensor = Sensor(name="sensor_pt859",
location="warehouse_125",
version="2021.06.05.5874",
pressure=125,
temperature=10,
timestamp=datetime.now(tz=timezone.utc))
print(sensor)
"""
Synchronous write
"""
write_api.write(bucket="my-bucket",
record=sensor,
record_measurement_key="name",
record_time_key="timestamp",
record_tag_keys=["location", "version"],
record_field_keys=["pressure", "temperature"])
"""
Car "current" speed
"""
car = Car('12V-BT', 'sport-cars', 125.25)
print(car)
"""
Synchronous write
"""
write_api.write(bucket="my-bucket",
record=car,
record_measurement_name="performance",
record_tag_keys=["engine", "type"],
record_field_keys=["speed"])