Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:dead lock #22656

Merged
merged 5 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
118 changes: 23 additions & 95 deletions source/dnode/vnode/src/tq/tq.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,86 +416,6 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
rsp.code = code;
tmsgSendRsp(&rsp);
return 0;

// SMqVgOffset vgOffset = {0};
// int32_t vgId = TD_VID(pTq->pVnode);
//
// SDecoder decoder;
// tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
// if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
// tqError("vgId:%d failed to decode seek msg", vgId);
// return -1;
// }
//
// tDecoderClear(&decoder);
//
// tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
// vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);
//
// STqOffset* pOffset = &vgOffset.offset;
// if (pOffset->val.type != TMQ_OFFSET__LOG) {
// tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
// return -1;
// }
//
// STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
// if (pHandle == NULL) {
// tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
//
// // 2. check consumer-vg assignment status
// taosRLockLatch(&pTq->lock);
// if (pHandle->consumerId != vgOffset.consumerId) {
// tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
// vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
// terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
// taosRUnLockLatch(&pTq->lock);
// return -1;
// }
// taosRUnLockLatch(&pTq->lock);
//
// // 3. check the offset info
// STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
// if (pSavedOffset != NULL) {
// if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
// tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
// return 0; // no need to update the offset value
// }
//
// if (pSavedOffset->val.version == pOffset->val.version) {
// tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
// pOffset->val.version, pSavedOffset->val.version);
// return 0;
// }
// }
//
// int64_t sver = 0, ever = 0;
// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
// if (pOffset->val.version < sver) {
// pOffset->val.version = sver;
// } else if (pOffset->val.version > ever) {
// pOffset->val.version = ever;
// }
//
// // save the new offset value
// if (pSavedOffset != NULL) {
// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
// pSavedOffset->val.version);
// } else {
// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
// }
//
// if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
// tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
// return -1;
// }
//
// tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
// vgOffset.consumerId, vgOffset.offset.val.version);
//
// return 0;
}

int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
Expand Down Expand Up @@ -610,7 +530,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
taosWUnLockLatch(&pTq->lock);

tqDebug("tmq poll: consumer:0x%" PRIx64
"vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
" vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
consumerId, vgId, req.subKey, pHandle);
taosMsleep(10);
}
Expand Down Expand Up @@ -707,10 +627,10 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
taosRUnLockLatch(&pTq->lock);
return -1;
}
taosRUnLockLatch(&pTq->lock);

int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
taosRUnLockLatch(&pTq->lock);

SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, req.reqOffset);
Expand Down Expand Up @@ -766,27 +686,35 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
int32_t code = 0;

taosWLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (pHandle) {
while (tqIsHandleExec(pHandle)) {
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
pHandle->subKey, pHandle);
taosMsleep(10);
}
while (1) {
taosWLockLatch(&pTq->lock);
bool exec = tqIsHandleExec(pHandle);

if (pHandle->pRef) {
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
}
if(exec){
tqInfo("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
pHandle->subKey, pHandle);
taosWUnLockLatch(&pTq->lock);
taosMsleep(10);
continue;
}
if (pHandle->pRef) {
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
}

tqUnregisterPushHandle(pTq, pHandle);
tqUnregisterPushHandle(pTq, pHandle);

code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
}
taosWUnLockLatch(&pTq->lock);
break;
}
}

taosWLockLatch(&pTq->lock);
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
if (code != 0) {
tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
Expand Down
15 changes: 10 additions & 5 deletions source/dnode/vnode/src/tq/tqRead.c
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
SWalReader* pWalReader = pReader->pWalReader;

// uint64_t st = taosGetTimestampMs();
while (1) {
SArray* pBlockList = pReader->submit.aSubmitTbData;
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
Expand Down Expand Up @@ -439,6 +440,10 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);

pReader->msg.msgStr = NULL;

// if(taosGetTimestampMs() - st > 5){
// return false;
// }
}
}

Expand Down Expand Up @@ -489,7 +494,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
return true;
} else {
tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
tqInfo("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
taosHashGetSize(pReader->tbIdHash), idstr);
}

Expand Down Expand Up @@ -850,7 +855,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
tDeleteSchemaWrapper(pSW);
goto FAIL;
}
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(block.pDataBlock));

block.info.id.uid = uid;
Expand All @@ -867,7 +872,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas

SSDataBlock* pBlock = taosArrayGetLast(blocks);

tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(blocks));

int32_t targetIdx = 0;
Expand Down Expand Up @@ -949,7 +954,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
tDeleteSchemaWrapper(pSW);
goto FAIL;
}
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(block.pDataBlock));

block.info.id.uid = uid;
Expand All @@ -966,7 +971,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas

SSDataBlock* pBlock = taosArrayGetLast(blocks);

tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(blocks));

int32_t targetIdx = 0;
Expand Down
23 changes: 4 additions & 19 deletions source/dnode/vnode/src/tq/tqUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
}

//static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
// if(offset->type == TMQ_OFFSET__LOG){
// offset->version = ver;
// }
//}

static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
uint64_t consumerId = pRequest->consumerId;
Expand All @@ -140,7 +134,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,

SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, *pOffset);
// dataRsp.reqOffset.type = pOffset->type; // store origin type for getting offset in tmq_get_vgroup_offset

qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
Expand All @@ -161,7 +154,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock);
}

// setRequestVersion(&dataRsp.reqOffset, pOffset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);

end : {
Expand All @@ -182,7 +174,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, *offset);
// taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset

if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
Expand Down Expand Up @@ -216,19 +207,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
walReaderVerifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version;

// uint64_t st = taosGetTimestampMs();
int totalRows = 0;
while (1) {
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
if (savedEpoch > pRequest->epoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
break;
}
ASSERT (savedEpoch <= pRequest->epoch);

if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
Expand All @@ -241,7 +227,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
Expand Down Expand Up @@ -269,9 +254,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
goto end;
}

// if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) {
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
} else {
Expand Down Expand Up @@ -310,7 +295,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
// this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
} else { // todo handle the case where re-balance occurs.
} else {
// for taosx
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
}
Expand Down
3 changes: 0 additions & 3 deletions source/libs/executor/src/scanoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1727,9 +1727,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
// qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
// pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
// pInfo->tqReader->pWalReader->curVersion);
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
return pResult;
}
Expand Down
4 changes: 2 additions & 2 deletions tests/system-test/7-tmq/tmqDropStb.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TDTestCase:
'rowsPerTbl': 10000,
'batchNum': 2000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'pollDelay': 50,
'showMsg': 1,
'showRow': 1}

Expand All @@ -45,7 +45,7 @@ class TDTestCase:
autoCommitInterval = 'auto.commit.interval.ms:1000'
autoOffset = 'auto.offset.reset:earliest'

pollDelay = 20
pollDelay = 50
showMsg = 1
showRow = 1

Expand Down