Skip to content

Commit

Permalink
[TD-5067]<fix>: taosdemo stmt use sample data (#6759)
Browse files Browse the repository at this point in the history
  • Loading branch information
sangshuduo committed Jul 6, 2021
1 parent 2bf253d commit d53b734
Showing 1 changed file with 120 additions and 38 deletions.
158 changes: 120 additions & 38 deletions src/kit/taosdemo/taosdemo.c
Original file line number Diff line number Diff line change
Expand Up @@ -3216,13 +3216,6 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
return 0;
}

#if 0
int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) {
// TODO
return 0;
}
#endif

/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
Expand Down Expand Up @@ -5338,7 +5331,7 @@ static int64_t generateInterlaceDataWithoutStb(

#if STMT_IFACE_ENABLED == 1
static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
char *dataType, int32_t dataLen, char **ptr)
char *dataType, int32_t dataLen, char **ptr, char *value)
{
if (0 == strncasecmp(dataType,
"BINARY", strlen("BINARY"))) {
Expand All @@ -5348,12 +5341,18 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
return -1;
}
char *bind_binary = (char *)*ptr;
rand_string(bind_binary, dataLen);

bind->buffer_type = TSDB_DATA_TYPE_BINARY;
bind->buffer_length = dataLen;
bind->buffer = bind_binary;
if (value) {
strncpy(bind_binary, value, strlen(value));
bind->buffer_length = strlen(bind_binary);
} else {
rand_string(bind_binary, dataLen);
bind->buffer_length = dataLen;
}

bind->length = &bind->buffer_length;
bind->buffer = bind_binary;
bind->is_null = NULL;

*ptr += bind->buffer_length;
Expand All @@ -5365,9 +5364,14 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
return -1;
}
char *bind_nchar = (char *)*ptr;
rand_string(bind_nchar, dataLen);

bind->buffer_type = TSDB_DATA_TYPE_NCHAR;
if (value) {
strncpy(bind_nchar, value, strlen(value));
} else {
rand_string(bind_nchar, dataLen);
}

bind->buffer_length = strlen(bind_nchar);
bind->buffer = bind_nchar;
bind->length = &bind->buffer_length;
Expand All @@ -5378,7 +5382,11 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
"INT", strlen("INT"))) {
int32_t *bind_int = (int32_t *)*ptr;

*bind_int = rand_int();
if (value) {
*bind_int = atoi(value);
} else {
*bind_int = rand_int();
}
bind->buffer_type = TSDB_DATA_TYPE_INT;
bind->buffer_length = sizeof(int32_t);
bind->buffer = bind_int;
Expand All @@ -5390,7 +5398,11 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
"BIGINT", strlen("BIGINT"))) {
int64_t *bind_bigint = (int64_t *)*ptr;

*bind_bigint = rand_bigint();
if (value) {
*bind_bigint = atoll(value);
} else {
*bind_bigint = rand_bigint();
}
bind->buffer_type = TSDB_DATA_TYPE_BIGINT;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_bigint;
Expand All @@ -5402,7 +5414,11 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
"FLOAT", strlen("FLOAT"))) {
float *bind_float = (float *) *ptr;

*bind_float = rand_float();
if (value) {
*bind_float = (float)atof(value);
} else {
*bind_float = rand_float();
}
bind->buffer_type = TSDB_DATA_TYPE_FLOAT;
bind->buffer_length = sizeof(float);
bind->buffer = bind_float;
Expand All @@ -5414,7 +5430,11 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
"DOUBLE", strlen("DOUBLE"))) {
double *bind_double = (double *)*ptr;

*bind_double = rand_double();
if (value) {
*bind_double = atof(value);
} else {
*bind_double = rand_double();
}
bind->buffer_type = TSDB_DATA_TYPE_DOUBLE;
bind->buffer_length = sizeof(double);
bind->buffer = bind_double;
Expand All @@ -5426,7 +5446,11 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
"SMALLINT", strlen("SMALLINT"))) {
int16_t *bind_smallint = (int16_t *)*ptr;

*bind_smallint = rand_smallint();
if (value) {
*bind_smallint = (int16_t)atoi(value);
} else {
*bind_smallint = rand_smallint();
}
bind->buffer_type = TSDB_DATA_TYPE_SMALLINT;
bind->buffer_length = sizeof(int16_t);
bind->buffer = bind_smallint;
Expand All @@ -5438,7 +5462,11 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
"TINYINT", strlen("TINYINT"))) {
int8_t *bind_tinyint = (int8_t *)*ptr;

*bind_tinyint = rand_tinyint();
if (value) {
*bind_tinyint = (int8_t)atoi(value);
} else {
*bind_tinyint = rand_tinyint();
}
bind->buffer_type = TSDB_DATA_TYPE_TINYINT;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_tinyint;
Expand All @@ -5461,7 +5489,11 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
"TIMESTAMP", strlen("TIMESTAMP"))) {
int64_t *bind_ts2 = (int64_t *) *ptr;

*bind_ts2 = rand_bigint();
if (value) {
*bind_ts2 = atoll(value);
} else {
*bind_ts2 = rand_bigint();
}
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts2;
Expand Down Expand Up @@ -5527,12 +5559,13 @@ static int32_t prepareStmtWithoutStb(
ptr += bind->buffer_length;

for (int i = 0; i < g_args.num_of_CPR; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
bind = (TAOS_BIND *)((char *)bindArray
+ (sizeof(TAOS_BIND) * (i + 1)));
if ( -1 == prepareStmtBindArrayByType(
bind,
data_type[i],
g_args.len_of_binary,
&ptr)) {
&ptr, NULL)) {
return -1;
}
}
Expand All @@ -5551,12 +5584,14 @@ static int32_t prepareStmtWithoutStb(
return k;
}

static int32_t prepareStbStmt(SSuperTable *stbInfo,
static int32_t prepareStbStmt(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName, uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime, char *buffer)
int64_t startTime,
int64_t *pSamplePos)
{
int ret = taos_stmt_set_tbname(stmt, tableName);
if (ret != 0) {
Expand All @@ -5567,16 +5602,24 @@ static int32_t prepareStbStmt(SSuperTable *stbInfo,

char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("Failed to allocate %d bind params\n",
(stbInfo->columnCount + 1));
errorPrint("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
return -1;
}

bool tsRand;
bool sourceRand;
if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
tsRand = true;
sourceRand = true;
} else {
tsRand = false;
sourceRand = false; // from sample data file
}

char *bindBuffer = malloc(g_args.len_of_binary);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
free(bindArray);
return -1;
}

uint32_t k;
Expand All @@ -5592,7 +5635,7 @@ static int32_t prepareStbStmt(SSuperTable *stbInfo,

bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (tsRand) {
if (sourceRand) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, k,
stbInfo->disorderRatio,
Expand All @@ -5607,14 +5650,46 @@ static int32_t prepareStbStmt(SSuperTable *stbInfo,

ptr += bind->buffer_length;

int cursor = 0;
for (int i = 0; i < stbInfo->columnCount; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i].dataType,
stbInfo->columns[i].dataLen,
&ptr)) {
return -1;

if (sourceRand) {
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i].dataType,
stbInfo->columns[i].dataLen,
&ptr,
NULL)) {
free(bindArray);
free(bindBuffer);
return -1;
}
} else {
char *restStr = stbInfo->sampleDataBuf + cursor;
int lengthOfRest = strlen(restStr);

int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}

memset(bindBuffer, 0, g_args.len_of_binary);
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too

if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i].dataType,
stbInfo->columns[i].dataLen,
&ptr,
bindBuffer)) {
free(bindArray);
free(bindBuffer);
return -1;
}
}
}
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
Expand All @@ -5623,11 +5698,16 @@ static int32_t prepareStbStmt(SSuperTable *stbInfo,

k++;
recordFrom ++;

if (!sourceRand) {
(*pSamplePos) ++;
}
if (recordFrom >= insertRows) {
break;
}
}

free(bindBuffer);
free(bindArray);
return k;
}
Expand Down Expand Up @@ -5820,13 +5900,14 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStbStmt(superTblInfo,
generated = prepareStbStmt(
superTblInfo,
pThreadInfo->stmt,
tableName,
batchPerTbl,
insertRows, i,
startTime,
pThreadInfo->buffer);
&(pThreadInfo->samplePos));
#else
generated = -1;
#endif
Expand Down Expand Up @@ -6051,7 +6132,8 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->stmt,
tableName,
g_args.num_of_RPR,
insertRows, i, start_time, pstr);
insertRows, i, start_time,
&(pThreadInfo->samplePos));
#else
generated = -1;
#endif
Expand Down

0 comments on commit d53b734

Please sign in to comment.