Skip to content

Commit

Permalink
Merge pull request #741 from taosdata/TD-29996
Browse files Browse the repository at this point in the history
add stmt delay1 delay2 delay3 feature
  • Loading branch information
DuanKuanJun committed May 16, 2024
2 parents 876cf83 + 7cf0214 commit 572cd96
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 19 deletions.
3 changes: 3 additions & 0 deletions inc/bench.h
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,9 @@ typedef struct SThreadInfo_S {
uint64_t totalInsertRows;
uint64_t totalQueried;
int64_t totalDelay;
int64_t totalDelay1;
int64_t totalDelay2;
int64_t totalDelay3;
uint64_t querySeq;
TAOS_SUB *tsub;
char ** lines;
Expand Down
3 changes: 2 additions & 1 deletion inc/benchData.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ int generateRandData(SSuperTable *stbInfo, char *sampleDataBuf,
bool tag, BArray *childCols);
int prepareStmt(SSuperTable *stbInfo, TAOS_STMT *stmt, char* tagData, uint64_t tableSeq);
uint32_t bindParamBatch(threadInfo *pThreadInfo,
uint32_t batch, int64_t startTime, SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n);
uint32_t batch, int64_t startTime,
SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n, int64_t *delay2, int64_t *delay3);
int prepareSampleData(SDataBase* database, SSuperTable* stbInfo);
void generateSmlJsonTags(tools_cJSON *tagsList,
char **sml_tags_json_array,
Expand Down
6 changes: 5 additions & 1 deletion src/benchData.c
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio,

uint32_t bindParamBatch(threadInfo *pThreadInfo,
uint32_t batch, int64_t startTime,
SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n) {
SChildTable *childTbl, int32_t *pkCur, int32_t *pkCnt, int32_t *n, int64_t *delay2, int64_t *delay3) {
TAOS_STMT *stmt = pThreadInfo->conn->stmt;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
uint32_t columnCount = stbInfo->cols->size;
Expand Down Expand Up @@ -1858,12 +1858,14 @@ uint32_t bindParamBatch(threadInfo *pThreadInfo,
}
}

int64_t start = toolsGetTimestampUs();
if (taos_stmt_bind_param_batch(
stmt, (TAOS_MULTI_BIND *)pThreadInfo->bindParams)) {
errorPrint("taos_stmt_bind_param_batch() failed! reason: %s\n",
taos_stmt_errstr(stmt));
return 0;
}
*delay2 += toolsGetTimestampUs() - start;

for (int c = 0; c < stbInfo->cols->size + 1; c++) {
TAOS_MULTI_BIND *param =
Expand All @@ -1873,11 +1875,13 @@ uint32_t bindParamBatch(threadInfo *pThreadInfo,
}

// if msg > 3MB, break
start = toolsGetTimestampUs();
if (taos_stmt_add_batch(stmt)) {
errorPrint("taos_stmt_add_batch() failed! reason: %s\n",
taos_stmt_errstr(stmt));
return 0;
}
*delay3 += toolsGetTimestampUs() - start;
return batch;
}

Expand Down
64 changes: 51 additions & 13 deletions src/benchInsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,10 @@ static void *syncWriteInterlace(void *sarg) {
if (stbInfo->autoTblCreating) {
csvFile = openTagCsv(stbInfo);
tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
}
}
int64_t delay1 = 0;
int64_t delay2 = 0;
int64_t delay3 = 0;

while (insertRows > 0) {
int64_t tmp_total_insert_rows = 0;
Expand Down Expand Up @@ -1693,6 +1696,7 @@ static void *syncWriteInterlace(void *sarg) {
snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
tableName);
}
int64_t start = toolsGetTimestampUs();
if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
escapedTbName)) {
errorPrint(
Expand All @@ -1702,9 +1706,12 @@ static void *syncWriteInterlace(void *sarg) {
g_fail = true;
goto free_of_interlace;
}
delay1 += toolsGetTimestampUs() - start;

int32_t n = 0;
generated = bindParamBatch(pThreadInfo, interlaceRows,
childTbl->ts, childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n);
childTbl->ts, childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n, &delay2, &delay3);

childTbl->ts += stbInfo->timestamp_step * n;
break;
}
Expand Down Expand Up @@ -1852,7 +1859,8 @@ static void *syncWriteInterlace(void *sarg) {
break;
}

int64_t delay = endTs - startTs;
int64_t delay4 = endTs - startTs;
int64_t delay = delay1 + delay2 + delay3 + delay4;
if (delay <=0) {
debugPrint("thread[%d]: startTS: %"PRId64", endTS: %"PRId64"\n",
pThreadInfo->threadID, startTs, endTs);
Expand All @@ -1866,7 +1874,11 @@ static void *syncWriteInterlace(void *sarg) {
tmfree(pdelay);
}
pThreadInfo->totalDelay += delay;
pThreadInfo->totalDelay1 += delay1;
pThreadInfo->totalDelay2 += delay2;
pThreadInfo->totalDelay3 += delay3;
}
delay1 = delay2 = delay3 = 0;

int64_t currentPrintTime = toolsGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30 * 1000) {
Expand All @@ -1876,7 +1888,7 @@ static void *syncWriteInterlace(void *sarg) {
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
(double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
lastPrintTime = currentPrintTime;
lastTotalInsertRows = pThreadInfo->totalInsertRows;
lastTotalInsertRows = pThreadInfo->totalInsertRows;
}
}
free_of_interlace:
Expand All @@ -1891,7 +1903,7 @@ static void *syncWriteInterlace(void *sarg) {
static int32_t prepareProgressDataStmt(
threadInfo *pThreadInfo,
SChildTable *childTbl,
int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1, int64_t *delay2, int64_t *delay3) {
SSuperTable *stbInfo = pThreadInfo->stbInfo;
char escapedTbName[TSDB_TABLE_NAME_LEN + 2] = "\0";
if (g_arguments->escape_character) {
Expand All @@ -1901,6 +1913,7 @@ static int32_t prepareProgressDataStmt(
snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
childTbl->name);
}
int64_t start = toolsGetTimestampUs();
if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
escapedTbName)) {
errorPrint(
Expand All @@ -1909,13 +1922,14 @@ static int32_t prepareProgressDataStmt(
taos_stmt_errstr(pThreadInfo->conn->stmt));
return -1;
}
*delay1 = toolsGetTimestampUs() - start;
int32_t n =0;
int32_t generated = bindParamBatch(
pThreadInfo,
(g_arguments->reqPerReq > (stbInfo->insertRows - i))
? (stbInfo->insertRows - i)
: g_arguments->reqPerReq,
*timestamp, childTbl, pkCur, pkCnt, &n);
*timestamp, childTbl, pkCur, pkCnt, &n, delay2, delay3);
*timestamp += n * stbInfo->timestamp_step;
return generated;
}
Expand Down Expand Up @@ -2398,6 +2412,9 @@ void *syncWriteProgressive(void *sarg) {
int32_t pos = 0;
int32_t pkCur = 0; // record generate same timestamp current count
int32_t pkCnt = 0; // record generate same timestamp count
int64_t delay1 = 0;
int64_t delay2 = 0;
int64_t delay3 = 0;
if (stmt) {
taos_stmt_close(pThreadInfo->conn->stmt);
pThreadInfo->conn->stmt = taos_stmt_init(pThreadInfo->conn->taos);
Expand Down Expand Up @@ -2457,7 +2474,7 @@ void *syncWriteProgressive(void *sarg) {
case STMT_IFACE: {
generated = prepareProgressDataStmt(
pThreadInfo,
childTbl, &timestamp, i, ttl, &pkCur, &pkCnt);
childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1, &delay2, &delay3);
break;
}
case SML_REST_IFACE:
Expand Down Expand Up @@ -2597,7 +2614,8 @@ void *syncWriteProgressive(void *sarg) {
break;
}

int64_t delay = endTs - startTs;
int64_t delay4 = endTs - startTs;
int64_t delay = delay1 + delay2 + delay3 + delay4;
if (delay <= 0) {
debugPrint("thread[%d]: startTs: %"PRId64", endTs: %"PRId64"\n",
pThreadInfo->threadID, startTs, endTs);
Expand All @@ -2611,7 +2629,11 @@ void *syncWriteProgressive(void *sarg) {
tmfree(pDelay);
}
pThreadInfo->totalDelay += delay;
pThreadInfo->totalDelay1 += delay1;
pThreadInfo->totalDelay2 += delay2;
pThreadInfo->totalDelay3 += delay3;
}
delay1 = delay2 = delay3 = 0;

int64_t currentPrintTime = toolsGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30 * 1000) {
Expand Down Expand Up @@ -2991,6 +3013,9 @@ static void preProcessArgument(SSuperTable *stbInfo) {

static int printTotalDelay(SDataBase *database,
int64_t totalDelay,
int64_t totalDelay1,
int64_t totalDelay2,
int64_t totalDelay3,
BArray *total_delay_list,
int threads,
int64_t totalInsertRows,
Expand All @@ -3000,9 +3025,17 @@ static int printTotalDelay(SDataBase *database,
return -1;
}

succPrint("Spent %.6f (real %.6f) seconds to insert rows: %" PRIu64
char subDelay[128] = "";
if(totalDelay1 + totalDelay2 + totalDelay3 > 0) {
sprintf(subDelay, "delay1=%.2f delay2=%.2f delay3=%.2f",
totalDelay1/threads/1E6,
totalDelay2/threads/1E6,
totalDelay3/threads/1E6);
}

succPrint("Spent %.6f ( real %.6f %s) seconds to insert rows: %" PRIu64
" with %d thread(s) into %s %.2f (real %.2f) records/second\n",
(end - start)/1E6, totalDelay/threads/1E6, totalInsertRows, threads,
(end - start)/1E6, totalDelay/threads/1E6, subDelay, totalInsertRows, threads,
database->dbName,
(double)(totalInsertRows / ((end - start)/1E6)),
(double)(totalInsertRows / (totalDelay/threads/1E6)));
Expand Down Expand Up @@ -3591,6 +3624,9 @@ static int startMultiThreadInsertData(SDataBase* database,

BArray * total_delay_list = benchArrayInit(1, sizeof(int64_t));
int64_t totalDelay = 0;
int64_t totalDelay1 = 0;
int64_t totalDelay2 = 0;
int64_t totalDelay3 = 0;
uint64_t totalInsertRows = 0;

// free threads resource
Expand Down Expand Up @@ -3682,6 +3718,9 @@ static int startMultiThreadInsertData(SDataBase* database,
}
totalInsertRows += pThreadInfo->totalInsertRows;
totalDelay += pThreadInfo->totalDelay;
totalDelay1 += pThreadInfo->totalDelay1;
totalDelay2 += pThreadInfo->totalDelay2;
totalDelay3 += pThreadInfo->totalDelay3;
benchArrayAddBatch(total_delay_list, pThreadInfo->delayList->pData,
pThreadInfo->delayList->size);
tmfree(pThreadInfo->delayList);
Expand All @@ -3702,9 +3741,8 @@ static int startMultiThreadInsertData(SDataBase* database,
free(pids);
free(infos);

int ret = printTotalDelay(database, totalDelay,
total_delay_list, threads,
totalInsertRows, start, end);
int ret = printTotalDelay(database, totalDelay, totalDelay1, totalDelay2, totalDelay3,
total_delay_list, threads, totalInsertRows, start, end);
benchArrayDestroy(total_delay_list);
if (g_fail || ret) {
return -1;
Expand Down
14 changes: 10 additions & 4 deletions src/benchMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,17 @@ int main(int argc, char* argv[]) {
modifyArgument();
}

g_arguments->fpOfInsertResult = fopen(g_arguments->output_file, "a");
if (NULL == g_arguments->fpOfInsertResult) {
errorPrint("failed to open %s for save result\n",
g_arguments->output_file);
if(g_arguments->output_file[0] == 0) {
infoPrint("%s","result_file is empty, ignore output.");
g_arguments->fpOfInsertResult = NULL;
} else {
g_arguments->fpOfInsertResult = fopen(g_arguments->output_file, "a");
if (NULL == g_arguments->fpOfInsertResult) {
errorPrint("failed to open %s for save result\n",
g_arguments->output_file);
}
}

infoPrint("client version: %s\n", taos_get_client_info());

if (g_arguments->test_mode == INSERT_TEST) {
Expand Down

0 comments on commit 572cd96

Please sign in to comment.