enh(tmq): optimize logic in altering tag with tmq#34711
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request focuses on optimizing the performance and stability of updating table lists within the TDengine Message Queue (TMQ) when tags are altered. It achieves this by introducing a defensive copy of table UID lists in the core C logic, preventing unintended modifications. Concurrently, the associated system test has been substantially upgraded to simulate high-frequency tag alteration scenarios with a significantly larger dataset and multi-threaded operations, ensuring robust validation of the implemented optimizations. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request aims to optimize the speed of updating table lists in TMQ when altering tags. The core change in the C code involves creating a copy of the table UID list before processing, which prevents side effects on the original list and improves correctness, especially in concurrent scenarios. The Python test has been substantially updated to a multi-threaded stress test, which is a great addition for verifying the fix under load. I've identified a few areas for improvement in the test script to enhance its correctness and reliability.
| continue | ||
| for block in val: | ||
| print(block.fetchall(),len(block.fetchall())) | ||
| cnt += len(block.fetchall()) |
There was a problem hiding this comment.
The fetchall() method on a result block is stateful. It consumes the rows from the block and moves an internal cursor. Subsequent calls to fetchall() on the same block will likely return an empty list.
Using len(block.fetchall()) directly is not robust. It's better practice to store the result of fetchall() in a variable and then get its length to avoid potential issues.
For example:
rows = block.fetchall()
cnt += len(rows)| if tableIndex >= endTable: | ||
| tableIndex = startTable | ||
| print(f"[Thread {threadId}] Restart alter table tag from {startTable}") | ||
| alterSql = '' |
There was a problem hiding this comment.
In the alterTag method, when tableIndex >= endTable, the loop restarts and alterSql is reset. If the number of tables in the range (endTable - startTable) is not a multiple of 10, the last few ALTER statements that have been concatenated into alterSql but not yet executed will be lost. You should execute the remaining statements in alterSql before resetting it and restarting the loop.
| if tableIndex >= endTable: | |
| tableIndex = startTable | |
| print(f"[Thread {threadId}] Restart alter table tag from {startTable}") | |
| alterSql = '' | |
| if tableIndex >= endTable: | |
| if alterSql: | |
| try: | |
| tdSqlTmp.execute(alterSql) | |
| except Exception as e: | |
| tdLog.warning(f"[Thread {threadId}] Alter failed: {e}") | |
| tableIndex = startTable | |
| print(f"[Thread {threadId}] Restart alter table tag from {startTable}") | |
| alterSql = '' |
| for i in range(alterCnt): | ||
| alterThread.append(threading.Thread(target=self.alterTag, kwargs={"start": i * batchAlterNum, "end": (i + 1) * batchAlterNum})) | ||
| alterThread[i].start() | ||
| tdSqlTmp = TDSql() | ||
| tdSqlTmp.init(self.conn.cursor()) | ||
| self.tdSqlList.append(tdSqlTmp) | ||
| thread = threading.Thread( | ||
| target=self.alterTag, | ||
| kwargs={"start": i * batchAlterNum, "end": (i + 1) * batchAlterNum, "tdSql": tdSqlTmp}, | ||
| name=f"AlterThread-{i}" | ||
| ) | ||
| self.alterThread.append(thread) | ||
| thread.start() | ||
| tdLog.info(f"Started AlterThread-{i}") |
There was a problem hiding this comment.
In the run method, you are creating multiple alterTag threads. Each thread gets a TDSql object initialized with self.conn.cursor(). According to the Python DB-API 2.0 specification, connection objects are generally not thread-safe, and using cursors from the same connection across multiple threads can lead to race conditions.
While the taos-python driver might have internal locking to handle this, it's safer and better practice for each thread to have its own database connection. Consider creating a new taos.connect(...) within each alterTag thread to ensure thread safety.
There was a problem hiding this comment.
Pull request overview
This PR aims to improve TMQ performance when tag-alter operations cause frequent updates to the internal “qualified table list” for stable subscriptions, and updates the related system test to stress/observe the behavior under heavier load and concurrency.
Changes:
- In vnode TQ read path, avoid in-place mutation of the shared table UID list by filtering a copied list per consumer handle before adding to the reader.
- Expand the TMQ alter-tag frequency system test to larger scale and multi-consumer + multi-alter-thread execution, with optional signal-based pause/stop controls.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| tests/system-test/7-tmq/tmq_alter_tag_frequency.py | Scales up workload, adds multiple consumers and per-thread SQL executors, and adds signal-driven control of alter threads. |
| source/dnode/vnode/src/tq/tqRead.c | Copies UID list before qFilterTableList() to prevent side effects across handles and then adds filtered UIDs to the reader. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| self.stopAlter = True | ||
| self.pauseAlter.set() | ||
| for tdSqlTmp in self.tdSqlList: | ||
| tdSqlTmp.close() |
There was a problem hiding this comment.
stop() no longer closes the global tdSql cursor created in init() (it used to call tdSql.close()). This can leak a cursor/statement handle across system tests; consider closing tdSql in addition to closing the per-thread tdSqlTmp instances.
| tdSqlTmp.close() | |
| tdSqlTmp.close() | |
| tdSql.close() |
| try: | ||
| consumer.subscribe(["t0"]) | ||
| except TmqError: | ||
| tdLog.exit(f"subscribe error") |
There was a problem hiding this comment.
consume() catches TmqError, but this file only imports Consumer from taos.tmq. Unless TmqError is injected by another wildcard import, this will raise NameError the first time subscribe fails. Import TmqError explicitly (e.g., from taos.tmq import Consumer, TmqError) or catch the concrete exception type you intend.
| # 注册信号处理器 | ||
| signal.signal(signal.SIGUSR1, self.signalHandler) # 暂停/恢复 alter 线程 | ||
| signal.signal(signal.SIGUSR2, self.signalHandler) # 停止 alter 线程 | ||
| signal.signal(signal.SIGINT, self.signalHandler) # Ctrl+C 停止所有 |
There was a problem hiding this comment.
The test is registered for Windows, but SIGUSR1/SIGUSR2 are not available on Windows' signal module, so signal.signal(signal.SIGUSR1/2, ...) will raise at runtime. Please guard these registrations (e.g., only on os.name == 'posix' or hasattr(signal, 'SIGUSR1')), and keep SIGINT handling cross-platform.
Code reviewNo issues found. Checked for bugs and CLAUDE.md compliance. 🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
…tags in tmq
Description
Issue(s)
Checklist
Please check the items in the checklist if applicable.