From 5dc80c0bb1818a5c690c74ed0eb884f2f40b296f Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 30 Dec 2022 11:20:25 +0800 Subject: [PATCH] fix: invalid msg order issue --- source/libs/qworker/src/qworker.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 6513845035d..cf6e251d723 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -681,6 +681,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { void *rsp = NULL; int32_t dataLen = 0; bool queryStop = false; + bool qComplete = false; do { ctx = NULL; @@ -705,11 +706,12 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (rsp) { - bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); + atomic_store_8((int8_t *)&ctx->queryContinue, 0); } qwMsg->connInfo = ctx->dataConnInfo; @@ -743,8 +745,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } QW_LOCK(QW_WRITE, &ctx->lock); - if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || - 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { + if (qComplete || (queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code) { // Note: query is not running anymore QW_SET_PHASE(ctx, 0); QW_UNLOCK(QW_WRITE, &ctx->lock);