diff --git a/src/defrag.c b/src/defrag.c index a730c62891ff..5b98ec228c89 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -70,6 +70,22 @@ sds activeDefragSds(sds sdsptr) { return NULL; } +/* Defrag helper for hfield strings + * + * returns NULL in case the allocation wasn't moved. + * when it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +hfield activeDefragHfield(hfield hf) { + void *ptr = hfieldGetAllocPtr(hf); + void *newptr = activeDefragAlloc(ptr); + if (newptr) { + size_t offset = hf - (char*)ptr; + hf = (char*)newptr + offset; + return hf; + } + return NULL; +} + /* Defrag helper for robj and/or string objects with expected refcount. * * Like activeDefragStringOb, but it requires the caller to pass in the expected @@ -250,6 +266,31 @@ void activeDefragSdsDictCallback(void *privdata, const dictEntry *de) { UNUSED(de); } +void activeDefragHfieldDictCallback(void *privdata, const dictEntry *de) { + dict *d = privdata; + hfield newhf, hf = dictGetKey(de); + + if (hfieldGetExpireTime(hf) == EB_EXPIRE_TIME_INVALID) { + /* If the hfield does not have TTL, we directly defrag it. */ + newhf = activeDefragHfield(hf); + } else { + /* Update its reference in the ebucket while defragging it. */ + ebuckets *eb = hashTypeGetDictMetaHFE(d); + newhf = ebDefragItem(eb, &hashFieldExpireBucketsType, hf, (ebDefragFunction *)activeDefragHfield); + } + if (newhf) { + /* We can't search in dict for that key after we've released + * the pointer it holds, since it won't be able to do the string + * compare, but we can find the entry using key hash and pointer. */ + dictUseStoredKeyApi(d, 1); + uint64_t hash = dictGetHash(d, newhf); + dictUseStoredKeyApi(d, 0); + dictEntry *de = dictFindEntryByPtrAndHash(d, hf, hash); + serverAssert(de); + dictSetKey(d, de, newhf); + } +} + /* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */ void activeDefragSdsDict(dict* d, int val_type) { unsigned long cursor = 0; @@ -268,6 +309,20 @@ void activeDefragSdsDict(dict* d, int val_type) { } while (cursor != 0); } +/* Defrag a dict with hfield key and sds value. */ +void activeDefragHfieldDict(dict *d) { + unsigned long cursor = 0; + dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */ + .defragVal = (dictDefragAllocFunction *)activeDefragSds + }; + do { + cursor = dictScanDefrag(d, cursor, activeDefragHfieldDictCallback, + &defragfns, d); + } while (cursor != 0); +} + /* Defrag a list of ptr, sds or robj string values */ void activeDefragList(list *l, int val_type) { listNode *ln, *newln; @@ -422,10 +477,10 @@ void scanLaterHash(robj *ob, unsigned long *cursor) { dict *d = ob->ptr; dictDefragFunctions defragfns = { .defragAlloc = activeDefragAlloc, - .defragKey = (dictDefragAllocFunction *)activeDefragSds, + .defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */ .defragVal = (dictDefragAllocFunction *)activeDefragSds }; - *cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL); + *cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d); } void defragQuicklist(redisDb *db, dictEntry *kde) { @@ -477,7 +532,7 @@ void defragHash(redisDb *db, dictEntry *kde) { if (dictSize(d) > server.active_defrag_max_scan_fields) defragLater(db, kde); else - activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS); + activeDefragHfieldDict(d); /* defrag the dict struct and tables */ if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd; @@ -672,7 +727,7 @@ void defragModule(redisDb *db, dictEntry *kde) { * all the various pointers it has. */ void defragKey(defragCtx *ctx, dictEntry *de) { sds keysds = dictGetKey(de); - robj *newob, *ob; + robj *newob, *ob = dictGetVal(de); unsigned char *newzl; sds newsds; redisDb *db = ctx->privdata; @@ -689,11 +744,22 @@ void defragKey(defragCtx *ctx, dictEntry *de) { dictEntry *expire_de = kvstoreDictFindEntryByPtrAndHash(db->expires, slot, keysds, hash); if (expire_de) kvstoreDictSetKey(db->expires, slot, expire_de, newsds); } + + /* Update the key's reference in the dict's metadata or the listpackEx. */ + if (unlikely(ob->type == OBJ_HASH)) + hashTypeUpdateKeyRef(ob, newsds); } /* Try to defrag robj and / or string value. */ - ob = dictGetVal(de); - if ((newob = activeDefragStringOb(ob))) { + if (unlikely(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob) != EB_EXPIRE_TIME_INVALID)) { + /* Update its reference in the ebucket while defragging it. */ + newob = ebDefragItem(&db->hexpires, &hashExpireBucketsType, ob, + (ebDefragFunction *)activeDefragStringOb); + } else { + /* If the dict doesn't have metadata, we directly defrag it. */ + newob = activeDefragStringOb(ob); + } + if (newob) { kvstoreDictSetVal(db->keys, slot, de, newob); ob = newob; } @@ -734,6 +800,12 @@ void defragKey(defragCtx *ctx, dictEntry *de) { if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; + } else if (ob->encoding == OBJ_ENCODING_LISTPACK_EX) { + listpackEx *newlpt, *lpt = (listpackEx*)ob->ptr; + if ((newlpt = activeDefragAlloc(lpt))) + ob->ptr = lpt = newlpt; + if ((newzl = activeDefragAlloc(lpt->lp))) + lpt->lp = newzl; } else if (ob->encoding == OBJ_ENCODING_HT) { defragHash(db, de); } else { diff --git a/src/ebuckets.c b/src/ebuckets.c index 7f3b900daf8c..b5dacb3dd916 100644 --- a/src/ebuckets.c +++ b/src/ebuckets.c @@ -1780,6 +1780,68 @@ void ebValidate(ebuckets eb, EbucketsType *type) { ebValidateRax(ebGetRaxPtr(eb), type); } +/* Reallocates the memory used by the item using the provided allocation function. + * This feature was added for the active defrag feature. + * + * The 'defragfn' callbacks are called with a pointer to memory that callback + * can reallocate. The callbacks should return a new memory address or NULL, + * where NULL means that no reallocation happened and the old memory is still valid. + * + * Note: It is the caller's responsibility to ensure that the item has a valid expire time. */ +eItem ebDefragItem(ebuckets *eb, EbucketsType *type, eItem item, ebDefragFunction *defragfn) { + assert(!ebIsEmpty(*eb)); + if (ebIsList(*eb)) { + ExpireMeta *prevem = NULL; + eItem curitem = ebGetListPtr(type, *eb); + while (curitem != NULL) { + if (curitem == item) { + if ((curitem = defragfn(curitem))) { + if (prevem) + prevem->next = curitem; + else + *eb = ebMarkAsList(curitem); + } + return curitem; + } + + /* Move to the next item in the list. */ + prevem = type->getExpireMeta(curitem); + curitem = prevem->next; + } + } else { + CommonSegHdr *currHdr; + ExpireMeta *mIter = type->getExpireMeta(item); + assert(mIter->trash != 1); + while (mIter->lastInSegment == 0) + mIter = type->getExpireMeta(mIter->next); + + if (mIter->lastItemBucket) + currHdr = (CommonSegHdr *) mIter->next; + else + currHdr = (CommonSegHdr *) ((NextSegHdr *) mIter->next)->prevSeg; + /* If the item is the first in the segment, then update the segment header */ + if (currHdr->head == item) { + if ((item = defragfn(item))) { + currHdr->head = item; + } + return item; + } + + /* Iterate over all items in the segment until the next is 'item' */ + ExpireMeta *mHead = type->getExpireMeta(currHdr->head); + mIter = mHead; + while (mIter->next != item) + mIter = type->getExpireMeta(mIter->next); + assert(mIter->next == item); + + if ((item = defragfn(item))) { + mIter->next = item; + } + return item; + } + redis_unreachable(); +} + /* Retrieves the expiration time associated with the given item. If associated * ExpireMeta is marked as trash, then return EB_EXPIRE_TIME_INVALID */ uint64_t ebGetExpireTime(EbucketsType *type, eItem item) { @@ -1794,12 +1856,14 @@ uint64_t ebGetExpireTime(EbucketsType *type, eItem item) { #include #include #include +#include #include "testhelp.h" #define TEST(name) printf("[TEST] >>> %s\n", name); #define TEST_COND(name, cond) printf("[%s] >>> %s\n", (cond) ? "TEST" : "BYPS", name); if (cond) typedef struct MyItem { + int index; ExpireMeta mexpire; } MyItem; @@ -1976,6 +2040,14 @@ void distributeTest(int lowestTime, #define UNUSED(x) (void)(x) #define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0])) +eItem defragCallback(const eItem item) { + size_t size = zmalloc_usable_size(item); + eItem newitem = zmalloc(size); + memcpy(newitem, item, size); + zfree(item); + return newitem; +} + int ebucketsTest(int argc, char **argv, int flags) { UNUSED(argc); UNUSED(argv); @@ -2307,6 +2379,29 @@ int ebucketsTest(int argc, char **argv, int flags) { } + TEST("item defragmentation") { + for (int s = 1; s <= EB_LIST_MAX_ITEMS * 3; s++) { + ebuckets eb = NULL; + MyItem *items[s]; + for (int i = 0; i < s; i++) { + items[i] = zmalloc(sizeof(MyItem)); + items[i]->index = i; + ebAdd(&eb, &myEbucketsType, items[i], i); + } + assert((s <= EB_LIST_MAX_ITEMS) ? ebIsList(eb) : !ebIsList(eb)); + /* Defrag all the items. */ + for (int i = 0; i < s; i++) { + MyItem *newitem = ebDefragItem(&eb, &myEbucketsType, items[i], defragCallback); + if (newitem) items[i] = newitem; + } + /* Verify that the data is not corrupted. */ + ebValidate(eb, &myEbucketsType); + for (int i = 0; i < s; i++) + assert(items[i]->index == i); + ebDestroy(&eb, &myEbucketsType, NULL); + } + } + // TEST("segment - Add smaller item to full segment that all share same ebucket-key") // TEST("segment - Add item to full segment and make it extended-segment (all share same ebucket-key)") // TEST("ebuckets - Create rax tree with extended-segment and add item before") diff --git a/src/ebuckets.h b/src/ebuckets.h index 2349ebcd9163..f30337330610 100644 --- a/src/ebuckets.h +++ b/src/ebuckets.h @@ -283,6 +283,9 @@ int ebAdd(ebuckets *eb, EbucketsType *type, eItem item, uint64_t expireTime); uint64_t ebGetExpireTime(EbucketsType *type, eItem item); +typedef eItem (ebDefragFunction)(const eItem item); +eItem ebDefragItem(ebuckets *eb, EbucketsType *type, eItem item, ebDefragFunction *fn); + static inline uint64_t ebGetMetaExpTime(ExpireMeta *expMeta) { return (((uint64_t)(expMeta)->expireTimeHi << 32) | (expMeta)->expireTimeLo); } diff --git a/src/server.h b/src/server.h index eb6e0405a18f..d21345f47fa7 100644 --- a/src/server.h +++ b/src/server.h @@ -2482,7 +2482,7 @@ extern dictType keylistDictType; extern dict *modules; extern EbucketsType hashExpireBucketsType; /* global expires */ -extern EbucketsType hashFieldExpiresBucketType; /* local per hash */ +extern EbucketsType hashFieldExpireBucketsType; /* local per hash */ /*----------------------------------------------------------------------------- * Functions prototypes @@ -3197,12 +3197,16 @@ void hashTypeAddToExpires(redisDb *db, sds key, robj *hashObj, uint64_t expireTi void hashTypeFree(robj *o); int hashTypeIsExpired(const robj *o, uint64_t expireAt); unsigned char *hashTypeListpackGetLp(robj *o); +uint64_t hashTypeGetMinExpire(robj *o); +void hashTypeUpdateKeyRef(robj *o, sds newkey); +ebuckets *hashTypeGetDictMetaHFE(dict *d); /* Hash-Field data type (of t_hash.c) */ hfield hfieldNew(const void *field, size_t fieldlen, int withExpireMeta); hfield hfieldTryNew(const void *field, size_t fieldlen, int withExpireMeta); int hfieldIsExpireAttached(hfield field); int hfieldIsExpired(hfield field); +uint64_t hfieldGetExpireTime(hfield field); static inline void hfieldFree(hfield field) { mstrFree(&mstrFieldKind, field); } static inline void *hfieldGetAllocPtr(hfield field) { return mstrGetAllocPtr(&mstrFieldKind, field); } static inline size_t hfieldlen(hfield field) { return mstrlen(field);} diff --git a/src/t_hash.c b/src/t_hash.c index 4c37943fdfa5..36240a646917 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -21,11 +21,8 @@ static ExpireMeta *hashGetExpireMeta(const eItem hash); static void hexpireGenericCommand(client *c, const char *cmd, long long basetime, int unit); static ExpireAction hashTypeActiveExpire(eItem hashObj, void *ctx); static void hfieldPersist(robj *hashObj, hfield field); -static uint64_t hfieldGetExpireTime(hfield field); static void updateGlobalHfeDs(redisDb *db, robj *o, uint64_t minExpire, uint64_t minExpireFields); static uint64_t hashTypeGetNextTimeToExpire(robj *o); -static uint64_t hashTypeGetMinExpire(robj *keyObj); - /* hash dictType funcs */ static int dictHfieldKeyCompare(dict *d, const void *key1, const void *key2); @@ -1999,6 +1996,24 @@ void hashTypeFree(robj *o) { } } +/* Attempts to update the reference to the new key. Now it's only used in defrag. */ +void hashTypeUpdateKeyRef(robj *o, sds newkey) { + if (o->encoding == OBJ_ENCODING_LISTPACK_EX) { + listpackEx *lpt = o->ptr; + lpt->key = newkey; + } else if (o->encoding == OBJ_ENCODING_HT && isDictWithMetaHFE(o->ptr)) { + dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *)dictMetadata((dict*)o->ptr); + dictExpireMeta->key = newkey; + } else { + /* Nothing to do. */ + } +} + +ebuckets *hashTypeGetDictMetaHFE(dict *d) { + dictExpireMetadata *dictExpireMeta = (dictExpireMetadata *) dictMetadata(d); + return &dictExpireMeta->hfe; +} + /*----------------------------------------------------------------------------- * Hash type commands *----------------------------------------------------------------------------*/ @@ -2635,7 +2650,7 @@ static ExpireMeta* hfieldGetExpireMeta(const eItem field) { return mstrMetaRef(field, &mstrFieldKind, (int) HFIELD_META_EXPIRE); } -static uint64_t hfieldGetExpireTime(hfield field) { +uint64_t hfieldGetExpireTime(hfield field) { if (!hfieldIsExpireAttached(field)) return EB_EXPIRE_TIME_INVALID; diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index e6ae34e7df09..d04f0de4c101 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -503,6 +503,105 @@ run_solo {defrag} { $rd_pubsub close } + test "Active defrag HFE: $type" { + r flushdb + r config resetstat + r config set hz 100 + r config set activedefrag no + # TODO: Lower the threshold after defraging the ebuckets. + # Now just to ensure that the reference is updated correctly. + r config set active-defrag-threshold-lower 12 + r config set active-defrag-cycle-min 65 + r config set active-defrag-cycle-max 75 + r config set active-defrag-ignore-bytes 1500kb + r config set maxmemory 0 + r config set hash-max-listpack-value 512 + r config set hash-max-listpack-entries 10 + + # Populate memory with interleaving hash field of same size + set n 3000 + set fields 16 ;# make all the fields in an eblist. + set dummy_field "[string repeat x 400]" + set rd [redis_deferring_client] + for {set i 0} {$i < $n} {incr i} { + for {set j 0} {$j < $fields} {incr j} { + $rd hset h$i f$j $dummy_field + $rd hexpire h$i 9999999 1 f$j + $rd set "k$i$j" $dummy_field + } + } + for {set j 0} {$j < [expr $n*$fields]} {incr j} { + $rd read ; # Discard hset replies + $rd read ; # Discard hexpire replies + $rd read ; # Discard set replies + } + + # Coverage for listpackex. + r hset h_lpex f0 $dummy_field + r hexpire h_lpex 9999999 1 f0 + assert_encoding listpackex h_lpex + + after 120 ;# serverCron only updates the info once in 100ms + if {$::verbose} { + puts "used [s allocator_allocated]" + puts "rss [s allocator_active]" + puts "frag [s allocator_frag_ratio]" + puts "frag_bytes [s allocator_frag_bytes]" + } + assert_lessthan [s allocator_frag_ratio] 1.05 + + # Delete all the keys to create fragmentation + for {set i 0} {$i < $n} {incr i} { + for {set j 0} {$j < $fields} {incr j} { + r del "k$i$j" + } + } + $rd close + after 120 ;# serverCron only updates the info once in 100ms + if {$::verbose} { + puts "used [s allocator_allocated]" + puts "rss [s allocator_active]" + puts "frag [s allocator_frag_ratio]" + puts "frag_bytes [s allocator_frag_bytes]" + } + assert_morethan [s allocator_frag_ratio] 1.35 + + catch {r config set activedefrag yes} e + if {[r config get activedefrag] eq "activedefrag yes"} { + + # wait for the active defrag to start working (decision once a second) + wait_for_condition 50 100 { + [s total_active_defrag_time] ne 0 + } else { + after 120 ;# serverCron only updates the info once in 100ms + puts [r info memory] + puts [r info stats] + puts [r memory malloc-stats] + fail "defrag not started." + } + + # wait for the active defrag to stop working + wait_for_condition 500 100 { + [s active_defrag_running] eq 0 + } else { + after 120 ;# serverCron only updates the info once in 100ms + puts [r info memory] + puts [r memory malloc-stats] + fail "defrag didn't stop." + } + + # test the fragmentation is lower + after 120 ;# serverCron only updates the info once in 100ms + if {$::verbose} { + puts "used [s allocator_allocated]" + puts "rss [s allocator_active]" + puts "frag [s allocator_frag_ratio]" + puts "frag_bytes [s allocator_frag_bytes]" + } + assert_lessthan_equal [s allocator_frag_ratio] 1.5 + } + } + if {$type eq "standalone"} { ;# skip in cluster mode test "Active defrag big list: $type" { r flushdb