Skip to content

Commit

Permalink
Merge pull request #22465 from taosdata/fix/3_liaohj
Browse files Browse the repository at this point in the history
fix(stream): add the back pressure for sink node.
  • Loading branch information
hjxilinx committed Aug 16, 2023
2 parents f135907 + 59d5858 commit f8f8e7a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 1 deletion.
7 changes: 7 additions & 0 deletions source/dnode/vnode/src/tq/tqRestore.c
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}

// downstream task has blocked the output, stopped for a while
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}

*pScanIdle = false;

// seek the stored version and extract data from WAL
Expand Down
2 changes: 1 addition & 1 deletion source/libs/stream/src/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
msgLen, ver, total, size + msgLen/1048576.0);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total,
size);
Expand Down
6 changes: 6 additions & 0 deletions source/libs/stream/src/streamExec.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
return 0;
}

if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
qDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr);
taosMsleep(10000);
continue;
}

SSDataBlock* output = NULL;
uint64_t ts = 0;
code = qExecTask(exec, &output, &ts);
Expand Down

0 comments on commit f8f8e7a

Please sign in to comment.