From dffe02e3835fd4c77166ebd687eb5875f0fab368 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 22 Dec 2022 09:08:23 +0800 Subject: [PATCH 1/2] fix:modify delete msg type to tmq meta --- source/client/src/clientRawBlockWrite.c | 76 ++++++++++++------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 150194aa278..734ba58cee1 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -179,7 +179,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { } string = cJSON_PrintUnformatted(json); -end: + end: cJSON_Delete(json); tFreeSMAltertbReq(&req); return string; @@ -200,7 +200,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { } string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); -_err: + _err: tDecoderClear(&coder); return string; } @@ -220,7 +220,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) { } string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); -_err: + _err: tDecoderClear(&coder); return string; } @@ -302,7 +302,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { cJSON_AddItemToArray(tags, tag); } -end: + end: cJSON_AddItemToObject(json, "tags", tags); taosArrayDestroy(pTagVals); } @@ -360,7 +360,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { } } -_exit: + _exit: for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -393,7 +393,7 @@ static char* processAutoCreateTable(STaosxRsp* rsp) { } string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); -_exit: + _exit: for (int i = 0; i < rsp->createTableNum; i++) { tDecoderClear(&decoder[i]); taosMemoryFreeClear(pCreateReq[i].comment); @@ -515,7 +515,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { } string = cJSON_PrintUnformatted(json); -_exit: + _exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -548,7 +548,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); -_exit: + _exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -590,7 +590,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); -_exit: + _exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -678,7 +678,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); -end: + end: destroyRequest(pRequest); tFreeSMCreateStbReq(&pReq); tDecoderClear(&coder); @@ -748,7 +748,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); -end: + end: destroyRequest(pRequest); tDecoderClear(&coder); return code; @@ -809,9 +809,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table @@ -891,7 +891,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; -end: + end: for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -961,9 +961,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { @@ -1015,7 +1015,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { } code = pRequest->code; -end: + end: taosHashCleanup(pVgroupHashmap); destroyRequest(pRequest); tDecoderClear(&coder); @@ -1073,7 +1073,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { char sql[256] = {0}; snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey); - printf("delete sql:%s\n", sql); + uDebug("delete sql:%s\n", sql); TAOS_RES* res = taos_query(taos, sql); SRequestObj* pRequest = (SRequestObj*)res; @@ -1083,7 +1083,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { } taos_free_result(res); -end: + end: tDecoderClear(&coder); return code; } @@ -1130,9 +1130,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { } SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; SVgroupInfo pInfo = {0}; SName pName = {0}; @@ -1191,7 +1191,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { code = handleAlterTbExecRes(pRes->res, pCatalog); } } -end: + end: taosArrayDestroy(pArray); if (pVgData) taosMemoryFreeClear(pVgData->pData); taosMemoryFreeClear(pVgData); @@ -1267,14 +1267,14 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch uint16_t fLen = 0; int32_t rowSize = 0; int16_t nVar = 0; - for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { - SSchema* schema = pTableMeta->schema + i; - fLen += TYPE_BYTES[schema->type]; - rowSize += schema->bytes; - if (IS_VAR_DATA_TYPE(schema->type)) { - nVar++; - } + for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { + SSchema* schema = pTableMeta->schema + i; + fLen += TYPE_BYTES[schema->type]; + rowSize += schema->bytes; + if (IS_VAR_DATA_TYPE(schema->type)) { + nVar++; } + } fLen -= sizeof(TSKEY); @@ -1597,7 +1597,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); taosMemoryFree(subReq); @@ -1652,7 +1652,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { conn.requestObjRefId = pRequest->self; conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - printf("raw data block num:%d\n", rspObj.rsp.blockNum); + uDebug("raw data block num:%d\n", rspObj.rsp.blockNum); while (++rspObj.resIter < rspObj.rsp.blockNum) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); if (!rspObj.rsp.withSchema) { @@ -1675,7 +1675,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { goto end; } - printf("raw data tbname:%s\n", tbName); + uDebug("raw data tbname:%s\n", tbName); SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; strcpy(pName.dbname, pRequest->pDb); strcpy(pName.tname, tbName); @@ -1861,7 +1861,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: tDeleteSMqDataRsp(&rspObj.rsp); rspObj.resInfo.pRspMsg = NULL; doFreeReqResultInfo(&rspObj.resInfo); @@ -1922,7 +1922,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) conn.requestObjRefId = pRequest->self; conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - printf("raw data block num:%d\n", rspObj.rsp.blockNum); + uDebug("raw data block num:%d\n", rspObj.rsp.blockNum); while (++rspObj.resIter < rspObj.rsp.blockNum) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); if (!rspObj.rsp.withSchema) { @@ -1945,7 +1945,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } - printf("raw data tbname:%s\n", tbName); + uDebug("raw data tbname:%s\n", tbName); SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; strcpy(pName.dbname, pRequest->pDb); strcpy(pName.tname, tbName); From 9a5c327b7bd88899096b4a7d32b8fc6381117041 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 22 Dec 2022 17:34:48 +0800 Subject: [PATCH 2/2] fix:modify delete msg type to tmq meta --- source/client/src/clientRawBlockWrite.c | 44 +++++++++++++++++++++++++ utils/test/c/tmq_taosx_ci.c | 1 + 2 files changed, 45 insertions(+) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 734ba58cee1..8ceb2a3380c 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -553,6 +553,45 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { tDecoderClear(&decoder); return string; } +static char* processDeleteTable(SMqMetaRsp* metaRsp){ + SDeleteRes req = {0}; + SDecoder coder = {0}; + int32_t code = TSDB_CODE_SUCCESS; + cJSON* json = NULL; + char* string = NULL; + + // decode and process req + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + + tDecoderInit(&coder, data, len); + if (tDecodeDeleteRes(&coder, &req) < 0) { + code = TSDB_CODE_INVALID_PARA; + goto _exit; + } + + // getTbName(req.tableFName); + char sql[256] = {0}; + snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, + req.tsColName, req.skey, req.tsColName, req.ekey); + uDebug("delete sql:%s\n", sql); + + json = cJSON_CreateObject(); + if (json == NULL) { + goto _exit; + } + cJSON* type = cJSON_CreateString("delete"); + cJSON_AddItemToObject(json, "type", type); + cJSON* sqlJson = cJSON_CreateString(sql); + cJSON_AddItemToObject(json, "sql", sqlJson); + + string = cJSON_PrintUnformatted(json); + + _exit: + cJSON_Delete(json); + tDecoderClear(&coder); + return string; +} static char* processDropTable(SMqMetaRsp* metaRsp) { SDecoder decoder = {0}; @@ -2202,7 +2241,12 @@ char* tmq_get_json_meta(TAOS_RES* res) { return processAlterTable(&pMetaRspObj->metaRsp); } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) { return processDropTable(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE) { + return processDropTable(&pMetaRspObj->metaRsp); + } else if (pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DELETE) { + return processDeleteTable(&pMetaRspObj->metaRsp); } + return NULL; } diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 5af9ba68b26..024a253a2e2 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -661,6 +661,7 @@ void initLogFile() { "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c3\",\"colType\":8,\"colLength\":64}", "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t2\",\"colType\":8,\"colLength\":64}", "{\"type\":\"alter\",\"tableType\":\"child\",\"tableName\":\"ct3\",\"alterType\":4,\"colName\":\"t1\",\"colValue\":\"5000\",\"colValueNull\":false}", + "{\"type\":\"delete\",\"sql\":\"delete from `ct3` where `ts` >= 1626006833600 and `ts` <= 1626006833605\"}", "{\"type\":\"create\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":10,\"length\":4}],\"tags\":[]}", "{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":5,\"colName\":\"c3\",\"colType\":5}", "{\"type\":\"alter\",\"tableType\":\"normal\",\"tableName\":\"n1\",\"alterType\":7,\"colName\":\"c2\",\"colType\":10,\"colLength\":8}",