Skip to content

Commit

Permalink
Merge pull request #21335 from taosdata/feat/TD-22746
Browse files Browse the repository at this point in the history
feat: support client meta control
  • Loading branch information
dapan1121 committed May 19, 2023
2 parents b7075c3 + efbc3fc commit 42c6589
Show file tree
Hide file tree
Showing 18 changed files with 577 additions and 110 deletions.
3 changes: 2 additions & 1 deletion include/common/tglobal.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ extern int32_t tsRedirectFactor;
extern int32_t tsRedirectMaxPeriod;
extern int32_t tsMaxRetryWaitTime;
extern bool tsUseAdapter;
extern int32_t tsMetaCacheMaxSize;
extern int32_t tsSlowLogThreshold;
extern int32_t tsSlowLogScope;

Expand Down Expand Up @@ -193,7 +194,7 @@ struct SConfig *taosGetCfg();

void taosSetAllDebugFlag(int32_t flag, bool rewrite);
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal, bool rewrite);
int32_t taosSetCfg(SConfig *pCfg, char *name);
int32_t taosApplyLocalCfg(SConfig *pCfg, char *name);
void taosLocalCfgForbiddenToChange(char *name, bool *forbidden);

#ifdef __cplusplus
Expand Down
3 changes: 2 additions & 1 deletion include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -3191,7 +3191,8 @@ typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t suid;
int32_t version;
SArray* pIndex;
int32_t indexSize;
SArray* pIndex; // STableIndexInfo
} STableIndexRsp;

int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
Expand Down
2 changes: 1 addition & 1 deletion include/libs/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const

int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);

int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);
int32_t catalogAsyncUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg);

int32_t catalogGetCachedTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta);

Expand Down
24 changes: 10 additions & 14 deletions include/libs/qcom/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,23 @@ typedef struct STbVerInfo {
int32_t tversion;
} STbVerInfo;

/*
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
* The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info.
*/
#pragma pack(push, 1)
typedef struct SCTableMeta {
int32_t vgId : 24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
int32_t vgId;
int8_t tableType;
} SCTableMeta;
#pragma pack(pop)

/*
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a
* SCTableMeta.
*/

#pragma pack(push, 1)
typedef struct STableMeta {
// BEGIN: KEEP THIS PART SAME WITH SCTableMeta
int32_t vgId : 24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
int32_t vgId;
int8_t tableType;
// END: KEEP THIS PART SAME WITH SCTableMeta

// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta
Expand All @@ -121,6 +116,7 @@ typedef struct STableMeta {
STableComInfo tableInfo;
SSchema schema[];
} STableMeta;
#pragma pack(pop)

typedef struct SDBVgInfo {
int32_t vgVersion;
Expand All @@ -130,7 +126,7 @@ typedef struct SDBVgInfo {
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs;
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
SArray* vgArray;
SArray* vgArray; // SVgroupInfo
} SDBVgInfo;

typedef struct SUseDbOutput {
Expand Down
2 changes: 1 addition & 1 deletion source/client/src/clientEnv.c
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
} else {
tscInfo("set cfg:%s to %s", pItem->name, str);
if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
code = taosSetCfg(pCfg, pItem->name);
code = taosApplyLocalCfg(pCfg, pItem->name);
}
}

Expand Down
2 changes: 1 addition & 1 deletion source/client/src/clientHb.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
return TSDB_CODE_TSC_INVALID_VALUE;
}

catalogUpdateTableMeta(pCatalog, rsp);
catalogAsyncUpdateTableMeta(pCatalog, rsp);
}
}

Expand Down
2 changes: 1 addition & 1 deletion source/client/src/clientImpl.c
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
}

int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
}

int32_t handleQueryExecRsp(SRequestObj* pRequest) {
Expand Down
14 changes: 10 additions & 4 deletions source/common/src/tglobal.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,10 @@ int32_t tsRedirectFactor = 2;
int32_t tsRedirectMaxPeriod = 1000;
int32_t tsMaxRetryWaitTime = 10000;
bool tsUseAdapter = false;
int32_t tsMetaCacheMaxSize = -1; // MB
int32_t tsSlowLogThreshold = 3; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;




/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
* metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server.
Expand Down Expand Up @@ -351,6 +349,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, true) != 0) return -1;
if (cfgAddString(pCfg, "slowLogScope", "", true) != 0) return -1;

Expand Down Expand Up @@ -788,6 +787,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64;
tsMetaCacheMaxSize = cfgGetItem(pCfg, "metaCacheMaxSize")->i32;
tsSlowLogThreshold = cfgGetItem(pCfg, "slowLogThreshold")->i32;
if (taosSetSlowLogScope(cfgGetItem(pCfg, "slowLogScope")->str)) {
return -1;
Expand Down Expand Up @@ -916,7 +916,7 @@ void taosLocalCfgForbiddenToChange(char *name, bool *forbidden) {
*forbidden = false;
}

int32_t taosSetCfg(SConfig *pCfg, char *name) {
int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
int32_t len = strlen(name);
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
Expand Down Expand Up @@ -1051,6 +1051,12 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
}
break;
}
case 'e': {
if (strcasecmp("metaCacheMaxSize", name) == 0) {
atomic_store_32(&tsMetaCacheMaxSize, cfgGetItem(pCfg, "metaCacheMaxSize")->i32);
}
break;
}
case 'i': {
if (strcasecmp("minimalTmpDirGB", name) == 0) {
tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024);
Expand Down
10 changes: 7 additions & 3 deletions source/common/src/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1566,21 +1566,21 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
int32_t len = strlen(db);
taosHashPut(pRsp->createdDbs, db, len, db, len);
taosHashPut(pRsp->createdDbs, db, len, db, len + 1);
}

for (int32_t i = 0; i < numOfReadDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
int32_t len = strlen(db);
taosHashPut(pRsp->readDbs, db, len, db, len);
taosHashPut(pRsp->readDbs, db, len, db, len + 1);
}

for (int32_t i = 0; i < numOfWriteDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
int32_t len = strlen(db);
taosHashPut(pRsp->writeDbs, db, len, db, len);
taosHashPut(pRsp->writeDbs, db, len, db, len + 1);
}

if (!tDecodeIsEnd(pDecoder)) {
Expand Down Expand Up @@ -3416,6 +3416,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->suid) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->version) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->indexSize) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) {
Expand Down Expand Up @@ -3461,6 +3462,7 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->suid) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->indexSize) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
Expand Down Expand Up @@ -3735,6 +3737,7 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
if (tEncodeCStr(&encoder, pIndexRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pIndexRsp->suid) < 0) return -1;
if (tEncodeI32(&encoder, pIndexRsp->version) < 0) return -1;
if (tEncodeI32(&encoder, pIndexRsp->indexSize) < 0) return -1;
int32_t num = taosArrayGetSize(pIndexRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
Expand Down Expand Up @@ -3797,6 +3800,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
if (tDecodeCStrTo(&decoder, tableIndexRsp.dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &tableIndexRsp.suid) < 0) return -1;
if (tDecodeI32(&decoder, &tableIndexRsp.version) < 0) return -1;
if (tDecodeI32(&decoder, &tableIndexRsp.indexSize) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
Expand Down
1 change: 1 addition & 0 deletions source/dnode/mnode/impl/src/mndSma.c
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
return code;
}

rsp->indexSize += sizeof(info) + pSma->exprLen + 1;
*exist = true;

sdbRelease(pSdb, pSma);
Expand Down
Loading

0 comments on commit 42c6589

Please sign in to comment.