Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 1. add limit for diskBasedBuf #22028

Merged
merged 1 commit into from Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions include/libs/nodes/plannodes.h
Expand Up @@ -246,7 +246,6 @@ typedef struct SSortLogicNode {
SLogicNode node;
SNodeList* pSortKeys;
bool groupSort;
int64_t maxRows;
} SSortLogicNode;

typedef struct SPartitionLogicNode {
Expand Down Expand Up @@ -524,7 +523,6 @@ typedef struct SSortPhysiNode {
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
SNodeList* pTargets;
int64_t maxRows;
} SSortPhysiNode;

typedef SSortPhysiNode SGroupSortPhysiNode;
Expand Down
10 changes: 8 additions & 2 deletions include/util/theap.h
Expand Up @@ -77,7 +77,7 @@ PriorityQueueNode* taosPQTop(PriorityQueue* pq);

size_t taosPQSize(PriorityQueue* pq);

void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node);
PriorityQueueNode* taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node);

void taosPQPop(PriorityQueue* pq);

Expand All @@ -89,7 +89,13 @@ void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn);

void destroyBoundedQueue(BoundedQueue* q);

void taosBQPush(BoundedQueue* q, PriorityQueueNode* n);
/*
* Push one node into BQ
* @retval NULL if n is upper than top node in q, and n is not freed
* @retval the pushed Node if pushing succeeded
* @note if maxSize exceeded, the original highest node is popped and freed with deleteFn
* */
PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n);

PriorityQueueNode* taosBQTop(BoundedQueue* q);

Expand Down
6 changes: 4 additions & 2 deletions source/libs/executor/inc/tsort.h
Expand Up @@ -64,15 +64,17 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
/**
*
* @param type
* @param maxRows keep maxRows at most
* @param maxTupleLength max len of one tuple, for check if heap sort is applicable
* @param maxRows keep maxRows at most, if 0, pq sort will not be used
* @param maxTupleLength max len of one tuple, for check if pq sort is applicable
* @param sortBufSize sort memory buf size, for check if heap sort is applicable
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
uint32_t sortBufSize);

void tsortSetForceUsePQSort(SSortHandle* pHandle);

/**
*
* @param pSortHandle
Expand Down
8 changes: 6 additions & 2 deletions source/libs/executor/src/sortoperator.c
Expand Up @@ -55,7 +55,11 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
pOperator->exprSupp.numOfExprs = numOfCols;
calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
pInfo->maxRows = pSortNode->maxRows;
pInfo->maxRows = -1;
if (pSortNode->node.pLimit) {
SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
if (pLimit->limit > 0) pInfo->maxRows = pLimit->limit;
}

int32_t numOfOutputCols = 0;
int32_t code =
Expand Down Expand Up @@ -718,7 +722,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}

if (p->info.rows > 0) {
if (p->info.rows > 0 || limitReached) {
break;
}
}
Expand Down
124 changes: 91 additions & 33 deletions source/libs/executor/src/tsort.c
Expand Up @@ -45,6 +45,7 @@ struct SSortHandle {
uint64_t maxRows;
uint32_t maxTupleLength;
uint32_t sortBufSize;
bool forceUsePQSort;
BoundedQueue* pBoundedQueue;
uint32_t tmpRowIdx;

Expand Down Expand Up @@ -73,7 +74,7 @@ static void* createTuple(uint32_t columnNum, uint32_t tupleLen) {
uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen;
return taosMemoryCalloc(1, totalLen);
}
static void destoryTuple(void* t) { taosMemoryFree(t); }
static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); }

#define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx))
#define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset)
Expand Down Expand Up @@ -107,12 +108,65 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
return t + *tupleOffset(t, colIdx);
}

static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param);

SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
return createOneDataBlock(pSortHandle->pDataBlock, false);
}

#define AllocatedTupleType 0
#define ReferencedTupleType 1 // tuple references to one row in pDataBlock
typedef struct TupleDesc {
uint8_t type;
char* data; // if type is AllocatedTuple, then points to the created tuple, otherwise points to the DataBlock
} TupleDesc;

typedef struct ReferencedTuple {
TupleDesc desc;
size_t rowIndex;
} ReferencedTuple;

static TupleDesc* createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx) {
TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc));
void* pTuple = createTuple(colNum, tupleLen);
if (!pTuple) {
taosMemoryFree(t);
return NULL;
}
size_t colLen = 0;
uint32_t offset = tupleGetDataStartOffset(colNum);
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
if (colDataIsNull_s(pCol, rowIdx)) {
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen);
} else {
colLen = colDataGetRowLength(pCol, rowIdx);
offset =
tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen);
}
}
t->type = AllocatedTupleType;
t->data = pTuple;
return t;
}

void* tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum) {
if (pDesc->type == ReferencedTupleType) {
ReferencedTuple* pRefTuple = (ReferencedTuple*)pDesc;
SColumnInfoData* pCol = taosArrayGet(((SSDataBlock*)pDesc->data)->pDataBlock, colIdx);
if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) return NULL;
return colDataGetData(pCol, pRefTuple->rowIndex);
} else {
return tupleGetField(pDesc->data, colIdx, colNum);
}
}

void destroyTuple(void* t) {
TupleDesc* pDesc = t;
if (pDesc->type == AllocatedTupleType) {
destoryAllocatedTuple(pDesc->data);
taosMemoryFree(pDesc);
}
}

/**
*
* @param type
Expand All @@ -130,11 +184,11 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->loops = 0;

pSortHandle->maxTupleLength = maxTupleLength;
if (maxRows < 0)
pSortHandle->sortBufSize = 0;
else
if (maxRows != 0) {
pSortHandle->sortBufSize = sortBufSize;
pSortHandle->maxRows = maxRows;
pSortHandle->maxRows = maxRows;
}
pSortHandle->forceUsePQSort = false;

if (pBlock != NULL) {
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
Expand Down Expand Up @@ -779,7 +833,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {

int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;

if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
Expand All @@ -804,6 +858,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
return code;
}

if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;

Expand Down Expand Up @@ -936,8 +991,17 @@ static STupleHandle* tsortBufMergeSortNextTuple(SSortHandle* pHandle) {
return &pHandle->tupleHandle;
}

static bool tsortIsForceUsePQSort(SSortHandle* pHandle) {
return pHandle->forceUsePQSort == true;
}

void tsortSetForceUsePQSort(SSortHandle* pHandle) {
pHandle->forceUsePQSort = true;
}

static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
if (tsortIsForceUsePQSort(pHandle)) return true;
uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*));
return maxRowsFitInMemory > pHandle->maxRows;
}
Expand All @@ -956,16 +1020,17 @@ static bool tsortPQComFnReverse(void*a, void* b, void* param) {
return 0;
}

static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param) {
char* pLTuple = (char*)pLeft;
char* pRTuple = (char*)pRight;
static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) {
TupleDesc* pLeftDesc = (TupleDesc*)pLeft;
TupleDesc* pRightDesc = (TupleDesc*)pRight;

SSortHandle* pHandle = (SSortHandle*)param;
SArray* orderInfo = (SArray*)pHandle->pSortInfo;
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
for (int32_t i = 0; i < orderInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i);
void *lData = tupleGetField(pLTuple, pOrder->slotId, colNum);
void *rData = tupleGetField(pRTuple, pOrder->slotId, colNum);
void *lData = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum);
void *rData = tupleDescGetField(pRightDesc, pOrder->slotId, colNum);
if (!lData && !rData) continue;
if (!lData) return pOrder->nullFirst ? -1 : 1;
if (!rData) return pOrder->nullFirst ? 1 : -1;
Expand All @@ -984,9 +1049,9 @@ static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* para
}

static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destoryTuple, pHandle);
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destroyTuple, pHandle);
if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
tsortSetComparFp(pHandle, colDataComparFn);
tsortSetComparFp(pHandle, tupleComparFn);

SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
SSortSource* source = *pSource;
Expand Down Expand Up @@ -1018,24 +1083,17 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
}
}
}
size_t colLen = 0;
ReferencedTuple refTuple = {.desc.data = (char*)pBlock, .desc.type = ReferencedTupleType, .rowIndex = 0};
for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) {
void* pTuple = createTuple(colNum, tupleLen);
if (pTuple == NULL) return TSDB_CODE_OUT_OF_MEMORY;

uint32_t offset = tupleGetDataStartOffset(colNum);
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
if (colDataIsNull_s(pCol, rowIdx)) {
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen);
} else {
colLen = colDataGetRowLength(pCol, rowIdx);
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false,
tupleLen);
}
refTuple.rowIndex = rowIdx;
pqNode.data = &refTuple;
PriorityQueueNode* pPushedNode = taosBQPush(pHandle->pBoundedQueue, &pqNode);
if (!pPushedNode) {
// do nothing if push failed
} else {
pPushedNode->data = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx);
if (pPushedNode->data == NULL) return TSDB_CODE_OUT_OF_MEMORY;
}
pqNode.data = pTuple;
taosBQPush(pHandle->pBoundedQueue, &pqNode);
}
}
return TSDB_CODE_SUCCESS;
Expand All @@ -1044,7 +1102,7 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
blockDataCleanup(pHandle->pDataBlock);
blockDataEnsureCapacity(pHandle->pDataBlock, 1);
// abondan the top tuple if queue size bigger than max size
// abandon the top tuple if queue size bigger than max size
if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) {
taosBQPop(pHandle->pBoundedQueue);
}
Expand All @@ -1056,7 +1114,7 @@ static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
if (taosBQSize(pHandle->pBoundedQueue) > 0) {
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue);
char* pTuple = (char*)node->data;
char* pTuple = ((TupleDesc*)node->data)->data;

for (uint32_t i = 0; i < colNum; ++i) {
void* pData = tupleGetField(pTuple, i, colNum);
Expand Down
1 change: 0 additions & 1 deletion source/libs/nodes/src/nodesCloneFuncs.c
Expand Up @@ -502,7 +502,6 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pSortKeys);
COPY_SCALAR_FIELD(groupSort);
COPY_SCALAR_FIELD(maxRows);
return TSDB_CODE_SUCCESS;
}

Expand Down
6 changes: 0 additions & 6 deletions source/libs/nodes/src/nodesCodeFuncs.c
Expand Up @@ -2115,9 +2115,6 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSortPhysiPlanMaxRows, pNode->maxRows);
}

return code;
}
Expand All @@ -2135,9 +2132,6 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkSortPhysiPlanMaxRows, &pNode->maxRows);
}

return code;
}
Expand Down
8 changes: 1 addition & 7 deletions source/libs/nodes/src/nodesMsgFuncs.c
Expand Up @@ -2594,7 +2594,7 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}

enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_MAX_ROWS };
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS };

static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
Expand All @@ -2609,9 +2609,6 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_SORT_CODE_MAX_ROWS, pNode->maxRows);
}

return code;
}
Expand All @@ -2635,9 +2632,6 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SORT_CODE_TARGETS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
break;
case PHY_SORT_CODE_MAX_ROWS:
code = tlvDecodeI64(pTlv, &pNode->maxRows);
break;
default:
break;
}
Expand Down
2 changes: 0 additions & 2 deletions source/libs/planner/src/planLogicCreater.c
Expand Up @@ -1027,7 +1027,6 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return TSDB_CODE_OUT_OF_MEMORY;
}

pSort->maxRows = -1;
pSort->groupSort = pSelect->groupSort;
pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
Expand Down Expand Up @@ -1299,7 +1298,6 @@ static int32_t createSetOpSortLogicNode(SLogicPlanContext* pCxt, SSetOperator* p
return TSDB_CODE_OUT_OF_MEMORY;
}

pSort->maxRows = -1;
TSWAP(pSort->node.pLimit, pSetOperator->pLimit);

int32_t code = TSDB_CODE_SUCCESS;
Expand Down