Skip to content

Commit

Permalink
fix: sml rest ctrlc (#577)
Browse files Browse the repository at this point in the history
* fix: sml-rest mem leak if ctrl-c

* test: change test branch

* fix: codacy quality issue

* fix: revert bench utils to 2dd1938

* fix: sml-rest telnet response 204

* fix: json-rest and taosjson-rest

* fix: reduce mem leak

* fix: reduce mem leak for interlace mode

* fix: address codacy issues

* fix: redudant mem cleanup

* test: add taosbenchmark/commandline-sml-rest.py

* fix: exclude sml-rest from 2.0 workflow
  • Loading branch information
sangshuduo committed Feb 12, 2023
1 parent 05b61e0 commit 22627d7
Show file tree
Hide file tree
Showing 16 changed files with 214 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/2.x-taosbenchmark-debug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ jobs:
sudo pkill -9 taosadapter || :
sudo pkill -9 taosd || :
for i in `find taosbenchmark -name "*.py"|grep -Ev "websocket|slow-query|mixed-query|tmq|vgroups|cloud"`; do python3 ./test.py -f $i > /dev/null && echo -e "\033[32m taosbenchmark-debug-test/$i success! \033[0m"|| echo -e "\033[31m taosbenchmark-debug-test/$i failed! \033[0m" | tee -a ~/taosbenchmark-debug-failed.txt ;done
for i in `find taosbenchmark -name "*.py"|grep -Ev "websocket|slow-query|mixed-query|tmq|vgroups|cloud|sml-rest"`; do python3 ./test.py -f $i > /dev/null && echo -e "\033[32m taosbenchmark-debug-test/$i success! \033[0m"|| echo -e "\033[31m taosbenchmark-debug-test/$i failed! \033[0m" | tee -a ~/taosbenchmark-debug-failed.txt ;done
- name: Check Test Result
if: steps.changed-files-specific.outputs.any_changed == 'true'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/2.x-taosbenchmark-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ jobs:
if find taosbenchmark -name "*.py"|grep -q .;
then
for i in `find taosbenchmark -name "*.py"|grep -Ev "websocket|slow-query|mixed-query|tmq|vgroups|cloud"`; do python3 ./test.py -f $i > /dev/null && echo -e "\033[32m taosbenchmark-release-test/$i success! \033[0m"|| echo -e "\033[31m taosbenchmark-release-test/$i failed! \033[0m" | tee -a ~/taosbenchmark-release-failed.txt ;done
for i in `find taosbenchmark -name "*.py"|grep -Ev "websocket|slow-query|mixed-query|tmq|vgroups|cloud|sml-rest"`; do python3 ./test.py -f $i > /dev/null && echo -e "\033[32m taosbenchmark-release-test/$i success! \033[0m"|| echo -e "\033[31m taosbenchmark-release-test/$i failed! \033[0m" | tee -a ~/taosbenchmark-release-failed.txt ;done
fi
- name: Check Test Result
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/3.0-coveralls.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
ref: 'fix/sangshuduo/TD-21932-taosbenchmark-schemaless-refine'
ref: 'fix/sangshuduo/TD-22334-sml-rest-ctrlc'

- name: Change time zone
if:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/3.0-macos-debug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
ref: 'fix/sangshuduo/TD-21932-taosbenchmark-schemaless-refine'
ref: 'fix/sangshuduo/TD-22334-sml-rest-ctrlc'

- name: Change time zone
if: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/3.0-macos-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
ref: 'fix/sangshuduo/TD-21932-taosbenchmark-schemaless-refine'
ref: 'fix/sangshuduo/TD-22334-sml-rest-ctrlc'

- name: Change time zone
if: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/3.0-non-x64.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ jobs:
echo "clone TDengine 3.0 on ${{ steps.setup.outputs.uname }}"
git clone --branch fix/sangshuduo/TD-21932-taosbenchmark-schemaless-refine --depth 1 https://github.com/taosdata/TDengine > /dev/null || exit 1
git clone --branch fix/sangshuduo/TD-22334-sml-rest-ctrlc --depth 1 https://github.com/taosdata/TDengine > /dev/null || exit 1
echo "build TDengine 3.0 on ${{ steps.setup.outputs.uname }}"
cd TDengine || exit 1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/3.0-taosBenchmark-debug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
ref: 'fix/sangshuduo/TD-21932-taosbenchmark-schemaless-refine'
ref: 'fix/sangshuduo/TD-22334-sml-rest-ctrlc'

- name: Change time zone
if: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/3.0-taosBenchmark-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
ref: 'fix/sangshuduo/TD-21932-taosbenchmark-schemaless-refine'
ref: 'fix/sangshuduo/TD-22334-sml-rest-ctrlc'

- name: Change time zone
if: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/3.0-taosdump-debug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
ref: 'fix/sangshuduo/TD-21932-taosbenchmark-schemaless-refine'
ref: 'fix/sangshuduo/TD-22334-sml-rest-ctrlc'

- name: Change time zone
if:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/3.0-taosdump-release-ws.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
ref: 'fix/sangshuduo/TD-21932-taosbenchmark-schemaless-refine'
ref: 'fix/sangshuduo/TD-22334-sml-rest-ctrlc'

- name: Change time zone
if:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/3.0-taosdump-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
ref: 'fix/sangshuduo/TD-21932-taosbenchmark-schemaless-refine'
ref: 'fix/sangshuduo/TD-22334-sml-rest-ctrlc'

- name: Change time zone
if:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/3.0-windows-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
with:
repository: 'taosdata/TDengine'
path: 'TDengine'
ref: 'fix/sangshuduo/TD-21932-taosbenchmark-schemaless-refine'
ref: 'fix/sangshuduo/TD-22334-sml-rest-ctrlc'

- name: create debug directory
if: |
Expand Down
17 changes: 17 additions & 0 deletions src/benchCommandOpt.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,23 @@ int32_t benchParseSingleOpt(int32_t key, char* arg) {
} else if (0 == strcasecmp(arg, "sml-taosjson")) {
stbInfo->iface = SML_IFACE;
stbInfo->lineProtocol = SML_JSON_TAOS_FORMAT;
} else if (0 == strcasecmp(arg, "sml-rest")
|| (0 == strcasecmp(arg, "sml-rest-line"))) {
stbInfo->iface = SML_REST_IFACE;
stbInfo->lineProtocol = TSDB_SML_LINE_PROTOCOL;
g_arguments->nthreads_auto = false;
} else if (0 == strcasecmp(arg, "sml-rest-telnet")) {
stbInfo->iface = SML_REST_IFACE;
stbInfo->lineProtocol = TSDB_SML_TELNET_PROTOCOL;
g_arguments->nthreads_auto = false;
} else if (0 == strcasecmp(arg, "sml-rest-json")) {
stbInfo->iface = SML_REST_IFACE;
stbInfo->lineProtocol = TSDB_SML_JSON_PROTOCOL;
g_arguments->nthreads_auto = false;
} else if (0 == strcasecmp(arg, "sml-rest-taosjson")) {
stbInfo->iface = SML_REST_IFACE;
stbInfo->lineProtocol = SML_JSON_TAOS_FORMAT;
g_arguments->nthreads_auto = false;
} else {
errorPrint(
"Invalid -I: %s, will auto set to default (taosc)\n",
Expand Down
79 changes: 64 additions & 15 deletions src/benchInsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
: "CREATE TABLE %s.%s (ts TIMESTAMP%s) TAGS %s",
database->dbName, stbInfo->stbName, cols, tags);
tmfree(cols);
cols = NULL;
tmfree(tags);
tags = NULL;
if (stbInfo->comment != NULL) {
length += snprintf(command + length, BUFFER_SIZE - length,
" COMMENT '%s'", stbInfo->comment);
Expand Down Expand Up @@ -761,6 +763,7 @@ static void *createTable(void *sarg) {
}
create_table_end:
tmfree(pThreadInfo->buffer);
pThreadInfo->buffer = NULL;
return NULL;
}

Expand Down Expand Up @@ -798,7 +801,9 @@ static int startMultiThreadCreateChildTable(
int sockfd = createSockFd();
if (sockfd < 0) {
tmfree(pids);
pids = NULL;
tmfree(infos);
infos = NULL;
return -1;
}
pThreadInfo->sockfd = sockfd;
Expand Down Expand Up @@ -826,9 +831,9 @@ static int startMultiThreadCreateChildTable(
threadInfo *pThreadInfo = infos + i;
g_arguments->actualChildTables += pThreadInfo->tables_created;

if (REST_IFACE != stbInfo->iface) {
if (pThreadInfo && pThreadInfo->conn)
closeBenchConn(pThreadInfo->conn);
if ((REST_IFACE != stbInfo->iface)
&& pThreadInfo && pThreadInfo->conn) {
closeBenchConn(pThreadInfo->conn);
}
}

Expand Down Expand Up @@ -904,6 +909,7 @@ void postFreeResource() {
SDbCfg *cfg = benchArrayGet(database->cfgs, c);
if ((NULL == root) && (0 == strcmp(cfg->name, "replica"))) {
tmfree(cfg->name);
cfg->name = NULL;
}
}
benchArrayDestroy(database->cfgs);
Expand All @@ -912,19 +918,26 @@ void postFreeResource() {
for (uint64_t j = 0; j < database->superTbls->size; j++) {
SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
tmfree(stbInfo->colsOfCreateChildTable);
stbInfo->colsOfCreateChildTable = NULL;
tmfree(stbInfo->sampleDataBuf);
stbInfo->sampleDataBuf = NULL;
tmfree(stbInfo->tagDataBuf);
stbInfo->tagDataBuf = NULL;
tmfree(stbInfo->partialColNameBuf);
stbInfo->partialColNameBuf = NULL;
for (int k = 0; k < stbInfo->tags->size; ++k) {
Field * tag = benchArrayGet(stbInfo->tags, k);
tmfree(tag->data);
tag->data = NULL;
}
benchArrayDestroy(stbInfo->tags);

for (int k = 0; k < stbInfo->cols->size; ++k) {
Field * col = benchArrayGet(stbInfo->cols, k);
tmfree(col->data);
col->data = NULL;
tmfree(col->is_null);
col->is_null = NULL;
}
benchArrayDestroy(stbInfo->cols);
if (g_arguments->test_mode == INSERT_TEST &&
Expand All @@ -933,10 +946,12 @@ void postFreeResource() {
++k) {
if (stbInfo->childTblName) {
tmfree(stbInfo->childTblName[k]);
stbInfo->childTblName[k] = NULL;
}
}
}
tmfree(stbInfo->childTblName);
stbInfo->childTblName = NULL;
benchArrayDestroy(stbInfo->tsmas);
#ifdef TD_VER_COMPATIBLE_3_0_0_0
if ((0 == stbInfo->interlaceRows)
Expand All @@ -945,8 +960,10 @@ void postFreeResource() {
SVGroup *vg = benchArrayGet(database->vgArray, v);
for (int64_t t = 0; t < vg->tbCountPerVgId; t ++) {
tmfree(vg->childTblName[t]);
vg->childTblName[t] = NULL;
}
tmfree(vg->childTblName);
vg->childTblName = NULL;
}
}
#endif // TD_VER_COMPATIBLE_3_0_0_0
Expand Down Expand Up @@ -1066,7 +1083,7 @@ int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) {
}
}

if (code != TSDB_CODE_SUCCESS && !g_arguments->terminate) {
if (code != TSDB_CODE_SUCCESS) {
errorPrint(
"failed to execute schemaless insert. "
"content: %s, code: 0x%08x reason: %s\n",
Expand All @@ -1080,7 +1097,8 @@ int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) {
case SML_REST_IFACE: {
if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL
|| stbInfo->lineProtocol == SML_JSON_TAOS_FORMAT) {
pThreadInfo->lines[0] = tools_cJSON_Print(pThreadInfo->json_array);
pThreadInfo->lines[0] = tools_cJSON_PrintUnformatted(
pThreadInfo->json_array);
code = postProceSql(pThreadInfo->lines[0], database->dbName,
database->precision, stbInfo->iface,
stbInfo->lineProtocol, g_arguments->port,
Expand Down Expand Up @@ -1383,7 +1401,10 @@ static void *syncWriteInterlace(void *sarg) {
pThreadInfo->json_array = NULL;
}
pThreadInfo->json_array = tools_cJSON_CreateArray();
tmfree(pThreadInfo->lines[0]);
if (pThreadInfo->lines && pThreadInfo->lines[0]) {
tmfree(pThreadInfo->lines[0]);
pThreadInfo->lines[0] = NULL;
}
} else {
for (int j = 0; j < generated; ++j) {
if (pThreadInfo && pThreadInfo->lines
Expand Down Expand Up @@ -1424,7 +1445,13 @@ static void *syncWriteInterlace(void *sarg) {
}
}
free_of_interlace:
if (0 == pThreadInfo->totalDelay) pThreadInfo->totalDelay = 1;
if (pThreadInfo && pThreadInfo->json_array) {
tools_cJSON_Delete(pThreadInfo->json_array);
pThreadInfo->json_array = NULL;
}
if ((pThreadInfo) && (0 == pThreadInfo->totalDelay)) {
pThreadInfo->totalDelay = 1;
}
succPrint(
"thread[%d] %s(), completed total inserted rows: %" PRIu64
", %.2f records/second\n",
Expand Down Expand Up @@ -1691,6 +1718,8 @@ void *syncWriteProgressive(void *sarg) {
}
if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL
|| stbInfo->lineProtocol == SML_JSON_TAOS_FORMAT) {
tmfree(pThreadInfo->lines[0]);
pThreadInfo->lines[0] = NULL;
pThreadInfo->lines[0] =
tools_cJSON_PrintUnformatted(
pThreadInfo->json_array);
Expand Down Expand Up @@ -1747,8 +1776,8 @@ void *syncWriteProgressive(void *sarg) {
ret = queryDbExecCall(pThreadInfo->conn, buffer);
int32_t trying = g_arguments->keep_trying;
while (ret && trying) {
infoPrint("will sleep %"PRIu32" milliseconds then re-create "
"table %s\n",
infoPrint("will sleep %"PRIu32" milliseconds then "
"re-create table %s\n",
g_arguments->trying_interval, buffer);
toolsMsleep(g_arguments->trying_interval);
ret = queryDbExecCall(pThreadInfo->conn, buffer);
Expand Down Expand Up @@ -1802,14 +1831,15 @@ void *syncWriteProgressive(void *sarg) {
case SML_IFACE:
if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL
|| stbInfo->lineProtocol == SML_JSON_TAOS_FORMAT) {
if (pThreadInfo->lines && pThreadInfo->lines[0]) {
tmfree(pThreadInfo->lines[0]);
pThreadInfo->lines[0] = NULL;
}
if (pThreadInfo->json_array) {
tools_cJSON_Delete(pThreadInfo->json_array);
pThreadInfo->json_array = NULL;
}
pThreadInfo->json_array = tools_cJSON_CreateArray();
if (pThreadInfo->lines) {
tmfree(pThreadInfo->lines[0]);
}
} else {
for (int j = 0; j < generated; ++j) {
debugPrint("pThreadInfo->lines[%d]: %s\n",
Expand Down Expand Up @@ -1851,7 +1881,13 @@ void *syncWriteProgressive(void *sarg) {
} // insertRows
} // tableSeq
free_of_progressive:
if (0 == pThreadInfo->totalDelay) pThreadInfo->totalDelay = 1;
if (pThreadInfo && pThreadInfo->json_array) {
tools_cJSON_Delete(pThreadInfo->json_array);
pThreadInfo->json_array = NULL;
}
if (pThreadInfo && (0 == pThreadInfo->totalDelay)) {
pThreadInfo->totalDelay = 1;
}
succPrint(
"thread[%d] %s(), completed total inserted rows: %" PRIu64
", %.2f records/second\n",
Expand Down Expand Up @@ -2583,12 +2619,14 @@ static int startMultiThreadInsertData(SDataBase* database,
free_ds(&pThreadInfo->buffer);
} else {
tmfree(pThreadInfo->buffer);
pThreadInfo->buffer = NULL;
}
break;
case SML_REST_IFACE:
if (g_arguments->terminate)
toolsMsleep(100);
tmfree(pThreadInfo->buffer);
// on-purpose no break here
case SML_IFACE:
if (stbInfo->lineProtocol != TSDB_SML_JSON_PROTOCOL
&& stbInfo->lineProtocol != SML_JSON_TAOS_FORMAT) {
Expand All @@ -2598,7 +2636,10 @@ static int startMultiThreadInsertData(SDataBase* database,
for (int j = 0; j < g_arguments->reqPerReq; j++) {
tmfree(pThreadInfo->lines[j]);
}
tmfree(pThreadInfo->sml_tags);
if (pThreadInfo->sml_tags) {
tmfree(pThreadInfo->sml_tags);
pThreadInfo->sml_tags = NULL;
}

} else {
if (pThreadInfo->sml_json_tags) {
Expand All @@ -2611,8 +2652,12 @@ static int startMultiThreadInsertData(SDataBase* database,
}
}
closeBenchConn(pThreadInfo->conn);
tmfree(pThreadInfo->lines);
if (pThreadInfo->lines) {
tmfree(pThreadInfo->lines);
pThreadInfo->lines = NULL;
}
break;

case STMT_IFACE:
taos_stmt_close(pThreadInfo->conn->stmt);
closeBenchConn(pThreadInfo->conn);
Expand All @@ -2621,14 +2666,17 @@ static int startMultiThreadInsertData(SDataBase* database,
tmfree(pThreadInfo->bindParams);
tmfree(pThreadInfo->is_null);
break;

case TAOSC_IFACE:
if (stbInfo->interlaceRows > 0) {
free_ds(&pThreadInfo->buffer);
} else {
tmfree(pThreadInfo->buffer);
pThreadInfo->buffer = NULL;
}
closeBenchConn(pThreadInfo->conn);
break;

default:
break;
}
Expand All @@ -2637,6 +2685,7 @@ static int startMultiThreadInsertData(SDataBase* database,
benchArrayAddBatch(total_delay_list, pThreadInfo->delayList->pData,
pThreadInfo->delayList->size);
tmfree(pThreadInfo->delayList);
pThreadInfo->delayList = NULL;
}
qsort(total_delay_list->pData, total_delay_list->size,
total_delay_list->elemSize, compare);
Expand Down
Loading

0 comments on commit 22627d7

Please sign in to comment.