Skip to content

Commit

Permalink
Merge pull request #23710 from taosdata/fix/3_liaohj
Browse files Browse the repository at this point in the history
fix(stream): check the status before pause
  • Loading branch information
hjxilinx committed Nov 15, 2023
2 parents 90f674d + 1b89d4f commit f45c327
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 40 deletions.
2 changes: 0 additions & 2 deletions include/libs/stream/tstream.h
Expand Up @@ -140,8 +140,6 @@ typedef enum EStreamTaskEvent {
TASK_EVENT_RESUME = 0x9,
TASK_EVENT_HALT = 0xA,
TASK_EVENT_DROPPING = 0xB,
TASK_EVENT_SCAN_TSDB = 0xC,
TASK_EVENT_SCAN_WAL = 0xD,
} EStreamTaskEvent;

typedef struct {
Expand Down
100 changes: 62 additions & 38 deletions source/dnode/mnode/impl/src/mndStream.c
Expand Up @@ -1156,57 +1156,75 @@ static int32_t initStreamNodeList(SMnode* pMnode) {
return taosArrayGetSize(execInfo.pNodeList);
}

static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SStreamObj *pStream = NULL;
int32_t code = 0;

{ // check if the node update happens or not
int64_t ts = taosGetTimestampSec();
static bool taskNodeIsUpdated(SMnode* pMnode) {
// check if the node update happens or not
taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode);

taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode);
if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec();
taosThreadMutexUnlock(&execInfo.lock);
return false;
}

if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = ts;
return 0;
for (int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
taosThreadMutexUnlock(&execInfo.lock);
return true;
}
}

for(int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
return 0;
}
}
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes ready");
taosArrayDestroy(pNodeSnapshot);
taosThreadMutexUnlock(&execInfo.lock);
return 0;
}

bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes are ready, ignore the checkpoint") taosArrayDestroy(pNodeSnapshot);
return 0;
}
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
taosArrayDestroy(pNodeSnapshot);

SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
taosArrayDestroy(pNodeSnapshot);
if (nodeUpdated) {
mDebug("stream task not ready due to node update");
}

if (nodeUpdated) {
mDebug("stream task not ready due to node update, checkpoint not issued");
return 0;
}
taosThreadMutexUnlock(&execInfo.lock);
return nodeUpdated;
}

static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SStreamObj *pStream = NULL;
int32_t code = 0;

// check if the node update happens or not
bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mWarn("checkpoint ignore, stream task nodes update detected");
return -1;
}

{ // check if all tasks are in TASK_STATUS__READY status
bool ready = true;

taosThreadMutexLock(&execInfo.lock);

// no streams exists, abort
int32_t numOfTasks = taosArrayGetSize(execInfo.pTaskList);
if (numOfTasks <= 0) {
taosThreadMutexUnlock(&execInfo.lock);
return 0;
}

for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId * p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
Expand Down Expand Up @@ -1762,6 +1780,12 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}

bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mError("tasks are not ready for pause, node update detected");
return -1;
}

STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream");
if (pTrans == NULL) {
mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr());
Expand Down

0 comments on commit f45c327

Please sign in to comment.