Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tmq test case to support data precision 'us' and 'ns' by charles #22478

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/parallel_test/cases.task
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
Expand Down
139 changes: 139 additions & 0 deletions tests/system-test/7-tmq/tmqDataPrecisionUnit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import sys
import re
import time
import threading
from taos.tmq import *
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *


class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)

self.db_name = "tmq_db"
self.topic_name = "tmq_topic"
self.stable_name = "stb"
self.rows_per_table = 1000
self.ctb_num = 100

def prepareData(self, precisionUnit="ms"):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
startTS = 1672502400000
if precisionUnit == "us":
startTS = 1672502400000000
elif precisionUnit == "ns":
startTS = 1672502400000000000

paraDict = {
'dbName': self.db_name,
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': self.stable_name,
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': self.ctb_num,
'rowsPerTbl': self.rows_per_table,
'batchNum': 100,
'startTs': startTS, # 2023-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0
}

# init the consumer database
tmqCom.initConsumerTable()

# create testing database、stable、ctables
tdCom.create_database(tdSql, paraDict["dbName"], paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, precision=precisionUnit)
tdLog.info("create database %s successfully" % paraDict["dbName"])
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"], stbName=paraDict["stbName"])
tdLog.info("create stable %s successfully" % paraDict["stbName"])
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"], ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"], ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create child tables successfully")

# insert data into tables and wait the async thread exit
tdLog.info("insert data into tables")
pThread = tmqCom.asyncInsertDataByInterlace(paraDict)
pThread.join()

def run(self):
"""Check tmq feature for different data precision unit like "ms、us、ns"
"""
precision_unit = ["ms", "us", "ns"]
for unit in precision_unit:
tdLog.info(f"start to test precision unit {unit}")
self.prepareData(precisionUnit=unit)
# drop database if exists
tdSql.execute(f"drop database if exists {self.db_name}")
self.prepareData(unit)

# create topic
tdLog.info("create topic from %s" % self.stable_name)
queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(self.db_name, self.stable_name)
sqlString = "create topic %s as %s" %(self.topic_name, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)

# save consumer info
consumerId = 0
expectrowcnt = self.rows_per_table * self.ctb_num
topicList = self.topic_name
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt, topicList, keyList, ifcheckdata, ifManualCommit)

# start consume processor
paraDict = {
'pollDelay': 15,
'showMsg': 1,
'showRow': 1,
'snapshot': 0
}
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'], dbName=self.db_name, showMsg=paraDict['showMsg'], showRow=paraDict['showRow'], snapshot=paraDict['snapshot'])

tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]

tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d "%(totalConsumeRows, totalRowsFromQuery))

if totalConsumeRows < totalRowsFromQuery:
tdLog.exit("tmq consume rows error!")

tmqCom.waitSubscriptionExit(tdSql, self.topic_name)
tdSql.query("drop topic %s" % self.topic_name)
tdSql.execute("drop database %s" % self.db_name)

def stop(self):
tdSql.execute(f"drop database if exists {self.db_name}")
tdSql.close()
tdLog.success(f"{__file__} successfully executed")


tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())