Skip to content

Commit

Permalink
feat: taosbenchmark supports specifying child table example data
Browse files Browse the repository at this point in the history
  • Loading branch information
sangshuduo committed Feb 27, 2023
1 parent 1e15545 commit d155a9d
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 44 deletions.
18 changes: 12 additions & 6 deletions inc/bench.h
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,19 @@ enum CONTINUE_IF_FAIL_MODE {
SMART_IF_FAILED, // 2
};

typedef struct SChildTable_S {
char childTableName[TSDB_TABLE_NAME_LEN];
bool useOwnSample;
char *sampleDataFilename;
} SChildTable;

typedef struct SSuperTable_S {
char * stbName;
char *stbName;
bool random_data_source; // rand_gen or sample
bool escape_character;
bool use_metric;
char * childTblPrefix;
char *childTblPrefix;
char *childTblSample;
bool childTblExists;
uint64_t childTblCount;
uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in
Expand Down Expand Up @@ -619,9 +625,9 @@ typedef struct SSuperTable_S {
uint32_t fillIntervalDis; // fill Dis interval rows cnt

// binary prefix
char* binaryPrefex;
char *binaryPrefex;
// nchar prefix
char* ncharPrefex;
char *ncharPrefex;

// random write future time
bool useNow;
Expand All @@ -642,12 +648,12 @@ typedef struct SSuperTable_S {
BArray * cols;
BArray * tags;
BArray * tsmas;
char ** childTblName;
SChildTable **childTblArray;
char * colsOfCreateChildTable;
uint32_t lenOfTags;
uint32_t lenOfCols;

char *sampleDataBuf;
char *sampleDataBuf;
bool useSampleTs;
char *tagDataBuf;
bool tcpTransfer;
Expand Down
85 changes: 50 additions & 35 deletions src/benchInsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -1017,18 +1017,22 @@ void postFreeResource() {
col->is_null = NULL;
}
benchArrayDestroy(stbInfo->cols);
if (g_arguments->test_mode == INSERT_TEST &&
stbInfo->insertRows != 0) {
if (g_arguments->test_mode == INSERT_TEST
&& stbInfo->insertRows != 0) {
for (int64_t k = 0; k < stbInfo->childTblCount;
++k) {
if (stbInfo->childTblName) {
tmfree(stbInfo->childTblName[k]);
stbInfo->childTblName[k] = NULL;
if (stbInfo->childTblArray) {
for (int64_t child = 0;
child < stbInfo->childTblCount;
child ++) {
tmfree(stbInfo->childTblArray[child]->sampleDataFilename);
tmfree(stbInfo->childTblArray[child]);
}
}
}
}
tmfree(stbInfo->childTblName);
stbInfo->childTblName = NULL;
tmfree(stbInfo->childTblArray);
stbInfo->childTblArray = NULL;
benchArrayDestroy(stbInfo->tsmas);
#ifdef TD_VER_COMPATIBLE_3_0_0_0
if ((0 == stbInfo->interlaceRows)
Expand Down Expand Up @@ -1320,7 +1324,7 @@ static void *syncWriteInterlace(void *sarg) {
goto free_of_interlace;
}
int64_t timestamp = pThreadInfo->start_time;
char * tableName = stbInfo->childTblName[tableSeq];
char * tableName = stbInfo->childTblArray[tableSeq]->childTableName;
char ttl[TTL_BUFF_LEN] = "";
if (stbInfo->ttl != 0) {
snprintf(ttl, TTL_BUFF_LEN, "TTL %d", stbInfo->ttl);
Expand Down Expand Up @@ -2038,10 +2042,10 @@ void *syncWriteProgressive(void *sarg) {
if (g_arguments->nthreads_auto) {
tableName = pThreadInfo->vg->childTblName[tableSeq];
} else {
tableName = stbInfo->childTblName[tableSeq];
tableName = stbInfo->childTblArray[tableSeq]->childTableName;
}
#else
tableName = stbInfo->childTblName[tableSeq];
tableName = stbInfo->childTblArray[tableSeq]->childTableName;
#endif
int64_t timestamp = pThreadInfo->start_time;
uint64_t len = 0;
Expand Down Expand Up @@ -2443,10 +2447,12 @@ static int parseBufferToStmtBatch(SSuperTable* stbInfo) {
static void fillChildTblNameByCount(SSuperTable *stbInfo) {
for (int64_t i = 0; i < stbInfo->childTblCount; ++i) {
if (stbInfo->escape_character) {
snprintf(stbInfo->childTblName[i], TSDB_TABLE_NAME_LEN,
snprintf(stbInfo->childTblArray[i]->childTableName,
TSDB_TABLE_NAME_LEN,
"`%s%" PRIu64 "`", stbInfo->childTblPrefix, i);
} else {
snprintf(stbInfo->childTblName[i], TSDB_TABLE_NAME_LEN,
snprintf(stbInfo->childTblArray[i]->childTableName,
TSDB_TABLE_NAME_LEN,
"%s%" PRIu64 "", stbInfo->childTblPrefix, i);
}
}
Expand All @@ -2456,11 +2462,11 @@ static int64_t fillChildTblNameByFromTo(SDataBase *database,
SSuperTable* stbInfo) {
for (int64_t i = stbInfo->childTblFrom; i < stbInfo->childTblTo; i++) {
if (stbInfo->escape_character) {
snprintf(stbInfo->childTblName[i-stbInfo->childTblFrom],
snprintf(stbInfo->childTblArray[i-stbInfo->childTblFrom]->childTableName,
TSDB_TABLE_NAME_LEN,
"`%s%" PRIu64 "`", stbInfo->childTblPrefix, i);
} else {
snprintf(stbInfo->childTblName[i-stbInfo->childTblTo],
snprintf(stbInfo->childTblArray[i-stbInfo->childTblTo]->childTableName,
TSDB_TABLE_NAME_LEN,
"%s%" PRIu64 "", stbInfo->childTblPrefix, i);
}
Expand Down Expand Up @@ -2501,12 +2507,12 @@ static int64_t fillChildTblNameByLimitOffset(SDataBase *database,
TAOS_ROW row = NULL;
while ((row = taos_fetch_row(res)) != NULL) {
int *lengths = taos_fetch_lengths(res);
stbInfo->childTblName[count][0] = '`';
strncpy(stbInfo->childTblName[count] + 1, row[0], lengths[0]);
stbInfo->childTblName[count][lengths[0] + 1] = '`';
stbInfo->childTblName[count][lengths[0] + 2] = '\0';
debugPrint("stbInfo->childTblName[%" PRId64 "]: %s\n",
count, stbInfo->childTblName[count]);
stbInfo->childTblArray[count]->childTableName[0] = '`';
strncpy(stbInfo->childTblArray[count]->childTableName+1, row[0], lengths[0]);
stbInfo->childTblArray[count]->childTableName[lengths[0] + 1] = '`';
stbInfo->childTblArray[count]->childTableName[lengths[0] + 2] = '\0';
debugPrint("stbInfo->childTblArray[%" PRId64 "]->childTableName: %s\n",
count, stbInfo->childTblArray[count]->childTableName);
count++;
}
taos_free_result(res);
Expand Down Expand Up @@ -2602,10 +2608,11 @@ static int startMultiThreadInsertData(SDataBase* database,

uint64_t tableFrom = 0;
int64_t ntables = stbInfo->childTblCount;
stbInfo->childTblName = benchCalloc(stbInfo->childTblCount,
sizeof(char *), true);
for (int64_t i = 0; i < stbInfo->childTblCount; ++i) {
stbInfo->childTblName[i] = benchCalloc(1, TSDB_TABLE_NAME_LEN, true);
stbInfo->childTblArray = benchCalloc(stbInfo->childTblCount,
sizeof(SChildTable*), true);
for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
stbInfo->childTblArray[child] =
benchCalloc(1, sizeof(SChildTable), true);
}

if ((stbInfo->iface != SML_IFACE && stbInfo->iface != SML_REST_IFACE)
Expand All @@ -2624,10 +2631,10 @@ static int startMultiThreadInsertData(SDataBase* database,
} else if (stbInfo->childTblCount == 1 && stbInfo->tags->size == 0) {
// Normal table
if (stbInfo->escape_character) {
snprintf(stbInfo->childTblName[0], TSDB_TABLE_NAME_LEN,
snprintf(stbInfo->childTblArray[0]->childTableName, TSDB_TABLE_NAME_LEN,
"`%s`", stbInfo->stbName);
} else {
snprintf(stbInfo->childTblName[0], TSDB_TABLE_NAME_LEN,
snprintf(stbInfo->childTblArray[0]->childTableName, TSDB_TABLE_NAME_LEN,
"%s", stbInfo->stbName);
}
} else {
Expand All @@ -2652,15 +2659,18 @@ static int startMultiThreadInsertData(SDataBase* database,

for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
int vgId;
int ret = taos_get_table_vgId(conn->taos, database->dbName,
stbInfo->childTblName[i], &vgId);
int ret = taos_get_table_vgId(
conn->taos, database->dbName,
stbInfo->childTblArray[i]->childTableName, &vgId);
if (ret < 0) {
errorPrint("Failed to get %s db's %s table's vgId\n",
database->dbName, stbInfo->childTblName[i]);
database->dbName,
stbInfo->childTblArray[i]->childTableName);
return -1;
}
debugPrint("Db %s\'s table\'s %s vgId is: %d\n",
database->dbName, stbInfo->childTblName[i], vgId);
database->dbName,
stbInfo->childTblArray[i]->childTableName, vgId);
for (int32_t v = 0; v < database->vgroups; v ++) {
SVGroup *vg = benchArrayGet(database->vgArray, v);
if (vgId == vg->vgId) {
Expand Down Expand Up @@ -2688,20 +2698,25 @@ static int startMultiThreadInsertData(SDataBase* database,
}
for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
int vgId;
int ret = taos_get_table_vgId(conn->taos, database->dbName,
stbInfo->childTblName[i], &vgId);
int ret = taos_get_table_vgId(
conn->taos, database->dbName,
stbInfo->childTblArray[i]->childTableName, &vgId);
if (ret < 0) {
errorPrint("Failed to get %s db's %s table's vgId\n",
database->dbName, stbInfo->childTblName[i]);
database->dbName,
stbInfo->childTblArray[i]->childTableName);
return -1;
}
debugPrint("Db %s\'s table\'s %s vgId is: %d\n",
database->dbName, stbInfo->childTblName[i], vgId);
database->dbName,
stbInfo->childTblArray[i]->childTableName, vgId);
for (int32_t v = 0; v < database->vgroups; v++) {
SVGroup *vg = benchArrayGet(database->vgArray, v);
if (vgId == vg->vgId) {
strncpy(vg->childTblName[vg->tbOffset],
stbInfo->childTblName[i], TSDB_TABLE_NAME_LEN);
stbInfo->childTblArray[i]->childTableName,
TSDB_TABLE_NAME_LEN);

vg->tbOffset++;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/benchInsertMix.c
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ bool insertDataMix(threadInfo* info, SDataBase* db, SSuperTable* stb) {

// loop insert child tables
for (uint64_t tbIdx = info->start_table_from; tbIdx <= info->end_table_to; ++tbIdx) {
char* tbName = stb->childTblName[tbIdx];
char* tbName = stb->childTblArray[tbIdx]->childTableName;

SMixRatio mixRatio;
mixRatioInit(&mixRatio, stb);
Expand Down
9 changes: 7 additions & 2 deletions src/benchJsonOpt.c
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,11 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) {
if (tools_cJSON_IsString(prefix)) {
superTable->childTblPrefix = prefix->valuestring;
}
tools_cJSON *childTbleSample =
tools_cJSON_GetObjectItem(stbInfo, "childtable_sample_file");
if (tools_cJSON_IsString(childTbleSample)) {
superTable->childTblSample = childTbleSample->valuestring;
}
tools_cJSON *escapeChar =
tools_cJSON_GetObjectItem(stbInfo, "escape_character");
if (tools_cJSON_IsString(escapeChar) &&
Expand Down Expand Up @@ -510,8 +515,8 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) {

tools_cJSON *dataSource =
tools_cJSON_GetObjectItem(stbInfo, "data_source");
if (tools_cJSON_IsString(dataSource) &&
(0 == strcasecmp(dataSource->valuestring, "sample"))) {
if (tools_cJSON_IsString(dataSource)
&& (0 == strcasecmp(dataSource->valuestring, "sample"))) {
superTable->random_data_source = false;
}

Expand Down
4 changes: 4 additions & 0 deletions tests/taosbenchmark/csv/sample_use_ts-stb3.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1641976781445,300
1641976781446,600
1641976781447,900
1641976781448,NULL
4 changes: 4 additions & 0 deletions tests/taosbenchmark/csv/sample_use_ts-stb5.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1641976781445,500
1641976781446,1000
1641976781447,1500
1641976781448,NULL
49 changes: 49 additions & 0 deletions tests/taosbenchmark/json/taosc_sample_use_ts-subtable.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"connection_pool_size": 20,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"prepared_rand": 10,
"chinese": "yes",
"insert_interval": 0,
"num_of_records_per_req": 10,
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes"
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb_",
"child_table_sample_file": "./taosbenchmark/csv/sample_use_ts-XXXX.csv",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "sample",
"insert_mode": "taosc",
"line_protocol": "line",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 20,
"insert_interval": 0,
"interlace_rows": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "./taosbenchmark/csv/sample_use_ts.csv",
"use_sample_ts": "yes",
"tags_file": "./taosbenchmark/csv/sample_tags.csv",
"columns": [{"type": "INT"}],
"tags": [{"type": "INT"}]
}]
}]
}
Loading

0 comments on commit d155a9d

Please sign in to comment.