Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enh: rsma stream state adaption #21969

Merged
merged 45 commits into from Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
43a029e
enh: coverity scan for sma
kailixu Mar 15, 2023
4317564
Merge branch 'main' into enh/TD-22404-M
kailixu Mar 18, 2023
11bf751
Merge branch 'main' into enh/TD-22404-M
kailixu Mar 27, 2023
eb301a1
enh: coverage for sma
kailixu Mar 29, 2023
fabd96d
enh: coverity scan for sma
kailixu Mar 29, 2023
9ee43c2
enh: coverity scan for sma
kailixu Mar 29, 2023
9552ac2
enh: coverity scan for sma
kailixu Mar 30, 2023
3530ae0
Merge branch 'main' into enh/TD-22404-M
kailixu Apr 7, 2023
7f6fdd4
Merge branch 'main' into enh/TD-22404-M
kailixu Apr 9, 2023
85e11d8
Merge branch 'main' into enh/TD-22404-M
kailixu Apr 12, 2023
fcb587b
Merge branch 'main' into enh/TD-22404-M
kailixu Apr 17, 2023
751a821
Merge branch 'main' into enh/TD-22404-M
kailixu Apr 17, 2023
b93c85f
Merge branch 'main' into enh/TD-22404-M
kailixu Apr 18, 2023
6292fb7
Merge branch 'main' into enh/TD-22404-M
kailixu Apr 21, 2023
df40699
Merge branch 'main' into enh/TD-22404-M
kailixu Apr 23, 2023
94d945a
Merge branch 'main' into enh/TD-22404-M
kailixu May 2, 2023
4827ee7
Merge branch 'main' into enh/TD-22404-M
kailixu May 5, 2023
00f15c1
merge main
kailixu May 12, 2023
0c9aa5b
merge main
kailixu May 12, 2023
a39ff5f
chore: more code
kailixu May 12, 2023
c1c1e72
Merge branch 'main' into enh/TD-22404-M
kailixu May 13, 2023
9faefb7
Merge branch 'main' into enh/TD-22404-M
kailixu May 16, 2023
d093b0e
chore: merge 3.0
kailixu Jul 5, 2023
59e83fc
chore: build fix
kailixu Jul 5, 2023
df7866e
chore: refactor
kailixu Jul 5, 2023
3f6768d
chore: rsma rocksdb adaption
kailixu Jul 5, 2023
25d9405
chore: rsma test case adaption
kailixu Jul 6, 2023
61380b7
chore: test case adaption
kailixu Jul 6, 2023
e8cdf13
chore: rsma code refactor
kailixu Jul 6, 2023
c1393fb
chore: invoke streamStateCommit for rsma
kailixu Jul 6, 2023
64ef48e
Merge branch '3.0' into enh/TD-23769-3.0
kailixu Jul 7, 2023
cb8b10f
Merge branch '3.0' into enh/TD-23769-3.0
kailixu Jul 7, 2023
a746d28
chore: add debug info
kailixu Jul 7, 2023
9e1912b
Merge branch '3.0' into enh/TD-23769-3.0
kailixu Jul 9, 2023
d37a760
Merge branch '3.0' into enh/TD-23769-3.0
kailixu Jul 11, 2023
314123e
chore: add check
kailixu Jul 11, 2023
7c85f77
chore: more check
kailixu Jul 11, 2023
30f8c9c
chore: more check
kailixu Jul 11, 2023
ba0bce8
chore: code revert
kailixu Jul 11, 2023
8c8bcab
Merge branch '3.0' into enh/TD-23769-3.0
kailixu Jul 12, 2023
f282acb
chore: rsma refactor
kailixu Jul 12, 2023
8cb6356
chore: more code
kailixu Jul 12, 2023
466f103
Merge branch '3.0' into enh/TD-23769-3.0
kailixu Jul 12, 2023
67461d9
chore: merge 3.0 and solve conflict
kailixu Jul 13, 2023
6ee135c
chore: more code
kailixu Jul 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 2 additions & 3 deletions include/util/taoserror.h
Expand Up @@ -757,9 +757,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3153)
#define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3154)
#define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3155)
#define TSDB_CODE_RSMA_FS_REF TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3157)
#define TSDB_CODE_RSMA_FS_UPDATE TAOS_DEF_ERROR_CODE(0, 0x3158)
#define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_RESULT TAOS_DEF_ERROR_CODE(0, 0x3157)

//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
Expand Down
1 change: 0 additions & 1 deletion source/dnode/vnode/CMakeLists.txt
Expand Up @@ -32,7 +32,6 @@ target_sources(
# sma
"src/sma/smaEnv.c"
"src/sma/smaUtil.c"
"src/sma/smaFS.c"
"src/sma/smaOpen.c"
"src/sma/smaCommit.c"
"src/sma/smaRollup.c"
Expand Down
57 changes: 21 additions & 36 deletions source/dnode/vnode/src/inc/sma.h
Expand Up @@ -105,17 +105,16 @@ struct SRSmaFS {

struct SRSmaStat {
SSma *pSma;
int64_t commitAppliedVer; // vnode applied version for async commit
int64_t refId; // shared by fetch tasks
volatile int64_t nBufItems; // number of items in queue buffer
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
volatile int32_t nFetchAll; // active number of fetch all
volatile int8_t triggerStat; // shared by fetch tasks
volatile int8_t commitStat; // 0 not in committing, 1 in committing
volatile int8_t delFlag; // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo
SRSmaFS fs; // for recovery/snapshot r/w
SHashObj *infoHash; // key: suid, value: SRSmaInfo
tsem_t notEmpty; // has items in queue buffer
int64_t refId; // shared by fetch tasks
volatile int64_t nBufItems; // number of items in queue buffer
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
volatile int32_t nFetchAll; // active number of fetch all
volatile int8_t triggerStat; // shared by fetch tasks
volatile int8_t commitStat; // 0 not in committing, 1 in committing
volatile int8_t delFlag; // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo
SRSmaFS fs; // for recovery/snapshot r/w
SHashObj *infoHash; // key: suid, value: SRSmaInfo
tsem_t notEmpty; // has items in queue buffer
};

struct SSmaStat {
Expand Down Expand Up @@ -156,12 +155,9 @@ struct SRSmaInfo {
int16_t padding;
T_REF_DECLARE()
SRSmaInfoItem items[TSDB_RETENTION_L2];
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
STaosQueue *queue; // buffer queue of SubmitReq
STaosQall *qall; // buffer qall of SubmitReq
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable qTaskInfo_t
STaosQueue *iQueue; // immutable buffer queue of SubmitReq
STaosQall *iQall; // immutable buffer qall of SubmitReq
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
STaosQueue *queue; // buffer queue of SubmitReq
STaosQall *qall; // buffer qall of SubmitReq
};

#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)
Expand Down Expand Up @@ -191,6 +187,12 @@ typedef enum {
RSMA_EXEC_COMMIT = 3, // triggered by commit
} ERsmaExecType;

#define TD_SMA_LOOPS_CHECK(n, limit) \
if (++(n) > limit) { \
sched_yield(); \
(n) = 0; \
}

// sma
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
Expand All @@ -213,27 +215,12 @@ int32_t smaPreClose(SSma *pSma);

// rsma
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback);
void tdRSmaFSClose(SRSmaFS *fs);
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew);
int32_t tdRSmaFSCommit(SSma *pSma);
int32_t tdRSmaFSFinishCommit(SSma *pSma);
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS);
void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize);
int32_t tdRSmaFSRollback(SSma *pSma);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
// int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFileName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(SVnode *pVnode, int64_t suid, int8_t level, int64_t version, STfs *pTfs,
char *outputName);
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, int8_t level, STfs *pTfs, char *outputName);
void tdRSmaQTaskInfoGetFullPathEx(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName);
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName);

static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int32_t ref = T_REF_INC(pRSmaInfo);
Expand All @@ -244,8 +231,6 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
}

void tdRSmaGetFileName(SVnode *pVnode, STfs *pTfs, const char *fname, int64_t suid, int8_t level, int64_t version,
char *outputName);
void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputName);

#ifdef __cplusplus
Expand Down
1 change: 0 additions & 1 deletion source/dnode/vnode/src/inc/vnodeInt.h
Expand Up @@ -319,7 +319,6 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
// SRSmaSnapWriter ========================================
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter);
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter);
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);

typedef struct {
Expand Down
68 changes: 5 additions & 63 deletions source/dnode/vnode/src/sma/smaCommit.c
Expand Up @@ -108,9 +108,6 @@ int32_t smaFinishCommit(SSma *pSma) {
int32_t lino = 0;
SVnode *pVnode = pSma->pVnode;

code = tdRSmaFSFinishCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);

if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) {
TSDB_CHECK_CODE(code, lino, _exit);
}
Expand Down Expand Up @@ -150,18 +147,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
if (isCommit) {
while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) {
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}

pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
if (ASSERTS(pRSmaStat->commitAppliedVer >= -1, "commit applied version %" PRIi64 " < -1",
pRSmaStat->commitAppliedVer)) {
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
TD_SMA_LOOPS_CHECK(nLoops, 1000)
}
}
// step 2: wait for all triggered fetch tasks to finish
Expand All @@ -173,11 +159,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
} else {
smaDebug("vgId:%d, rsma commit%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit);
}
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
TD_SMA_LOOPS_CHECK(nLoops, 1000);
}

/**
Expand All @@ -189,40 +171,17 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
(void *)taosGetSelfPthreadId());
nLoops = 0;
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
TD_SMA_LOOPS_CHECK(nLoops, 1000);
}

if (!isCommit) goto _exit;

smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
// code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit);

smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());

#if 0 // consuming task of qTaskInfo clone
// step 4: swap queue/qall and iQueue/iQall
// lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));

void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);

while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
TSWAP(pInfo->iQall, pInfo->qall);
TSWAP(pInfo->iQueue, pInfo->queue);
TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]);
TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]);
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
}

// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
#endif
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());

// all rsma results are written completely
STsdb *pTsdb = NULL;
Expand Down Expand Up @@ -258,9 +217,6 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
goto _exit;
}

code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit);

code = tsdbCommit(VND_RSMA1(pVnode), pInfo);
TSDB_CHECK_CODE(code, lino, _exit);

Expand Down Expand Up @@ -310,20 +266,6 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {

continue;
}
#if 0
if (pRSmaInfo->taskInfo[0]) {
if (pRSmaInfo->iTaskInfo[0]) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
pRSmaInfo->iTaskInfo[0] = NULL;
}
} else {
TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]);
}

taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
#endif
}

// unlock
Expand Down
58 changes: 11 additions & 47 deletions source/dnode/vnode/src/sma/smaEnv.c
Expand Up @@ -30,7 +30,6 @@ static int32_t tdRsmaStartExecutor(const SSma *pSma);
static int32_t tdRsmaStopExecutor(const SSma *pSma);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
static void *tdFreeTSmaStat(STSmaStat *pStat);
static void tdDestroyRSmaStat(void *pRSmaStat);

/**
Expand Down Expand Up @@ -63,19 +62,15 @@ int32_t smaInit() {

int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT;
smaMgmt.refHash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK);
if (!smaMgmt.refHash) {
taosCloseRef(smaMgmt.rsetId);
atomic_store_8(&smaMgmt.inited, 0);
smaError("failed to init sma tmr hanle since %s", terrstr());
return TSDB_CODE_FAILED;
}

// init fetch timer handle
smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
if (!smaMgmt.tmrHandle) {

if (!smaMgmt.refHash || !smaMgmt.tmrHandle) {
taosCloseRef(smaMgmt.rsetId);
taosHashCleanup(smaMgmt.refHash);
smaMgmt.refHash = NULL;
if (smaMgmt.refHash) {
taosHashCleanup(smaMgmt.refHash);
smaMgmt.refHash = NULL;
}
atomic_store_8(&smaMgmt.inited, 0);
smaError("failed to init sma tmr handle since %s", terrstr());
return TSDB_CODE_FAILED;
Expand Down Expand Up @@ -143,10 +138,6 @@ static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
}

static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
if (!ppEnv) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED;
}

if (!(*ppEnv)) {
if (tdNewSmaEnv(pSma, smaType, ppEnv) != TSDB_CODE_SUCCESS) {
Expand Down Expand Up @@ -196,10 +187,6 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
int32_t code = 0;
int32_t lino = 0;

if (ASSERTS(pSmaStat != NULL, "pSmaStat is NULL")) {
terrno = TSDB_CODE_RSMA_INVALID_ENV;
TSDB_CHECK_CODE(code, lino, _exit);
}

if (*pSmaStat) { // no lock
return code; // success, return directly
Expand Down Expand Up @@ -255,15 +242,13 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
taosInitRWLatch(RSMA_FS_LOCK(pRSmaStat));
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
// TODO
} else {
ASSERTS(0, "unknown smaType:%" PRIi8, smaType);
code = TSDB_CODE_APP_ERROR;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
} else {
smaDebug("vgId:%d, %s succeed, type:%" PRIi8, SMA_VID(pSma), __func__, smaType);
}
return code;
}
Expand All @@ -277,12 +262,6 @@ static void tdDestroyTSmaStat(STSmaStat *pStat) {
}
}

static void *tdFreeTSmaStat(STSmaStat *pStat) {
tdDestroyTSmaStat(pStat);
taosMemoryFreeClear(pStat);
return NULL;
}

static void tdDestroyRSmaStat(void *pRSmaStat) {
if (pRSmaStat) {
SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
Expand All @@ -300,11 +279,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
} else {
smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
}
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
TD_SMA_LOOPS_CHECK(nLoops, 1000);
}

// step 3:
Expand All @@ -313,10 +288,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
// step 4: destroy the rsma info and associated fetch tasks
taosHashCleanup(RSMA_INFO_HASH(pStat));

// step 5:
tdRSmaFSClose(RSMA_FS(pStat));

// step 6: free pStat
// step 5: free pStat
tsem_destroy(&(pStat->notEmpty));
taosMemoryFreeClear(pStat);
}
Expand Down Expand Up @@ -354,10 +326,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId);
}
} else {
ASSERTS(0, "unknown smaType:%" PRIi8, smaType);
terrno = TSDB_CODE_APP_ERROR;
smaError("%s failed at line %d since %s", __func__, __LINE__, terrstr());
return -1;
smaError("%s failed at line %d since Unknown type", __func__, __LINE__);
}
}
return 0;
Expand All @@ -375,11 +344,6 @@ int32_t tdLockSma(SSma *pSma) {
}

int32_t tdUnLockSma(SSma *pSma) {
if (ASSERTS(SMA_LOCKED(pSma), "pSma %p is not locked:%d", pSma, pSma->locked)) {
terrno = TSDB_CODE_APP_ERROR;
smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), tstrerror(terrno));
return -1;
}

pSma->locked = false;
int code = taosThreadMutexUnlock(&pSma->mutex);
Expand Down