Skip to content

Commit

Permalink
optimize sort perf
Browse files Browse the repository at this point in the history
  • Loading branch information
wangjiaming0909 committed Sep 13, 2023
1 parent 4a13100 commit 9ceebeb
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 91 deletions.
9 changes: 9 additions & 0 deletions include/common/tdatablock.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData,
}
}

static FORCE_INLINE bool colDataIsNull_t(const SColumnInfoData* pColumnInfoData, uint32_t row, bool isVarType) {
if (!pColumnInfoData->hasNull) return false;
if (isVarType) {
return colDataIsNull_var(pColumnInfoData, row);
} else {
return pColumnInfoData->nullbitmap ? colDataIsNull_f(pColumnInfoData->nullbitmap, row) : false;
}
}

static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
SColumnDataAgg* pColAgg) {
if (!pColumnInfoData->hasNull) {
Expand Down
5 changes: 4 additions & 1 deletion include/libs/nodes/plannodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ typedef struct SMergeLogicNode {
int32_t srcGroupId;
bool groupSort;
bool ignoreGroupId;
bool inputWithGroupId;
} SMergeLogicNode;

typedef enum EWindowType {
Expand Down Expand Up @@ -294,7 +295,8 @@ typedef struct SPartitionLogicNode {
SNode* pSubtable;

bool needBlockOutputTsOrder; // if true, partition output block will have ts order maintained
int32_t tsSlotId;
int32_t pkTsColId;
uint64_t pkTsColTbId;
} SPartitionLogicNode;

typedef enum ESubplanType {
Expand Down Expand Up @@ -534,6 +536,7 @@ typedef struct SMergePhysiNode {
int32_t srcGroupId;
bool groupSort;
bool ignoreGroupId;
bool inputWithGroupId;
} SMergePhysiNode;

typedef struct SWindowPhysiNode {
Expand Down
16 changes: 8 additions & 8 deletions source/libs/executor/src/executil.c
Original file line number Diff line number Diff line change
Expand Up @@ -2265,11 +2265,11 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) {
SColumnDataAgg* pColAgg = NULL;
const char* isNull = oldkeyBuf;
const char* p = oldkeyBuf + sizeof(int8_t) * taosArrayGetSize(pSortGroupCols);
const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;

for (int32_t i = 0; i < taosArrayGetSize(pSortGroupCols); ++i) {
const SColumn* pCol = (SColumn*)taosArrayGet(pSortGroupCols, i);
const SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];

if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
Expand All @@ -2296,15 +2296,15 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol

int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock,
int32_t rowIndex) {
uint32_t colNum = taosArrayGetSize(pSortGroupCols);
uint32_t colNum = pSortGroupCols->size;
SColumnDataAgg* pColAgg = NULL;
char* isNull = keyBuf;
char* p = keyBuf + sizeof(int8_t) * colNum;

for (int32_t i = 0; i < colNum; ++i) {
const SColumn* pCol = (SColumn*)taosArrayGet(pSortGroupCols, i);
const SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) continue;
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
if (pCol->slotId > pBlock->pDataBlock->size) continue;

if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];

Expand Down
6 changes: 4 additions & 2 deletions source/libs/executor/src/sortoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ typedef struct SMultiwayMergeOperatorInfo {
bool ignoreGroupId;
uint64_t groupId;
STupleHandle* prefetchedTuple;
bool inputWithGroupId;
} SMultiwayMergeOperatorInfo;

int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
Expand Down Expand Up @@ -742,7 +743,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*

while (1) {
STupleHandle* pTupleHandle = NULL;
if (pInfo->groupSort) {
if (pInfo->groupSort || pInfo->inputWithGroupId) {
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
Expand All @@ -763,7 +764,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
break;
}

if (pInfo->groupSort) {
if (pInfo->groupSort || pInfo->inputWithGroupId) {
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
Expand Down Expand Up @@ -943,6 +944,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder;
pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId;

setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL,
Expand Down
68 changes: 41 additions & 27 deletions source/libs/executor/src/tsort.c
Original file line number Diff line number Diff line change
Expand Up @@ -616,47 +616,61 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
int ret = pParam->cmpFn(left1, right1);
return ret;
} else {
bool isVarType;
for (int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type);

if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) {
bool leftNull = false;
if (pLeftColInfoData->hasNull) {
if (pLeftBlock->pBlockAgg == NULL) {
leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType);
} else {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
pLeftBlock->pBlockAgg[i]);
}
}

bool leftNull = false;
if (pLeftColInfoData->hasNull) {
if (pLeftBlock->pBlockAgg == NULL) {
leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
} else {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
pLeftBlock->pBlockAgg[i]);
bool rightNull = false;
if (pRightColInfoData->hasNull) {
if (pRightBlock->pBlockAgg == NULL) {
rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType);
} else {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
pRightBlock->pBlockAgg[i]);
}
}
}

bool rightNull = false;
if (pRightColInfoData->hasNull) {
if (pRightBlock->pBlockAgg == NULL) {
rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
} else {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
pRightBlock->pBlockAgg[i]);
if (leftNull && rightNull) {
continue; // continue to next slot
}
}

if (leftNull && rightNull) {
continue; // continue to next slot
}
if (rightNull) {
return pOrder->nullFirst ? 1 : -1;
}

if (rightNull) {
return pOrder->nullFirst ? 1 : -1;
if (leftNull) {
return pOrder->nullFirst ? -1 : 1;
}
}

if (leftNull) {
return pOrder->nullFirst ? -1 : 1;
void* left1, *right1;
if (isVarType) {
left1 = colDataGetVarData(pLeftColInfoData, pLeftSource->src.rowIndex);
right1 = colDataGetVarData(pRightColInfoData, pRightSource->src.rowIndex);
} else {
left1 = colDataGetNumData(pLeftColInfoData, pLeftSource->src.rowIndex);
right1 = colDataGetNumData(pRightColInfoData, pRightSource->src.rowIndex);
}

void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);

__compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
__compar_fn_t fn = pOrder->compFn;
if (!fn) {
fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
pOrder->compFn = fn;
}

int ret = fn(left1, right1);
if (ret == 0) {
Expand Down
4 changes: 3 additions & 1 deletion source/libs/nodes/src/nodesCloneFuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst
COPY_SCALAR_FIELD(srcGroupId);
COPY_SCALAR_FIELD(groupSort);
COPY_SCALAR_FIELD(ignoreGroupId);
COPY_SCALAR_FIELD(inputWithGroupId);
return TSDB_CODE_SUCCESS;
}

Expand Down Expand Up @@ -543,7 +544,8 @@ static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLog
CLONE_NODE_LIST_FIELD(pTags);
CLONE_NODE_FIELD(pSubtable);
COPY_SCALAR_FIELD(needBlockOutputTsOrder);
COPY_SCALAR_FIELD(tsSlotId);
COPY_SCALAR_FIELD(pkTsColId);
COPY_SCALAR_FIELD(pkTsColTbId);
return TSDB_CODE_SUCCESS;
}

Expand Down
4 changes: 4 additions & 0 deletions source/libs/nodes/src/nodesCodeFuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2271,6 +2271,7 @@ static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
static const char* jkMergePhysiPlanGroupSort = "GroupSort";
static const char* jkMergePhysiPlanIgnoreGroupID = "IgnoreGroupID";
static const char* jkMergePhysiPlanInputWithGroupId = "InputWithGroupId";

static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
Expand All @@ -2294,6 +2295,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanIgnoreGroupID, pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanInputWithGroupId, pNode->inputWithGroupId);
}

return code;
}
Expand Down
7 changes: 7 additions & 0 deletions source/libs/nodes/src/nodesMsgFuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2682,6 +2682,7 @@ enum {
PHY_MERGE_CODE_SRC_GROUP_ID,
PHY_MERGE_CODE_GROUP_SORT,
PHY_MERGE_CODE_IGNORE_GROUP_ID,
PHY_MERGE_CODE_INPUT_WITH_GROUP_ID,
};

static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
Expand All @@ -2706,6 +2707,9 @@ static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_IGNORE_GROUP_ID, pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_INPUT_WITH_GROUP_ID, pNode->inputWithGroupId);
}

return code;
}
Expand Down Expand Up @@ -2738,6 +2742,9 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_MERGE_CODE_IGNORE_GROUP_ID:
code = tlvDecodeBool(pTlv, &pNode->ignoreGroupId);
break;
case PHY_MERGE_CODE_INPUT_WITH_GROUP_ID:
code = tlvDecodeBool(pTlv, &pNode->inputWithGroupId);
break;
default:
break;
}
Expand Down
3 changes: 2 additions & 1 deletion source/libs/planner/src/planLogicCreater.c
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,8 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
pPartition->needBlockOutputTsOrder = true;
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pSelect->pWindow;
SColumnNode* pTsCol = (SColumnNode*)pInterval->pCol;
pPartition->tsSlotId = pTsCol->slotId;
pPartition->pkTsColId = pTsCol->colId;
pPartition->pkTsColTbId = pTsCol->tableId;
}

if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags) {
Expand Down
22 changes: 12 additions & 10 deletions source/libs/planner/src/planOptimizer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2838,13 +2838,14 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu
cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT);
return true;
}
case QUERY_NODE_LOGIC_PLAN_SORT:
if (((SSortLogicNode*)pNodeLimitPushTo)->calcGroupId) break;
// fall through
case QUERY_NODE_LOGIC_PLAN_FILL:
case QUERY_NODE_LOGIC_PLAN_SORT: {
cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT);
SNode* pChild = NULL;
FOREACH(pChild, pNodeLimitPushTo->pChildren) { pushDownLimitHow(pNodeLimitPushTo, (SLogicNode*)pChild); }
return true;
}
case QUERY_NODE_LOGIC_PLAN_AGG: {
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT &&
(isPartTagAgg((SAggLogicNode*)pNodeLimitPushTo) || isPartTableAgg((SAggLogicNode*)pNodeLimitPushTo))) {
Expand Down Expand Up @@ -3585,11 +3586,13 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
int32_t code = TSDB_CODE_SUCCESS;
SSortLogicNode* pSort = (SSortLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT);
if (pSort) {
bool alreadyPartByPKTs = false;
pSort->groupSort = false;
TSWAP(pSort->node.pChildren, pPartition->node.pChildren);
optResetParent((SLogicNode*)pSort);
FOREACH(node, pPartition->pPartitionKeys) {
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (QUERY_NODE_COLUMN == nodeType(node) && ((SColumnNode*)node)->colId == pPartition->pkTsColId &&
((SColumnNode*)node)->tableId == pPartition->pkTsColTbId)
alreadyPartByPKTs = true;
if (!pOrder) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
Expand All @@ -3600,7 +3603,7 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
}
}

if (pPartition->needBlockOutputTsOrder) {
if (pPartition->needBlockOutputTsOrder && !alreadyPartByPKTs) {
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (!pOrder) {
code = TSDB_CODE_OUT_OF_MEMORY;
Expand All @@ -3612,7 +3615,7 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
FOREACH(node, pPartition->node.pTargets) {
if (nodeType(node) == QUERY_NODE_COLUMN) {
SColumnNode* pCol = (SColumnNode*)node;
if (pCol->slotId == pPartition->tsSlotId) {
if (pCol->colId == pPartition->pkTsColId && pCol->tableId == pPartition->pkTsColTbId) {
pOrder->pExpr = nodesCloneNode((SNode*)pCol);
break;
}
Expand All @@ -3624,10 +3627,6 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
}
}
}
if (code == TSDB_CODE_SUCCESS) {
pSort->node.pTargets = nodesCloneList(((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0))->pTargets);
if (!pSort->node.pTargets) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pSort);
pSort = NULL;
Expand All @@ -3651,6 +3650,9 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
// if sort create failed, we eat the error, skip the optimization
code = TSDB_CODE_SUCCESS;
} else {
TSWAP(pSort->node.pChildren, pNode->node.pChildren);
TSWAP(pSort->node.pTargets, pNode->node.pTargets);
optResetParent((SLogicNode*)pSort);
pSort->calcGroupId = true;
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort);
if (code == TSDB_CODE_SUCCESS) {
Expand Down
18 changes: 17 additions & 1 deletion source/libs/planner/src/planPhysiCreater.c
Original file line number Diff line number Diff line change
Expand Up @@ -1800,7 +1800,6 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList*
SNodeList* pPartitionKeys = NULL;
int32_t code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
pPart->needBlockOutputTsOrder = pPartLogicNode->needBlockOutputTsOrder;
pPart->tsSlotId = pPartLogicNode->tsSlotId;

SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
Expand All @@ -1822,6 +1821,22 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList*
}
}

if (pPart->needBlockOutputTsOrder) {
SNode* node;
bool found = false;
FOREACH(node, pPartLogicNode->node.pTargets) {
if (nodeType(node) == QUERY_NODE_COLUMN) {
SColumnNode* pCol = (SColumnNode*)node;
if (pCol->tableId == pPartLogicNode->pkTsColTbId && pCol->colId == pPartLogicNode->pkTsColId) {
pPart->tsSlotId = pCol->slotId;
found = true;
break;
}
}
}
if (!found) code = TSDB_CODE_PLAN_INTERNAL_ERROR;
}

if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pPartLogicNode, (SPhysiNode*)pPart);
}
Expand Down Expand Up @@ -1948,6 +1963,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
pMerge->groupSort = pMergeLogicNode->groupSort;
pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId;
pMerge->inputWithGroupId = pMergeLogicNode->inputWithGroupId;

int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);

Expand Down
5 changes: 5 additions & 0 deletions source/libs/planner/src/planSpliter.c
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,11 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p
}
break;
}
case QUERY_NODE_LOGIC_PLAN_SORT: {
SSortLogicNode* pSort = (SSortLogicNode*)pNode;
if (pSort->calcGroupId) pMerge->inputWithGroupId = true;
break;
}
default:
break;
}
Expand Down

0 comments on commit 9ceebeb

Please sign in to comment.