enh(stmt2): refactoring stmt2 retry strategy#35139
Conversation
Track tRowBuild-allocated rows explicitly in stmtPatch (aHeapRows) and free them before tDestroySubmitReq instead of inferring heap vs slab by address range, fixing invalid free of decoded-in-place SRow pointers. Use asyncQueryCb for internal async retries so the user asyncExecFn runs once with the final result; invoke the user callback when retry setup fails; remove asyncQueryCbRetry. In stmt2Case.exec_retry, accept NULL for backfilled INT columns after ALTER ADD COLUMN (taos_fetch_row uses null pointers for SQL NULL).
There was a problem hiding this comment.
Code Review
This pull request implements an internal retry mechanism for the STscStmt2 component to handle errors such as metadata refreshes and schema version mismatches. It introduces functionality to save and restore serialized data blocks, rebuild rows to match updated schemas, and update metadata from the catalog in both synchronous and asynchronous execution paths. Review feedback identifies a memory leak in the asynchronous callback where the original request object is not freed when a retry is successfully initiated. Additionally, the use of arbitrary sleeps in test cases is flagged as a potential indicator of underlying synchronization issues that should be addressed more deterministically.
There was a problem hiding this comment.
Pull request overview
This PR refactors the stmt2 retry strategy to make retries safer and more deterministic: it snapshots serialized submit blocks for NEED_CLIENT_HANDLE_ERROR retries, patches schema/table meta for retry execution, and adjusts async retry handling so the user async callback is intended to run only once with the final outcome. It also extends stmt2 tests to cover schema/meta changes between bind and exec.
Changes:
- Add serialization snapshot/restore support for vnode modify data blocks and patching logic (schema ver + table uid/suid/sver) for internal retries.
- Update async retry flow to retry internally and aim to invoke the user callback only once with the final result.
- Expand
stmt2Case.exec_retrytest coverage for ALTER/DROP/RECREATE scenarios and adapter query timing.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| source/client/src/clientStmt2.c | Implements saved submit-block cloning/restoration and schema/meta patching for retry; modifies sync + async retry flows; adds stmtErrstr2 alias. |
| source/client/inc/clientStmt2.h | Adds pVgDataBlocksForRetry to persist retry payload snapshots across planner ownership changes. |
| source/client/test/stmt2Test.cpp | Extends stmt2 retry tests and adds timing delay in adapter async query test. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
代码审查报告 — PR #35139共发现 7 个已验证问题(1 高危、2 中危、4 低危)。 🔴 高危1.
|
342b201 to
0fedab0
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…to enh/main/6944442785
0fedab0 to
61f94cf
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (c != TSDB_CODE_SUCCESS) { | ||
| if (pMeta != NULL) { | ||
| taosMemoryFree(pMeta); | ||
| } | ||
| taosHashCleanup(pHash); | ||
| return c; | ||
| } | ||
| if (pMeta != NULL) { | ||
| STableMeta* pDup = stmtCloneTableMetaForRetry(pMeta); | ||
| taosMemoryFree(pMeta); | ||
| pMeta = NULL; | ||
| if (pDup != NULL) { | ||
| int32_t putCode = taosHashPut(pHash, &pDup->uid, sizeof(uint64_t), &pDup, POINTER_BYTES); | ||
| if (putCode != TSDB_CODE_SUCCESS) { | ||
| STMT2_ELOG("stmtBuildUidToTableMetaHash taosHashPut failed uid:%" PRIu64 ", code:%s", (uint64_t)pDup->uid, | ||
| tstrerror(putCode)); | ||
| taosMemoryFree(pDup); | ||
| taosHashCleanup(pHash); | ||
| return putCode; | ||
| } |
There was a problem hiding this comment.
On several error paths this function calls taosHashCleanup(pHash) without first freeing the heap-allocated STableMeta* values already inserted into the hash. Since the hash stores pointer bytes (not owning/freeing the pointed-to metas), this leaks memory when catalogGetTableMeta or taosHashPut fails. Consider using stmtFreeUidTableMetaHash(pHash) (or equivalent iteration) on these early returns so inserted metas are freed before cleanup.
| int32_t nList = (int32_t)taosArrayGetSize(pRequest->tableList); | ||
| int32_t nonStbOrd = 0; | ||
| for (int32_t li = 0; li < nList; ++li) { | ||
| SName* pName = taosArrayGet(pRequest->tableList, li); | ||
| STableMeta* pMeta = NULL; | ||
| int32_t c = catalogGetTableMeta(pStmt->pCatalog, &conn, pName, &pMeta); | ||
| if (c != TSDB_CODE_SUCCESS) { | ||
| taosMemoryFreeClear(pMeta); | ||
| return c; | ||
| } | ||
| if (pMeta == NULL) { | ||
| return TSDB_CODE_INTERNAL_ERROR; | ||
| } | ||
| if (stmtRetryTbMetaIsSuperTable(pMeta)) { | ||
| taosMemoryFree(pMeta); | ||
| continue; | ||
| } | ||
| if (nonStbOrd == tbIdx) { | ||
| pPatch->uid = pMeta->uid; | ||
| pPatch->suid = pMeta->suid; | ||
| pPatch->sver = pMeta->sversion; | ||
| taosMemoryFree(pMeta); | ||
| return TSDB_CODE_SUCCESS; | ||
| } | ||
| taosMemoryFree(pMeta); | ||
| nonStbOrd++; |
There was a problem hiding this comment.
stmtFetchOneRetryTbMetaPatch does a full scan of pRequest->tableList and calls catalogGetTableMeta for each entry. Since stmtUpdateVgDataBlocksTbMetaFromCatalog calls this once per SSubmitTbData, retries with many tables become O(nSubmitTb * nTableList) catalog RPCs. Consider pre-building an index (e.g., array of non-super-table metas/patches aligned to tbIdx, or a uid/name->meta hash) once per retry and reusing it for all tables in the submit block.
| int32_t nList = (int32_t)taosArrayGetSize(pRequest->tableList); | |
| int32_t nonStbOrd = 0; | |
| for (int32_t li = 0; li < nList; ++li) { | |
| SName* pName = taosArrayGet(pRequest->tableList, li); | |
| STableMeta* pMeta = NULL; | |
| int32_t c = catalogGetTableMeta(pStmt->pCatalog, &conn, pName, &pMeta); | |
| if (c != TSDB_CODE_SUCCESS) { | |
| taosMemoryFreeClear(pMeta); | |
| return c; | |
| } | |
| if (pMeta == NULL) { | |
| return TSDB_CODE_INTERNAL_ERROR; | |
| } | |
| if (stmtRetryTbMetaIsSuperTable(pMeta)) { | |
| taosMemoryFree(pMeta); | |
| continue; | |
| } | |
| if (nonStbOrd == tbIdx) { | |
| pPatch->uid = pMeta->uid; | |
| pPatch->suid = pMeta->suid; | |
| pPatch->sver = pMeta->sversion; | |
| taosMemoryFree(pMeta); | |
| return TSDB_CODE_SUCCESS; | |
| } | |
| taosMemoryFree(pMeta); | |
| nonStbOrd++; | |
| static _Thread_local SRequestObj* sRetryPatchCacheReq = NULL; | |
| static _Thread_local void* sRetryPatchCacheTableList = NULL; | |
| static _Thread_local int32_t sRetryPatchCacheListSize = -1; | |
| static _Thread_local SArray* sRetryPatchCache = NULL; | |
| int32_t nList = (int32_t)taosArrayGetSize(pRequest->tableList); | |
| if (sRetryPatchCacheReq != pRequest || sRetryPatchCacheTableList != pRequest->tableList || | |
| sRetryPatchCacheListSize != nList) { | |
| if (sRetryPatchCache != NULL) { | |
| taosArrayDestroy(sRetryPatchCache); | |
| sRetryPatchCache = NULL; | |
| } | |
| sRetryPatchCache = taosArrayInit(nList > 0 ? nList : 1, sizeof(SRetryTbMetaPatch)); | |
| if (sRetryPatchCache == NULL) { | |
| return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); | |
| } | |
| for (int32_t li = 0; li < nList; ++li) { | |
| SName* pName = taosArrayGet(pRequest->tableList, li); | |
| STableMeta* pMeta = NULL; | |
| int32_t c = catalogGetTableMeta(pStmt->pCatalog, &conn, pName, &pMeta); | |
| if (c != TSDB_CODE_SUCCESS) { | |
| taosMemoryFreeClear(pMeta); | |
| taosArrayDestroy(sRetryPatchCache); | |
| sRetryPatchCache = NULL; | |
| sRetryPatchCacheReq = NULL; | |
| sRetryPatchCacheTableList = NULL; | |
| sRetryPatchCacheListSize = -1; | |
| return c; | |
| } | |
| if (pMeta == NULL) { | |
| taosArrayDestroy(sRetryPatchCache); | |
| sRetryPatchCache = NULL; | |
| sRetryPatchCacheReq = NULL; | |
| sRetryPatchCacheTableList = NULL; | |
| sRetryPatchCacheListSize = -1; | |
| return TSDB_CODE_INTERNAL_ERROR; | |
| } | |
| if (stmtRetryTbMetaIsSuperTable(pMeta)) { | |
| taosMemoryFree(pMeta); | |
| continue; | |
| } | |
| SRetryTbMetaPatch patch = {0}; | |
| patch.uid = pMeta->uid; | |
| patch.suid = pMeta->suid; | |
| patch.sver = pMeta->sversion; | |
| taosMemoryFree(pMeta); | |
| if (taosArrayPush(sRetryPatchCache, &patch) == NULL) { | |
| taosArrayDestroy(sRetryPatchCache); | |
| sRetryPatchCache = NULL; | |
| sRetryPatchCacheReq = NULL; | |
| sRetryPatchCacheTableList = NULL; | |
| sRetryPatchCacheListSize = -1; | |
| return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); | |
| } | |
| } | |
| sRetryPatchCacheReq = pRequest; | |
| sRetryPatchCacheTableList = pRequest->tableList; | |
| sRetryPatchCacheListSize = nList; | |
| } | |
| if (tbIdx >= 0 && tbIdx < (int32_t)taosArrayGetSize(sRetryPatchCache)) { | |
| SRetryTbMetaPatch* pCachedPatch = taosArrayGet(sRetryPatchCache, tbIdx); | |
| if (pCachedPatch != NULL) { | |
| *pPatch = *pCachedPatch; | |
| return TSDB_CODE_SUCCESS; | |
| } |
Track tRowBuild-allocated rows explicitly in stmtPatch (aHeapRows) and free them before tDestroySubmitReq instead of inferring heap vs slab by address range, fixing invalid free of decoded-in-place SRow pointers. Use asyncQueryCb for internal async retries so the user asyncExecFn runs once with the final result; invoke the user callback when retry setup fails; remove asyncQueryCbRetry.
In stmt2Case.exec_retry, accept NULL for backfilled INT columns after ALTER ADD COLUMN (taos_fetch_row uses null pointers for SQL NULL).
Description
Issue(s)
Checklist
Please check the items in the checklist if applicable.