Skip to content

Commit

Permalink
feat: 1. add limit for diskBasedBuf
Browse files Browse the repository at this point in the history
2. use referenced tuple before actually pushing into pq
3. use limitInfo instead of maxRows in sort pyhsical node
  • Loading branch information
wangjiaming0909 committed Jul 11, 2023
1 parent 638a394 commit 6f03185
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 74 deletions.
2 changes: 0 additions & 2 deletions include/libs/nodes/plannodes.h
Expand Up @@ -246,7 +246,6 @@ typedef struct SSortLogicNode {
SLogicNode node;
SNodeList* pSortKeys;
bool groupSort;
int64_t maxRows;
} SSortLogicNode;

typedef struct SPartitionLogicNode {
Expand Down Expand Up @@ -524,7 +523,6 @@ typedef struct SSortPhysiNode {
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
SNodeList* pTargets;
int64_t maxRows;
} SSortPhysiNode;

typedef SSortPhysiNode SGroupSortPhysiNode;
Expand Down
10 changes: 8 additions & 2 deletions include/util/theap.h
Expand Up @@ -77,7 +77,7 @@ PriorityQueueNode* taosPQTop(PriorityQueue* pq);

size_t taosPQSize(PriorityQueue* pq);

void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node);
PriorityQueueNode* taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node);

void taosPQPop(PriorityQueue* pq);

Expand All @@ -89,7 +89,13 @@ void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn);

void destroyBoundedQueue(BoundedQueue* q);

void taosBQPush(BoundedQueue* q, PriorityQueueNode* n);
/*
* Push one node into BQ
* @retval NULL if n is upper than top node in q, and n is not freed
* @retval the pushed Node if pushing succeeded
* @note if maxSize exceeded, the original highest node is popped and freed with deleteFn
* */
PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n);

PriorityQueueNode* taosBQTop(BoundedQueue* q);

Expand Down
8 changes: 6 additions & 2 deletions source/libs/executor/src/sortoperator.c
Expand Up @@ -55,7 +55,11 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
pOperator->exprSupp.numOfExprs = numOfCols;
calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
pInfo->maxRows = pSortNode->maxRows;
pInfo->maxRows = -1;
if (pSortNode->node.pLimit) {
SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
if (pLimit->limit > 0) pInfo->maxRows = pLimit->limit;
}

int32_t numOfOutputCols = 0;
int32_t code =
Expand Down Expand Up @@ -718,7 +722,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}

if (p->info.rows > 0) {
if (p->info.rows > 0 || limitReached) {
break;
}
}
Expand Down
106 changes: 77 additions & 29 deletions source/libs/executor/src/tsort.c
Expand Up @@ -73,7 +73,7 @@ static void* createTuple(uint32_t columnNum, uint32_t tupleLen) {
uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen;
return taosMemoryCalloc(1, totalLen);
}
static void destoryTuple(void* t) { taosMemoryFree(t); }
static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); }

#define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx))
#define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset)
Expand Down Expand Up @@ -107,12 +107,65 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
return t + *tupleOffset(t, colIdx);
}

static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param);

SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
return createOneDataBlock(pSortHandle->pDataBlock, false);
}

#define AllocatedTupleType 0
#define ReferencedTupleType 1 // tuple references to one row in pDataBlock
typedef struct TupleDesc {
uint8_t type;
char* data; // if type is AllocatedTuple, then points to the created tuple, otherwise points to the DataBlock
} TupleDesc;

typedef struct ReferencedTuple {
TupleDesc desc;
size_t rowIndex;
} ReferencedTuple;

static TupleDesc* createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx) {
TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc));
void* pTuple = createTuple(colNum, tupleLen);
if (!pTuple) {
taosMemoryFree(t);
return NULL;
}
size_t colLen = 0;
uint32_t offset = tupleGetDataStartOffset(colNum);
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
if (colDataIsNull_s(pCol, rowIdx)) {
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen);
} else {
colLen = colDataGetRowLength(pCol, rowIdx);
offset =
tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen);
}
}
t->type = AllocatedTupleType;
t->data = pTuple;
return t;
}

void* tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum) {
if (pDesc->type == ReferencedTupleType) {
ReferencedTuple* pRefTuple = (ReferencedTuple*)pDesc;
SColumnInfoData* pCol = taosArrayGet(((SSDataBlock*)pDesc->data)->pDataBlock, colIdx);
if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) return NULL;
return colDataGetData(pCol, pRefTuple->rowIndex);
} else {
return tupleGetField(pDesc->data, colIdx, colNum);
}
}

void destroyTuple(void* t) {
TupleDesc* pDesc = t;
if (pDesc->type == AllocatedTupleType) {
destoryAllocatedTuple(pDesc->data);
taosMemoryFree(pDesc);
}
}

/**
*
* @param type
Expand Down Expand Up @@ -779,7 +832,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {

int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;

if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
Expand All @@ -804,6 +857,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
return code;
}

if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;

Expand Down Expand Up @@ -956,16 +1010,17 @@ static bool tsortPQComFnReverse(void*a, void* b, void* param) {
return 0;
}

static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param) {
char* pLTuple = (char*)pLeft;
char* pRTuple = (char*)pRight;
static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) {
TupleDesc* pLeftDesc = (TupleDesc*)pLeft;
TupleDesc* pRightDesc = (TupleDesc*)pRight;

SSortHandle* pHandle = (SSortHandle*)param;
SArray* orderInfo = (SArray*)pHandle->pSortInfo;
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
for (int32_t i = 0; i < orderInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i);
void *lData = tupleGetField(pLTuple, pOrder->slotId, colNum);
void *rData = tupleGetField(pRTuple, pOrder->slotId, colNum);
void *lData = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum);
void *rData = tupleDescGetField(pRightDesc, pOrder->slotId, colNum);
if (!lData && !rData) continue;
if (!lData) return pOrder->nullFirst ? -1 : 1;
if (!rData) return pOrder->nullFirst ? 1 : -1;
Expand All @@ -984,9 +1039,9 @@ static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* para
}

static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destoryTuple, pHandle);
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destroyTuple, pHandle);
if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
tsortSetComparFp(pHandle, colDataComparFn);
tsortSetComparFp(pHandle, tupleComparFn);

SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
SSortSource* source = *pSource;
Expand Down Expand Up @@ -1018,24 +1073,17 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
}
}
}
size_t colLen = 0;
ReferencedTuple refTuple = {.desc.data = (char*)pBlock, .desc.type = ReferencedTupleType, .rowIndex = 0};
for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) {
void* pTuple = createTuple(colNum, tupleLen);
if (pTuple == NULL) return TSDB_CODE_OUT_OF_MEMORY;

uint32_t offset = tupleGetDataStartOffset(colNum);
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
if (colDataIsNull_s(pCol, rowIdx)) {
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen);
} else {
colLen = colDataGetRowLength(pCol, rowIdx);
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false,
tupleLen);
}
refTuple.rowIndex = rowIdx;
pqNode.data = &refTuple;
PriorityQueueNode* pPushedNode = taosBQPush(pHandle->pBoundedQueue, &pqNode);
if (!pPushedNode) {
// do nothing if push failed
} else {
pPushedNode->data = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx);
if (pPushedNode->data == NULL) return TSDB_CODE_OUT_OF_MEMORY;
}
pqNode.data = pTuple;
taosBQPush(pHandle->pBoundedQueue, &pqNode);
}
}
return TSDB_CODE_SUCCESS;
Expand All @@ -1044,7 +1092,7 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
blockDataCleanup(pHandle->pDataBlock);
blockDataEnsureCapacity(pHandle->pDataBlock, 1);
// abondan the top tuple if queue size bigger than max size
// abandon the top tuple if queue size bigger than max size
if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) {
taosBQPop(pHandle->pBoundedQueue);
}
Expand All @@ -1056,7 +1104,7 @@ static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
if (taosBQSize(pHandle->pBoundedQueue) > 0) {
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue);
char* pTuple = (char*)node->data;
char* pTuple = ((TupleDesc*)node->data)->data;

for (uint32_t i = 0; i < colNum; ++i) {
void* pData = tupleGetField(pTuple, i, colNum);
Expand Down
1 change: 0 additions & 1 deletion source/libs/nodes/src/nodesCloneFuncs.c
Expand Up @@ -502,7 +502,6 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pSortKeys);
COPY_SCALAR_FIELD(groupSort);
COPY_SCALAR_FIELD(maxRows);
return TSDB_CODE_SUCCESS;
}

Expand Down
6 changes: 0 additions & 6 deletions source/libs/nodes/src/nodesCodeFuncs.c
Expand Up @@ -2115,9 +2115,6 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSortPhysiPlanMaxRows, pNode->maxRows);
}

return code;
}
Expand All @@ -2135,9 +2132,6 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkSortPhysiPlanMaxRows, &pNode->maxRows);
}

return code;
}
Expand Down
8 changes: 1 addition & 7 deletions source/libs/nodes/src/nodesMsgFuncs.c
Expand Up @@ -2594,7 +2594,7 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}

enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_MAX_ROWS };
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS };

static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
Expand All @@ -2609,9 +2609,6 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_SORT_CODE_MAX_ROWS, pNode->maxRows);
}

return code;
}
Expand All @@ -2635,9 +2632,6 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SORT_CODE_TARGETS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
break;
case PHY_SORT_CODE_MAX_ROWS:
code = tlvDecodeI64(pTlv, &pNode->maxRows);
break;
default:
break;
}
Expand Down
2 changes: 0 additions & 2 deletions source/libs/planner/src/planLogicCreater.c
Expand Up @@ -1027,7 +1027,6 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return TSDB_CODE_OUT_OF_MEMORY;
}

pSort->maxRows = -1;
pSort->groupSort = pSelect->groupSort;
pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
Expand Down Expand Up @@ -1299,7 +1298,6 @@ static int32_t createSetOpSortLogicNode(SLogicPlanContext* pCxt, SSetOperator* p
return TSDB_CODE_OUT_OF_MEMORY;
}

pSort->maxRows = -1;
TSWAP(pSort->node.pLimit, pSetOperator->pLimit);

int32_t code = TSDB_CODE_SUCCESS;
Expand Down
19 changes: 9 additions & 10 deletions source/libs/planner/src/planOptimizer.c
Expand Up @@ -2635,11 +2635,13 @@ static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) {
}

SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
// push down to sort node
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
SLimitNode* pChildLimit = (SLimitNode*)(pChild->pLimit);
// if we have pushed down, we skip it
if ((*(SSortLogicNode*)pChild).maxRows != -1) return false;
} else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) {
if (pChild->pLimit) return false;
} else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) {
// push down to table scan node
// if pNode is sortNode, we skip push down limit info to table scan node
return false;
}
return true;
Expand All @@ -2654,13 +2656,10 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
nodesDestroyNode(pChild->pLimit);
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
SLimitNode* pLimitNode = (SLimitNode*)pNode->pLimit;
int64_t maxRows = -1;
if (pLimitNode->limit != -1) {
maxRows = pLimitNode->limit;
if (pLimitNode->offset != -1) maxRows += pLimitNode->offset;
}
((SSortLogicNode*)pChild)->maxRows = maxRows;
pChild->pLimit = nodesCloneNode(pNode->pLimit);
SLimitNode* pLimit = (SLimitNode*)pChild->pLimit;
pLimit->limit += pLimit->offset;
pLimit->offset = 0;
} else {
pChild->pLimit = pNode->pLimit;
pNode->pLimit = NULL;
Expand Down
1 change: 0 additions & 1 deletion source/libs/planner/src/planPhysiCreater.c
Expand Up @@ -1374,7 +1374,6 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
if (NULL == pSort) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->maxRows = pSortLogicNode->maxRows;

SNodeList* pPrecalcExprs = NULL;
SNodeList* pSortKeys = NULL;
Expand Down
1 change: 0 additions & 1 deletion source/libs/planner/src/planSpliter.c
Expand Up @@ -1018,7 +1018,6 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
splSetParent((SLogicNode*)pPartSort);
pPartSort->pSortKeys = pSortKeys;
pPartSort->groupSort = pSort->groupSort;
pPartSort->maxRows = pSort->maxRows;
code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
}

Expand Down

0 comments on commit 6f03185

Please sign in to comment.