Skip to content

Commit

Permalink
Revert "[TD-4533]<fix>: taosdemo resub if resubAfterConsume != -1 (#6243
Browse files Browse the repository at this point in the history
)"

This reverts commit 0c4075e.
  • Loading branch information
sangshuduo committed May 29, 2021
1 parent 5694f03 commit 9d40d4d
Showing 1 changed file with 12 additions and 40 deletions.
52 changes: 12 additions & 40 deletions src/kit/taosdemo/taosdemo.c
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ typedef struct SpecifiedQueryInfo_S {
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
int endAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char topic[MAX_QUERY_SQL_COUNT][32];
int consumed[MAX_QUERY_SQL_COUNT];
Expand All @@ -412,7 +411,6 @@ typedef struct SuperQueryInfo_S {
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume;
int endAfterConsume;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];

char* childTblName;
Expand Down Expand Up @@ -4378,26 +4376,16 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j],
sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);

cJSON* endAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "endAfterConsume");
if (endAfterConsume
&& endAfterConsume->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.endAfterConsume[j]
= endAfterConsume->valueint;
} else if (!endAfterConsume) {
// default value is -1, which mean infinite loop
g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1;
}

cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
if (resubAfterConsume
&& resubAfterConsume->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint;
} else if (!resubAfterConsume) {
// default value is -1, which mean do not resub
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = 1;
}

cJSON *result = cJSON_GetObjectItem(sql, "result");
Expand Down Expand Up @@ -4541,26 +4529,16 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
}

cJSON* superEndAfterConsume =
cJSON_GetObjectItem(superQuery, "endAfterConsume");
if (superEndAfterConsume
&& superEndAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.endAfterConsume =
superEndAfterConsume->valueint;
} else if (!superEndAfterConsume) {
// default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.endAfterConsume = -1;
}

cJSON* superResubAfterConsume =
cJSON_GetObjectItem(superQuery, "endAfterConsume");
cJSON_GetObjectItem(superQuery, "resubAfterConsume");
if (superResubAfterConsume
&& superResubAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.endAfterConsume =
g_queryInfo.superQueryInfo.resubAfterConsume =
superResubAfterConsume->valueint;
} else if (!superResubAfterConsume) {
// default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.endAfterConsume = -1;
//printf("failed to read json, subscribe interval no found\n");
////goto PARSE_OVER;
g_queryInfo.superQueryInfo.resubAfterConsume = 1;
}

// supert table sqls
Expand Down Expand Up @@ -7284,10 +7262,7 @@ static void *superSubscribe(void *sarg) {

uint64_t st = 0, et = 0;

while ((g_queryInfo.superQueryInfo.endAfterConsume == -1)
|| (g_queryInfo.superQueryInfo.endAfterConsume <
consumed[pThreadInfo->end_table_to - pThreadInfo->start_table_from])) {

while(1) {
for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) {
tsubSeq = i - pThreadInfo->start_table_from;
Expand Down Expand Up @@ -7316,7 +7291,7 @@ static void *superSubscribe(void *sarg) {
}
consumed[tsubSeq] ++;

if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1)
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
&& (consumed[tsubSeq] >=
g_queryInfo.superQueryInfo.resubAfterConsume)) {
printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
Expand Down Expand Up @@ -7398,10 +7373,7 @@ static void *specifiedSubscribe(void *sarg) {
// start loop to consume result

g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
while((g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq] == -1)
|| (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] <
g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq])) {

while(1) {
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue;
}
Expand All @@ -7420,7 +7392,7 @@ static void *specifiedSubscribe(void *sarg) {
}

g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++;
if ((g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq] != -1)
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
&& (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub specified query: %"PRIu64"\n",
Expand Down

0 comments on commit 9d40d4d

Please sign in to comment.