Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enh: skip request to restore a vnode of single replica in failed mode #23787

Merged
merged 4 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ typedef struct {
} SVnodeThread;

// vmInt.c
int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId);
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId);
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict);
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal);
Expand Down
25 changes: 13 additions & 12 deletions source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {

vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);

SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode != NULL && !pVnode->failed) {
SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
dError("vgId:%d, already exist", req.vgId);
tFreeSCreateVnodeReq(&req);
vmReleaseVnode(pMgmt, pVnode);
Expand All @@ -291,10 +291,11 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}

ASSERT(pVnode == NULL || pVnode->failed);

wrapperCfg.diskPrimary = pVnode ? pVnode->diskPrimary : vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
int32_t diskPrimary = wrapperCfg.diskPrimary;
int32_t diskPrimary = vmGetPrimaryDisk(pMgmt, vnodeCfg.vgId);
if (diskPrimary < 0) {
diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
}
wrapperCfg.diskPrimary = diskPrimary;

snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);

Expand Down Expand Up @@ -371,7 +372,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
TMSG_INFO(pMsg->msgType));

SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
Expand Down Expand Up @@ -489,7 +490,7 @@ int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
req.vgId, TMSG_INFO(pMsg->msgType));

SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
Expand Down Expand Up @@ -532,7 +533,7 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);

SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
Expand Down Expand Up @@ -565,7 +566,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
req.dstVgId);
pVnode = vmAcquireVnode(pMgmt, srcVgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
Expand Down Expand Up @@ -680,7 +681,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}

SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
if (pVnode) vmReleaseVnode(pMgmt, pVnode);
Expand Down Expand Up @@ -748,7 +749,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}

SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
if (pVnode == NULL) {
dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
Expand Down
39 changes: 34 additions & 5 deletions source/dnode/mgmt/mgmt_vnode/src/vmInt.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@
#include "vnd.h"
#include "libs/function/tudf.h"

int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
int32_t diskId = -1;
SVnodeObj *pVnode = NULL;

taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode != NULL) {
diskId = pVnode->diskPrimary;
}
taosThreadRwlockUnlock(&pMgmt->lock);
return diskId;
}

int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
STfs *pTfs = pMgmt->pTfs;
int32_t diskId = 0;
Expand Down Expand Up @@ -74,12 +87,12 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
return diskId;
}

SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
SVnodeObj *pVnode = NULL;

taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL || pVnode->dropped) {
if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
pVnode = NULL;
} else {
Expand All @@ -91,6 +104,8 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
return pVnode;
}

SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVnodeImpl(pMgmt, vgId, true); }

void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
if (pVnode == NULL) return;

Expand All @@ -100,6 +115,15 @@ void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
taosThreadRwlockUnlock(&pMgmt->lock);
}

static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
if (!ppVnode || !(*ppVnode)) return;

SVnodeObj *pVnode = *ppVnode;
taosMemoryFree(pVnode->path);
taosMemoryFree(pVnode);
ppVnode[0] = NULL;
}

int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
SVnodeObj *pVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
Expand Down Expand Up @@ -134,6 +158,12 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
}

taosThreadRwlockWrlock(&pMgmt->lock);
SVnodeObj *pOld = NULL;
taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
if (pOld) {
ASSERT(pOld->failed);
vmFreeVnodeObj(&pOld);
}
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
taosThreadRwlockUnlock(&pMgmt->lock);

Expand Down Expand Up @@ -223,8 +253,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs);
}

taosMemoryFree(pVnode->path);
taosMemoryFree(pVnode);
vmFreeVnodeObj(&pVnode);
}

static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
Expand Down Expand Up @@ -621,7 +650,7 @@ static void *vmRestoreVnodeInThread(void *param) {
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
SVnodeObj *pVnode = pThread->ppVnodes[v];
if (pVnode->failed) {
dError("vgId:%d, skip restoring vnode in failure mode.", pVnode->vgId);
dError("vgId:%d, cannot restore a vnode in failed mode.", pVnode->vgId);
continue;
}

Expand Down
4 changes: 2 additions & 2 deletions source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
pHead->vgId = ntohl(pHead->vgId);

SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL || pVnode->failed) {
if (pVnode == NULL) {
dGDebug("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
terrno = (terrno != 0) ? terrno : -1;
Expand Down Expand Up @@ -316,7 +316,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t size = -1;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
if (pVnode != NULL && !pVnode->failed) {
if (pVnode != NULL) {
switch (qtype) {
case WRITE_QUEUE:
size = taosQueueItemSize(pVnode->pWriteW.queue);
Expand Down