From 94d68955f8b708a7d0113d101ae1ddd5c55c9adf Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sun, 8 Jan 2023 19:34:10 +0800 Subject: [PATCH] fix: taosbenchmark ctrl-c (#545) * fix: taosbenchmark ctrl-c * fix: optimize query callback to handle terminate signal --- inc/bench.h | 1 + src/benchInsert.c | 93 ++++++++++++++++++++++++++++------------------- src/benchMain.c | 11 ++++-- src/benchQuery.c | 7 +++- src/benchUtil.c | 5 +++ 5 files changed, 75 insertions(+), 42 deletions(-) diff --git a/inc/bench.h b/inc/bench.h index 88de21f4..86a26839 100644 --- a/inc/bench.h +++ b/inc/bench.h @@ -837,6 +837,7 @@ void printVersion(); int32_t benchParseSingleOpt(int32_t key, char* arg); void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res); +void printWarnCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res); #ifndef LINUX int32_t benchParseArgsNoArgp(int argc, char* argv[]); diff --git a/src/benchInsert.c b/src/benchInsert.c index 878eebfa..daab20d9 100644 --- a/src/benchInsert.c +++ b/src/benchInsert.c @@ -55,7 +55,7 @@ static int getSuperTableFromServerTaosc( res = taos_query(conn->taos, command); int32_t code = taos_errno(res); if (code != 0) { - printErrCmdCodeStr(command, code, res); + printWarnCmdCodeStr(command, code, res); infoPrint("stable %s does not exist, will create one\n", stbInfo->stbName); close_bench_conn(conn); @@ -115,7 +115,7 @@ static int getSuperTableFromServer(SDataBase* database, SSuperTable* stbInfo) { int ret = 0; char command[SQL_BUFF_LEN] = "\0"; - snprintf(command, SQL_BUFF_LEN, "describe %s.`%s`", database->dbName, + snprintf(command, SQL_BUFF_LEN, "DESCRIBE %s.`%s`", database->dbName, stbInfo->stbName); if (REST_IFACE == stbInfo->iface) { @@ -751,11 +751,13 @@ static int startMultiThreadCreateChildTable( pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; tableFrom = pThreadInfo->end_table_to + 1; pThreadInfo->tables_created = 0; - pthread_create(pids + i, NULL, createTable, pThreadInfo); + if (!g_arguments->terminate) + pthread_create(pids + i, NULL, createTable, pThreadInfo); } for (int i = 0; i < threads; i++) { - pthread_join(pids[i], NULL); + if (!g_arguments->terminate) + pthread_join(pids[i], NULL); } for (int i = 0; i < threads; i++) { @@ -1314,13 +1316,14 @@ static void *syncWriteInterlace(void *sarg) { } free_of_interlace: if (0 == pThreadInfo->totalDelay) pThreadInfo->totalDelay = 1; - succPrint( - "thread[%d] completed total inserted rows: %" PRIu64 + "thread[%d] %s(), completed total inserted rows: %" PRIu64 ", %.2f records/second\n", - pThreadInfo->threadID, pThreadInfo->totalInsertRows, + pThreadInfo->threadID, + __func__, + pThreadInfo->totalInsertRows, (double)(pThreadInfo->totalInsertRows / - ((double)pThreadInfo->totalDelay / 1E6))); + ((double)pThreadInfo->totalDelay / 1E6))); return NULL; } @@ -1451,7 +1454,8 @@ void *syncWriteProgressive(void *sarg) { disorderRange = stbInfo->disorderRange; } disorderTs = startTimestamp - disorderRange; - debugPrint("rand_num: %d, < disorderRatio: %d, disorderTs: %"PRId64"\n", + debugPrint("rand_num: %d, < disorderRatio:" + " %d, disorderTs: %"PRId64"\n", rand_num, stbInfo->disorderRatio, disorderTs); } } @@ -1638,11 +1642,13 @@ void *syncWriteProgressive(void *sarg) { free_of_progressive: if (0 == pThreadInfo->totalDelay) pThreadInfo->totalDelay = 1; succPrint( - "thread[%d] completed total inserted rows: %" PRIu64 + "thread[%d] %s(), completed total inserted rows: %" PRIu64 ", %.2f records/second\n", - pThreadInfo->threadID, pThreadInfo->totalInsertRows, + pThreadInfo->threadID, + __func__, + pThreadInfo->totalInsertRows, (double)(pThreadInfo->totalInsertRows / - ((double)pThreadInfo->totalDelay / 1E6))); + ((double)pThreadInfo->totalDelay / 1E6))); return NULL; } @@ -1853,8 +1859,8 @@ static int parseBufferToStmtBatch( static int startMultiThreadInsertData(SDataBase* database, SSuperTable* stbInfo) { - if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE) && - !stbInfo->use_metric) { + if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE) + && !stbInfo->use_metric) { errorPrint("%s", "schemaless cannot work without stable\n"); return -1; } @@ -2261,16 +2267,21 @@ static int startMultiThreadInsertData(SDataBase* database, for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infos + i; if (stbInfo->interlaceRows > 0) { - pthread_create(pids + i, NULL, syncWriteInterlace, pThreadInfo); + if (!g_arguments->terminate) + pthread_create(pids + i, NULL, + syncWriteInterlace, pThreadInfo); } else { - pthread_create(pids + i, NULL, syncWriteProgressive, pThreadInfo); + if (!g_arguments->terminate) + pthread_create(pids + i, NULL, + syncWriteProgressive, pThreadInfo); } } int64_t start = toolsGetTimestampUs(); for (int i = 0; i < threads; i++) { - pthread_join(pids[i], NULL); + if (!g_arguments->terminate) + pthread_join(pids[i], NULL); } int64_t end = toolsGetTimestampUs()+1; @@ -2341,32 +2352,36 @@ static int startMultiThreadInsertData(SDataBase* database, free(infos); succPrint("Spent %.6f seconds to insert rows: %" PRIu64 - " with %d thread(s) into %s %.2f records/second\n", - (end - start)/1E6, totalInsertRows, threads, - database->dbName, - (double)(totalInsertRows / ((end - start)/1E6))); + " with %d thread(s) into %s %.2f records/second\n", + (end - start)/1E6, totalInsertRows, threads, + database->dbName, + (double)(totalInsertRows / ((end - start)/1E6))); if (!total_delay_list->size) { benchArrayDestroy(total_delay_list); return -1; } succPrint("insert delay, " - "min: %.4fms, " - "avg: %.4fms, " - "p90: %.4fms, " - "p95: %.4fms, " - "p99: %.4fms, " - "max: %.4fms\n", - *(int64_t *)(benchArrayGet(total_delay_list, 0))/1E3, - (double)totalDelay/total_delay_list->size/1E3, - *(int64_t *)(benchArrayGet(total_delay_list, - (int32_t)(total_delay_list->size * 0.9)))/1E3, - *(int64_t *)(benchArrayGet(total_delay_list, - (int32_t)(total_delay_list->size * 0.95)))/1E3, - *(int64_t *)(benchArrayGet(total_delay_list, - (int32_t)(total_delay_list->size * 0.99)))/1E3, - *(int64_t *)(benchArrayGet(total_delay_list, - (int32_t)(total_delay_list->size - 1)))/1E3); + "min: %.4fms, " + "avg: %.4fms, " + "p90: %.4fms, " + "p95: %.4fms, " + "p99: %.4fms, " + "max: %.4fms\n", + *(int64_t *)(benchArrayGet(total_delay_list, 0))/1E3, + (double)totalDelay/total_delay_list->size/1E3, + *(int64_t *)(benchArrayGet(total_delay_list, + (int32_t)(total_delay_list->size + * 0.9)))/1E3, + *(int64_t *)(benchArrayGet(total_delay_list, + (int32_t)(total_delay_list->size + * 0.95)))/1E3, + *(int64_t *)(benchArrayGet(total_delay_list, + (int32_t)(total_delay_list->size + * 0.99)))/1E3, + *(int64_t *)(benchArrayGet(total_delay_list, + (int32_t)(total_delay_list->size + - 1)))/1E3); benchArrayDestroy(total_delay_list); if (g_fail) { @@ -2574,7 +2589,9 @@ int insertTestProcess() { pThreadInfo->dbName = database->dbName; pThreadInfo->stbName = stbInfo->stbName; pThreadInfo->tsmas = stbInfo->tsmas; - pthread_create(&tsmas_pid, NULL, create_tsmas, pThreadInfo); + if (!g_arguments->terminate) + pthread_create(&tsmas_pid, NULL, + create_tsmas, pThreadInfo); } } } diff --git a/src/benchMain.c b/src/benchMain.c index 2fbeed33..27a599a0 100644 --- a/src/benchMain.c +++ b/src/benchMain.c @@ -24,6 +24,7 @@ int g_majorVersionOfClient = 0; #ifdef LINUX void benchQueryInterruptHandler(int32_t signum, void* sigingo, void* context) { + infoPrint("%s", "Receive SIGINT or other signal, quit taosBenchmark\n"); sem_post(&g_arguments->cancelSem); } @@ -31,11 +32,15 @@ void* benchCancelHandler(void* arg) { if (bsem_wait(&g_arguments->cancelSem) != 0) { toolsMsleep(10); } - infoPrint("%s", "Receive SIGINT or other signal, quit taosBenchmark\n"); - if(g_arguments->in_prompt) { + + g_arguments->terminate = true; + toolsMsleep(10); + + if (INSERT_TEST != g_arguments->test_mode) { + postFreeResource(); + toolsMsleep(10); exit(EXIT_SUCCESS); } - g_arguments->terminate = true; return NULL; } #endif diff --git a/src/benchQuery.c b/src/benchQuery.c index 744598a4..ef0880f5 100644 --- a/src/benchQuery.c +++ b/src/benchQuery.c @@ -15,6 +15,10 @@ extern int g_majorVersionOfClient; int selectAndGetResult(threadInfo *pThreadInfo, char *command) { int ret = 0; + + if (g_arguments->terminate) { + return -1; + } uint32_t threadID = pThreadInfo->threadID; char dbName[TSDB_DB_NAME_LEN] = {0}; tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN); @@ -330,7 +334,8 @@ static int multi_thread_super_table_query(uint16_t iface, char* dbName) { g_queryInfo.superQueryInfo.threadCnt = threads; for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { - pthread_join(pidsOfSub[i], NULL); + if (!g_arguments->terminate) + pthread_join(pidsOfSub[i], NULL); threadInfo *pThreadInfo = infosOfSub + i; if (iface == REST_IFACE) { destroySockFd(pThreadInfo->sockfd); diff --git a/src/benchUtil.c b/src/benchUtil.c index 3e643883..af916625 100644 --- a/src/benchUtil.c +++ b/src/benchUtil.c @@ -1133,3 +1133,8 @@ FORCE_INLINE void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) { taos_free_result(res); } +FORCE_INLINE void printWarnCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) { + warnPrint("failed to run command %s, code: 0x%08x, reason: %s\n", + cmd, code, taos_errstr(res)); + taos_free_result(res); +}