-
Notifications
You must be signed in to change notification settings - Fork 4.8k
/
tmq_example.js
90 lines (80 loc) · 2.77 KB
/
tmq_example.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
const taos = require("@tdengine/websocket");
const db = 'power';
const stable = 'meters';
const topics = ['power_meters_topic'];
// ANCHOR: create_consumer
async function createConsumer() {
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, "gId"],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.CLIENT_ID, 'test_tmq_client'],
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
return await taos.tmqConnect(configMap);
}
// ANCHOR_END: create_consumer
async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb('power')
const createDB = `CREATE DATABASE IF NOT EXISTS POWER ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);
// ANCHOR: create_topic
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
// ANCHOR_END: create_topic
for (let i = 0; i < 10; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
}
wsSql.Close();
}
// ANCHOR: subscribe
async function subscribe(consumer) {
await consumer.subscribe(topics);
for (let i = 0; i < 5; i++) {
let res = await consumer.poll(500);
for (let [key, value] of res) {
console.log(key, value);
}
if (res.size == 0) {
break;
}
await consumer.commit();
}
}
// ANCHOR_END: subscribe
async function test() {
let consumer = null;
try {
await prepare();
let consumer = await createConsumer()
await subscribe(consumer)
// ANCHOR: assignment
let assignment = await consumer.assignment();
console.log(assignment);
assignment = await consumer.seekToBeginning(assignment);
for(let i in assignment) {
console.log("seek after:", assignment[i])
}
// ANCHOR_END: assignment
await consumer.unsubscribe();
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (consumer) {
await consumer.close();
}
taos.destroy();
}
}
test()