forked from influxdata/influxdb-client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_MultiprocessingWriter.py
72 lines (56 loc) · 3.09 KB
/
test_MultiprocessingWriter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import os
import unittest
from datetime import datetime
from influxdb_client import WritePrecision, InfluxDBClient
from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter
from influxdb_client.client.write_api import SYNCHRONOUS
# noinspection PyMethodMayBeStatic
class MultiprocessingWriterTest(unittest.TestCase):
def setUp(self) -> None:
self.url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086")
self.token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")
self.org = os.getenv('INFLUXDB_V2_ORG', "my-org")
self.writer = None
def tearDown(self) -> None:
if self.writer:
self.writer.__del__()
def test_write_without_start(self):
self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org,
write_options=SYNCHRONOUS)
with self.assertRaises(AssertionError) as ve:
self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5")
self.assertEqual('Cannot write data: the writer is not started.', f'{ve.exception}')
def test_write_after_terminate(self):
self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org,
write_options=SYNCHRONOUS)
self.writer.start()
self.writer.__del__()
with self.assertRaises(AssertionError) as ve:
self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5")
self.assertEqual('Cannot write data: the writer is closed.', f'{ve.exception}')
def test_terminate_twice(self):
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
writer.__del__()
writer.terminate()
writer.terminate()
writer.__del__()
def test_use_context_manager(self):
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
self.assertIsNotNone(writer)
def test_pass_parameters(self):
unique = get_date_helper().to_nanoseconds(datetime.utcnow() - datetime.utcfromtimestamp(0))
# write data
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
writer.write(bucket="my-bucket", record=f"mem_{unique},tag=a value=5i 10", write_precision=WritePrecision.S)
# query data
with InfluxDBClient(url=self.url, token=self.token, org=self.org) as client:
query_api = client.query_api()
tables = query_api.query(
f'from(bucket: "my-bucket") |> range(start: 0) |> filter(fn: (r) => r._measurement == "mem_{unique}")',
self.org)
record = tables[0].records[0]
self.assertIsNotNone(record)
self.assertEqual("a", record["tag"])
self.assertEqual(5, record["_value"])
self.assertEqual(get_date_helper().to_utc(datetime.utcfromtimestamp(10)), record["_time"])