-
Notifications
You must be signed in to change notification settings - Fork 4.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #22225 from taosdata/test/TS-3717-3.0
test : add batch create topic and batch consume topic
- Loading branch information
Showing
2 changed files
with
218 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
################################################################### | ||
# Copyright (c) 2016 by TAOS Technologies, Inc. | ||
# All rights reserved. | ||
# | ||
# This file is proprietary and confidential to TAOS Technologies. | ||
# No part of this file may be reproduced, stored, transmitted, | ||
# disclosed or used in any form or by any means other than as | ||
# expressly provided by the written permission from Jianhui Tao | ||
# | ||
################################################################### | ||
|
||
# -*- coding: utf-8 -*- | ||
|
||
# | ||
# The option for wal_retetion_period and wal_retention_size is work well | ||
# | ||
|
||
import taos | ||
from taos.tmq import Consumer | ||
|
||
import os | ||
import sys | ||
import threading | ||
import json | ||
import time | ||
import random | ||
from datetime import date | ||
from datetime import datetime | ||
from datetime import timedelta | ||
from os import path | ||
|
||
|
||
topicName = "topic" | ||
topicNum = 100 | ||
|
||
# consume topic | ||
def consume_topic(topic_name, group,consume_cnt, index, wait): | ||
consumer = Consumer( | ||
{ | ||
"group.id": group, | ||
"td.connect.user": "root", | ||
"td.connect.pass": "taosdata", | ||
"enable.auto.commit": "true", | ||
} | ||
) | ||
|
||
print(f"start consumer topic:{topic_name} group={group} index={index} ...") | ||
consumer.subscribe([topic_name]) | ||
cnt = 0 | ||
try: | ||
while True and cnt < consume_cnt: | ||
res = consumer.poll(1) | ||
if not res: | ||
if wait: | ||
continue | ||
else: | ||
break | ||
err = res.error() | ||
if err is not None: | ||
raise err | ||
val = res.value() | ||
cnt += 1 | ||
print(f" consume {cnt} ") | ||
for block in val: | ||
datas = block.fetchall() | ||
data = datas[0][:50] | ||
|
||
print(f" {topic_name}_{group}_{index} {cnt} {data}") | ||
|
||
finally: | ||
consumer.unsubscribe() | ||
consumer.close() | ||
|
||
def consumerThread(index): | ||
global topicName, topicNum | ||
print(f' thread {index} start...') | ||
while True: | ||
idx = random.randint(0, topicNum - 1) | ||
name = f"{topicName}{idx}" | ||
group = f"group_{index}_{idx}" | ||
consume_topic(name, group, 100, index, True) | ||
|
||
|
||
|
||
if __name__ == "__main__": | ||
print(sys.argv) | ||
threadCnt = 10 | ||
|
||
if len(sys.argv) == 1: | ||
threadCnt = int(sys.argv[1]) | ||
|
||
|
||
threads = [] | ||
print(f'consumer with {threadCnt} threads...') | ||
for i in range(threadCnt): | ||
x = threading.Thread(target=consumerThread, args=(i,)) | ||
x.start() | ||
threads.append(x) | ||
|
||
# wait | ||
for i, thread in enumerate(threads): | ||
thread.join() | ||
print(f'join thread {i} end.') | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
################################################################### | ||
# Copyright (c) 2016 by TAOS Technologies, Inc. | ||
# All rights reserved. | ||
# | ||
# This file is proprietary and confidential to TAOS Technologies. | ||
# No part of this file may be reproduced, stored, transmitted, | ||
# disclosed or used in any form or by any means other than as | ||
# expressly provided by the written permission from Jianhui Tao | ||
# | ||
################################################################### | ||
|
||
# -*- coding: utf-8 -*- | ||
|
||
import os | ||
import sys | ||
import random | ||
import time | ||
|
||
from util.log import * | ||
from util.cases import * | ||
from util.sql import * | ||
from util.common import * | ||
from util.sqlset import * | ||
|
||
class TDTestCase: | ||
def init(self, conn, logSql, replicaVar=1): | ||
self.replicaVar = int(replicaVar) | ||
tdLog.debug("start to execute %s" % __file__) | ||
tdSql.init(conn.cursor()) | ||
self.setsql = TDSetSql() | ||
|
||
# prepareEnv | ||
def prepareEnv(self): | ||
self.dbName = "mullevel" | ||
self.stbName = "meters" | ||
self.topicName = "topic" | ||
self.topicNum = 100 | ||
self.loop = 50000 | ||
|
||
sql = f"use {self.dbName}" | ||
tdSql.execute(sql) | ||
|
||
# generate topic sql | ||
self.sqls = [ | ||
f"select * from {self.stbName}", | ||
f"select * from {self.stbName} where ui < 200", | ||
f"select * from {self.stbName} where fc > 20.1", | ||
f"select * from {self.stbName} where nch like '%%a%%'", | ||
f"select * from {self.stbName} where fc > 20.1", | ||
f"select lower(bin) from {self.stbName} where length(bin) < 10;", | ||
f"select upper(bin) from {self.stbName} where length(nch) > 10;", | ||
f"select upper(bin) from {self.stbName} where ti > 10 or ic < 40;", | ||
f"select * from {self.stbName} where ic < 100 " | ||
] | ||
|
||
|
||
|
||
# prepareEnv | ||
def createTopics(self): | ||
for i in range(self.topicNum): | ||
topicName = f"{self.topicName}{i}" | ||
sql = random.choice(self.sqls) | ||
createSql = f"create topic if not exists {topicName} as {sql}" | ||
try: | ||
tdSql.execute(createSql, 3, True) | ||
except: | ||
tdLog.info(f" create topic {topicName} failed.") | ||
|
||
|
||
# random del topic | ||
def managerTopics(self): | ||
|
||
for i in range(self.loop): | ||
tdLog.info(f"start modify loop={i}") | ||
idx = random.randint(0, self.topicNum - 1) | ||
# delete | ||
topicName = f"{self.topicName}{idx}" | ||
sql = f"drop topic if exist {topicName}" | ||
try: | ||
tdSql.execute(sql, 3, True) | ||
except: | ||
tdLog.info(f" drop topic {topicName} failed.") | ||
|
||
|
||
# create topic | ||
sql = random.choice(self.sqls) | ||
createSql = f"create topic if not exists {topicName} as {sql}" | ||
try: | ||
tdSql.execute(createSql, 3, True) | ||
except: | ||
tdLog.info(f" create topic {topicName} failed.") | ||
|
||
seconds = [0.1, 0.5, 3, 2.5, 1.5, 0.4, 5.2, 2.6, 0.4, 0.2] | ||
time.sleep(random.choice(seconds)) | ||
|
||
|
||
# run | ||
def run(self): | ||
# prepare env | ||
self.prepareEnv() | ||
|
||
# create topic | ||
self.createTopics() | ||
|
||
# modify topic | ||
self.managerTopics() | ||
|
||
|
||
def stop(self): | ||
tdSql.close() | ||
tdLog.success("%s successfully executed" % __file__) | ||
|
||
tdCases.addWindows(__file__, TDTestCase()) | ||
tdCases.addLinux(__file__, TDTestCase()) |