Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pipelined agg when partition by col slimit #23003

Merged
merged 1 commit into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions source/libs/command/inc/commandInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ extern "C" {
#define EXPLAIN_VGROUP_SLOT_FORMAT "vgroup_slot=%d,%d"
#define EXPLAIN_UID_SLOT_FORMAT "uid_slot=%d,%d"
#define EXPLAIN_SRC_SCAN_FORMAT "src_scan=%d,%d"
#define EXPLAIN_PLAN_BLOCKING "blocking=%d"

#define COMMAND_RESET_LOG "resetLog"
#define COMMAND_SCHEDULE_POLICY "schedulePolicy"
Expand Down
2 changes: 2 additions & 0 deletions source/libs/command/src/explain.c
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pAggNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pAggNode->node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pAggNode->node.pSlimit);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_PLAN_BLOCKING, !pAggNode->node.forceCreateNonBlockingOptr);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));

Expand Down
19 changes: 18 additions & 1 deletion source/libs/planner/src/planOptimizer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2843,10 +2843,26 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu
case QUERY_NODE_LOGIC_PLAN_AGG: {
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT &&
(isPartTagAgg((SAggLogicNode*)pNodeLimitPushTo) || isPartTableAgg((SAggLogicNode*)pNodeLimitPushTo))) {
// when part by tag, slimit will be cloned to agg, and it will be pipelined.
// when part by tag/tbname, slimit will be cloned to agg, and it will be pipelined.
// The scan below will do scanning with group order
return cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_SLIMIT);
}
// else if not part by tag and tbname, the partition node below indicates that results are sorted, the agg node can
// be pipelined.
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && LIST_LENGTH(pNodeLimitPushTo->pChildren) == 1) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNodeLimitPushTo->pChildren, 0);
if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_PARTITION) {
pNodeLimitPushTo->forceCreateNonBlockingOptr = true;
return cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_SLIMIT);
}
// Currently, partColOpt is executed after pushDownLimitOpt, and partColOpt will replace partition node with
// sort node.
// To avoid dependencies between these two optimizations, we add sort node too.
if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_SORT && ((SSortLogicNode*)pChild)->calcGroupId) {
pNodeLimitPushTo->forceCreateNonBlockingOptr = true;
return cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_SLIMIT);
}
}
break;
}
case QUERY_NODE_LOGIC_PLAN_SCAN:
Expand Down Expand Up @@ -3593,6 +3609,7 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
nodesListMakeAppend(&pSort->pSortKeys, (SNode*)pOrder);
pOrder->order = ORDER_ASC;
pOrder->pExpr = nodesCloneNode(node);
pOrder->nullOrder = NULL_ORDER_FIRST;
if (!pOrder->pExpr) code = TSDB_CODE_OUT_OF_MEMORY;
}
}
Expand Down
112 changes: 109 additions & 3 deletions source/libs/planner/src/planSpliter.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ typedef struct SSplitRule {

typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo);

static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput);

static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
Expand Down Expand Up @@ -883,15 +885,119 @@ static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitI
return code;
}

static SFunctionNode* createGroupKeyAggFunc(SColumnNode* pGroupCol) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (pFunc) {
strcpy(pFunc->functionName, "_group_key");
strcpy(pFunc->node.aliasName, pGroupCol->node.aliasName);
strcpy(pFunc->node.userAlias, pGroupCol->node.userAlias);
int32_t code = nodesListMakeStrictAppend(&pFunc->pParameterList, nodesCloneNode((SNode*)pGroupCol));
if (code == TSDB_CODE_SUCCESS) {
code = fmGetFuncInfo(pFunc, NULL, 0);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pFunc);
pFunc = NULL;
}
char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0};
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", pFunc->functionName, pFunc);
taosCreateMD5Hash(name, len);
strncpy(pFunc->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
}
return pFunc;
}

/**
* @brief For pipelined agg node, add a SortMergeNode to merge result from vnodes.
* For agg + partition, results are sorted by group id, use group sort.
* For agg + sort for group, results are sorted by partition keys, not group id, merges keys should be the same
* as partition keys
*/
static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* pInfo, SLogicNode* pChildAgg) {
bool groupSort = true;
SNodeList* pMergeKeys = NULL;
int32_t code = TSDB_CODE_SUCCESS;
bool sortForGroup = false;

if (pChildAgg->pChildren->length != 1) return TSDB_CODE_TSC_INTERNAL_ERROR;

SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pChildAgg->pChildren, 0);
if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_SORT) {
SSortLogicNode* pSort = (SSortLogicNode*)pChild;
if (pSort->calcGroupId) {
SNode *node, *node2;
groupSort = false;
sortForGroup = true;
SNodeList* extraAggFuncs = NULL;
uint32_t originalLen = LIST_LENGTH(pSort->node.pTargets), idx = 0;
code = stbSplCreateMergeKeys(pSort->pSortKeys, pSort->node.pTargets, &pMergeKeys);
if (TSDB_CODE_SUCCESS != code) return code;

// Create group_key func for all sort keys.
// We only need newly added nodes in pSort.node.pTargets when stbSplCreateMergeKeys
FOREACH(node, pSort->node.pTargets) {
if (idx++ < originalLen) continue;
SFunctionNode* pGroupKeyFunc = createGroupKeyAggFunc((SColumnNode*)node);
if (!pGroupKeyFunc) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
code = nodesListMakeStrictAppend(&extraAggFuncs, (SNode*)pGroupKeyFunc);
if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pGroupKeyFunc);
}
}

if (TSDB_CODE_SUCCESS == code) {
// add these extra group_key funcs into targets
code = createColumnByRewriteExprs(extraAggFuncs, &pChildAgg->pTargets);
}
if (code == TSDB_CODE_SUCCESS) {
nodesListAppendList(((SAggLogicNode*)pChildAgg)->pAggFuncs, extraAggFuncs);
extraAggFuncs = NULL;
}

if (code == TSDB_CODE_SUCCESS) {
FOREACH(node, pMergeKeys) {
SOrderByExprNode* pOrder = (SOrderByExprNode*)node;
SColumnNode* pCol = (SColumnNode*)pOrder->pExpr;
FOREACH(node2, ((SAggLogicNode*)pChildAgg)->pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)node2;
if (0 != strcmp(pFunc->functionName, "_group_key")) continue;
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
if (!nodesEqualNode(pParam, (SNode*)pCol)) continue;

// use the colName of group_key func to make sure finding the right slot id for merge keys.
strcpy(pCol->colName, pFunc->node.aliasName);
strcpy(pCol->node.aliasName, pFunc->node.aliasName);
memset(pCol->tableAlias, 0, TSDB_TABLE_NAME_LEN);
break;
}
}
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
nodesDestroyList(extraAggFuncs);
}
}
}
code = stbSplCreateMergeNode(pCtx, NULL, pInfo->pSplitNode, pMergeKeys, pChildAgg, groupSort);
if (TSDB_CODE_SUCCESS == code && sortForGroup) {
SMergeLogicNode* pMerge =
(SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
pMerge->inputWithGroupId = true;
}
return code;
}

static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartAgg = NULL;
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);


if (TSDB_CODE_SUCCESS == code) {
// if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
if ((SAggLogicNode*)pInfo->pSplitNode->pSlimit)
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, NULL, pPartAgg, true);
if (pInfo->pSplitNode->forceCreateNonBlockingOptr)
code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
else
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
} else {
Expand Down
4 changes: 4 additions & 0 deletions tests/parallel_test/cases.task
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col_agg.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col_agg.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col_agg.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col_agg.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
Expand Down