Skip to content

Commit

Permalink
Merge pull request #23455 from taosdata/fix/liaohj
Browse files Browse the repository at this point in the history
refactor: do some internal refactor.
  • Loading branch information
hjxilinx committed Nov 1, 2023
2 parents 06bf52e + 0fb4cfd commit 0cf0ff3
Show file tree
Hide file tree
Showing 21 changed files with 429 additions and 260 deletions.
30 changes: 15 additions & 15 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ typedef struct {
SUseDbRsp dbInfo;
} STaskDispatcherShuffle;

typedef struct {
int32_t nodeId;
SEpSet epset;
} SDownstreamTaskEpset;

typedef struct {
int64_t stbUid;
char stbFullName[TSDB_TABLE_FNAME_LEN];
Expand Down Expand Up @@ -327,15 +332,10 @@ typedef struct SDispatchMsgInfo {
void* pTimer; // used to dispatch data after a given time duration
} SDispatchMsgInfo;

typedef struct STaskOutputQueue {
typedef struct STaskQueue {
int8_t status;
SStreamQueue* queue;
} STaskOutputQueue;

typedef struct STaskInputInfo {
int8_t status;
SStreamQueue* queue;
} STaskInputInfo;
} STaskQueue;

typedef struct STaskSchedInfo {
int8_t status;
Expand Down Expand Up @@ -384,6 +384,7 @@ typedef struct STaskOutputInfo {
};
int8_t type;
STokenBucket* pTokenBucket;
SArray* pDownstreamUpdateList;
} STaskOutputInfo;

typedef struct SUpstreamInfo {
Expand All @@ -395,8 +396,8 @@ struct SStreamTask {
int64_t ver;
SStreamTaskId id;
SSTaskBasicInfo info;
STaskOutputQueue outputq;
STaskInputInfo inputInfo;
STaskQueue outputq;
STaskQueue inputq;
STaskSchedInfo schedInfo;
STaskOutputInfo outputInfo;
SDispatchMsgInfo msgInfo;
Expand Down Expand Up @@ -431,7 +432,7 @@ struct SStreamTask {
typedef struct STaskStartInfo {
int64_t startTs;
int64_t readyTs;
int32_t startedAfterNodeUpdate;
int32_t startAllTasksFlag;
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
int32_t elapsedTime;
} STaskStartInfo;
Expand Down Expand Up @@ -645,7 +646,8 @@ typedef struct STaskStatusEntry {
typedef struct SStreamHbMsg {
int32_t vgId;
int32_t numOfTasks;
SArray* pTaskStatus; // SArray<SStreamTaskStatusEntry>
SArray* pTaskStatus; // SArray<STaskStatusEntry>
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
} SStreamHbMsg;

int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
Expand Down Expand Up @@ -733,7 +735,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);

//int32_t streamTaskStartScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
Expand All @@ -744,7 +745,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask);

int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
void streamTaskRestoreStatus(SStreamTask* pTask);

int32_t streamTaskStop(SStreamTask* pTask);
Expand Down Expand Up @@ -796,7 +797,6 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta);
Expand All @@ -806,12 +806,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask);

// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask);

int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
Expand Down
153 changes: 111 additions & 42 deletions source/dnode/mnode/impl/src/mndStream.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady);

static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);

Expand All @@ -91,6 +91,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
static int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList);

int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
Expand Down Expand Up @@ -1156,7 +1157,13 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
}
}

SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes are ready, ignore the checkpoint")
taosArrayDestroy(pNodeSnapshot);
return 0;
}

SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
Expand Down Expand Up @@ -2059,11 +2066,12 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
return info;
}

static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) {
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SVgObj *pVgroup = NULL;

*allReady = true;
SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry));

while (1) {
Expand All @@ -2075,7 +2083,22 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) {
SNodeEntry entry = {0};
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
entry.nodeId = pVgroup->vgId;
entry.hbTimestamp = -1;
entry.hbTimestamp = pVgroup->updateTime;

if (*allReady) {
for (int32_t i = 0; i < pVgroup->replica; ++i) {
if (!pVgroup->vnodeGid[i].syncRestore) {
*allReady = false;
break;
}

ESyncState state = pVgroup->vnodeGid[i].syncState;
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
*allReady = false;
break;
}
}
}

char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
Expand Down Expand Up @@ -2119,7 +2142,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
continue;
}

mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans", pStream->uid, pStream->name);
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
pStream->name, pTrans->id);

int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans);

// todo: not continue, drop all and retry again
Expand Down Expand Up @@ -2216,23 +2241,26 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
}
}

static int32_t doRemoveTasks(SStreamExecInfo* pExecNode, STaskId* pRemovedId) {
static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p == NULL) {
return TSDB_CODE_SUCCESS;
}

if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));

for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId* pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t) pRemovedId->taskId,
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
break;
}
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);

int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num);
break;
}
}
return 0;

return TSDB_CODE_SUCCESS;
}

static bool taskNodeExists(SArray* pList, int32_t nodeId) {
Expand Down Expand Up @@ -2319,7 +2347,14 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0;
}

SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
bool allVnodeReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady);
if (!allVnodeReady) {
taosArrayDestroy(pNodeSnapshot);
atomic_store_32(&mndNodeCheckSentinel, 0);
mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
return 0;
}

taosThreadMutexLock(&execInfo.lock);
removeExpirednodeEntryAndTask(pNodeSnapshot);
Expand Down Expand Up @@ -2359,10 +2394,6 @@ typedef struct SMStreamNodeCheckMsg {
int8_t placeHolder; // // to fix windows compile error, define place holder
} SMStreamNodeCheckMsg;

typedef struct SMStreamTaskResetMsg {
int8_t placeHolder;
} SMStreamTaskResetMsg;

static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
Expand Down Expand Up @@ -2577,6 +2608,43 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) {
return 0;
}

int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);

for (int k = 0; k < num; ++k) {
int32_t* pVgId = taosArrayGet(pNodeList, k);

int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for (int i = 0; i < numOfNodes; ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);

if (pNodeEntry->nodeId == *pVgId) {
mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId);
pNodeEntry->stageUpdated = true;
break;
}
}
}

return TSDB_CODE_SUCCESS;
}

static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {

mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
pTaskEntry->stage, stage, pTaskEntry->id.taskId);

pNodeEntry->stageUpdated = true;
pTaskEntry->stage = stage;
break;
}
}
}

int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
Expand All @@ -2602,29 +2670,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
doExtractTasksFromStream(pMnode);
}

setNodeEpsetExpiredFlag(req.pUpdateNodes);

for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pEntry == NULL) {
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
continue;
}

if (p->stage != pEntry->stage && pEntry->stage != -1) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
if (pNodeEntry->nodeId == pEntry->nodeId) {
mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64,
pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId);

pNodeEntry->stageUpdated = true;
pEntry->stage = p->stage;
break;
}
}
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage);
} else {
streamTaskStatusCopy(pEntry, p);
streamTaskStatusCopy(pTaskEntry, p);
if (p->activeCheckpointId != 0) {
if (activeCheckpointId != 0) {
ASSERT(activeCheckpointId == p->activeCheckpointId);
Expand All @@ -2638,7 +2697,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
}

pEntry->status = p->status;
pTaskEntry->status = p->status;
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
}
Expand All @@ -2647,13 +2706,23 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (checkpointFailed && activeCheckpointId != 0) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", execInfo.activeCheckpoint);
mndResetFromCheckpoint(pMnode);
bool allReady = true;
SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);

if (allReady) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status",
execInfo.activeCheckpoint);
mndResetFromCheckpoint(pMnode);
} else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks");
}
}

taosThreadMutexUnlock(&execInfo.lock);

taosArrayDestroy(req.pTaskStatus);
taosArrayDestroy(req.pUpdateNodes);
return TSDB_CODE_SUCCESS;
}
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/inc/tq.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int32_t tqStartStreamTask(STQ* pTq);
int32_t tqStartStreamTasks(STQ* pTq);
int32_t tqResetStreamTaskStatus(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq);

// tq util
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/inc/vnodeInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq);
int32_t tqLaunchStreamTaskAsync(STQ* pTq);

int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
Expand Down

0 comments on commit 0cf0ff3

Please sign in to comment.