From d94eb77e25e15f8dff714288cee04ddd8e185f88 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 15 May 2024 17:18:29 +0800 Subject: [PATCH 1/2] feat: stmt add delay1 delay2 delay3 for settbname bind setbatch --- inc/bench.h | 3 +++ inc/benchData.h | 3 ++- src/benchData.c | 6 ++++- src/benchInsert.c | 64 +++++++++++++++++++++++++++++++++++++---------- src/benchMain.c | 14 ++++++++--- 5 files changed, 71 insertions(+), 19 deletions(-) diff --git a/inc/bench.h b/inc/bench.h index 2678a0f0..d8925a12 100644 --- a/inc/bench.h +++ b/inc/bench.h @@ -996,6 +996,9 @@ typedef struct SThreadInfo_S { uint64_t totalInsertRows; uint64_t totalQueried; int64_t totalDelay; + int64_t totalDelay1; + int64_t totalDelay2; + int64_t totalDelay3; uint64_t querySeq; TAOS_SUB *tsub; char ** lines; diff --git a/inc/benchData.h b/inc/benchData.h index 6ae487bb..a88271b1 100644 --- a/inc/benchData.h +++ b/inc/benchData.h @@ -28,7 +28,8 @@ int generateRandData(SSuperTable *stbInfo, char *sampleDataBuf, bool tag, BArray *childCols); int prepareStmt(SSuperTable *stbInfo, TAOS_STMT *stmt, char* tagData, uint64_t tableSeq); uint32_t bindParamBatch(threadInfo *pThreadInfo, - uint32_t batch, int64_t startTime, SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n); + uint32_t batch, int64_t startTime, + SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n, int64_t *delay2, int64_t *delay3); int prepareSampleData(SDataBase* database, SSuperTable* stbInfo); void generateSmlJsonTags(tools_cJSON *tagsList, char **sml_tags_json_array, diff --git a/src/benchData.c b/src/benchData.c index 5aee8da1..ebd3aedc 100644 --- a/src/benchData.c +++ b/src/benchData.c @@ -1798,7 +1798,7 @@ int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio, uint32_t bindParamBatch(threadInfo *pThreadInfo, uint32_t batch, int64_t startTime, - SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n) { + SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n, int64_t *delay2, int64_t *delay3) { TAOS_STMT *stmt = pThreadInfo->conn->stmt; SSuperTable *stbInfo = pThreadInfo->stbInfo; uint32_t columnCount = stbInfo->cols->size; @@ -1858,12 +1858,14 @@ uint32_t bindParamBatch(threadInfo *pThreadInfo, } } + int64_t start = toolsGetTimestampUs(); if (taos_stmt_bind_param_batch( stmt, (TAOS_MULTI_BIND *)pThreadInfo->bindParams)) { errorPrint("taos_stmt_bind_param_batch() failed! reason: %s\n", taos_stmt_errstr(stmt)); return 0; } + *delay2 += toolsGetTimestampUs() - start; for (int c = 0; c < stbInfo->cols->size + 1; c++) { TAOS_MULTI_BIND *param = @@ -1873,11 +1875,13 @@ uint32_t bindParamBatch(threadInfo *pThreadInfo, } // if msg > 3MB, break + start = toolsGetTimestampUs(); if (taos_stmt_add_batch(stmt)) { errorPrint("taos_stmt_add_batch() failed! reason: %s\n", taos_stmt_errstr(stmt)); return 0; } + *delay3 += toolsGetTimestampUs() - start; return batch; } diff --git a/src/benchInsert.c b/src/benchInsert.c index 405a218a..0760a54e 100644 --- a/src/benchInsert.c +++ b/src/benchInsert.c @@ -1536,7 +1536,10 @@ static void *syncWriteInterlace(void *sarg) { if (stbInfo->autoTblCreating) { csvFile = openTagCsv(stbInfo); tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false); - } + } + int64_t delay1 = 0; + int64_t delay2 = 0; + int64_t delay3 = 0; while (insertRows > 0) { int64_t tmp_total_insert_rows = 0; @@ -1693,6 +1696,7 @@ static void *syncWriteInterlace(void *sarg) { snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s", tableName); } + int64_t start = toolsGetTimestampUs(); if (taos_stmt_set_tbname(pThreadInfo->conn->stmt, escapedTbName)) { errorPrint( @@ -1702,9 +1706,12 @@ static void *syncWriteInterlace(void *sarg) { g_fail = true; goto free_of_interlace; } + delay1 += toolsGetTimestampUs() - start; + int32_t n = 0; generated = bindParamBatch(pThreadInfo, interlaceRows, - childTbl->ts, childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n); + childTbl->ts, childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n, &delay2, &delay3); + childTbl->ts += stbInfo->timestamp_step * n; break; } @@ -1852,7 +1859,8 @@ static void *syncWriteInterlace(void *sarg) { break; } - int64_t delay = endTs - startTs; + int64_t delay4 = endTs - startTs; + int64_t delay = delay1 + delay2 + delay3 + delay4; if (delay <=0) { debugPrint("thread[%d]: startTS: %"PRId64", endTS: %"PRId64"\n", pThreadInfo->threadID, startTs, endTs); @@ -1866,7 +1874,11 @@ static void *syncWriteInterlace(void *sarg) { tmfree(pdelay); } pThreadInfo->totalDelay += delay; + pThreadInfo->totalDelay1 += delay1; + pThreadInfo->totalDelay2 += delay2; + pThreadInfo->totalDelay3 += delay3; } + delay1 = delay2 = delay3 = 0; int64_t currentPrintTime = toolsGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30 * 1000) { @@ -1876,7 +1888,7 @@ static void *syncWriteInterlace(void *sarg) { pThreadInfo->threadID, pThreadInfo->totalInsertRows, (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime)); lastPrintTime = currentPrintTime; - lastTotalInsertRows = pThreadInfo->totalInsertRows; + lastTotalInsertRows = pThreadInfo->totalInsertRows; } } free_of_interlace: @@ -1891,7 +1903,7 @@ static void *syncWriteInterlace(void *sarg) { static int32_t prepareProgressDataStmt( threadInfo *pThreadInfo, SChildTable *childTbl, - int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) { + int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1, int64_t *delay2, int64_t *delay3) { SSuperTable *stbInfo = pThreadInfo->stbInfo; char escapedTbName[TSDB_TABLE_NAME_LEN + 2] = "\0"; if (g_arguments->escape_character) { @@ -1901,6 +1913,7 @@ static int32_t prepareProgressDataStmt( snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s", childTbl->name); } + int64_t start = toolsGetTimestampUs(); if (taos_stmt_set_tbname(pThreadInfo->conn->stmt, escapedTbName)) { errorPrint( @@ -1909,13 +1922,14 @@ static int32_t prepareProgressDataStmt( taos_stmt_errstr(pThreadInfo->conn->stmt)); return -1; } + *delay1 = toolsGetTimestampUs() - start; int32_t n =0; int32_t generated = bindParamBatch( pThreadInfo, (g_arguments->reqPerReq > (stbInfo->insertRows - i)) ? (stbInfo->insertRows - i) : g_arguments->reqPerReq, - *timestamp, childTbl, pkCur, pkCnt, &n); + *timestamp, childTbl, pkCur, pkCnt, &n, delay2, delay3); *timestamp += n * stbInfo->timestamp_step; return generated; } @@ -2398,6 +2412,9 @@ void *syncWriteProgressive(void *sarg) { int32_t pos = 0; int32_t pkCur = 0; // record generate same timestamp current count int32_t pkCnt = 0; // record generate same timestamp count + int64_t delay1 = 0; + int64_t delay2 = 0; + int64_t delay3 = 0; if (stmt) { taos_stmt_close(pThreadInfo->conn->stmt); pThreadInfo->conn->stmt = taos_stmt_init(pThreadInfo->conn->taos); @@ -2457,7 +2474,7 @@ void *syncWriteProgressive(void *sarg) { case STMT_IFACE: { generated = prepareProgressDataStmt( pThreadInfo, - childTbl, ×tamp, i, ttl, &pkCur, &pkCnt); + childTbl, ×tamp, i, ttl, &pkCur, &pkCnt, &delay1, &delay2, &delay3); break; } case SML_REST_IFACE: @@ -2597,7 +2614,8 @@ void *syncWriteProgressive(void *sarg) { break; } - int64_t delay = endTs - startTs; + int64_t delay4 = endTs - startTs; + int64_t delay = delay1 + delay2 + delay3 + delay4; if (delay <= 0) { debugPrint("thread[%d]: startTs: %"PRId64", endTs: %"PRId64"\n", pThreadInfo->threadID, startTs, endTs); @@ -2611,7 +2629,11 @@ void *syncWriteProgressive(void *sarg) { tmfree(pDelay); } pThreadInfo->totalDelay += delay; + pThreadInfo->totalDelay1 += delay1; + pThreadInfo->totalDelay2 += delay2; + pThreadInfo->totalDelay3 += delay3; } + delay1 = delay2 = delay3 = 0; int64_t currentPrintTime = toolsGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30 * 1000) { @@ -2991,6 +3013,9 @@ static void preProcessArgument(SSuperTable *stbInfo) { static int printTotalDelay(SDataBase *database, int64_t totalDelay, + int64_t totalDelay1, + int64_t totalDelay2, + int64_t totalDelay3, BArray *total_delay_list, int threads, int64_t totalInsertRows, @@ -3000,9 +3025,17 @@ static int printTotalDelay(SDataBase *database, return -1; } - succPrint("Spent %.6f (real %.6f) seconds to insert rows: %" PRIu64 + char subDelay[128] = ""; + if(totalDelay1 + totalDelay2 + totalDelay3 > 0) { + sprintf(subDelay, "delay1=%.2f delay2=%.2f delay3=%.2f", + totalDelay1/threads/1E6, + totalDelay2/threads/1E6, + totalDelay3/threads/1E6); + } + + succPrint("Spent %.6f ( real %.6f %s) seconds to insert rows: %" PRIu64 " with %d thread(s) into %s %.2f (real %.2f) records/second\n", - (end - start)/1E6, totalDelay/threads/1E6, totalInsertRows, threads, + (end - start)/1E6, totalDelay/threads/1E6, subDelay, totalInsertRows, threads, database->dbName, (double)(totalInsertRows / ((end - start)/1E6)), (double)(totalInsertRows / (totalDelay/threads/1E6))); @@ -3591,6 +3624,9 @@ static int startMultiThreadInsertData(SDataBase* database, BArray * total_delay_list = benchArrayInit(1, sizeof(int64_t)); int64_t totalDelay = 0; + int64_t totalDelay1 = 0; + int64_t totalDelay2 = 0; + int64_t totalDelay3 = 0; uint64_t totalInsertRows = 0; // free threads resource @@ -3682,6 +3718,9 @@ static int startMultiThreadInsertData(SDataBase* database, } totalInsertRows += pThreadInfo->totalInsertRows; totalDelay += pThreadInfo->totalDelay; + totalDelay1 += pThreadInfo->totalDelay1; + totalDelay2 += pThreadInfo->totalDelay2; + totalDelay3 += pThreadInfo->totalDelay3; benchArrayAddBatch(total_delay_list, pThreadInfo->delayList->pData, pThreadInfo->delayList->size); tmfree(pThreadInfo->delayList); @@ -3702,9 +3741,8 @@ static int startMultiThreadInsertData(SDataBase* database, free(pids); free(infos); - int ret = printTotalDelay(database, totalDelay, - total_delay_list, threads, - totalInsertRows, start, end); + int ret = printTotalDelay(database, totalDelay, totalDelay1, totalDelay2, totalDelay3, + total_delay_list, threads, totalInsertRows, start, end); benchArrayDestroy(total_delay_list); if (g_fail || ret) { return -1; diff --git a/src/benchMain.c b/src/benchMain.c index 46b461c6..04f6a03c 100644 --- a/src/benchMain.c +++ b/src/benchMain.c @@ -99,11 +99,17 @@ int main(int argc, char* argv[]) { modifyArgument(); } - g_arguments->fpOfInsertResult = fopen(g_arguments->output_file, "a"); - if (NULL == g_arguments->fpOfInsertResult) { - errorPrint("failed to open %s for save result\n", - g_arguments->output_file); + if(_arguments->output_file[0] == 0) { + infoPrint("%s","result_file is empty, ignore output."); + g_arguments->fpOfInsertResult = NULL; + } else { + g_arguments->fpOfInsertResult = fopen(g_arguments->output_file, "a"); + if (NULL == g_arguments->fpOfInsertResult) { + errorPrint("failed to open %s for save result\n", + g_arguments->output_file); + } } + infoPrint("client version: %s\n", taos_get_client_info()); if (g_arguments->test_mode == INSERT_TEST) { From 7cf021468cf0aa4ad916145ed4ae8025ad8b350a Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 15 May 2024 20:30:16 +0800 Subject: [PATCH 2/2] fix: build error --- src/benchMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/benchMain.c b/src/benchMain.c index 04f6a03c..82e45ddb 100644 --- a/src/benchMain.c +++ b/src/benchMain.c @@ -99,7 +99,7 @@ int main(int argc, char* argv[]) { modifyArgument(); } - if(_arguments->output_file[0] == 0) { + if(g_arguments->output_file[0] == 0) { infoPrint("%s","result_file is empty, ignore output."); g_arguments->fpOfInsertResult = NULL; } else {