Skip to content

Commit

Permalink
Merge pull request #22476 from taosdata/fix/3_liaohj
Browse files Browse the repository at this point in the history
fix(stream): pause when outputQ is blocked.
  • Loading branch information
hjxilinx committed Aug 18, 2023
2 parents a9d88b7 + 9e9d83d commit 1087224
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
3 changes: 3 additions & 0 deletions source/libs/stream/inc/streamInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
extern "C" {
#endif

#define ONE_MB_F (1048576.0)
#define SIZE_IN_MB(_v) ((_v) / ONE_MB_F)

typedef struct {
int8_t inited;
void* timer;
Expand Down
11 changes: 5 additions & 6 deletions source/libs/stream/src/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define ONE_MB_F (1048576.0)
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)

#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
SStreamGlobalEnv streamEnv;

int32_t streamInit() {
Expand Down Expand Up @@ -178,7 +177,6 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

// todo add log
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int32_t code = 0;
int32_t type = pTask->outputInfo.type;
Expand All @@ -191,11 +189,12 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
} else {
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
if (code != 0) { // todo failed to add it into the output queue, free it.
return code;
if (code != 0) {
qError("s-task:%s failed to put res into outputQ", pTask->id.idStr);
}

streamDispatchStreamBlock(pTask);
return code;
}

return 0;
Expand Down Expand Up @@ -359,7 +358,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {

// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
msgLen, ver, total, size + msgLen/1048576.0);
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) {
Expand Down
16 changes: 11 additions & 5 deletions source/libs/stream/src/streamExec.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
#define STREAM_RESULT_DUMP_THRESHOLD 100

static int32_t updateCheckPointInfo(SStreamTask* pTask);
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
Expand Down Expand Up @@ -51,7 +51,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
}

qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
size / 1048576.0);
SIZE_IN_MB(size));

code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
Expand Down Expand Up @@ -90,6 +90,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return 0;
}

if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr);
taosMsleep(1000);
continue;
}

SSDataBlock* output = NULL;
uint64_t ts = 0;
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
Expand Down Expand Up @@ -137,10 +143,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
taosArrayPush(pRes, &block);

qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
pTask->info.selfChildId, numOfBlocks, size / 1048576.0);
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size));

// current output should be dispatched to down stream nodes
if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
if (code != TSDB_CODE_SUCCESS) {
Expand Down Expand Up @@ -636,7 +642,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {

double el = (taosGetTimestampMs() - st) / 1000.0;
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
id, el, resSize / 1048576.0, totalBlocks);
id, el, SIZE_IN_MB(resSize), totalBlocks);

streamFreeQitem(pInput);
}
Expand Down

0 comments on commit 1087224

Please sign in to comment.