Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize partition node, replace with sort node #22629

Merged
merged 3 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/common/tdatablock.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ typedef struct SBlockOrderInfo {
bool nullFirst;
int32_t order;
int32_t slotId;
void* compFn;
SColumnInfoData* pColData;
} SBlockOrderInfo;

Expand Down Expand Up @@ -82,6 +83,15 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData,
}
}

static FORCE_INLINE bool colDataIsNull_t(const SColumnInfoData* pColumnInfoData, uint32_t row, bool isVarType) {
if (!pColumnInfoData->hasNull) return false;
if (isVarType) {
return colDataIsNull_var(pColumnInfoData, row);
} else {
return pColumnInfoData->nullbitmap ? colDataIsNull_f(pColumnInfoData->nullbitmap, row) : false;
}
}

static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
SColumnDataAgg* pColAgg) {
if (!pColumnInfoData->hasNull) {
Expand Down Expand Up @@ -210,6 +220,10 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(uint32_t numOfCols);

int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
/**
* @brief find how many rows already in order start from first row
*/
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo);

int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
Expand Down
1 change: 1 addition & 0 deletions include/common/ttokendef.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@
#define TK_NK_BIN 605 // bin format data 0b111
#define TK_BATCH_SCAN 606
#define TK_NO_BATCH_SCAN 607
#define TK_SORT_FOR_GROUP 608


#define TK_NK_NIL 65535
Expand Down
17 changes: 16 additions & 1 deletion include/libs/nodes/plannodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ typedef struct SAggLogicNode {
bool hasTimeLineFunc;
bool onlyHasKeepOrderFunc;
bool hasGroupKeyOptimized;
bool isGroupTb;
bool isPartTb; // true if partition keys has tbname
} SAggLogicNode;

typedef struct SProjectLogicNode {
Expand Down Expand Up @@ -221,6 +223,7 @@ typedef struct SMergeLogicNode {
int32_t srcGroupId;
bool groupSort;
bool ignoreGroupId;
bool inputWithGroupId;
} SMergeLogicNode;

typedef enum EWindowType {
Expand Down Expand Up @@ -263,6 +266,7 @@ typedef struct SWindowLogicNode {
int8_t igExpired;
int8_t igCheckUpdate;
EWindowAlgorithm windowAlgo;
bool isPartTb;
} SWindowLogicNode;

typedef struct SFillLogicNode {
Expand All @@ -279,15 +283,20 @@ typedef struct SSortLogicNode {
SLogicNode node;
SNodeList* pSortKeys;
bool groupSort;
int64_t maxRows;
bool skipPKSortOpt;
bool calcGroupId;
bool excludePkCol; // exclude PK ts col when calc group id
} SSortLogicNode;

typedef struct SPartitionLogicNode {
SLogicNode node;
SNodeList* pPartitionKeys;
SNodeList* pTags;
SNode* pSubtable;

bool needBlockOutputTsOrder; // if true, partition output block will have ts order maintained
int32_t pkTsColId;
uint64_t pkTsColTbId;
} SPartitionLogicNode;

typedef enum ESubplanType {
Expand Down Expand Up @@ -527,6 +536,7 @@ typedef struct SMergePhysiNode {
int32_t srcGroupId;
bool groupSort;
bool ignoreGroupId;
bool inputWithGroupId;
} SMergePhysiNode;

typedef struct SWindowPhysiNode {
Expand Down Expand Up @@ -603,6 +613,8 @@ typedef struct SSortPhysiNode {
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
SNodeList* pTargets;
bool calcGroupId;
bool excludePkCol;
} SSortPhysiNode;

typedef SSortPhysiNode SGroupSortPhysiNode;
Expand All @@ -612,6 +624,9 @@ typedef struct SPartitionPhysiNode {
SNodeList* pExprs; // these are expression list of partition_by_clause
SNodeList* pPartitionKeys;
SNodeList* pTargets;

bool needBlockOutputTsOrder;
int32_t tsSlotId;
} SPartitionPhysiNode;

typedef struct SStreamPartitionPhysiNode {
Expand Down
1 change: 1 addition & 0 deletions include/libs/nodes/querynodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ typedef struct SLeftValueNode {
typedef enum EHintOption {
HINT_NO_BATCH_SCAN = 1,
HINT_BATCH_SCAN,
HINT_SORT_FOR_GROUP,
} EHintOption;

typedef struct SHintNode {
Expand Down
26 changes: 25 additions & 1 deletion source/common/src/tdatablock.c
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,13 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
return 0;
}
}
__compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);

__compar_fn_t fn;
if (pOrder->compFn) {
fn = pOrder->compFn;
} else {
fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
}

int ret = fn(left1, right1);
if (ret == 0) {
Expand Down Expand Up @@ -1099,6 +1105,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order);
}

terrno = 0;
Expand Down Expand Up @@ -2515,3 +2522,20 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
}

int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
if (!pDataBlock || !pOrderInfo) return 0;
for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是因为目前只有一列吗,要不这个逻辑就不对了吧

SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i);
pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId);
pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
}
SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
int32_t rowIdx = 0, nextRowIdx = 1;
for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
break;
}
}
return nextRowIdx;
}
18 changes: 18 additions & 0 deletions source/libs/executor/inc/executil.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,22 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t

SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI);

/**
* @brief build a tuple into keyBuf
* @param [out] keyBuf the output buf
* @param [in] pSortGroupCols the cols to build
* @param [in] pBlock block the tuple in
*/
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex);

int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pDataBlock,
int32_t rowIndex);

uint64_t calcGroupId(char *pData, int32_t len);

SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys);

int32_t extractKeysLen(const SArray* keys);

#endif // TDENGINE_EXECUTIL_H
10 changes: 10 additions & 0 deletions source/libs/executor/inc/tsort.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ void tsortSetClosed(SSortHandle* pHandle);
void tsortSetSingleTableMerge(SSortHandle* pHandle);
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param);

/**
* @brief comp the tuple with keyBuf, if not equal, new keys will be built in keyBuf, newLen will be stored in keyLen
* @param [in] pSortCols cols to comp and build
* @param [in, out] pass in the old keys, if comp not equal, new keys will be built in it.
* @param [in, out] keyLen the old keysLen, if comp not equal, new keysLen will be stored in it.
* @param [in] the tuple to comp with
* @retval 0 if comp equal, 1 if not
*/
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple);

#ifdef __cplusplus
}
#endif
Expand Down
1 change: 1 addition & 0 deletions source/libs/executor/src/aggregateoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc

SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
return TSDB_CODE_SUCCESS;
Expand Down
101 changes: 101 additions & 0 deletions source/libs/executor/src/executil.c
Original file line number Diff line number Diff line change
Expand Up @@ -2261,3 +2261,104 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + delta; // window end key
}

int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) {
SColumnDataAgg* pColAgg = NULL;
const char* isNull = oldkeyBuf;
const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;

for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];

if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
if (isNull[i] != 1) return 1;
} else {
if (isNull[i] != 0) return 1;
const char* val = colDataGetData(pColInfoData, rowIndex);
if (pCol->type == TSDB_DATA_TYPE_JSON) {
int32_t len = getJsonValueLen(val);
if (memcmp(p, val, len) != 0) return 1;
p += len;
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
p += varDataTLen(val);
} else {
if (0 != memcmp(p, val, pCol->bytes)) return 1;
p += pCol->bytes;
}
}
}
if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
return 0;
}

int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock,
int32_t rowIndex) {
uint32_t colNum = pSortGroupCols->size;
SColumnDataAgg* pColAgg = NULL;
char* isNull = keyBuf;
char* p = keyBuf + sizeof(int8_t) * colNum;

for (int32_t i = 0; i < colNum; ++i) {
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
if (pCol->slotId > pBlock->pDataBlock->size) continue;

if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];

if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
isNull[i] = 1;
} else {
isNull[i] = 0;
const char* val = colDataGetData(pColInfoData, rowIndex);
if (pCol->type == TSDB_DATA_TYPE_JSON) {
int32_t len = getJsonValueLen(val);
memcpy(p, val, len);
p += len;
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
varDataCopy(p, val);
p += varDataTLen(val);
} else {
memcpy(p, val, pCol->bytes);
p += pCol->bytes;
}
}
}
return (int32_t)(p - keyBuf);
}

uint64_t calcGroupId(char* pData, int32_t len) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pData, len);
tMD5Final(&context);

// NOTE: only extract the initial 8 bytes of the final MD5 digest
uint64_t id = 0;
memcpy(&id, context.digest, sizeof(uint64_t));
if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
return id;
}

SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
SNode* node;
SNodeList* ret = NULL;
FOREACH(node, pSortKeys) {
SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
nodesListMakeAppend(&ret, pSortKey->pExpr);
}
return ret;
}

int32_t extractKeysLen(const SArray* keys) {
int32_t len = 0;
int32_t keyNum = taosArrayGetSize(keys);
for (int32_t i = 0; i < keyNum; ++i) {
SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
len += pCol->bytes;
}
len += sizeof(int8_t) * keyNum; //null flag
return len;
}