Skip to content

Commit

Permalink
Merge pull request #22877 from taosdata/fix/3_1_patch_liaohj
Browse files Browse the repository at this point in the history
refactor: do some internal refactor.
  • Loading branch information
hjxilinx committed Sep 13, 2023
2 parents 4aa78ce + bafa2df commit 0d0a45b
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 69 deletions.
5 changes: 2 additions & 3 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,9 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo)
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId);

int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock);
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask);
bool streamQueueIsFull(const STaosQueue* pQueue);
bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ);

typedef struct {
SMsgHead head;
Expand Down Expand Up @@ -645,12 +646,10 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);

int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);

char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/tq/tqStreamTask.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}

if (streamQueueIsFull(pTask->inputInfo.queue->pQueue)) {
if (streamQueueIsFull(pTask->inputInfo.queue->pQueue, true)) {
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
Expand Down
30 changes: 0 additions & 30 deletions source/libs/stream/src/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,36 +212,6 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}

int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int32_t code = 0;
int32_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else if (type == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else {
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;
code = taosWriteQitem(pQueue, pBlock);

int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
if (code != 0) {
qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
pTask->id.idStr, total, size, tstrerror(code));
} else {
qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
}

streamDispatchStreamBlock(pTask);
return code;
}

return 0;
}

int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
Expand Down
2 changes: 1 addition & 1 deletion source/libs/stream/src/streamDispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,8 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {

const char* id = pTask->id.idStr;
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
if (numOfElems > 0) {
double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
qDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
}

Expand Down
67 changes: 46 additions & 21 deletions source/libs/stream/src/streamExec.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,58 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) {
return (status == TASK_STATUS__PAUSE);
}

static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int32_t code = 0;
int32_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else if (type == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else {
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
code = streamTaskPutDataIntoOutputQ(pTask, pBlock);
if (code != TSDB_CODE_SUCCESS) {
return code;
}

streamDispatchStreamBlock(pTask);
return code;
}

return 0;
}

static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
int32_t* totalBlocks) {
int32_t numOfBlocks = taosArrayGetSize(pRes);
if (numOfBlocks > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
if (pStreamBlocks == NULL) {
qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return -1;
}
if (numOfBlocks == 0) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return TSDB_CODE_SUCCESS;
}

qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
SIZE_IN_MB(size));
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
if (pStreamBlocks == NULL) {
qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return TSDB_CODE_OUT_OF_MEMORY;
}

int32_t code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
destroyStreamDataBlock(pStreamBlocks);
return -1;
}
qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
SIZE_IN_MB(size));

*totalSize += size;
*totalBlocks += numOfBlocks;
} else {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
//code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY
destroyStreamDataBlock(pStreamBlocks);
return code;
}

return TSDB_CODE_SUCCESS;
*totalSize += size;
*totalBlocks += numOfBlocks;

return code;
}

static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
Expand Down Expand Up @@ -236,7 +261,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;

code = streamTaskOutputResultBlock(pTask, qRes);
code = doOutputResultBlockImpl(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
Expand Down Expand Up @@ -536,7 +561,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {

if (type == STREAM_INPUT__DATA_BLOCK) {
qDebug("s-task:%s sink task start to sink %d blocks", id, numOfBlocks);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
continue;
}
}
Expand Down
64 changes: 51 additions & 13 deletions source/libs/stream/src/streamQueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@

#include "streamInt.h"

#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_TASK_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50)

// todo refactor:
// read data from input queue
Expand Down Expand Up @@ -159,10 +160,15 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
}
#endif

bool streamQueueIsFull(const STaosQueue* pQueue) {
bool isFull = taosQueueItemSize((STaosQueue*) pQueue) >= STREAM_TASK_INPUT_QUEUE_CAPACITY;
double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*) pQueue));
return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ) {
bool isFull = taosQueueItemSize((STaosQueue*)pQueue) >= STREAM_TASK_QUEUE_CAPACITY;
if (isFull) {
return true;
}

int32_t threahold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE;
double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*)pQueue));
return (size >= threahold);
}

int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
Expand Down Expand Up @@ -275,15 +281,15 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type;
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
int32_t total = taosQueueItemSize(pQueue) + 1;
int32_t total = streamQueueGetNumOfItems(pTask->inputInfo.queue) + 1;

if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue, true)) {
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
qTrace(
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
streamDataSubmitDestroy(px);
taosFreeQitem(pItem);
return -1;
Expand All @@ -306,11 +312,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if (streamQueueIsFull(pQueue)) {
if (streamQueueIsFull(pQueue, true)) {
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));

qTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
destroyStreamDataBlock((SStreamDataBlock*)pItem);
return -1;
}
Expand Down Expand Up @@ -356,6 +362,38 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return 0;
}

// the result should be put into the outputQ in any cases, otherwise, the result may be lost
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;

while (streamQueueIsFull(pQueue, false)) {
if (streamTaskShouldStop(&pTask->status)) {
qInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
return TSDB_CODE_STREAM_EXEC_CANCELLED;
}

int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
// let's wait for there are enough space to hold this result pBlock
qDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr,
total, size);
taosMsleep(500);
}

int32_t code = taosWriteQitem(pQueue, pBlock);

int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
if (code != 0) {
qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
pTask->id.idStr, total + 1, size, tstrerror(code));
} else {
qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
}

return TSDB_CODE_SUCCESS;
}

int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) {
if (cap < 100 || rate < 50 || pBucket == NULL) {
qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate);
Expand Down

0 comments on commit 0d0a45b

Please sign in to comment.