Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tsdb/cache): reuse data file reader #22166

Merged
merged 2 commits into from Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions source/dnode/vnode/src/inc/tsdb.h
Expand Up @@ -877,6 +877,7 @@ typedef struct SCacheRowsReader {
SSHashObj *pTableMap;
SArray *pLDataIterArray;
struct SDataFileReader *pFileReader;
STFileSet *pCurFileSet;
STsdbReadSnap *pReadSnap;
char *idstr;
int64_t lastTs;
Expand Down
103 changes: 65 additions & 38 deletions source/dnode/vnode/src/tsdb/tsdbCache.c
Expand Up @@ -1949,35 +1949,39 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie

STFileObj **pFileObj = state->pFileSet->farr;
if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.szPage};
const char *filesName[4] = {0};
if (pFileObj[0] != NULL) {
conf.files[0].file = *pFileObj[0]->f;
conf.files[0].exist = true;
filesName[0] = pFileObj[0]->fname;
if (state->pFileSet != state->pr->pCurFileSet) {
SDataFileReaderConfig conf = {.tsdb = state->pTsdb, .szPage = state->pTsdb->pVnode->config.szPage};
const char *filesName[4] = {0};
if (pFileObj[0] != NULL) {
conf.files[0].file = *pFileObj[0]->f;
conf.files[0].exist = true;
filesName[0] = pFileObj[0]->fname;

conf.files[1].file = *pFileObj[1]->f;
conf.files[1].exist = true;
filesName[1] = pFileObj[1]->fname;

conf.files[2].file = *pFileObj[2]->f;
conf.files[2].exist = true;
filesName[2] = pFileObj[2]->fname;
}

conf.files[1].file = *pFileObj[1]->f;
conf.files[1].exist = true;
filesName[1] = pFileObj[1]->fname;
if (pFileObj[3] != NULL) {
conf.files[3].exist = true;
conf.files[3].file = *pFileObj[3]->f;
filesName[3] = pFileObj[3]->fname;
}

conf.files[2].file = *pFileObj[2]->f;
conf.files[2].exist = true;
filesName[2] = pFileObj[2]->fname;
}
code = tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}

if (pFileObj[3] != NULL) {
conf.files[3].exist = true;
conf.files[3].file = *pFileObj[3]->f;
filesName[3] = pFileObj[3]->fname;
}
loadDataTomb(state->pr, state->pr->pFileReader);

code = tsdbDataFileReaderOpen(filesName, &conf, &state->pr->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
state->pr->pCurFileSet = state->pFileSet;
}

loadDataTomb(state->pr, state->pr->pFileReader);

if (!state->pIndexList) {
state->pIndexList = taosArrayInit(1, sizeof(SBrinBlk));
} else {
Expand Down Expand Up @@ -2053,7 +2057,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
}

if (!state->pLastRow) {
lastIterClose(&state->pLastIter);
if (state->pLastIter) {
lastIterClose(&state->pLastIter);
}

clearLastFileSet(state);
state->state = SFSNEXTROW_FILESET;
Expand Down Expand Up @@ -2154,7 +2160,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
}

if (!state->pLastRow) {
lastIterClose(&state->pLastIter);
if (state->pLastIter) {
lastIterClose(&state->pLastIter);
}

*ppRow = &state->row;
--state->iRow;
Expand Down Expand Up @@ -2214,19 +2222,6 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
return code;
}

int32_t clearNextRowFromFS(void *iter) {
int32_t code = 0;

SFSNextRowIter *state = (SFSNextRowIter *)iter;
if (!state) {
return code;
}

clearLastFileSet(state);

return code;
}

typedef enum SMEMNEXTROWSTATES {
SMEMNEXTROW_ENTER,
SMEMNEXTROW_NEXT,
Expand Down Expand Up @@ -2346,6 +2341,36 @@ typedef struct CacheNextRowIter {
STsdb *pTsdb;
} CacheNextRowIter;

int32_t clearNextRowFromFS(void *iter) {
int32_t code = 0;

SFSNextRowIter *state = (SFSNextRowIter *)iter;
if (!state) {
return code;
}

if (state->pLastIter) {
lastIterClose(&state->pLastIter);
}

if (state->pBlockData) {
tBlockDataDestroy(state->pBlockData);
state->pBlockData = NULL;
}

if (state->pTSRow) {
taosMemoryFree(state->pTSRow);
state->pTSRow = NULL;
}

if (state->pRowIter->pSkyline) {
taosArrayDestroy(state->pRowIter->pSkyline);
state->pRowIter->pSkyline = NULL;
}

return code;
}

static void clearLastFileSet(SFSNextRowIter *state) {
if (state->pLastIter) {
lastIterClose(&state->pLastIter);
Expand All @@ -2359,6 +2384,8 @@ static void clearLastFileSet(SFSNextRowIter *state) {
if (state->pr->pFileReader) {
tsdbDataFileReaderClose(&state->pr->pFileReader);
state->pr->pFileReader = NULL;

state->pr->pCurFileSet = NULL;
}

if (state->pTSRow) {
Expand Down