/
tmq_native.py
84 lines (70 loc) · 1.81 KB
/
tmq_native.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import taos
conn = taos.connect(
host="localhost",
user="root",
password="taosdata",
port=6030,
)
db = "power"
topic = "topic_meters"
conn.execute(f"DROP TOPIC IF EXISTS {topic}")
conn.execute(f"DROP DATABASE IF EXISTS {db}")
conn.execute(f"CREATE DATABASE {db}")
# change database. same as execute "USE db"
conn.select_db(db)
# create super table
conn.execute(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
)
# ANCHOR: create_topic
# create topic
conn.execute(
f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
)
# ANCHOR_END: create_topic
# ANCHOR: create_consumer
from taos.tmq import Consumer
consumer = Consumer(
{
"group.id": "1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
# ANCHOR_END: create_consumer
# ANCHOR: subscribe
consumer.subscribe([topic])
# ANCHOR_END: subscribe
try:
# ANCHOR: consume
while True:
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
# ANCHOR_END: consume
# ANCHOR: assignment
assignments = consumer.assignment()
for assignment in assignments:
print(assignment)
# ANCHOR_END: assignment
# ANCHOR: seek
offset = taos.tmq.TopicPartition(
topic=topic,
partition=assignment.partition,
offset=0,
)
consumer.seek(offset)
# ANCHOR_END: seek
finally:
# ANCHOR: unsubscribe
consumer.unsubscribe()
consumer.close()
# ANCHOR_END: unsubscribe
conn.close()