Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rpc): if all vnode in one group can not connect, return TSDB_CODE… #18935

Merged
merged 6 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/client/src/tscLocal.c
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) {
pSql->res.code = pHb->res.code;
}

if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pSql->res.code == TSDB_CODE_RPC_VGROUP_NOT_CONNECTED) {
taosReleaseRef(tscObjRef, pObj->hbrid);
return pSql->res.code;
}
Expand All @@ -920,7 +920,7 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) {
taosReleaseRef(tscObjRef, pObj->hbrid);
}

if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pSql->res.code == TSDB_CODE_RPC_VGROUP_NOT_CONNECTED) {
return pSql->res.code;
}

Expand Down
3 changes: 2 additions & 1 deletion src/client/src/tscParseLineProtocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ static void insertCallback(void *param, TAOS_RES *res, int32_t notUsedCode) {
|| code == TSDB_CODE_VND_INVALID_VGROUP_ID
|| code == TSDB_CODE_TDB_TABLE_RECONFIGURE
|| code == TSDB_CODE_APP_NOT_READY
|| code == TSDB_CODE_RPC_VGROUP_NOT_CONNECTED
|| code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && batch->tryTimes < TSDB_MAX_REPLICA) {
batch->tryAgain = true;
}
Expand All @@ -985,7 +986,7 @@ static void insertCallback(void *param, TAOS_RES *res, int32_t notUsedCode) {
}
}

if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_VGROUP_NOT_CONNECTED) {
if (batch->tryAgain) {
batch->sleep = true;
}
Expand Down
1 change: 1 addition & 0 deletions src/client/src/tscServer.c
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ bool shouldRewTableMeta(SSqlObj* pSql, SRpcMsg* rpcMsg) {
rpcMsg->code != TSDB_CODE_VND_INVALID_VGROUP_ID &&
rpcMsg->code != TSDB_CODE_QRY_INVALID_SCHEMA_VERSION &&
rpcMsg->code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
rpcMsg->code != TSDB_CODE_RPC_VGROUP_NOT_CONNECTED &&
rpcMsg->code != TSDB_CODE_APP_NOT_READY ) {
return false;
}
Expand Down
1 change: 1 addition & 0 deletions src/client/src/tscSubquery.c
Original file line number Diff line number Diff line change
Expand Up @@ -3462,6 +3462,7 @@ static bool needRetryInsert(SSqlObj* pParentObj) {

if (code != TSDB_CODE_TDB_TABLE_RECONFIGURE && code != TSDB_CODE_TDB_INVALID_TABLE_ID &&
code != TSDB_CODE_VND_INVALID_VGROUP_ID && code != TSDB_CODE_RPC_NETWORK_UNAVAIL &&
code != TSDB_CODE_RPC_VGROUP_NOT_CONNECTED &&
code != TSDB_CODE_APP_NOT_READY) {
pParentObj->res.code = code;
ret = false;
Expand Down
1 change: 1 addition & 0 deletions src/inc/taoserror.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015) //"Unable to resolve FQDN"
#define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) //"Invalid app version"
#define TSDB_CODE_RPC_SHORTCUT TAOS_DEF_ERROR_CODE(0, 0x0017) //"Shortcut"
#define TSDB_CODE_RPC_VGROUP_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0018) //"Vgroup could not be connected"

//common & util
#define TSDB_CODE_COM_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //"Operation not supported"
Expand Down
6 changes: 6 additions & 0 deletions src/rpc/src/rpcMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,12 @@ static void rpcProcessConnError(void *param, void *id) {
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;

if( pContext->numOfTry >= pContext->epSet.numOfEps && rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if(pContext->msgType == TSDB_MSG_TYPE_SUBMIT || pContext->msgType == TSDB_MSG_TYPE_QUERY) {
rpcMsg.code = TSDB_CODE_RPC_VGROUP_NOT_CONNECTED;
}
}

tWarn("%s %p, connection error. notify client query over. numOfTry=%d msgType=%d", pRpc->label, pContext->ahandle, pContext->numOfTry, pContext->msgType);
rpcNotifyClient(pContext, &rpcMsg);
} else {
Expand Down
1 change: 1 addition & 0 deletions src/util/src/terror.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SHORTCUT, "Shortcut")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_VGROUP_NOT_CONNECTED, "Vgroup could not be connected")

//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, "Operation not supported")
Expand Down
5 changes: 3 additions & 2 deletions src/vnode/src/vnodeRead.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt

atomic_add_fetch_32(&pVnode->queuedRMsg, 1);

if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) {
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->code == TSDB_CODE_RPC_VGROUP_NOT_CONNECTED ||
pRead->msgType == TSDB_MSG_TYPE_FETCH) {
vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount,
pVnode->queuedRMsg);
return taosWriteQitem(pVnode->fqueue, qtype, pRead);
Expand Down Expand Up @@ -229,7 +230,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
memset(pRet, 0, sizeof(SRspRet));

// qHandle needs to be freed correctly
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->code == TSDB_CODE_RPC_VGROUP_NOT_CONNECTED) {
vError("error rpc msg in query, %s", tstrerror(pRead->code));
}

Expand Down