Skip to content

Commit

Permalink
refactor stream session window
Browse files Browse the repository at this point in the history
  • Loading branch information
54liuyao committed Aug 14, 2023
1 parent 4b435e2 commit d1c2898
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 162 deletions.
7 changes: 4 additions & 3 deletions source/libs/executor/inc/executil.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,10 @@ void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);

int32_t convertFillType(int32_t mode);
int32_t resultrowComparAsc(const void* p1, const void* p2);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI *pAPI);

void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI);
char* getStreamOpName(uint16_t opType);
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr);
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr);

void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
Expand Down
53 changes: 53 additions & 0 deletions source/libs/executor/src/executil.c
Original file line number Diff line number Diff line change
Expand Up @@ -2178,6 +2178,45 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return TSDB_CODE_SUCCESS;
}

char* getStreamOpName(uint16_t opType) {
switch (opType) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: {
return "stream scan";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: {
return "interval single";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: {
return "interval final";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: {
return "interval semi";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: {
return "stream fill";
}
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: {
return "session single";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: {
return "session semi";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: {
return "session final";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: {
return "state single";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: {
return "stream partitionby";
};
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: {
return "stream event";
};
}
return "";
}

void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
Expand All @@ -2188,6 +2227,20 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr
taosMemoryFree(pBuf);
}

void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
return;
}
if (qDebugFlag & DEBUG_DEBUG) {
char* pBuf = NULL;
char flagBuf[64];
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr));
taosMemoryFree(pBuf);
}
}

TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }

void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
Expand Down
12 changes: 6 additions & 6 deletions source/libs/executor/src/filloperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
(pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
}
if (pOperator->status == OP_RES_TO_RETURN) {
doDeleteFillFinalize(pOperator);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
setOperatorCompleted(pOperator);
Expand All @@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
pInfo->pFillInfo->preRowKey = INT64_MIN;
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
break;
}
printDataBlock(pBlock, "stream fill recv", GET_TASKID(pTaskInfo));
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));

if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
Expand All @@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pInfo->pFillSup->hasDelete = true;
doDeleteFillResult(pOperator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "stream fill delete", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
continue;
Expand Down Expand Up @@ -1378,7 +1378,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}

pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
printDataBlock(pInfo->pRes, "stream fill", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}

Expand Down
6 changes: 3 additions & 3 deletions source/libs/executor/src/groupoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pDest->info.rows;
pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
ASSERT(pDest->info.rows > 0);
printDataBlock(pDest, "stream partitionby", GET_TASKID(pTaskInfo));
printDataBlock(pDest, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pDest;
}

Expand Down Expand Up @@ -1116,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
return NULL;
}
printDataBlock(pBlock, "stream partitionby recv", GET_TASKID(pTaskInfo));
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
switch (pBlock->info.type) {
case STREAM_NORMAL:
case STREAM_PULL_DATA:
Expand All @@ -1126,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case STREAM_DELETE_DATA: {
copyDataBlock(pInfo->pDelRes, pBlock);
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pInfo->pDelRes, "stream partitionby delete", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
} break;
default:
Expand Down

0 comments on commit d1c2898

Please sign in to comment.