Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(vnode/destroy): delete objects #22482

Merged
merged 1 commit into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 51 additions & 54 deletions source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
pCfg->syncCfg.replicaNum++;
}
if(pCreate->selfIndex != -1){
if (pCreate->selfIndex != -1) {
pCfg->syncCfg.myIndex = pCreate->selfIndex;
}
for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
Expand All @@ -157,7 +157,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->syncCfg.totalReplicaNum++;
}
pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
if(pCreate->learnerSelfIndex != -1){
if (pCreate->learnerSelfIndex != -1) {
pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
}
}
Expand Down Expand Up @@ -201,38 +201,37 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}

if(req.learnerReplica == 0)
{
if (req.learnerReplica == 0) {
req.learnerSelfIndex = -1;
}

dInfo("vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d"
", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
"learnerReplica:%d learnerSelfIndex:%d strict:%d",
req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
(uint64_t)req.buffer * 1024 * 1024,
req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid,
req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression,
req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix,
req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
dInfo(
"vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
"szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d"
", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
"learnerReplica:%d learnerSelfIndex:%d strict:%d",
req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
(uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
req.isTsma, req.precision, req.compression, req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel,
req.walRetentionPeriod, req.walRetentionSize, req.walRollPeriod, req.walSegmentSize, req.hashMethod,
req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica,
req.learnerSelfIndex, req.strict);
for (int32_t i = 0; i < req.replica; ++i) {
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
req.replicas[i].id);
}
for (int32_t i = 0; i < req.learnerReplica; ++i) {
dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
req.learnerReplicas[i].port, req.replicas[i].id);
req.learnerReplicas[i].port, req.replicas[i].id);
}

SReplica *pReplica = NULL;
if(req.selfIndex != -1){
if (req.selfIndex != -1) {
pReplica = &req.replicas[req.selfIndex];
}
else{
} else {
pReplica = &req.learnerReplicas[req.learnerSelfIndex];
}
if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
Expand Down Expand Up @@ -313,10 +312,10 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
_OVER:
if (code != 0) {
vnodeClose(pImpl);
vnodeDestroy(path, pMgmt->pTfs);
vnodeDestroy(0, path, pMgmt->pTfs);
} else {
dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created",
req.vgId, TMSG_INFO(pMsg->msgType));
dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
TMSG_INFO(pMsg->msgType));
}

tFreeSCreateVnodeReq(&req);
Expand All @@ -331,12 +330,12 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}

if(req.learnerReplicas == 0){
if (req.learnerReplicas == 0) {
req.learnerSelfIndex = -1;
}

dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request",
req.vgId, TMSG_INFO(pMsg->msgType));
dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
TMSG_INFO(pMsg->msgType));

SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) {
Expand All @@ -347,15 +346,15 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {

ESyncRole role = vnodeGetRole(pVnode->pImpl);
dInfo("vgId:%d, checking node role:%d", req.vgId, role);
if(role == TAOS_SYNC_ROLE_VOTER){
if (role == TAOS_SYNC_ROLE_VOTER) {
dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
vmReleaseVnode(pMgmt, pVnode);
return -1;
}

dInfo("vgId:%d, checking node catch up", req.vgId);
if(vnodeIsCatchUp(pVnode->pImpl) != 1){
if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
terrno = TSDB_CODE_VND_NOT_CATCH_UP;
vmReleaseVnode(pMgmt, pVnode);
return -1;
Expand All @@ -375,20 +374,18 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
}

if (req.replica <= 0 ||
(req.selfIndex < 0 && req.learnerSelfIndex <0)||
req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) {
if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
req.learnerSelfIndex >= req.learnerReplica) {
terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", vgId);
vmReleaseVnode(pMgmt, pVnode);
return -1;
}

SReplica *pReplica = NULL;
if(req.selfIndex != -1){
if (req.selfIndex != -1) {
pReplica = &req.replicas[req.selfIndex];
}
else{
} else {
pReplica = &req.learnerReplicas[req.learnerSelfIndex];
}

Expand All @@ -412,7 +409,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
vmCloseVnode(pMgmt, pVnode, false);

int32_t diskPrimary = wrapperCfg.diskPrimary;
char path[TSDB_FILENAME_LEN] = {0};
char path[TSDB_FILENAME_LEN] = {0};
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);

dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
Expand All @@ -439,7 +436,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}

dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
req.vgId, TMSG_INFO(pMsg->msgType));
req.vgId, TMSG_INFO(pMsg->msgType));
return 0;
}

Expand Down Expand Up @@ -510,8 +507,8 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
vmCloseVnode(pMgmt, pVnode, true);

int32_t diskPrimary = wrapperCfg.diskPrimary;
char srcPath[TSDB_FILENAME_LEN] = {0};
char dstPath[TSDB_FILENAME_LEN] = {0};
char srcPath[TSDB_FILENAME_LEN] = {0};
char dstPath[TSDB_FILENAME_LEN] = {0};
snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);

Expand Down Expand Up @@ -555,15 +552,16 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}

if(alterReq.learnerReplica == 0){
if (alterReq.learnerReplica == 0) {
alterReq.learnerSelfIndex = -1;
}

int32_t vgId = alterReq.vgId;
dInfo("vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d",
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
dInfo(
"vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d",
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
for (int32_t i = 0; i < alterReq.replica; ++i) {
SReplica *pReplica = &alterReq.replicas[i];
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
Expand All @@ -573,19 +571,17 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
}

if (alterReq.replica <= 0 ||
(alterReq.selfIndex < 0 && alterReq.learnerSelfIndex <0)||
if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", vgId);
return -1;
}

SReplica *pReplica = NULL;
if(alterReq.selfIndex != -1){
if (alterReq.selfIndex != -1) {
pReplica = &alterReq.replicas[alterReq.selfIndex];
}
else{
} else {
pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
}

Expand Down Expand Up @@ -615,7 +611,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
vmCloseVnode(pMgmt, pVnode, false);

int32_t diskPrimary = wrapperCfg.diskPrimary;
char path[TSDB_FILENAME_LEN] = {0};
char path[TSDB_FILENAME_LEN] = {0};
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);

dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
Expand All @@ -641,10 +637,11 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}

dInfo("vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d",
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
dInfo(
"vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d",
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion source/dnode/mgmt/mgmt_vnode/src/vmInt.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
if (pVnode->dropped) {
dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
vnodeDestroy(path, pMgmt->pTfs);
vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs);
}

taosMemoryFree(pVnode->path);
Expand Down