Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update test case tmqSeekAndCommit.py by charles #22578

Merged
merged 5 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/parallel_test/cases.task
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
Expand Down
2 changes: 1 addition & 1 deletion tests/parallel_test/run_case.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ md5sum /home/TDinternal/debug/build/lib/libtaos.so
#define taospy 2.7.10
pip3 list|grep taospy
pip3 uninstall taospy -y
pip3 install --default-timeout=120 taospy==2.7.10
pip3 install --default-timeout=120 taospy==2.7.12

$TIMEOUT_CMD $cmd
RET=$?
Expand Down
93 changes: 86 additions & 7 deletions tests/system-test/7-tmq/tmqSeekAndCommit.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def init(self, conn, logSql, replicaVar=1):
self.db_name = "tmq_db"
self.topic_name = "tmq_topic"
self.stable_name = "tmqst"
self.prepareData()


def prepareData(self):
Expand Down Expand Up @@ -73,8 +74,9 @@ def tmqSubscribe(self, inputDict):
return consumer

def test_seek_and_committed_position_with_autocommit(self):
"""Check the position and committed offset of the topic for autocommit scenario
"""
try:
self.prepareData()
inputDict = {
"topic_name": self.topic_name,
"group_id": "1",
Expand All @@ -95,31 +97,108 @@ def test_seek_and_committed_position_with_autocommit(self):

partitions = consumer.assignment()
position_partitions = consumer.position(partitions)
tdLog.info("position_partitions: %s"%(position_partitions))
for i in range(len(position_partitions)):
tdLog.info("position_partitions[%s].offset: %s"%(i, position_partitions[i].offset))
committed_partitions = consumer.committed(partitions)
tdLog.info("committed_partitions: %s"%(committed_partitions))
origin_committed_position = []
for i in range(len(committed_partitions)):
tdLog.info("committed_partitions[%s].offset: %s"%(i, committed_partitions[i].offset))
origin_committed_position.append(committed_partitions[i].offset)
assert(len(position_partitions) == len(committed_partitions))
for i in range(len(position_partitions)):
assert(position_partitions[i].offset == committed_partitions[i].offset)
# seek to the beginning of the topic

# seek to the specified offset of the topic, then check position and committed offset
for partition in partitions:
partition.offset = 5
consumer.seek(partition)
position_partitions = consumer.position(partitions)
for i in range(len(position_partitions)):
assert(position_partitions[i].offset == 5)
committed_partitions = consumer.committed(partitions)
for i in range(len(committed_partitions)):
assert(committed_partitions[i].offset != 5 and committed_partitions[i].offset == origin_committed_position[i])
except Exception as ex:
raise Exception("Failed to test seek and committed position with autocommit with error: {}".format(str(ex)))
finally:
consumer.unsubscribe()
consumer.close()

def test_commit_by_offset(self):
pass

"""Check the position and committed offset of the topic for commit by offset scenario
"""
try:
inputDict = {
"topic_name": self.topic_name,
"group_id": "1",
"auto_commit": "false",
"offset_reset": "earliest"
}
consumer = self.tmqSubscribe(inputDict)
origin_committed_position = []
while(True):
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
partitions = consumer.assignment()
consumer.commit(offsets=partitions)
val = res.value()
for block in val:
tdLog.info("block.fetchall() number: %s"%(len(block.fetchall())))
position_partitions = consumer.position(partitions)
committed_partitions = consumer.committed(partitions)
for i in range(len(position_partitions)):
assert(position_partitions[i].offset == committed_partitions[i].offset)
committed_partitions = consumer.committed(partitions)
for i in range(len(committed_partitions)):
origin_committed_position.append(committed_partitions[i].offset)
tdLog.info("original committed_partitions[%s].offset: %s"%(i, committed_partitions[i].offset))
# seek to the specified offset of the topic, then check position and committed offset
for partition in partitions:
partition.offset = 2
consumer.seek(partition)
position_partitions = consumer.position(partitions)
for i in range(len(position_partitions)):
assert(position_partitions[i].offset == 2)
committed_partitions = consumer.committed(partitions)
for i in range(len(committed_partitions)):
tdLog.info("after seek committed_partitions[%s].offset: %s"%(i, committed_partitions[i].offset))
assert(committed_partitions[i].offset != 2 and committed_partitions[i].offset == origin_committed_position[i])
# continue to consume data from seek offset
while(True):
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
partitions = consumer.assignment()
# commit by offset
consumer.commit(offsets=partitions)
val = res.value()
for block in val:
tdLog.info("block.fetchall() number: %s"%(len(block.fetchall())))
partitions = consumer.assignment()
position_partitions = consumer.position(partitions)
committed_partitions = consumer.committed(partitions)
assert(len(position_partitions) == len(committed_partitions))
for i in range(len(position_partitions)):
assert(position_partitions[i].offset == committed_partitions[i].offset)
except Exception as ex:
raise Exception("Failed to test commit by offset with error: {}".format(str(ex)))
finally:
consumer.unsubscribe()
consumer.close()

def run(self):
self.test_seek_and_committed_position_with_autocommit()
self.test_commit_by_offset()

def stop(self):
tdSql.execute("drop topic %s" % self.topic_name)
tdSql.execute("drop database %s"%(self.db_name))
tdSql.close()
tdLog.success(f"{__file__} successfully executed")

Expand Down