Skip to content

Commit

Permalink
Merge pull request #22236 from taosdata/fix/TD-25416
Browse files Browse the repository at this point in the history
fix(tsdb/cache/del): use lru mutex to fix race with committing
  • Loading branch information
gccgdb1234 committed Jul 28, 2023
2 parents f3b862e + 1fcbf34 commit c19feb2
Showing 1 changed file with 15 additions and 107 deletions.
122 changes: 15 additions & 107 deletions source/dnode/vnode/src/tsdb/tsdbCache.c
Expand Up @@ -1028,55 +1028,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache

return code;
}
/*
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0;
SLRUCache *pCache = pTsdb->lruCache;
SArray *pCidList = pr->pCidList;
int num_keys = TARRAY_SIZE(pCidList);
for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = NULL;
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h,
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);

if (h) {
taosLRUCacheRelease(pCache, h, false);
}
taosArrayPush(pLastArray, &lastCol);
}
return code;
}
*/
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
int32_t code = 0;
// fetch schema
Expand Down Expand Up @@ -1108,6 +1060,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
taosThreadMutexLock(&pTsdb->lruMutex);
taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksMayWrite(pTsdb, true, false, false);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
Expand Down Expand Up @@ -1137,7 +1090,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]);

taosThreadMutexLock(&pTsdb->lruMutex);
// taosThreadMutexLock(&pTsdb->lruMutex);

LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
if (h) {
Expand All @@ -1159,7 +1112,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
}
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);

taosThreadMutexUnlock(&pTsdb->lruMutex);
// taosThreadMutexUnlock(&pTsdb->lruMutex);
}
for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]);
Expand All @@ -1171,6 +1124,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE

rocksMayWrite(pTsdb, true, false, true);

taosThreadMutexUnlock(&pTsdb->lruMutex);

_exit:
taosMemoryFree(pTSchema);

Expand Down Expand Up @@ -1311,62 +1266,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {

return code;
}
/*
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;

// getTableCacheKey(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
}
// getTableCacheKey(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
}
*/
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) {
int32_t code = 0;
STSRow *cacheRow = NULL;
Expand Down Expand Up @@ -1767,6 +1667,10 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
}

if (record.version <= pReader->info.verRange.maxVer) {
/*
tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);
*/
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pInfo->pTombData, &delData);
}
Expand Down Expand Up @@ -1977,9 +1881,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
goto _err;
}

loadDataTomb(state->pr, state->pr->pFileReader);

state->pr->pCurFileSet = state->pFileSet;

loadDataTomb(state->pr, state->pr->pFileReader);
}

if (!state->pIndexList) {
Expand Down Expand Up @@ -2017,6 +1921,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->iBrinIndex = indexSize;
}

if (state->pFileSet != state->pr->pCurFileSet) {
state->pr->pCurFileSet = state->pFileSet;
}

code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
state->pr, state->lastTs, aCols, nCols);
if (code != TSDB_CODE_SUCCESS) {
Expand Down

0 comments on commit c19feb2

Please sign in to comment.