Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enh(stream): wait for the outputQ is available for new result blocks. #22876

Merged
merged 4 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 2 additions & 3 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,9 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo)
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId);

int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock);
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask);
bool streamQueueIsFull(const STaosQueue* pQueue);
bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ);

typedef struct {
SMsgHead head;
Expand Down Expand Up @@ -645,12 +646,10 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);

int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);

char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/tq/tqStreamTask.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}

if (streamQueueIsFull(pTask->inputInfo.queue->pQueue)) {
if (streamQueueIsFull(pTask->inputInfo.queue->pQueue, true)) {
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
Expand Down
30 changes: 0 additions & 30 deletions source/libs/stream/src/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,36 +212,6 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int32_t code = 0;
int32_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else if (type == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else {
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;
code = taosWriteQitem(pQueue, pBlock);

int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
if (code != 0) {
qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
pTask->id.idStr, total, size, tstrerror(code));
} else {
qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
}

streamDispatchStreamBlock(pTask);
return code;
}

return 0;
}

int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
Expand Down
2 changes: 1 addition & 1 deletion source/libs/stream/src/streamDispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {

const char* id = pTask->id.idStr;
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
if (numOfElems > 0) {
double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
qDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
}

Expand Down
67 changes: 46 additions & 21 deletions source/libs/stream/src/streamExec.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,58 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) {
return (status == TASK_STATUS__PAUSE);
}

static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int32_t code = 0;
int32_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else if (type == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else {
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
code = streamTaskPutDataIntoOutputQ(pTask, pBlock);
if (code != TSDB_CODE_SUCCESS) {
return code;
}

streamDispatchStreamBlock(pTask);
return code;
}

return 0;
}

static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
int32_t* totalBlocks) {
int32_t numOfBlocks = taosArrayGetSize(pRes);
if (numOfBlocks > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
if (pStreamBlocks == NULL) {
qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return -1;
}
if (numOfBlocks == 0) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return TSDB_CODE_SUCCESS;
}

qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
SIZE_IN_MB(size));
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
if (pStreamBlocks == NULL) {
qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return TSDB_CODE_OUT_OF_MEMORY;
}

int32_t code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
destroyStreamDataBlock(pStreamBlocks);
return -1;
}
qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
SIZE_IN_MB(size));

*totalSize += size;
*totalBlocks += numOfBlocks;
} else {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
//code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY
destroyStreamDataBlock(pStreamBlocks);
return code;
}

return TSDB_CODE_SUCCESS;
*totalSize += size;
*totalBlocks += numOfBlocks;

return code;
}

static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
Expand Down Expand Up @@ -236,7 +261,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;

code = streamTaskOutputResultBlock(pTask, qRes);
code = doOutputResultBlockImpl(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
Expand Down Expand Up @@ -536,7 +561,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {

if (type == STREAM_INPUT__DATA_BLOCK) {
qDebug("s-task:%s sink task start to sink %d blocks", id, numOfBlocks);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
continue;
}
}
Expand Down
64 changes: 51 additions & 13 deletions source/libs/stream/src/streamQueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@

#include "streamInt.h"

#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_TASK_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50)

// todo refactor:
// read data from input queue
Expand Down Expand Up @@ -159,10 +160,15 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
}
#endif

bool streamQueueIsFull(const STaosQueue* pQueue) {
bool isFull = taosQueueItemSize((STaosQueue*) pQueue) >= STREAM_TASK_INPUT_QUEUE_CAPACITY;
double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*) pQueue));
return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ) {
bool isFull = taosQueueItemSize((STaosQueue*)pQueue) >= STREAM_TASK_QUEUE_CAPACITY;
if (isFull) {
return true;
}

int32_t threahold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE;
double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*)pQueue));
return (size >= threahold);
}

int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
Expand Down Expand Up @@ -275,15 +281,15 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type;
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
int32_t total = taosQueueItemSize(pQueue) + 1;
int32_t total = streamQueueGetNumOfItems(pTask->inputInfo.queue) + 1;

if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue, true)) {
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
qTrace(
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
streamDataSubmitDestroy(px);
taosFreeQitem(pItem);
return -1;
Expand All @@ -306,11 +312,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if (streamQueueIsFull(pQueue)) {
if (streamQueueIsFull(pQueue, true)) {
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));

qTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
destroyStreamDataBlock((SStreamDataBlock*)pItem);
return -1;
}
Expand Down Expand Up @@ -356,6 +362,38 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return 0;
}

// the result should be put into the outputQ in any cases, otherwise, the result may be lost
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;

while (streamQueueIsFull(pQueue, false)) {
if (streamTaskShouldStop(&pTask->status)) {
qInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
return TSDB_CODE_STREAM_EXEC_CANCELLED;
}

int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
// let's wait for there are enough space to hold this result pBlock
qDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr,
total, size);
taosMsleep(500);
}

int32_t code = taosWriteQitem(pQueue, pBlock);

int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
if (code != 0) {
qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
pTask->id.idStr, total + 1, size, tstrerror(code));
} else {
qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
}

return TSDB_CODE_SUCCESS;
}

int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) {
if (cap < 100 || rate < 50 || pBucket == NULL) {
qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate);
Expand Down