Skip to content

Commit

Permalink
Merge pull request #22866 from taosdata/fix/3_liaohj
Browse files Browse the repository at this point in the history
refactor: do  some internal refactor.
  • Loading branch information
hjxilinx committed Sep 12, 2023
2 parents 4f6f953 + 13c20fa commit 4325a0a
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 66 deletions.
6 changes: 2 additions & 4 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,12 @@ typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
typedef struct {
int8_t type;
int64_t ver;
int32_t* dataRef;
SPackedData submit;
} SStreamDataSubmit;

typedef struct {
int8_t type;
int64_t ver;
SArray* dataRefs; // SArray<int32_t*>
SArray* submits; // SArray<SPackedSubmit>
} SStreamMergedSubmit;

Expand Down Expand Up @@ -672,7 +670,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
int32_t streamTaskGetInputQItems(const SStreamTask* pTask);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);

// common
int32_t streamRestoreParam(SStreamTask* pTask);
Expand All @@ -696,7 +694,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
// source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);

// agg level
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/tq/tq.c
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
ASSERT(pTask->status.pauseAllowed == true);
}

streamSourceScanHistoryData(pTask);
streamScanHistoryData(pTask);
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/tq/tqSink.c
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqDebug("s-task:%s tq sink pipe2, row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
tqDebug("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
}

if (IS_SET_NULL(pCol)) {
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/tq/tqStreamTask.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}

int32_t numOfItems = streamTaskGetInputQItems(pTask);
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputInfo.queue);
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;

taosThreadMutexLock(&pTask->lock);
Expand Down
13 changes: 10 additions & 3 deletions source/libs/stream/src/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
}

int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock));
int8_t status = TASK_INPUT_STATUS__NORMAL;

// enqueue
Expand Down Expand Up @@ -223,9 +223,16 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
destroyStreamDataBlock(pBlock);
} else {
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock);
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;
code = taosWriteQitem(pQueue, pBlock);

int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
if (code != 0) {
qError("s-task:%s failed to put res into outputQ", pTask->id.idStr);
qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
pTask->id.idStr, total, size, tstrerror(code));
} else {
qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
}

streamDispatchStreamBlock(pTask);
Expand Down
38 changes: 8 additions & 30 deletions source/libs/stream/src/streamData.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,28 +115,16 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
return NULL;
}

pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
if (pDataSubmit->dataRef == NULL) {
taosFreeQitem(pDataSubmit);
return NULL;
}

pDataSubmit->ver = pData->ver;
pDataSubmit->submit = *pData;
*pDataSubmit->dataRef = 1; // initialize the reference count to be 1
pDataSubmit->type = type;

return pDataSubmit;
}

void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);

if (ref == 0) {
taosMemoryFree(pDataSubmit->submit.msgStr);
taosMemoryFree(pDataSubmit->dataRef);
}
ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
taosMemoryFree(pDataSubmit->submit.msgStr);
}

SStreamMergedSubmit* streamMergedSubmitNew() {
Expand All @@ -146,11 +134,8 @@ SStreamMergedSubmit* streamMergedSubmitNew() {
}

pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));

if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
if (pMerged->submits == NULL) {
taosArrayDestroy(pMerged->submits);
taosArrayDestroy(pMerged->dataRefs);
taosFreeQitem(pMerged);
return NULL;
}
Expand All @@ -160,9 +145,10 @@ SStreamMergedSubmit* streamMergedSubmitNew() {
}

int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
taosArrayPush(pMerged->submits, &pSubmit->submit);
pMerged->ver = pSubmit->ver;
if (pSubmit->ver > pMerged->ver) {
pMerged->ver = pSubmit->ver;
}
return 0;
}

Expand Down Expand Up @@ -222,18 +208,10 @@ void streamFreeQitem(SStreamQueueItem* data) {

int32_t sz = taosArrayGetSize(pMerge->submits);
for (int32_t i = 0; i < sz; i++) {
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0);

if (ref == 0) {
SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
taosMemoryFree(pSubmit->msgStr);
taosMemoryFree(pRef);
}
SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
taosMemoryFree(pSubmit->msgStr);
}
taosArrayDestroy(pMerge->submits);
taosArrayDestroy(pMerge->dataRefs);
taosFreeQitem(pMerge);
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
Expand Down
5 changes: 3 additions & 2 deletions source/libs/stream/src/streamDispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));

const char* id = pTask->id.idStr;
int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->pQueue);
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
if (numOfElems > 0) {
qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", id, numOfElems);
qDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
}

// to make sure only one dispatch is running
Expand Down
18 changes: 5 additions & 13 deletions source/libs/stream/src/streamExec.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return code;
}

int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
int32_t streamScanHistoryData(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);

int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
bool finished = false;
int32_t outputBatchSize = 100;

qSetStreamOpOpen(exec);

Expand Down Expand Up @@ -217,8 +219,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);

if ((++numOfBlocks) >= batchSize) {
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, batchSize);
if ((++numOfBlocks) >= outputBatchSize) {
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, outputBatchSize);
break;
}
}
Expand Down Expand Up @@ -248,13 +250,6 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
return 0;
}

int32_t streamTaskGetInputQItems(const SStreamTask* pTask) {
int32_t numOfItems1 = taosQueueItemSize(pTask->inputInfo.queue->pQueue);
int32_t numOfItems2 = taosQallItemSize(pTask->inputInfo.queue->qall);

return numOfItems1 + numOfItems2;
}

// wait for the stream task to be idle
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
const char* id = pTask->id.idStr;
Expand Down Expand Up @@ -576,7 +571,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) {
// ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskBuildCheckpoint(pTask);
Expand Down Expand Up @@ -608,8 +602,6 @@ int32_t streamTryExec(SStreamTask* pTask) {
return -1;
}

// streamTaskBuildCheckpoint(pTask);

atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->status.schedStatus);
Expand Down
31 changes: 24 additions & 7 deletions source/libs/stream/src/streamQueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ bool streamQueueIsFull(const STaosQueue* pQueue) {
return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
}

int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue);
int32_t numOfItems2 = taosQallItemSize(pQueue->qall);

return numOfItems1 + numOfItems2;
}

int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5;
Expand Down Expand Up @@ -269,14 +276,14 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
int8_t type = pItem->type;
STaosQueue* pQueue = pTask->inputInfo.queue->pQueue;
int32_t total = taosQueueItemSize(pQueue) + 1;
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));

if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) {
// qError(
// "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
qTrace(
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
streamDataSubmitDestroy(px);
taosFreeQitem(pItem);
return -1;
Expand All @@ -292,31 +299,39 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return code;
}

double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));

// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
if (streamQueueIsFull(pQueue)) {
// qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
// pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));

qTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
destroyStreamDataBlock((SStreamDataBlock*)pItem);
return -1;
}

qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock((SStreamDataBlock*)pItem);
return code;
}

double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE) {
int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pItem);
return code;
}

double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
} else if (type == STREAM_INPUT__GET_RES) {
Expand All @@ -326,6 +341,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
taosFreeQitem(pItem);
return code;
}

double size = SIZE_IN_MB(taosQueueMemorySize(pQueue));
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
} else {
ASSERT(0);
Expand Down
4 changes: 0 additions & 4 deletions source/libs/stream/src/streamRecover.c
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,6 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8
return 0;
}

int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
return streamScanExec(pTask, 100);
}

int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
Expand Down

0 comments on commit 4325a0a

Please sign in to comment.