Skip to content

Commit

Permalink
Merge pull request #22166 from taosdata/fix/TD-24422
Browse files Browse the repository at this point in the history
fix(tsdb/cache): reuse data file reader
  • Loading branch information
gccgdb1234 committed Jul 25, 2023
2 parents f963c43 + 424dd40 commit 7ea9cf1
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 38 deletions.
1 change: 1 addition & 0 deletions source/dnode/vnode/src/inc/tsdb.h
Expand Up @@ -877,6 +877,7 @@ typedef struct SCacheRowsReader {
SSHashObj *pTableMap;
SArray *pLDataIterArray;
struct SDataFileReader *pFileReader;
STFileSet *pCurFileSet;
STsdbReadSnap *pReadSnap;
char *idstr;
int64_t lastTs;
Expand Down
103 changes: 65 additions & 38 deletions source/dnode/vnode/src/tsdb/tsdbCache.c
Expand Up @@ -1949,35 +1949,39 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie

STFileObj **pFileObj = state->pFileSet->farr;
if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.szPage};
const char *filesName[4] = {0};
if (pFileObj[0] != NULL) {
conf.files[0].file = *pFileObj[0]->f;
conf.files[0].exist = true;
filesName[0] = pFileObj[0]->fname;
if (state->pFileSet != state->pr->pCurFileSet) {
SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.szPage};
const char *filesName[4] = {0};
if (pFileObj[0] != NULL) {
conf.files[0].file = *pFileObj[0]->f;
conf.files[0].exist = true;
filesName[0] = pFileObj[0]->fname;

conf.files[1].file = *pFileObj[1]->f;
conf.files[1].exist = true;
filesName[1] = pFileObj[1]->fname;

conf.files[2].file = *pFileObj[2]->f;
conf.files[2].exist = true;
filesName[2] = pFileObj[2]->fname;
}

conf.files[1].file = *pFileObj[1]->f;
conf.files[1].exist = true;
filesName[1] = pFileObj[1]->fname;
if (pFileObj[3] != NULL) {
conf.files[3].exist = true;
conf.files[3].file = *pFileObj[3]->f;
filesName[3] = pFileObj[3]->fname;
}

conf.files[2].file = *pFileObj[2]->f;
conf.files[2].exist = true;
filesName[2] = pFileObj[2]->fname;
}
code = tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}

if (pFileObj[3] != NULL) {
conf.files[3].exist = true;
conf.files[3].file = *pFileObj[3]->f;
filesName[3] = pFileObj[3]->fname;
}
loadDataTomb(state->pr, state->pr->pFileReader);

code = tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
state->pr->pCurFileSet = state->pFileSet;
}

loadDataTomb(state->pr, state->pr->pFileReader);

if (!state->pIndexList) {
state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
} else {
Expand Down Expand Up @@ -2053,7 +2057,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
}

if (!state->pLastRow) {
lastIterClose(&state->pLastIter);
if (state->pLastIter) {
lastIterClose(&state->pLastIter);
}

clearLastFileSet(state);
state->state = SFSNEXTROW_FILESET;
Expand Down Expand Up @@ -2154,7 +2160,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
}

if (!state->pLastRow) {
lastIterClose(&state->pLastIter);
if (state->pLastIter) {
lastIterClose(&state->pLastIter);
}

*ppRow = &state->row;
--state->iRow;
Expand Down Expand Up @@ -2214,19 +2222,6 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
return code;
}

int32_t clearNextRowFromFS(void *iter) {
int32_t code = 0;

SFSNextRowIter *state = (SFSNextRowIter *)iter;
if (!state) {
return code;
}

clearLastFileSet(state);

return code;
}

typedef enum SMEMNEXTROWSTATES {
SMEMNEXTROW_ENTER,
SMEMNEXTROW_NEXT,
Expand Down Expand Up @@ -2346,6 +2341,36 @@ typedef struct CacheNextRowIter {
STsdb *pTsdb;
} CacheNextRowIter;

int32_t clearNextRowFromFS(void *iter) {
int32_t code = 0;

SFSNextRowIter *state = (SFSNextRowIter *)iter;
if (!state) {
return code;
}

if (state->pLastIter) {
lastIterClose(&state->pLastIter);
}

if (state->pBlockData) {
tBlockDataDestroy(state->pBlockData);
state->pBlockData = NULL;
}

if (state->pTSRow) {
taosMemoryFree(state->pTSRow);
state->pTSRow = NULL;
}

if (state->pRowIter->pSkyline) {
taosArrayDestroy(state->pRowIter->pSkyline);
state->pRowIter->pSkyline = NULL;
}

return code;
}

static void clearLastFileSet(SFSNextRowIter *state) {
if (state->pLastIter) {
lastIterClose(&state->pLastIter);
Expand All @@ -2359,6 +2384,8 @@ static void clearLastFileSet(SFSNextRowIter *state) {
if (state->pr->pFileReader) {
tsdbDataFileReaderClose(&state->pr->pFileReader);
state->pr->pFileReader = NULL;

state->pr->pCurFileSet = NULL;
}

if (state->pTSRow) {
Expand Down

0 comments on commit 7ea9cf1

Please sign in to comment.