Skip to content

Commit

Permalink
fix: taosbenchmark ctrl-c (#545)
Browse files Browse the repository at this point in the history
* fix: taosbenchmark ctrl-c

* fix: optimize query callback to handle terminate signal
  • Loading branch information
sangshuduo committed Jan 8, 2023
1 parent 18d063a commit 94d6895
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 42 deletions.
1 change: 1 addition & 0 deletions inc/bench.h
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ void printVersion();
int32_t benchParseSingleOpt(int32_t key, char* arg);

void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res);
void printWarnCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res);

#ifndef LINUX
int32_t benchParseArgsNoArgp(int argc, char* argv[]);
Expand Down
93 changes: 55 additions & 38 deletions src/benchInsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ static int getSuperTableFromServerTaosc(
res = taos_query(conn->taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
printErrCmdCodeStr(command, code, res);
printWarnCmdCodeStr(command, code, res);
infoPrint("stable %s does not exist, will create one\n",
stbInfo->stbName);
close_bench_conn(conn);
Expand Down Expand Up @@ -115,7 +115,7 @@ static int getSuperTableFromServer(SDataBase* database, SSuperTable* stbInfo) {
int ret = 0;

char command[SQL_BUFF_LEN] = "\0";
snprintf(command, SQL_BUFF_LEN, "describe %s.`%s`", database->dbName,
snprintf(command, SQL_BUFF_LEN, "DESCRIBE %s.`%s`", database->dbName,
stbInfo->stbName);

if (REST_IFACE == stbInfo->iface) {
Expand Down Expand Up @@ -751,11 +751,13 @@ static int startMultiThreadCreateChildTable(
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->tables_created = 0;
pthread_create(pids + i, NULL, createTable, pThreadInfo);
if (!g_arguments->terminate)
pthread_create(pids + i, NULL, createTable, pThreadInfo);
}

for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
if (!g_arguments->terminate)
pthread_join(pids[i], NULL);
}

for (int i = 0; i < threads; i++) {
Expand Down Expand Up @@ -1314,13 +1316,14 @@ static void *syncWriteInterlace(void *sarg) {
}
free_of_interlace:
if (0 == pThreadInfo->totalDelay) pThreadInfo->totalDelay = 1;

succPrint(
"thread[%d] completed total inserted rows: %" PRIu64
"thread[%d] %s(), completed total inserted rows: %" PRIu64
", %.2f records/second\n",
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->threadID,
__func__,
pThreadInfo->totalInsertRows,
(double)(pThreadInfo->totalInsertRows /
((double)pThreadInfo->totalDelay / 1E6)));
((double)pThreadInfo->totalDelay / 1E6)));
return NULL;
}

Expand Down Expand Up @@ -1451,7 +1454,8 @@ void *syncWriteProgressive(void *sarg) {
disorderRange = stbInfo->disorderRange;
}
disorderTs = startTimestamp - disorderRange;
debugPrint("rand_num: %d, < disorderRatio: %d, disorderTs: %"PRId64"\n",
debugPrint("rand_num: %d, < disorderRatio:"
" %d, disorderTs: %"PRId64"\n",
rand_num, stbInfo->disorderRatio, disorderTs);
}
}
Expand Down Expand Up @@ -1638,11 +1642,13 @@ void *syncWriteProgressive(void *sarg) {
free_of_progressive:
if (0 == pThreadInfo->totalDelay) pThreadInfo->totalDelay = 1;
succPrint(
"thread[%d] completed total inserted rows: %" PRIu64
"thread[%d] %s(), completed total inserted rows: %" PRIu64
", %.2f records/second\n",
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->threadID,
__func__,
pThreadInfo->totalInsertRows,
(double)(pThreadInfo->totalInsertRows /
((double)pThreadInfo->totalDelay / 1E6)));
((double)pThreadInfo->totalDelay / 1E6)));
return NULL;
}

Expand Down Expand Up @@ -1853,8 +1859,8 @@ static int parseBufferToStmtBatch(

static int startMultiThreadInsertData(SDataBase* database,
SSuperTable* stbInfo) {
if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE) &&
!stbInfo->use_metric) {
if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE)
&& !stbInfo->use_metric) {
errorPrint("%s", "schemaless cannot work without stable\n");
return -1;
}
Expand Down Expand Up @@ -2261,16 +2267,21 @@ static int startMultiThreadInsertData(SDataBase* database,
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
if (stbInfo->interlaceRows > 0) {
pthread_create(pids + i, NULL, syncWriteInterlace, pThreadInfo);
if (!g_arguments->terminate)
pthread_create(pids + i, NULL,
syncWriteInterlace, pThreadInfo);
} else {
pthread_create(pids + i, NULL, syncWriteProgressive, pThreadInfo);
if (!g_arguments->terminate)
pthread_create(pids + i, NULL,
syncWriteProgressive, pThreadInfo);
}
}

int64_t start = toolsGetTimestampUs();

for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
if (!g_arguments->terminate)
pthread_join(pids[i], NULL);
}

int64_t end = toolsGetTimestampUs()+1;
Expand Down Expand Up @@ -2341,32 +2352,36 @@ static int startMultiThreadInsertData(SDataBase* database,
free(infos);

succPrint("Spent %.6f seconds to insert rows: %" PRIu64
" with %d thread(s) into %s %.2f records/second\n",
(end - start)/1E6, totalInsertRows, threads,
database->dbName,
(double)(totalInsertRows / ((end - start)/1E6)));
" with %d thread(s) into %s %.2f records/second\n",
(end - start)/1E6, totalInsertRows, threads,
database->dbName,
(double)(totalInsertRows / ((end - start)/1E6)));
if (!total_delay_list->size) {
benchArrayDestroy(total_delay_list);
return -1;
}

succPrint("insert delay, "
"min: %.4fms, "
"avg: %.4fms, "
"p90: %.4fms, "
"p95: %.4fms, "
"p99: %.4fms, "
"max: %.4fms\n",
*(int64_t *)(benchArrayGet(total_delay_list, 0))/1E3,
(double)totalDelay/total_delay_list->size/1E3,
*(int64_t *)(benchArrayGet(total_delay_list,
(int32_t)(total_delay_list->size * 0.9)))/1E3,
*(int64_t *)(benchArrayGet(total_delay_list,
(int32_t)(total_delay_list->size * 0.95)))/1E3,
*(int64_t *)(benchArrayGet(total_delay_list,
(int32_t)(total_delay_list->size * 0.99)))/1E3,
*(int64_t *)(benchArrayGet(total_delay_list,
(int32_t)(total_delay_list->size - 1)))/1E3);
"min: %.4fms, "
"avg: %.4fms, "
"p90: %.4fms, "
"p95: %.4fms, "
"p99: %.4fms, "
"max: %.4fms\n",
*(int64_t *)(benchArrayGet(total_delay_list, 0))/1E3,
(double)totalDelay/total_delay_list->size/1E3,
*(int64_t *)(benchArrayGet(total_delay_list,
(int32_t)(total_delay_list->size
* 0.9)))/1E3,
*(int64_t *)(benchArrayGet(total_delay_list,
(int32_t)(total_delay_list->size
* 0.95)))/1E3,
*(int64_t *)(benchArrayGet(total_delay_list,
(int32_t)(total_delay_list->size
* 0.99)))/1E3,
*(int64_t *)(benchArrayGet(total_delay_list,
(int32_t)(total_delay_list->size
- 1)))/1E3);

benchArrayDestroy(total_delay_list);
if (g_fail) {
Expand Down Expand Up @@ -2574,7 +2589,9 @@ int insertTestProcess() {
pThreadInfo->dbName = database->dbName;
pThreadInfo->stbName = stbInfo->stbName;
pThreadInfo->tsmas = stbInfo->tsmas;
pthread_create(&tsmas_pid, NULL, create_tsmas, pThreadInfo);
if (!g_arguments->terminate)
pthread_create(&tsmas_pid, NULL,
create_tsmas, pThreadInfo);
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions src/benchMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,23 @@ int g_majorVersionOfClient = 0;

#ifdef LINUX
void benchQueryInterruptHandler(int32_t signum, void* sigingo, void* context) {
infoPrint("%s", "Receive SIGINT or other signal, quit taosBenchmark\n");
sem_post(&g_arguments->cancelSem);
}

void* benchCancelHandler(void* arg) {
if (bsem_wait(&g_arguments->cancelSem) != 0) {
toolsMsleep(10);
}
infoPrint("%s", "Receive SIGINT or other signal, quit taosBenchmark\n");
if(g_arguments->in_prompt) {

g_arguments->terminate = true;
toolsMsleep(10);

if (INSERT_TEST != g_arguments->test_mode) {
postFreeResource();
toolsMsleep(10);
exit(EXIT_SUCCESS);
}
g_arguments->terminate = true;
return NULL;
}
#endif
Expand Down
7 changes: 6 additions & 1 deletion src/benchQuery.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ extern int g_majorVersionOfClient;

int selectAndGetResult(threadInfo *pThreadInfo, char *command) {
int ret = 0;

if (g_arguments->terminate) {
return -1;
}
uint32_t threadID = pThreadInfo->threadID;
char dbName[TSDB_DB_NAME_LEN] = {0};
tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
Expand Down Expand Up @@ -330,7 +334,8 @@ static int multi_thread_super_table_query(uint16_t iface, char* dbName) {
g_queryInfo.superQueryInfo.threadCnt = threads;

for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL);
if (!g_arguments->terminate)
pthread_join(pidsOfSub[i], NULL);
threadInfo *pThreadInfo = infosOfSub + i;
if (iface == REST_IFACE) {
destroySockFd(pThreadInfo->sockfd);
Expand Down
5 changes: 5 additions & 0 deletions src/benchUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -1133,3 +1133,8 @@ FORCE_INLINE void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) {
taos_free_result(res);
}

FORCE_INLINE void printWarnCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) {
warnPrint("failed to run command %s, code: 0x%08x, reason: %s\n",
cmd, code, taos_errstr(res));
taos_free_result(res);
}

0 comments on commit 94d6895

Please sign in to comment.