Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: taosbenchmark supports specifying child table example data #590

Merged
merged 3 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 74 additions & 67 deletions inc/bench.h
Original file line number Diff line number Diff line change
Expand Up @@ -574,93 +574,100 @@ enum CONTINUE_IF_FAIL_MODE {
SMART_IF_FAILED, // 2
};

typedef struct SChildTable_S {
char childTableName[TSDB_TABLE_NAME_LEN];
bool useOwnSample;
char *sampleDataBuf;
uint64_t insertRows;
} SChildTable;

typedef struct SSuperTable_S {
char * stbName;
bool random_data_source; // rand_gen or sample
bool escape_character;
bool use_metric;
char * childTblPrefix;
bool childTblExists;
uint64_t childTblCount;
uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in
char *stbName;
bool random_data_source; // rand_gen or sample
bool escape_character;
bool use_metric;
char *childTblPrefix;
char *childTblSample;
bool childTblExists;
uint64_t childTblCount;
uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in
// one sql
bool autoCreateTable;
uint16_t iface; // 0: taosc, 1: rest, 2: stmt
uint16_t lineProtocol;
uint64_t childTblLimit;
uint64_t childTblOffset;
uint64_t childTblFrom;
uint64_t childTblTo;
bool autoCreateTable;
uint16_t iface; // 0: taosc, 1: rest, 2: stmt
uint16_t lineProtocol;
uint64_t childTblLimit;
uint64_t childTblOffset;
uint64_t childTblFrom;
uint64_t childTblTo;
enum CONTINUE_IF_FAIL_MODE continueIfFail;

// int multiThreadWriteOneTbl; // 0: no, 1: yes
uint32_t interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms, us or ns. according to database precision
uint32_t interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms, us or ns. according to database precision

// ratio
uint8_t disRatio; // disorder ratio 0 ~ 100 %
uint8_t updRatio; // update ratio 0 ~ 100 %
uint8_t delRatio; // delete ratio 0 ~ 100 %
uint8_t disRatio; // disorder ratio 0 ~ 100 %
uint8_t updRatio; // update ratio 0 ~ 100 %
uint8_t delRatio; // delete ratio 0 ~ 100 %

// range
uint64_t disRange; // disorder range
uint64_t updRange; // update range
uint64_t delRange; // delete range
uint64_t disRange; // disorder range
uint64_t updRange; // update range
uint64_t delRange; // delete range

// generate row value rule see pre RULE_ define
uint8_t genRowRule;
uint8_t genRowRule;

// data position
uint8_t dataPos; // see define DATAPOS_
uint8_t dataPos; // see define DATAPOS_

uint32_t fillIntervalUpd; // fill Upd interval rows cnt
uint32_t fillIntervalDis; // fill Dis interval rows cnt
uint32_t fillIntervalUpd; // fill Upd interval rows cnt
uint32_t fillIntervalDis; // fill Dis interval rows cnt

// binary prefix
char* binaryPrefex;
char *binaryPrefex;
// nchar prefix
char* ncharPrefex;
char *ncharPrefex;

// random write future time
bool useNow;
bool writeFuture;
int32_t durMinute; // passed database->durMinute
int32_t checkInterval; // check correct interval

int64_t max_sql_len;
uint64_t insert_interval;
uint64_t insertRows;
uint64_t timestamp_step;
int64_t startTimestamp;
int64_t specifiedColumns;
char sampleFile[MAX_FILE_NAME_LEN];
char tagsFile[MAX_FILE_NAME_LEN];
uint32_t partialColNum;
char * partialColNameBuf;
BArray * cols;
BArray * tags;
BArray * tsmas;
char ** childTblName;
char * colsOfCreateChildTable;
uint32_t lenOfTags;
uint32_t lenOfCols;

char *sampleDataBuf;
bool useSampleTs;
char *tagDataBuf;
bool tcpTransfer;
bool non_stop;
char *comment;
int delay;
int file_factor;
char *rollup;
char* max_delay;
char* watermark;
int ttl;
int32_t keep_trying;
uint32_t trying_interval;
bool useNow;
bool writeFuture;
int32_t durMinute; // passed database->durMinute
int32_t checkInterval; // check correct interval

int64_t max_sql_len;
uint64_t insert_interval;
uint64_t insertRows;
uint64_t timestamp_step;
int64_t startTimestamp;
int64_t specifiedColumns;
char sampleFile[MAX_FILE_NAME_LEN];
char tagsFile[MAX_FILE_NAME_LEN];
uint32_t partialColNum;
char *partialColNameBuf;
BArray *cols;
BArray *tags;
BArray *tsmas;
SChildTable **childTblArray;
char *colsOfCreateChildTable;
uint32_t lenOfTags;
uint32_t lenOfCols;

char *sampleDataBuf;
bool useSampleTs;
char *tagDataBuf;
bool tcpTransfer;
bool non_stop;
char *comment;
int delay;
int file_factor;
char *rollup;
char *max_delay;
char *watermark;
int ttl;
int32_t keep_trying;
uint32_t trying_interval;
} SSuperTable;

typedef struct SDbCfg_S {
Expand Down
106 changes: 83 additions & 23 deletions src/benchData.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,26 @@ int prepareStmt(SSuperTable *stbInfo, TAOS_STMT *stmt, uint64_t tableSeq) {
return 0;
}

static int generateSampleFromCsvForStb(char *buffer,
char *file, int32_t length,
int64_t size) {
static bool getSampleFileNameByPattern(char *filePath,
SSuperTable *stbInfo,
int64_t child) {
char *pos = strstr(stbInfo->childTblSample, "XXXX");
snprintf(filePath, MAX_PATH_LEN, "%s", stbInfo->childTblSample);
int64_t offset = (int64_t)pos - (int64_t)stbInfo->childTblSample;
snprintf(filePath + offset,
MAX_PATH_LEN - offset,
"%s",
stbInfo->childTblArray[child]->childTableName);
size_t len = strlen(stbInfo->childTblArray[child]->childTableName);
snprintf(filePath + offset + len,
MAX_PATH_LEN - offset - len,
"%s", pos +4);
return true;
}

static int generateSampleFromCsv(char *buffer,
char *file, int32_t length,
int64_t size) {
size_t n = 0;
char * line = NULL;
int getRows = 0;
Expand Down Expand Up @@ -228,11 +245,11 @@ static int generateSampleFromCsvForStb(char *buffer,
return 0;
}

static int getAndSetRowsFromCsvFile(SSuperTable *stbInfo) {
FILE * fp = fopen(stbInfo->sampleFile, "r");
static int getAndSetRowsFromCsvFile(char *sampleFile, uint64_t *insertRows) {
FILE * fp = fopen(sampleFile, "r");
if (NULL == fp) {
errorPrint("Failed to open sample file: %s, reason:%s\n",
stbInfo->sampleFile, strerror(errno));
sampleFile, strerror(errno));
return -1;
}

Expand All @@ -249,7 +266,7 @@ static int getAndSetRowsFromCsvFile(SSuperTable *stbInfo) {
while (fgets(buf, TSDB_MAX_SQL_LEN, fp)) {
line_count++;
}
stbInfo->insertRows = line_count;
*insertRows = line_count;
fclose(fp);
tmfree(buf);
return 0;
Expand Down Expand Up @@ -1339,11 +1356,16 @@ int generateRandData(SSuperTable *stbInfo, char *sampleDataBuf,
return -1;
}

int prepareChildTblSampleData(SDataBase* database, SSuperTable* stbInfo) {
return -1;
}

int prepareSampleData(SDataBase* database, SSuperTable* stbInfo) {
stbInfo->lenOfCols = accumulateRowLen(stbInfo->cols, stbInfo->iface);
stbInfo->lenOfTags = accumulateRowLen(stbInfo->tags, stbInfo->iface);
if (stbInfo->partialColNum != 0 &&
(stbInfo->iface == TAOSC_IFACE || stbInfo->iface == REST_IFACE)) {
if (stbInfo->partialColNum != 0
&& ((stbInfo->iface == TAOSC_IFACE
|| stbInfo->iface == REST_IFACE))) {
if (stbInfo->partialColNum > stbInfo->cols->size) {
stbInfo->partialColNum = stbInfo->cols->size;
} else {
Expand Down Expand Up @@ -1385,16 +1407,15 @@ int prepareSampleData(SDataBase* database, SSuperTable* stbInfo) {
stbInfo->partialColNum = stbInfo->cols->size;
}
stbInfo->sampleDataBuf =
benchCalloc(1,
stbInfo->lenOfCols*g_arguments->prepared_rand,
true);
benchCalloc(
1, stbInfo->lenOfCols*g_arguments->prepared_rand, true);
infoPrint(
"generate stable<%s> columns data with lenOfCols<%u> * "
"prepared_rand<%" PRIu64 ">\n",
stbInfo->stbName, stbInfo->lenOfCols, g_arguments->prepared_rand);
if (stbInfo->random_data_source) {
if (generateRandData(stbInfo, stbInfo->sampleDataBuf,
stbInfo->lenOfCols*g_arguments->prepared_rand,
stbInfo->lenOfCols*g_arguments->prepared_rand,
stbInfo->lenOfCols,
stbInfo->cols,
g_arguments->prepared_rand,
Expand All @@ -1403,30 +1424,67 @@ int prepareSampleData(SDataBase* database, SSuperTable* stbInfo) {
}
} else {
if (stbInfo->useSampleTs) {
if (getAndSetRowsFromCsvFile(stbInfo)) return -1;
if (getAndSetRowsFromCsvFile(
stbInfo->sampleFile, &stbInfo->insertRows)) {
return -1;
}
}
if (generateSampleFromCsvForStb(stbInfo->sampleDataBuf,
if (generateSampleFromCsv(stbInfo->sampleDataBuf,
stbInfo->sampleFile, stbInfo->lenOfCols,
g_arguments->prepared_rand)) {
errorPrint("Failed to generate sample from csv file %s\n",
stbInfo->sampleFile);
return -1;
}

if (stbInfo->childTblSample) {
if (NULL == strstr(stbInfo->childTblSample, "XXXX")) {
errorPrint("Child table sample file pattern has no %s\n",
"XXXX");
return -1;
}
for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
char sampleFilePath[MAX_PATH_LEN] = {0};
getSampleFileNameByPattern(sampleFilePath, stbInfo, child);
if (0 != access(sampleFilePath, F_OK)) {
continue;
}
SChildTable *childTbl = stbInfo->childTblArray[child];
if (getAndSetRowsFromCsvFile(sampleFilePath,
&(childTbl->insertRows))) {
return -1;
}

childTbl->sampleDataBuf =
benchCalloc(
1, stbInfo->lenOfCols*g_arguments->prepared_rand, true);
if (generateSampleFromCsv(
childTbl->sampleDataBuf,
sampleFilePath,
stbInfo->lenOfCols,
g_arguments->prepared_rand)) {
errorPrint("Failed to generate sample from file "
"for child table %"PRId64"\n",
child);
return -1;
}
childTbl->useOwnSample = true;
}
}
}
debugPrint("sampleDataBuf: %s\n", stbInfo->sampleDataBuf);

if (stbInfo->tags->size != 0) {
stbInfo->tagDataBuf =
benchCalloc(1,
stbInfo->childTblCount*stbInfo->lenOfTags,
true);
benchCalloc(
1, stbInfo->childTblCount*stbInfo->lenOfTags, true);
infoPrint(
"generate stable<%s> tags data with lenOfTags<%u> * "
"childTblCount<%" PRIu64 ">\n",
stbInfo->stbName, stbInfo->lenOfTags,
stbInfo->childTblCount);
stbInfo->childTblCount);
if (stbInfo->tagsFile[0] != 0) {
if (generateSampleFromCsvForStb(
if (generateSampleFromCsv(
stbInfo->tagDataBuf, stbInfo->tagsFile,
stbInfo->lenOfTags,
stbInfo->childTblCount)) {
Expand All @@ -1435,7 +1493,7 @@ int prepareSampleData(SDataBase* database, SSuperTable* stbInfo) {
} else {
if (generateRandData(stbInfo,
stbInfo->tagDataBuf,
stbInfo->childTblCount*stbInfo->lenOfTags,
stbInfo->childTblCount*stbInfo->lenOfTags,
stbInfo->lenOfTags,
stbInfo->tags,
stbInfo->childTblCount, true)) {
Expand All @@ -1445,8 +1503,10 @@ int prepareSampleData(SDataBase* database, SSuperTable* stbInfo) {
debugPrint("tagDataBuf: %s\n", stbInfo->tagDataBuf);
}

if (0 != convertServAddr(stbInfo->iface,
stbInfo->tcpTransfer, stbInfo->lineProtocol)) {
if (0 != convertServAddr(
stbInfo->iface,
stbInfo->tcpTransfer,
stbInfo->lineProtocol)) {
return -1;
}
return 0;
Expand Down
Loading