Skip to content

Commit

Permalink
Merge pull request #704 from taosdata/feat/TD-26260
Browse files Browse the repository at this point in the history
feat/TD-26260: support geometry (not include schemaless)
  • Loading branch information
DuanKuanJun committed Sep 19, 2023
2 parents f5bc51a + c9094f4 commit 78e769d
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 16 deletions.
6 changes: 4 additions & 2 deletions inc/benchData.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
/***** Global variables ******/
/***** Declare functions *****/
void rand_string(char *str, int size, bool chinese);
int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio,
int disorderRange);
void rand_geometry(char *str, int fieldLen, int maxType);
int geoCalcBufferSize(int fieldLen);
int getGeoMaxType(int fieldLen);
int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, int disorderRatio, int disorderRange);
int generateRandData(SSuperTable *stbInfo, char *sampleDataBuf,
int64_t bufLen,
int lenOfOneRow, BArray * fields, int64_t loop,
Expand Down
12 changes: 6 additions & 6 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin
ADD_DEPENDENCIES(taosdump deps-jansson)
ADD_DEPENDENCIES(taosdump deps-snappy)
IF (${TD_VER_COMPATIBLE} STRGREATER_EQUAL "3.0.0.0")
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ELSE()
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ENDIF()
ELSE ()
INCLUDE_DIRECTORIES(/usr/local/include)
Expand All @@ -233,9 +233,9 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin
SET(OS_ID "Darwin")

IF (${TD_VER_COMPATIBLE} STRGREATER_EQUAL "3.0.0.0")
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ELSE()
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ENDIF()
ENDIF ()

Expand Down Expand Up @@ -427,9 +427,9 @@ ELSE ()
SET(CMAKE_C_STANDARD 11)
SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} /utf-8")
IF (${TD_VER_COMPATIBLE} STRGREATER_EQUAL "3.0.0.0")
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsString.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchTmq.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsString.c toolsSys.c toolsString.c)
ELSE ()
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ADD_EXECUTABLE(taosBenchmark benchMain.c benchSubscribe.c benchQuery.c benchJsonOpt.c benchInsert.c benchInsertMix.c benchDataMix.c wrapDb.c benchData.c benchDataGeometry.c benchCommandOpt.c benchUtil.c benchUtilDs.c benchSys.c toolstime.c toolsSys.c toolsString.c)
ENDIF ()

ADD_EXECUTABLE(taosdump taosdump.c toolsSys.c toolstime.c toolsDir.c toolsString.c)
Expand Down
34 changes: 34 additions & 0 deletions src/benchData.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ uint32_t accumulateRowLen(BArray *fields, int iface) {
switch (field->type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_GEOMETRY:
len += field->length + 3;
break;
case TSDB_DATA_TYPE_INT:
Expand Down Expand Up @@ -387,6 +388,28 @@ static int tmpStr(char *tmp, int iface, Field *field, int i) {
return 0;
}

static int tmpGeometry(char *tmp, int iface, Field *field, int i) {
if (g_arguments->demo_mode) {
// TODO
} else if (field->values) {
int arraySize = tools_cJSON_GetArraySize(field->values);
if (arraySize) {
tools_cJSON *buf = tools_cJSON_GetArrayItem(field->values, taosRandom() % arraySize);
snprintf(tmp, field->length, "%s", buf->valuestring);
} else {
errorPrint(
"%s() cannot read correct value "
"from json file. array size: %d\n",
__func__, arraySize);
return -1;
}
} else {
int maxType = getGeoMaxType(field->length);
rand_geometry(tmp, field->length, maxType);
}
return 0;
}

FORCE_INLINE double tmpDoubleImpl(Field *field, int32_t angle) {
double doubleTmp = (double)(field->min);

Expand Down Expand Up @@ -708,6 +731,17 @@ static int generateRandDataSQL(SSuperTable *stbInfo, char *sampleDataBuf,
tmfree(tmp);
break;
}
case TSDB_DATA_TYPE_GEOMETRY: {
int bufferSize = geoCalcBufferSize(field->length);
char *tmp = benchCalloc(1, bufferSize, false);
if (0 != tmpGeometry(tmp, stbInfo->iface, field, i)) {
free(tmp);
return -1;
}
n = snprintf(sampleDataBuf + pos, bufLen - pos, "'%s',", tmp);
tmfree(tmp);
break;
}
case TSDB_DATA_TYPE_JSON: {
pos += tmpJson(sampleDataBuf, bufLen, pos,
fieldsSize, field);
Expand Down
158 changes: 158 additions & 0 deletions src/benchDataGeometry.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the MIT license as published by the Free Software
* Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*/

#include <bench.h>
#include <benchData.h>

typedef struct {
double x;
double y;
} SGeoCoord2D;

typedef enum { GEO_SUB_TYPE_POINT = 0, GEO_SUB_TYPE_LINESTRING, GEO_SUB_TYPE_POLYGON, GEO_SUB_TYPE_COUNT } EGeoSubType;

const int CCoordSize = 16;

typedef struct {
const char *preifx;
const char *suffix;
int baseLen;
int minCoordNum;
int maxCoordNum;
bool isClosed; // if true, first coord and last coord must be the same
} SGeoSubTypeInfo;

// should be ordered by (baseLen + minCoordNum * CCoordSize), ASC
const SGeoSubTypeInfo GeoInfo[] = {
{"POINT(", ")", 5, 1, 1, false}, {"LINESTRING(", ")", 9, 2, 4094, false}, {"POLYGON((", "))", 13, 3, 4094, true}};

typedef struct {
EGeoSubType type;
BArray *coordArray; // element: SGeoCoord2D
} SGeoObj2D;

static SGeoObj2D *geoObject2DInit(EGeoSubType subType, BArray *coordArray);
static SGeoObj2D *geoObject2DRandInit(EGeoSubType subType, int coordNum);
static void geoObject2DDestory(SGeoObj2D *pObj);

static int geoCoord2DToStr(char *buffer, SGeoCoord2D *pCoord);
static int geoCoord2DArrayToStr(char *buffer, BArray *coordArray);
static int geoObject2DToStr(char *buffer, SGeoObj2D *object);

static BArray *randCoordArray(int count);

/*--- init & destory ---*/
static SGeoObj2D *geoObject2DInit(EGeoSubType subType, BArray *coordArray) {
SGeoObj2D *pObj = (SGeoObj2D *)benchCalloc(1, sizeof(SGeoObj2D), true);
pObj->type = subType;
pObj->coordArray = coordArray;
return pObj;
}

static SGeoObj2D *geoObject2DRandInit(EGeoSubType subType, int coordNum) {
const SGeoSubTypeInfo *info = &GeoInfo[subType];

if (info->isClosed) {
coordNum = coordNum - 1;
}
BArray *array = randCoordArray(coordNum);
if (info->isClosed) {
SGeoCoord2D *pCoord = (SGeoCoord2D *)benchCalloc(1, sizeof(SGeoCoord2D), true);
SGeoCoord2D *pFirstCoord= benchArrayGet(array, 0);
memcpy(pCoord, pFirstCoord, sizeof(SGeoCoord2D));
benchArrayPush(array, pCoord);
}
return geoObject2DInit(subType, array);
}

static void geoObject2DDestory(SGeoObj2D *pObj) {
if (!pObj) return;
benchArrayDestroy(pObj->coordArray);
tmfree(pObj);
}

/*--- string formatters ---*/
static int geoCoord2DToStr(char *buffer, SGeoCoord2D *pCoord) { return sprintf(buffer, "%10.6lf %10.6lf", pCoord->x, pCoord->y); }

static int geoCoord2DArrayToStr(char *buffer, BArray *coordArray) {
int pos = 0;
bool firstCoord = true;
for (int i = 0; i < coordArray->size; i++) {
int size = 0;
if (firstCoord) {
firstCoord = false;
} else {
size = sprintf(buffer + pos, "%s", ", ");
pos += size;
}
size = geoCoord2DToStr(buffer + pos, benchArrayGet(coordArray, i));
pos += size;
}
return pos;
}

static int geoObject2DToStr(char *buffer, SGeoObj2D *object) {
int pos = sprintf(buffer, "%s", GeoInfo[object->type].preifx);
pos += geoCoord2DArrayToStr(buffer + pos, object->coordArray);
pos += sprintf(buffer + pos, "%s", GeoInfo[object->type].suffix);
return pos;
}

static BArray *randCoordArray(int count) {
BArray *array = benchArrayInit(8, sizeof(SGeoCoord2D));
int minVal = -1000, maxVal = 1000;
for (int i = 0; i < count; i++) {
double x = minVal + 1.0 * taosRandom() / RAND_MAX * (maxVal - minVal);
double y = minVal + 1.0 * taosRandom() / RAND_MAX * (maxVal - minVal);
SGeoCoord2D *pCoord = (SGeoCoord2D *)benchCalloc(1, sizeof(SGeoCoord2D), true);
pCoord->x = x;
pCoord->y = y;
benchArrayPush(array, pCoord);
}
return array;
}

int geoCalcBufferSize(int fieldLen) {
// not accurate, but enough
return fieldLen + 20;
}

int getGeoMaxType(int fieldLen) {
int maxType = -1;
for (int type = GEO_SUB_TYPE_COUNT - 1; type >= 0; type--) {
const SGeoSubTypeInfo *info = &GeoInfo[type];

int minLen = info->baseLen + info->minCoordNum * CCoordSize;
if (fieldLen >= minLen) {
maxType = type;
break;
}
}
return maxType;
}

void rand_geometry(char *str, int fieldLen, int maxType) {
EGeoSubType type = taosRandom() % (maxType + 1);
const SGeoSubTypeInfo *info = &GeoInfo[type];

int maxCoordNum = (fieldLen - info->baseLen) / CCoordSize;
maxCoordNum = min(maxCoordNum, info->maxCoordNum);

int coordNum = info->minCoordNum;
if (maxCoordNum > info->minCoordNum) {
coordNum = info->minCoordNum + taosRandom() % (maxCoordNum - info->minCoordNum);
}

SGeoObj2D *pObj = geoObject2DRandInit(type, coordNum);
geoObject2DToStr(str, pObj);
geoObject2DDestory(pObj);
}
18 changes: 15 additions & 3 deletions src/benchInsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,16 @@ static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
Field * col = benchArrayGet(stbInfo->cols, colIndex);
int n;
if (col->type == TSDB_DATA_TYPE_BINARY ||
col->type == TSDB_DATA_TYPE_NCHAR) {
col->type == TSDB_DATA_TYPE_NCHAR ||
col->type == TSDB_DATA_TYPE_GEOMETRY) {
n = snprintf(colsBuf + len, col_buffer_len - len,
",%s %s(%d)", col->name,
convertDatatypeToString(col->type), col->length);
if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) {
errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
colIndex);
return -1;
}
} else {
n = snprintf(colsBuf + len, col_buffer_len - len,
",%s %s", col->name,
Expand Down Expand Up @@ -282,10 +288,16 @@ static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
for (tagIndex = 0; tagIndex < stbInfo->tags->size; tagIndex++) {
Field *tag = benchArrayGet(stbInfo->tags, tagIndex);
if (tag->type == TSDB_DATA_TYPE_BINARY ||
tag->type == TSDB_DATA_TYPE_NCHAR) {
tag->type == TSDB_DATA_TYPE_NCHAR ||
tag->type == TSDB_DATA_TYPE_GEOMETRY) {
n = snprintf(tagsBuf + len, tag_buffer_len - len,
"%s %s(%d),", tag->name,
convertDatatypeToString(tag->type), tag->length);
if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) {
errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
tagIndex);
return -1;
}
} else if (tag->type == TSDB_DATA_TYPE_JSON) {
n = snprintf(tagsBuf + len, tag_buffer_len - len,
"%s json", tag->name);
Expand Down Expand Up @@ -1738,7 +1750,7 @@ static void *syncWriteInterlace(void *sarg) {
infoPrint(
"thread[%d] has currently inserted rows: %" PRIu64
", peroid insert rate: %.3f rows/s \n",
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
(double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
lastPrintTime = currentPrintTime;
lastTotalInsertRows = pThreadInfo->totalInsertRows;
Expand Down
5 changes: 3 additions & 2 deletions src/benchJsonOpt.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ static int getColumnAndTagTypeFromInsertJsonFile(
} else {
if (type == TSDB_DATA_TYPE_BINARY
|| type == TSDB_DATA_TYPE_JSON
|| type == TSDB_DATA_TYPE_NCHAR) {
|| type == TSDB_DATA_TYPE_NCHAR
|| type == TSDB_DATA_TYPE_GEOMETRY) {
length = g_arguments->binwidth;
} else {
length = convertTypeToLength(type);
Expand Down Expand Up @@ -1840,7 +1841,7 @@ static int getMetaFromTmqJsonFile(tools_cJSON *json) {
if (tools_cJSON_IsString(groupMode)) {
g_tmqInfo.consumerInfo.groupMode = groupMode->valuestring;
}


tools_cJSON *pollDelay = tools_cJSON_GetObjectItem(tmqInfo, "poll_delay");
if (tools_cJSON_IsNumber(pollDelay)) {
Expand Down
12 changes: 9 additions & 3 deletions src/benchUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ void closeBenchConn(SBenchConn* conn) {
if(conn->taos) {
taos_close(conn->taos);
conn->taos = NULL;
}
}
if (conn->ctaos) {
taos_close(conn->ctaos);
conn->ctaos = NULL;
Expand Down Expand Up @@ -840,6 +840,8 @@ char *convertDatatypeToString(int type) {
return "double";
case TSDB_DATA_TYPE_JSON:
return "json";
case TSDB_DATA_TYPE_GEOMETRY:
return "geometry";
default:
break;
}
Expand Down Expand Up @@ -972,6 +974,8 @@ int convertStringToDatatype(char *type, int length) {
return TSDB_DATA_TYPE_JSON;
} else if (0 == strcasecmp(type, "varchar")) {
return TSDB_DATA_TYPE_BINARY;
} else if (0 == strcasecmp(type, "geometry")) {
return TSDB_DATA_TYPE_GEOMETRY;
} else {
errorPrint("unknown data type: %s\n", type);
exit(EXIT_FAILURE);
Expand Down Expand Up @@ -1009,6 +1013,8 @@ int convertStringToDatatype(char *type, int length) {
return TSDB_DATA_TYPE_JSON;
} else if (0 == strncasecmp(type, "varchar", length)) {
return TSDB_DATA_TYPE_BINARY;
} else if (0 == strcasecmp(type, "geometry")) {
return TSDB_DATA_TYPE_GEOMETRY;
} else {
errorPrint("unknown data type: %s\n", type);
exit(EXIT_FAILURE);
Expand Down Expand Up @@ -1065,7 +1071,7 @@ void* benchArrayAddBatch(BArray* pArray, void* pData, int32_t elems) {

void* dst = BARRAY_GET_ELEM(pArray, pArray->size);
memcpy(dst, pData, pArray->elemSize * elems);
tmfree(pData);
tmfree(pData); // TODO remove this
pArray->size += elems;
return dst;
}
Expand Down Expand Up @@ -1191,7 +1197,7 @@ void destroySockFd(int sockfd) {
if (sockfd < 0) {
return;
}

// shutdown the connection since no more data will be sent
int result;
result = shutdown(sockfd, SHUT_WR);
Expand Down

0 comments on commit 78e769d

Please sign in to comment.