Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tsdb neighbouring block search and set it to global block iterator #22415

Merged
merged 8 commits into from
Aug 31, 2023
52 changes: 38 additions & 14 deletions source/dnode/vnode/src/tsdb/tsdbRead2.c
Original file line number Diff line number Diff line change
Expand Up @@ -1102,10 +1102,10 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
(pVerRange->maxVer < pBlock->record.maxVer && pVerRange->maxVer >= pBlock->record.minVer);
}

static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
int32_t* nextIndex, int32_t order, SBrinRecord* pRecord) {
bool asc = ASCENDING_TRAVERSE(order);
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) {
return false;
}

Expand All @@ -1116,7 +1116,8 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl
int32_t step = asc ? 1 : -1;
// *nextIndex = pBlockInfo->tbBlockIdx + step;
// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
SBrinRecord* p = taosArrayGet(pTableBlockScanInfo->pBlockList, pBlockInfo->tbBlockIdx + step);
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
SBrinRecord* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
memcpy(pRecord, p, sizeof(SBrinRecord));

*nextIndex = pBlockInfo->tbBlockIdx + step;
Expand All @@ -1141,20 +1142,42 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
return -1;
}

static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
if (index < 0 || index >= pBlockIter->numOfBlocks) {
return -1;
}

SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
pBlockIter->index += step;

if (index != pBlockIter->index) {
taosArrayRemove(pBlockIter->blockList, index);
taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
if (index != pBlockIter->index) {
if (index > pBlockIter->index) {
for (int32_t i = index - 1; i >= pBlockIter->index; --i) {
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);

SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
pTableDataBlockIdx->globalIndex = i + 1;

taosArraySet(pBlockIter->blockList, i + 1, pBlockInfo);
}
} else if (index < pBlockIter->index) {
for (int32_t i = index + 1; i <= pBlockIter->index; ++i) {
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);

STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
pTableDataBlockIdx->globalIndex = i - 1;

taosArraySet(pBlockIter->blockList, i - 1, pBlockInfo);
}

}

taosArraySet(pBlockIter->blockList, pBlockIter->index, &fblock);
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, fblock.uid, pReader->idStr);
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, fblock.tbBlockIdx);
pTableDataBlockIdx->globalIndex = pBlockIter->index;
}

return TSDB_CODE_SUCCESS;
Expand Down Expand Up @@ -1260,7 +1283,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
int32_t neighborIndex = 0;
SBrinRecord rec = {0};

bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->info.order, &rec);
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, pReader->info.order, &rec);

// overlap with neighbor
if (hasNeighbor) {
Expand Down Expand Up @@ -2232,7 +2255,7 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
*loadNeighbor = false;

SBrinRecord rec = {0};
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->info.order, &rec);
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pBlockScanInfo, &nextIndex, pReader->info.order, &rec);
if (!hasNeighbor) { // do nothing
return code;
}
Expand All @@ -2242,11 +2265,11 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
SDataBlockIter* pBlockIter = &pStatus->blockIter;

// 1. find the next neighbor block in the scan block list
SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex);
int32_t neighborIndex = tableDataBlockIdx->globalIndex;

// 2. remove it from the scan block list
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step);

// 3. load the neighbor block, and set it to be the currently accessed file data block
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
Expand Down Expand Up @@ -4178,6 +4201,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
}

pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
pBlockScanInfo->pBlockIdxList = taosArrayDestroy(pBlockScanInfo->pBlockIdxList);
// TODO: keep skyline for reuse
pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
}
Expand Down
25 changes: 24 additions & 1 deletion source/dnode/vnode/src/tsdb/tsdbReadUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ void clearBlockScanInfo(STableBlockScanInfo* p) {

p->delSkyline = taosArrayDestroy(p->delSkyline);
p->pBlockList = taosArrayDestroy(p->pBlockList);
p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList);
p->pMemDelData = taosArrayDestroy(p->pMemDelData);
p->pFileDelData = taosArrayDestroy(p->pFileDelData);
}
Expand All @@ -238,6 +239,7 @@ void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
// reset the index in last block when handing a new file
taosArrayClear(pScanInfo->pBlockList);
taosArrayClear(pScanInfo->pBlockIdxList);
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
}

Expand Down Expand Up @@ -384,12 +386,21 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3

// since there is only one table qualified, blocks are not sorted
if (sup.numOfTables == 1) {
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0);
if (pTableScanInfo->pBlockIdxList == NULL) {
pTableScanInfo->pBlockIdxList = taosArrayInit(numOfBlocks, sizeof(STableDataBlockIdx));
}
for (int32_t i = 0; i < numOfBlocks; ++i) {
SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);

taosArrayPush(pBlockIter->blockList, &blockInfo);

STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i};
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
}
taosArrayDestroy(pTableScanInfo->pBlockList);
pTableScanInfo->pBlockList = NULL;

int64_t et = taosGetTimestampUs();
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
Expand Down Expand Up @@ -420,7 +431,13 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);

taosArrayPush(pBlockIter->blockList, &blockInfo);

STableBlockScanInfo *pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo;
if (pTableScanInfo->pBlockIdxList == NULL) {
size_t szTableDataBlocks = taosArrayGetSize(pTableScanInfo->pBlockList);
pTableScanInfo->pBlockIdxList = taosArrayInit(szTableDataBlocks, sizeof(STableDataBlockIdx));
}
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal};
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
// set data block index overflow, in order to disable the offset comparator
if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
Expand All @@ -430,6 +447,12 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
}

for (int32_t i = 0; i < numOfTables; ++i) {
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
taosArrayDestroy(pTableScanInfo->pBlockList);
pTableScanInfo->pBlockList = NULL;
}

int64_t et = taosGetTimestampUs();
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
(et - st) / 1000.0, pReader->idStr);
Expand Down
5 changes: 5 additions & 0 deletions source/dnode/vnode/src/tsdb/tsdbReadUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,16 @@ typedef struct {
bool hasVal;
} SIterInfo;

typedef struct STableDataBlockIdx {
int32_t globalIndex;
} STableDataBlockIdx;

typedef struct STableBlockScanInfo {
uint64_t uid;
TSKEY lastKey;
TSKEY lastKeyInStt; // last accessed key in stt
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
SArray* pMemDelData; // SArray<SDelData>
SArray* pFileDelData; // SArray<SDelData> from each file set
SIterInfo iter; // mem buffer skip list iterator
Expand Down