Skip to content

Commit

Permalink
Merge pull request #23003 from taosdata/feat-pipelined-agg/TD-25620
Browse files Browse the repository at this point in the history
feat: pipelined agg when partition by col slimit
  • Loading branch information
dapan1121 committed Sep 22, 2023
2 parents e3048d9 + f2832b5 commit 94ad93f
Show file tree
Hide file tree
Showing 6 changed files with 386 additions and 4 deletions.
1 change: 1 addition & 0 deletions source/libs/command/inc/commandInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ extern "C" {
#define EXPLAIN_VGROUP_SLOT_FORMAT "vgroup_slot=%d,%d"
#define EXPLAIN_UID_SLOT_FORMAT "uid_slot=%d,%d"
#define EXPLAIN_SRC_SCAN_FORMAT "src_scan=%d,%d"
#define EXPLAIN_PLAN_BLOCKING "blocking=%d"

#define COMMAND_RESET_LOG "resetLog"
#define COMMAND_SCHEDULE_POLICY "schedulePolicy"
Expand Down
2 changes: 2 additions & 0 deletions source/libs/command/src/explain.c
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pAggNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pAggNode->node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pAggNode->node.pSlimit);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_PLAN_BLOCKING, !pAggNode->node.forceCreateNonBlockingOptr);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));

Expand Down
19 changes: 18 additions & 1 deletion source/libs/planner/src/planOptimizer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2843,10 +2843,26 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu
case QUERY_NODE_LOGIC_PLAN_AGG: {
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT &&
(isPartTagAgg((SAggLogicNode*)pNodeLimitPushTo) || isPartTableAgg((SAggLogicNode*)pNodeLimitPushTo))) {
// when part by tag, slimit will be cloned to agg, and it will be pipelined.
// when part by tag/tbname, slimit will be cloned to agg, and it will be pipelined.
// The scan below will do scanning with group order
return cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_SLIMIT);
}
// else if not part by tag and tbname, the partition node below indicates that results are sorted, the agg node can
// be pipelined.
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && LIST_LENGTH(pNodeLimitPushTo->pChildren) == 1) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNodeLimitPushTo->pChildren, 0);
if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_PARTITION) {
pNodeLimitPushTo->forceCreateNonBlockingOptr = true;
return cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_SLIMIT);
}
// Currently, partColOpt is executed after pushDownLimitOpt, and partColOpt will replace partition node with
// sort node.
// To avoid dependencies between these two optimizations, we add sort node too.
if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_SORT && ((SSortLogicNode*)pChild)->calcGroupId) {
pNodeLimitPushTo->forceCreateNonBlockingOptr = true;
return cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_SLIMIT);
}
}
break;
}
case QUERY_NODE_LOGIC_PLAN_SCAN:
Expand Down Expand Up @@ -3593,6 +3609,7 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
nodesListMakeAppend(&pSort->pSortKeys, (SNode*)pOrder);
pOrder->order = ORDER_ASC;
pOrder->pExpr = nodesCloneNode(node);
pOrder->nullOrder = NULL_ORDER_FIRST;
if (!pOrder->pExpr) code = TSDB_CODE_OUT_OF_MEMORY;
}
}
Expand Down
112 changes: 109 additions & 3 deletions source/libs/planner/src/planSpliter.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ typedef struct SSplitRule {

typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo);

static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput);

static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
Expand Down Expand Up @@ -883,15 +885,119 @@ static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitI
return code;
}

static SFunctionNode* createGroupKeyAggFunc(SColumnNode* pGroupCol) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (pFunc) {
strcpy(pFunc->functionName, "_group_key");
strcpy(pFunc->node.aliasName, pGroupCol->node.aliasName);
strcpy(pFunc->node.userAlias, pGroupCol->node.userAlias);
int32_t code = nodesListMakeStrictAppend(&pFunc->pParameterList, nodesCloneNode((SNode*)pGroupCol));
if (code == TSDB_CODE_SUCCESS) {
code = fmGetFuncInfo(pFunc, NULL, 0);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pFunc);
pFunc = NULL;
}
char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0};
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", pFunc->functionName, pFunc);
taosCreateMD5Hash(name, len);
strncpy(pFunc->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
}
return pFunc;
}

/**
* @brief For pipelined agg node, add a SortMergeNode to merge result from vnodes.
* For agg + partition, results are sorted by group id, use group sort.
* For agg + sort for group, results are sorted by partition keys, not group id, merges keys should be the same
* as partition keys
*/
static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* pInfo, SLogicNode* pChildAgg) {
bool groupSort = true;
SNodeList* pMergeKeys = NULL;
int32_t code = TSDB_CODE_SUCCESS;
bool sortForGroup = false;

if (pChildAgg->pChildren->length != 1) return TSDB_CODE_TSC_INTERNAL_ERROR;

SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pChildAgg->pChildren, 0);
if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_SORT) {
SSortLogicNode* pSort = (SSortLogicNode*)pChild;
if (pSort->calcGroupId) {
SNode *node, *node2;
groupSort = false;
sortForGroup = true;
SNodeList* extraAggFuncs = NULL;
uint32_t originalLen = LIST_LENGTH(pSort->node.pTargets), idx = 0;
code = stbSplCreateMergeKeys(pSort->pSortKeys, pSort->node.pTargets, &pMergeKeys);
if (TSDB_CODE_SUCCESS != code) return code;

// Create group_key func for all sort keys.
// We only need newly added nodes in pSort.node.pTargets when stbSplCreateMergeKeys
FOREACH(node, pSort->node.pTargets) {
if (idx++ < originalLen) continue;
SFunctionNode* pGroupKeyFunc = createGroupKeyAggFunc((SColumnNode*)node);
if (!pGroupKeyFunc) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
code = nodesListMakeStrictAppend(&extraAggFuncs, (SNode*)pGroupKeyFunc);
if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pGroupKeyFunc);
}
}

if (TSDB_CODE_SUCCESS == code) {
// add these extra group_key funcs into targets
code = createColumnByRewriteExprs(extraAggFuncs, &pChildAgg->pTargets);
}
if (code == TSDB_CODE_SUCCESS) {
nodesListAppendList(((SAggLogicNode*)pChildAgg)->pAggFuncs, extraAggFuncs);
extraAggFuncs = NULL;
}

if (code == TSDB_CODE_SUCCESS) {
FOREACH(node, pMergeKeys) {
SOrderByExprNode* pOrder = (SOrderByExprNode*)node;
SColumnNode* pCol = (SColumnNode*)pOrder->pExpr;
FOREACH(node2, ((SAggLogicNode*)pChildAgg)->pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)node2;
if (0 != strcmp(pFunc->functionName, "_group_key")) continue;
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
if (!nodesEqualNode(pParam, (SNode*)pCol)) continue;

// use the colName of group_key func to make sure finding the right slot id for merge keys.
strcpy(pCol->colName, pFunc->node.aliasName);
strcpy(pCol->node.aliasName, pFunc->node.aliasName);
memset(pCol->tableAlias, 0, TSDB_TABLE_NAME_LEN);
break;
}
}
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
nodesDestroyList(extraAggFuncs);
}
}
}
code = stbSplCreateMergeNode(pCtx, NULL, pInfo->pSplitNode, pMergeKeys, pChildAgg, groupSort);
if (TSDB_CODE_SUCCESS == code && sortForGroup) {
SMergeLogicNode* pMerge =
(SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
pMerge->inputWithGroupId = true;
}
return code;
}

static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartAgg = NULL;
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);


if (TSDB_CODE_SUCCESS == code) {
// if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
if ((SAggLogicNode*)pInfo->pSplitNode->pSlimit)
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, NULL, pPartAgg, true);
if (pInfo->pSplitNode->forceCreateNonBlockingOptr)
code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
else
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
} else {
Expand Down
4 changes: 4 additions & 0 deletions tests/parallel_test/cases.task
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col_agg.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col_agg.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col_agg.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col_agg.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
Expand Down

0 comments on commit 94ad93f

Please sign in to comment.