Skip to content

Commit

Permalink
pause source task
Browse files Browse the repository at this point in the history
  • Loading branch information
54liuyao committed May 16, 2023
1 parent ad7509b commit 3c81137
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
4 changes: 2 additions & 2 deletions source/dnode/mnode/impl/src/mndStream.c
Expand Up @@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndPauseStreamTask(pTrans, pTask) < 0) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) {
return -1;
}
}
Expand Down Expand Up @@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1;
}
}
Expand Down
4 changes: 2 additions & 2 deletions source/libs/stream/src/streamExec.c
Expand Up @@ -35,7 +35,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*

while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus);
if (status != TASK_STATUS__NORMAL) {
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
atomic_load_8(&pTask->status.taskStatus));
taosMsleep(2);
Expand Down Expand Up @@ -408,7 +408,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed", pTask->id.idStr);

if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) {
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
}
}
Expand Down

0 comments on commit 3c81137

Please sign in to comment.