Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support show vnodes with restored status #22448

Merged
merged 10 commits into from
Aug 17, 2023
7 changes: 5 additions & 2 deletions include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,9 @@ typedef struct {
int32_t vgId;
int8_t syncState;
int8_t syncRestore;
int64_t syncTerm;
int64_t roleTimeMs;
int64_t startTimeMs;
int8_t syncCanRead;
int64_t cacheUsage;
int64_t numOfTables;
Expand All @@ -1181,9 +1184,9 @@ typedef struct {
} SVnodeLoad;

typedef struct {
int8_t syncState;
int8_t syncRestore;
int8_t syncState;
int64_t syncTerm;
int8_t syncRestore;
int64_t roleTimeMs;
} SMnodeLoad;

Expand Down
1 change: 1 addition & 0 deletions include/libs/sync/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ typedef struct SSyncState {
bool canRead;
SyncTerm term;
int64_t roleTimeMs;
int64_t startTimeMs;
} SSyncState;

int32_t syncInit();
Expand Down
10 changes: 5 additions & 5 deletions source/common/src/systable.c
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ static const SSysDbTableSchema topicSchema[] = {
{.name = "type", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
};


static const SSysDbTableSchema subscriptionSchema[] = {
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
Expand All @@ -295,12 +294,13 @@ static const SSysDbTableSchema subscriptionSchema[] = {
};

static const SSysDbTableSchema vnodesSchema[] = {
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "replica", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "dnode_ep", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "restored", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = true},
};

static const SSysDbTableSchema userUserPrivilegesSchema[] = {
Expand Down
32 changes: 27 additions & 5 deletions source/common/src/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1083,8 +1083,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1;
if (tEncodeI32(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, pload->roleTimeMs) < 0) return -1;
if (tEncodeI64(&encoder, pload->startTimeMs) < 0) return -1;
}

// mnode loads
Expand All @@ -1108,6 +1108,16 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pReq->mload.syncTerm) < 0) return -1;
if (tEncodeI64(&encoder, pReq->mload.roleTimeMs) < 0) return -1;
if (tEncodeI8(&encoder, pReq->clusterCfg.ttlChangeOnWrite) < 0) return -1;

// vnode extra
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i);
int64_t reserved = 0;
if (tEncodeI64(&encoder, pload->syncTerm) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
}
tEndEncode(&encoder);

int32_t tlen = encoder.pos;
Expand Down Expand Up @@ -1152,7 +1162,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {

for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad vload = {0};
int64_t reserved64 = 0;
vload.syncTerm = -1;
int32_t reserved32 = 0;
if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1;
if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1;
Expand All @@ -1166,14 +1176,15 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
if (tDecodeI32(&decoder, (int32_t *)&reserved32) < 0) return -1;
if (tDecodeI64(&decoder, &reserved64) < 0) return -1;
if (tDecodeI64(&decoder, &reserved64) < 0) return -1;
if (tDecodeI64(&decoder, &vload.roleTimeMs) < 0) return -1;
if (tDecodeI64(&decoder, &vload.startTimeMs) < 0) return -1;
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}

// mnode loads
if (tDecodeI8(&decoder, &pReq->mload.syncState) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->mload.syncRestore) < 0) return -1;

Expand Down Expand Up @@ -1204,6 +1215,17 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI8(&decoder, &pReq->clusterCfg.ttlChangeOnWrite) < 0) return -1;
}

// vnode extra
if (!tDecodeIsEnd(&decoder)) {
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad *pLoad = taosArrayGet(pReq->pVloads, i);
int64_t reserved = 0;
if (tDecodeI64(&decoder, &pLoad->syncTerm) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
Expand Down
3 changes: 3 additions & 0 deletions source/dnode/mnode/impl/inc/mndDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,11 @@ typedef struct {
typedef struct {
int32_t dnodeId;
ESyncState syncState;
int64_t syncTerm;
bool syncRestore;
bool syncCanRead;
int64_t roleTimeMs;
int64_t startTimeMs;
ESyncRole nodeRole;
} SVnodeGid;

Expand Down
81 changes: 52 additions & 29 deletions source/dnode/mnode/impl/src/mndDnode.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,47 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const S
return 0;
}

static bool mndUpdateVnodeState(int32_t vgId, SVnodeGid *pGid, SVnodeLoad *pVload) {
bool stateChanged = false;
bool roleChanged = pGid->syncState != pVload->syncState ||
(pVload->syncTerm != -1 && pGid->syncTerm != pVload->syncTerm) ||
pGid->roleTimeMs != pVload->roleTimeMs;
if (roleChanged || pGid->syncRestore != pVload->syncRestore || pGid->syncCanRead != pVload->syncCanRead ||
pGid->startTimeMs != pVload->startTimeMs) {
mInfo(
"vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
"canRead:%d, dnode:%d",
vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, syncStr(pVload->syncState),
pVload->syncRestore, pVload->syncCanRead, pGid->dnodeId);
pGid->syncState = pVload->syncState;
pGid->syncTerm = pVload->syncTerm;
pGid->syncRestore = pVload->syncRestore;
pGid->syncCanRead = pVload->syncCanRead;
pGid->startTimeMs = pVload->startTimeMs;
pGid->roleTimeMs = pVload->roleTimeMs;
stateChanged = true;
}
return stateChanged;
}

static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
bool stateChanged = false;
bool roleChanged = pObj->syncState != pMload->syncState ||
(pMload->syncTerm != -1 && pObj->syncTerm != pMload->syncTerm) ||
pObj->roleTimeMs != pMload->roleTimeMs;
if (roleChanged || pObj->syncRestore != pMload->syncRestore) {
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 " to %" PRId64,
pObj->id, syncStr(pObj->syncState), syncStr(pMload->syncState), pObj->syncRestore, pMload->syncRestore,
pObj->syncTerm, pMload->syncTerm);
pObj->syncState = pMload->syncState;
pObj->syncTerm = pMload->syncTerm;
pObj->syncRestore = pMload->syncRestore;
pObj->roleTimeMs = pMload->roleTimeMs;
stateChanged = true;
}
return stateChanged;
}

static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStatusReq statusReq = {0};
Expand Down Expand Up @@ -496,26 +537,21 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pVgroup->compStorage = pVload->compStorage;
pVgroup->pointsWritten = pVload->pointsWritten;
}
bool roleChanged = false;
bool stateChanged = false;
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
if (pGid->dnodeId == statusReq.dnodeId) {
if (pGid->syncState != pVload->syncState || pGid->syncRestore != pVload->syncRestore ||
pGid->syncCanRead != pVload->syncCanRead) {
mInfo(
"vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
"canRead:%d, dnode:%d",
pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead,
syncStr(pVload->syncState), pVload->syncRestore, pVload->syncCanRead, pDnode->id);
pGid->syncState = pVload->syncState;
pGid->syncRestore = pVload->syncRestore;
pGid->syncCanRead = pVload->syncCanRead;
roleChanged = true;
if (pVload->startTimeMs == 0) {
pVload->startTimeMs = statusReq.rebootTime;
}
if (pVload->roleTimeMs == 0) {
pVload->roleTimeMs = statusReq.rebootTime;
}
stateChanged = mndUpdateVnodeState(pVgroup->vgId, pGid, pVload);
break;
}
}
if (roleChanged) {
if (stateChanged) {
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb != NULL && pDb->stateTs != curMs) {
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
Expand All @@ -531,23 +567,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {

SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
if (pObj != NULL) {
bool roleChanged = pObj->syncState != statusReq.mload.syncState ||
(statusReq.mload.syncTerm != -1 && pObj->syncTerm != statusReq.mload.syncTerm);
bool restoreChanged = pObj->syncRestore != statusReq.mload.syncRestore;
if (roleChanged || restoreChanged) {
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64
" to %" PRId64,
pObj->id, syncStr(pObj->syncState), syncStr(statusReq.mload.syncState), pObj->syncRestore,
statusReq.mload.syncRestore, pObj->syncTerm, statusReq.mload.syncTerm);
pObj->syncState = statusReq.mload.syncState;
pObj->syncRestore = statusReq.mload.syncRestore;
pObj->syncTerm = statusReq.mload.syncTerm;
if (statusReq.mload.roleTimeMs == 0) {
statusReq.mload.roleTimeMs = statusReq.rebootTime;
}

if (roleChanged) {
pObj->roleTimeMs = (statusReq.mload.roleTimeMs != 0) ? statusReq.mload.roleTimeMs : taosGetTimestampMs();
}

mndUpdateMnodeState(pObj, &statusReq.mload);
mndReleaseMnode(pMnode, pObj);
}

Expand Down
7 changes: 4 additions & 3 deletions source/dnode/mnode/impl/src/mndMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;

bool roleChanged = false;
bool stateChanged = false;
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
SVnodeGid *pGid = &pVgroup->vnodeGid[vg];
if (pGid->dnodeId == dnodeId) {
Expand All @@ -197,13 +197,14 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
pGid->syncState = TAOS_SYNC_STATE_OFFLINE;
pGid->syncRestore = 0;
pGid->syncCanRead = 0;
roleChanged = true;
pGid->startTimeMs = 0;
stateChanged = true;
}
break;
}
}

if (roleChanged) {
if (stateChanged) {
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb != NULL && pDb->stateTs != curMs) {
mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
Expand Down
12 changes: 2 additions & 10 deletions source/dnode/mnode/impl/src/mndMnode.c
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,6 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
ESdbStatus objStatus = 0;
char *pWrite;
int64_t curMs = taosGetTimestampMs();
int64_t dummyTimeMs = 0;

pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId);
if (pSelfObj == NULL) {
Expand Down Expand Up @@ -858,16 +857,9 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);

int64_t roleTimeMs = (isDnodeOnline) ? pObj->roleTimeMs : 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pObj->syncTerm != pSelfObj->syncTerm || !isDnodeOnline) {
// state of old term / no status report => use dummyTimeMs
if (pObj->syncTerm > pSelfObj->syncTerm) {
mError("mnode:%d has a newer term:%" PRId64 " than me:%" PRId64, pObj->id, pObj->syncTerm, pSelfObj->syncTerm);
}
colDataSetVal(pColInfo, numOfRows, (const char *)&dummyTimeMs, false);
} else {
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->roleTimeMs, false);
}
colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);

numOfRows++;
sdbRelease(pSdb, pObj);
Expand Down
44 changes: 27 additions & 17 deletions source/dnode/mnode/impl/src/mndVgroup.c
Original file line number Diff line number Diff line change
Expand Up @@ -961,27 +961,24 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
int32_t cols = 0;
int64_t curMs = taosGetTimestampMs();

while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
if (pShow->pIter == NULL) break;

for (int32_t i = 0; i < pVgroup->replica && numOfRows < rows; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SVnodeGid *pGid = &pVgroup->vnodeGid[i];
SColumnInfoData *pColInfo = NULL;
cols = 0;

pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);

pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->replica, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->dnodeId, false);

char buf[20] = {0};
STR_TO_VARSTR(buf, syncStr(pVgid->syncState));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);

// db_name
const char *dbname = mndGetDbStr(pVgroup->dbName);
char b1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
if (dbname != NULL) {
Expand All @@ -992,20 +989,33 @@ static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)b1, false);

// dnode is online?
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pGid->dnodeId);
if (pDnode == NULL) {
mError("failed to acquire dnode. dnodeId:%d", pGid->dnodeId);
break;
}
bool isDnodeOnline = mndIsDnodeOnline(pDnode, curMs);

char buf[20] = {0};
ESyncState syncState = (isDnodeOnline) ? pGid->syncState : TAOS_SYNC_STATE_OFFLINE;
STR_TO_VARSTR(buf, syncStr(syncState));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgid->dnodeId, false);
colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);

int64_t roleTimeMs = (isDnodeOnline) ? pGid->roleTimeMs : 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&roleTimeMs, false);

int64_t startTimeMs = (isDnodeOnline) ? pGid->startTimeMs : 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&startTimeMs, false);

SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
char b2[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
if (pDnode != NULL) {
STR_WITH_MAXSIZE_TO_VARSTR(b2, pDnode->ep, TSDB_EP_LEN + VARSTR_HEADER_SIZE);
} else {
STR_WITH_MAXSIZE_TO_VARSTR(b2, "NULL", TSDB_EP_LEN + VARSTR_HEADER_SIZE);
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)b2, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pGid->syncRestore, false);

numOfRows++;
sdbRelease(pSdb, pDnode);
}

sdbRelease(pSdb, pVgroup);
Expand Down
3 changes: 3 additions & 0 deletions source/dnode/vnode/src/vnd/vnodeQuery.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->vgId = TD_VID(pVnode);
pLoad->syncState = state.state;
pLoad->syncRestore = state.restored;
pLoad->syncTerm = state.term;
pLoad->roleTimeMs = state.roleTimeMs;
pLoad->startTimeMs = state.startTimeMs;
pLoad->syncCanRead = state.canRead;
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
Expand Down