From a25b15392c3f50909ab2dafbcbdb3794ada29620 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Wed, 22 May 2024 23:05:32 +0300 Subject: [PATCH] Improve performance of hfe listpack (#13279) This PR contains a few optimizations for hfe listpack. - Hfe fields are ordered by TTL in the listpack. There are two cases that we want to search listpack according to TTLs: - As part of active-expiry, we need to find the fields that are expired. e.g. find fields that have smaller TTLs than given timestamp. - When we want to add a new field, we need to find the correct position to maintain the order by TTL. e.g. find the field that has a higher TTL than the one we want to insert. Iterating with lpNext() to compare TTLs has a performance cost as lpNext() calls lpValidateIntegrity() for each entry. Instead, this PR adds `lpFindCb()` to the listpack which accepts a comparator callback. It preserves same validation logic of lpFind() which is faster than search with lpNext(). - We have field name, value, ttl for a single hfe field. Inserting these items one by one to listpack is costly. Especially, as we place fields according to TTL, most additions will end up in the middle of the listpack. Each insert causes realloc + memmove. This PR introduces `lpBatchInsert()` to add multiple items in one go. - For hsetf, if we are going to update value and TTL at the same time, currently, we update the value first and later update the TTL (two distinct listpack operation). This PR improves it by doing it with a single update operation. --------- Co-authored-by: debing.sun --- src/listpack.c | 409 +++++++++++++++++++++++++++++++++++++++++++++---- src/listpack.h | 5 + src/t_hash.c | 283 +++++++++++++--------------------- 3 files changed, 492 insertions(+), 205 deletions(-) diff --git a/src/listpack.c b/src/listpack.c index 21b30eaee8b8..5d9028e13d0f 100644 --- a/src/listpack.c +++ b/src/listpack.c @@ -681,50 +681,35 @@ int lpGetIntegerValue(unsigned char *p, long long *lval) { return 0; } -/* Find pointer to the entry equal to the specified entry. Skip 'skip' entries - * between every comparison. Returns NULL when the field could not be found. */ -unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, - uint32_t slen, unsigned int skip) { +/* Find pointer to the entry with a comparator callback. + * + * 'cmp' is a comparator callback. If it returns zero, current entry pointer + * will be returned. 'user' is passed to this callback. + * Skip 'skip' entries between every comparison. + * Returns NULL when the field could not be found. */ +unsigned char *lpFindCb(unsigned char *lp, unsigned char *p, + void *user, lpCmp cmp, unsigned int skip) +{ int skipcnt = 0; - unsigned char vencoding = 0; unsigned char *value; - int64_t ll, vll; + int64_t ll; uint64_t entry_size = 123456789; /* initialized to avoid warning. */ uint32_t lp_bytes = lpBytes(lp); - assert(p); + if (!p) + p = lpFirst(lp); + while (p) { if (skipcnt == 0) { value = lpGetWithSize(p, &ll, NULL, &entry_size); if (value) { /* check the value doesn't reach outside the listpack before accessing it */ assert(p >= lp + LP_HDR_SIZE && p + entry_size < lp + lp_bytes); - if (slen == ll && memcmp(value, s, slen) == 0) { - return p; - } - } else { - /* Find out if the searched field can be encoded. Note that - * we do it only the first time, once done vencoding is set - * to non-zero and vll is set to the integer value. */ - if (vencoding == 0) { - /* If the entry can be encoded as integer we set it to - * 1, else set it to UCHAR_MAX, so that we don't retry - * again the next time. */ - if (slen >= 32 || slen == 0 || !lpStringToInt64((const char*)s, slen, &vll)) { - vencoding = UCHAR_MAX; - } else { - vencoding = 1; - } - } - - /* Compare current entry with specified entry, do it only - * if vencoding != UCHAR_MAX because if there is no encoding - * possible for the field it can't be a valid integer. */ - if (vencoding != UCHAR_MAX && ll == vll) { - return p; - } } + if (cmp(lp, p, user, value, ll) == 0) + return p; + /* Reset skip count */ skipcnt = skip; p += entry_size; @@ -749,6 +734,62 @@ unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, return NULL; } +struct lpFindArg { + unsigned char *s; /* Item to search */ + uint32_t slen; /* Item len */ + int vencoding; + int64_t vll; +}; + +/* Comparator function to find item */ +static inline int lpFindCmp(const unsigned char *lp, unsigned char *p, + void *user, unsigned char *s, long long slen) { + (void) lp; + (void) p; + struct lpFindArg *arg = user; + + if (s) { + if (slen == arg->slen && memcmp(arg->s, s, slen) == 0) { + return 0; + } + } else { + /* Find out if the searched field can be encoded. Note that + * we do it only the first time, once done vencoding is set + * to non-zero and vll is set to the integer value. */ + if (arg->vencoding == 0) { + /* If the entry can be encoded as integer we set it to + * 1, else set it to UCHAR_MAX, so that we don't retry + * again the next time. */ + if (arg->slen >= 32 || arg->slen == 0 || !lpStringToInt64((const char*)arg->s, arg->slen, &arg->vll)) { + arg->vencoding = UCHAR_MAX; + } else { + arg->vencoding = 1; + } + } + + /* Compare current entry with specified entry, do it only + * if vencoding != UCHAR_MAX because if there is no encoding + * possible for the field it can't be a valid integer. */ + if (arg->vencoding != UCHAR_MAX && slen == arg->vll) { + return 0; + } + } + + return 1; +} + +/* Find pointer to the entry equal to the specified entry. Skip 'skip' entries + * between every comparison. Returns NULL when the field could not be found. */ +unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, + uint32_t slen, unsigned int skip) +{ + struct lpFindArg arg = { + .s = s, + .slen = slen + }; + return lpFindCb(lp, p, &arg, lpFindCmp, skip); +} + /* Insert, delete or replace the specified string element 'elestr' of length * 'size' or integer element 'eleint' at the specified position 'p', with 'p' * being a listpack element pointer obtained with lpFirst(), lpLast(), lpNext(), @@ -926,6 +967,140 @@ unsigned char *lpInsert(unsigned char *lp, unsigned char *elestr, unsigned char return lp; } +/* Insert the specified elements with 'entries' and 'len' at the specified + * position 'p', with 'p' being a listpack element pointer obtained with + * lpFirst(), lpLast(), lpNext(), lpPrev() or lpSeek(). + * + * This is similar to lpInsert() but allows you to insert batch of entries in + * one call. This function is more efficient than inserting entries one by one + * as it does single realloc()/memmove() calls for all the entries. + * + * In each listpackEntry, if 'sval' is not null, it is assumed entry is string + * and 'sval' and 'slen' will be used. Otherwise, 'lval' will be used to append + * the integer entry. + * + * The elements are inserted before or after the element pointed by 'p' + * depending on the 'where' argument, that can be LP_BEFORE or LP_AFTER. + * + * If 'newp' is not NULL, at the end of a successful call '*newp' will be set + * to the address of the element just added, so that it will be possible to + * continue an interaction with lpNext() and lpPrev(). + * + * Returns NULL on out of memory or when the listpack total length would exceed + * the max allowed size of 2^32-1, otherwise the new pointer to the listpack + * holding the new element is returned (and the old pointer passed is no longer + * considered valid). */ +unsigned char *lpBatchInsert(unsigned char *lp, unsigned char *p, int where, + listpackEntry *entries, unsigned int len, + unsigned char **newp) +{ + assert(where == LP_BEFORE || where == LP_AFTER); + assert(entries != NULL && len > 0); + + struct listpackInsertEntry { + int enctype; + uint64_t enclen; + unsigned char intenc[LP_MAX_INT_ENCODING_LEN]; + unsigned char backlen[LP_MAX_BACKLEN_SIZE]; + unsigned long backlen_size; + }; + + uint64_t addedlen = 0; /* The encoded length of the added elements. */ + struct listpackInsertEntry tmp[3]; /* Encoded entries */ + struct listpackInsertEntry *enc = tmp; + + if (len > sizeof(tmp) / sizeof(struct listpackInsertEntry)) { + /* If 'len' is larger than local buffer size, allocate on heap. */ + enc = zmalloc(len * sizeof(struct listpackInsertEntry)); + } + + /* If we need to insert after the current element, we just jump to the + * next element (that could be the EOF one) and handle the case of + * inserting before. So the function will actually deal with just one + * case: LP_BEFORE. */ + if (where == LP_AFTER) { + p = lpSkip(p); + where = LP_BEFORE; + ASSERT_INTEGRITY(lp, p); + } + + for (unsigned int i = 0; i < len; i++) { + listpackEntry *e = &entries[i]; + if (e->sval) { + /* Calling lpEncodeGetType() results into the encoded version of the + * element to be stored into 'intenc' in case it is representable as + * an integer: in that case, the function returns LP_ENCODING_INT. + * Otherwise, if LP_ENCODING_STR is returned, we'll have to call + * lpEncodeString() to actually write the encoded string on place + * later. + * + * Whatever the returned encoding is, 'enclen' is populated with the + * length of the encoded element. */ + enc[i].enctype = lpEncodeGetType(e->sval, e->slen, + enc[i].intenc, &enc[i].enclen); + } else { + enc[i].enctype = LP_ENCODING_INT; + lpEncodeIntegerGetType(e->lval, enc[i].intenc, &enc[i].enclen); + } + addedlen += enc[i].enclen; + + /* We need to also encode the backward-parsable length of the element + * and append it to the end: this allows to traverse the listpack from + * the end to the start. */ + enc[i].backlen_size = lpEncodeBacklen(enc[i].backlen, enc[i].enclen); + addedlen += enc[i].backlen_size; + } + + uint64_t old_listpack_bytes = lpGetTotalBytes(lp); + uint64_t new_listpack_bytes = old_listpack_bytes + addedlen; + if (new_listpack_bytes > UINT32_MAX) return NULL; + + /* Store the offset of the element 'p', so that we can obtain its + * address again after a reallocation. */ + unsigned long poff = p-lp; + unsigned char *dst = lp + poff; /* May be updated after reallocation. */ + + /* Realloc before: we need more room. */ + if (new_listpack_bytes > old_listpack_bytes && + new_listpack_bytes > lp_malloc_size(lp)) { + if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL; + dst = lp + poff; + } + + /* Setup the listpack relocating the elements to make the exact room + * we need to store the new ones. */ + memmove(dst+addedlen,dst,old_listpack_bytes-poff); + + for (unsigned int i = 0; i < len; i++) { + listpackEntry *ent = &entries[i]; + + if (newp) + *newp = dst; + + if (enc[i].enctype == LP_ENCODING_INT) + memcpy(dst, enc[i].intenc, enc[i].enclen); + else + lpEncodeString(dst, ent->sval, ent->slen); + + dst += enc[i].enclen; + memcpy(dst, enc[i].backlen, enc[i].backlen_size); + dst += enc[i].backlen_size; + } + + /* Update header. */ + uint32_t num_elements = lpGetNumElements(lp); + if (num_elements != LP_HDR_NUMELE_UNKNOWN) { + if ((int64_t) len > (int64_t) LP_HDR_NUMELE_UNKNOWN - (int64_t) num_elements) + lpSetNumElements(lp, LP_HDR_NUMELE_UNKNOWN); + else + lpSetNumElements(lp,num_elements + len); + } + lpSetTotalBytes(lp,new_listpack_bytes); + if (enc != tmp) lp_free(enc); + + return lp; +} + /* This is just a wrapper for lpInsert() to directly use a string. */ unsigned char *lpInsertString(unsigned char *lp, unsigned char *s, uint32_t slen, unsigned char *p, int where, unsigned char **newp) @@ -973,6 +1148,20 @@ unsigned char *lpAppendInteger(unsigned char *lp, long long lval) { return lpInsertInteger(lp, lval, eofptr, LP_BEFORE, NULL); } +/* Append batch of entries to the listpack. + * + * This call is more efficient than multiple lpAppend() calls as it only does + * a single realloc() for all the given entries. + * + * In each listpackEntry, if 'sval' is not null, it is assumed entry is string + * and 'sval' and 'slen' will be used. Otherwise, 'lval' will be used to append + * the integer entry. */ +unsigned char *lpBatchAppend(unsigned char *lp, listpackEntry *entries, unsigned long len) { + uint64_t listpack_bytes = lpGetTotalBytes(lp); + unsigned char *eofptr = lp + listpack_bytes - 1; + return lpBatchInsert(lp, eofptr, LP_BEFORE, entries, len, NULL); +} + /* This is just a wrapper for lpInsert() to directly use a string to replace * the current element. The function returns the new listpack as return * value, and also updates the current cursor by updating '*p'. */ @@ -1834,6 +2023,24 @@ static int lpValidation(unsigned char *p, unsigned int head_count, void *userdat return ret; } +static int lpFindCbCmp(const unsigned char *lp, unsigned char *p, void *user, unsigned char *s, long long slen) { + assert(lp); + assert(p); + + char *n = user; + + if (!s) { + int64_t sval; + if (lpStringToInt64((const char*)n, strlen(n), &sval)) + return slen == sval ? 0 : 1; + } else { + if (strlen(n) == (size_t) slen && memcmp(n, s, slen) == 0) + return 0; + } + + return 1; +} + int listpackTest(int argc, char *argv[], int flags) { UNUSED(argc); UNUSED(argv); @@ -2078,6 +2285,111 @@ int listpackTest(int argc, char *argv[], int flags) { zfree(lp); } + TEST("Batch append") { + listpackEntry ent[6] = { + {.sval = (unsigned char*)mixlist[0], .slen = strlen(mixlist[0])}, + {.sval = (unsigned char*)mixlist[1], .slen = strlen(mixlist[1])}, + {.sval = (unsigned char*)mixlist[2], .slen = strlen(mixlist[2])}, + {.lval = 4294967296}, + {.sval = (unsigned char*)mixlist[3], .slen = strlen(mixlist[3])}, + {.lval = -100} + }; + + lp = lpNew(0); + lp = lpBatchAppend(lp, ent, 2); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + assert(lpLength(lp) == 2); + + lp = lpBatchAppend(lp, &ent[2], 1); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + verifyEntry(lpSeek(lp, 2), ent[2].sval, ent[2].slen); + assert(lpLength(lp) == 3); + + lp = lpDeleteRange(lp, 1, 1); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[2].sval, ent[2].slen); + assert(lpLength(lp) == 2); + + lp = lpBatchAppend(lp, &ent[3], 3); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[2].sval, ent[2].slen); + verifyEntry(lpSeek(lp, 2), (unsigned char*) "4294967296", 10); + verifyEntry(lpSeek(lp, 3), ent[4].sval, ent[4].slen); + verifyEntry(lpSeek(lp, 4), (unsigned char*) "-100", 4); + assert(lpLength(lp) == 5); + + lp = lpDeleteRange(lp, 1, 3); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), (unsigned char*) "-100", 4); + assert(lpLength(lp) == 2); + + lpFree(lp); + } + + TEST("Batch insert") { + lp = lpNew(0); + listpackEntry ent[6] = { + {.sval = (unsigned char*)mixlist[0], .slen = strlen(mixlist[0])}, + {.sval = (unsigned char*)mixlist[1], .slen = strlen(mixlist[1])}, + {.sval = (unsigned char*)mixlist[2], .slen = strlen(mixlist[2])}, + {.lval = 4294967296}, + {.sval = (unsigned char*)mixlist[3], .slen = strlen(mixlist[3])}, + {.lval = -100} + }; + + lp = lpBatchAppend(lp, ent, 4); + assert(lpLength(lp) == 4); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + verifyEntry(lpSeek(lp, 2), ent[2].sval, ent[2].slen); + verifyEntry(lpSeek(lp, 3), (unsigned char*)"4294967296", 10); + + /* Insert with LP_BEFORE */ + p = lpSeek(lp, 3); + lp = lpBatchInsert(lp, p, LP_BEFORE, &ent[4], 2, &p); + verifyEntry(p, (unsigned char*)"-100", 4); + assert(lpLength(lp) == 6); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + verifyEntry(lpSeek(lp, 2), ent[2].sval, ent[2].slen); + verifyEntry(lpSeek(lp, 3), ent[4].sval, ent[4].slen); + verifyEntry(lpSeek(lp, 4), (unsigned char*)"-100", 4); + verifyEntry(lpSeek(lp, 5), (unsigned char*)"4294967296", 10); + + lp = lpDeleteRange(lp, 1, 2); + assert(lpLength(lp) == 4); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[4].sval, ent[4].slen); + verifyEntry(lpSeek(lp, 2), (unsigned char*)"-100", 4); + verifyEntry(lpSeek(lp, 3), (unsigned char*)"4294967296", 10); + + /* Insert with LP_AFTER */ + p = lpSeek(lp, 0); + lp = lpBatchInsert(lp, p, LP_AFTER, &ent[1], 2, &p); + verifyEntry(p, ent[2].sval, ent[2].slen); + assert(lpLength(lp) == 6); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + verifyEntry(lpSeek(lp, 2), ent[2].sval, ent[2].slen); + verifyEntry(lpSeek(lp, 3), ent[4].sval, ent[4].slen); + verifyEntry(lpSeek(lp, 4), (unsigned char*)"-100", 4); + verifyEntry(lpSeek(lp, 5), (unsigned char*)"4294967296", 10); + + lp = lpDeleteRange(lp, 2, 4); + assert(lpLength(lp) == 2); + p = lpSeek(lp, 1); + lp = lpBatchInsert(lp, p, LP_AFTER, &ent[2], 1, &p); + verifyEntry(p, ent[2].sval, ent[2].slen); + assert(lpLength(lp) == 3); + verifyEntry(lpSeek(lp, 0), ent[0].sval, ent[0].slen); + verifyEntry(lpSeek(lp, 1), ent[1].sval, ent[1].slen); + verifyEntry(lpSeek(lp, 2), ent[2].sval, ent[2].slen); + + lpFree(lp); + } + TEST("Batch delete") { unsigned char *lp = createList(); /* char *mixlist[] = {"hello", "foo", "quux", "1024"} */ assert(lpLength(lp) == 4); /* Pre-condition */ @@ -2614,6 +2926,21 @@ int listpackTest(int argc, char *argv[], int flags) { lpFree(lp); } + TEST("Test lpFindCb") { + lp = createList(); /* "hello", "foo", "quux", "1024" */ + assert(lpFindCb(lp, lpFirst(lp), "abc", lpFindCbCmp, 0) == NULL); + verifyEntry(lpFindCb(lp, NULL, "hello", lpFindCbCmp, 0), (unsigned char*)"hello", 5); + verifyEntry(lpFindCb(lp, NULL, "1024", lpFindCbCmp, 0), (unsigned char*)"1024", 4); + verifyEntry(lpFindCb(lp, NULL, "quux", lpFindCbCmp, 0), (unsigned char*)"quux", 4); + verifyEntry(lpFindCb(lp, NULL, "foo", lpFindCbCmp, 0), (unsigned char*)"foo", 3); + lpFree(lp); + + lp = lpNew(0); + assert(lpFindCb(lp, lpFirst(lp), "hello", lpFindCbCmp, 0) == NULL); + assert(lpFindCb(lp, lpFirst(lp), "1024", lpFindCbCmp, 0) == NULL); + lpFree(lp); + } + TEST("Test lpValidateIntegrity") { lp = createList(); long count = 0; @@ -2636,6 +2963,26 @@ int listpackTest(int argc, char *argv[], int flags) { lpFree(lp); } + TEST("Test number of elements exceeds LP_HDR_NUMELE_UNKNOWN with batch insert") { + listpackEntry ent[2] = { + {.sval = (unsigned char*)mixlist[0], .slen = strlen(mixlist[0])}, + {.sval = (unsigned char*)mixlist[1], .slen = strlen(mixlist[1])} + }; + + lp = lpNew(0); + for (int i = 0; i < (LP_HDR_NUMELE_UNKNOWN/2) + 1; i++) + lp = lpBatchAppend(lp, ent, 2); + + assert(lpGetNumElements(lp) == LP_HDR_NUMELE_UNKNOWN); + assert(lpLength(lp) == LP_HDR_NUMELE_UNKNOWN+1); + + lp = lpDeleteRange(lp, -2, 2); + assert(lpGetNumElements(lp) == LP_HDR_NUMELE_UNKNOWN); + assert(lpLength(lp) == LP_HDR_NUMELE_UNKNOWN-1); + assert(lpGetNumElements(lp) == LP_HDR_NUMELE_UNKNOWN-1); /* update length after lpLength */ + lpFree(lp); + } + TEST("Stress with random payloads of different encoding") { unsigned long long start = usec(); int i,j,len,where; diff --git a/src/listpack.h b/src/listpack.h index df492a44c9a5..c9fbc56241b5 100644 --- a/src/listpack.h +++ b/src/listpack.h @@ -49,6 +49,9 @@ unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **p, long long unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp); unsigned char *lpDeleteRangeWithEntry(unsigned char *lp, unsigned char **p, unsigned long num); unsigned char *lpDeleteRange(unsigned char *lp, long index, unsigned long num); +unsigned char *lpBatchAppend(unsigned char *lp, listpackEntry *entries, unsigned long len); +unsigned char *lpBatchInsert(unsigned char *lp, unsigned char *p, int where, + listpackEntry *entries, unsigned int len, unsigned char **newp); unsigned char *lpBatchDelete(unsigned char *lp, unsigned char **ps, unsigned long count); unsigned char *lpMerge(unsigned char **first, unsigned char **second); unsigned char *lpDup(unsigned char *lp); @@ -57,6 +60,8 @@ unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf); unsigned char *lpGetValue(unsigned char *p, unsigned int *slen, long long *lval); int lpGetIntegerValue(unsigned char *p, long long *lval); unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, uint32_t slen, unsigned int skip); +typedef int (*lpCmp)(const unsigned char *lp, unsigned char *p, void *user, unsigned char *s, long long slen); +unsigned char *lpFindCb(unsigned char *lp, unsigned char *p, void *user, lpCmp cmp, unsigned int skip); unsigned char *lpFirst(unsigned char *lp); unsigned char *lpLast(unsigned char *lp); unsigned char *lpNext(unsigned char *lp, unsigned char *p); diff --git a/src/t_hash.c b/src/t_hash.c index f39459178326..299bea8aa782 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -329,31 +329,59 @@ static void listpackExFree(listpackEx *lpt) { zfree(lpt); } -/* Returns number of expired fields. */ -static uint64_t listpackExExpireDryRun(const robj *o) { - serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); +struct lpFingArgs { + uint64_t max_to_search; /* [in] Max number of tuples to search */ + uint64_t expire_time; /* [in] Find the tuple that has a TTL larger than expire_time */ + unsigned char *p; /* [out] First item of the tuple that has a TTL larger than expire_time */ + int expired; /* [out] Number of tuples that have TTLs less than expire_time */ + int index; /* Internally used */ + unsigned char *fptr; /* Internally used, temp ptr */ +}; - uint64_t expired = 0; - unsigned char *fptr; - listpackEx *lpt = o->ptr; +/* Callback for lpFindCb(). Used to find number of expired fields as part of + * active expiry or when trying to find the position for the new field according + * to its expiry time.*/ +static int cbFindInListpack(const unsigned char *lp, unsigned char *p, + void *user, unsigned char *s, long long slen) +{ + (void) lp; + struct lpFingArgs *r = user; - fptr = lpFirst(lpt->lp); - while (fptr != NULL) { - long long val; + r->index++; - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr); - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr && lpGetIntegerValue(fptr, &val)); + if (r->max_to_search == 0) + return 0; /* Break the loop and return */ - if (!hashTypeIsExpired(o, val)) - break; + if (r->index % 3 == 1) { + r->fptr = p; /* First item of the tuple. */ + } else if (r->index % 3 == 0) { + serverAssert(!s); - expired++; - fptr = lpNext(lpt->lp, fptr); + /* Third item of a tuple is expiry time */ + if (slen == HASH_LP_NO_TTL || (uint64_t) slen >= r->expire_time) { + r->p = r->fptr; + return 0; /* Break the loop and return */ + } + r->expired++; + r->max_to_search--; } - return expired; + return 1; +} + +/* Returns number of expired fields. */ +static uint64_t listpackExExpireDryRun(const robj *o) { + serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); + + listpackEx *lpt = o->ptr; + + struct lpFingArgs r = { + .max_to_search = UINT64_MAX, + .expire_time = commandTimeSnapshot(), + }; + + lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); + return r.expired; } /* Returns the expiration time of the item with the nearest expiration. */ @@ -382,74 +410,58 @@ static uint64_t listpackExGetMinExpire(robj *o) { void listpackExExpire(robj *o, ExpireInfo *info) { serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); uint64_t min = EB_EXPIRE_TIME_INVALID; - unsigned char *ptr, *field; listpackEx *lpt = o->ptr; - ptr = lpFirst(lpt->lp); - while (ptr != NULL && (info->itemsExpired < info->maxToExpire)) { - long long val; + struct lpFingArgs r = { + .max_to_search = info->maxToExpire, + .expire_time = info->now + }; - field = ptr; - ptr = lpNext(lpt->lp, ptr); - serverAssert(ptr); - ptr = lpNext(lpt->lp, ptr); - serverAssert(ptr && lpGetIntegerValue(ptr, &val)); + lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); + info->itemsExpired += r.expired; - /* Fields are ordered by expiry time. If we reached to a non-expired - * field or a non-volatile field, we know rest is not yet expired. */ - if (val == HASH_LP_NO_TTL || (uint64_t) val > info->now) - break; - - lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &field, 3); - ptr = field; - info->itemsExpired++; - } + /* Delete all the expired fields in one go */ + if (r.expired > 0) + lpt->lp = lpDeleteRange(lpt->lp, 0, r.expired * 3); min = hashTypeGetNextTimeToExpire(o); info->nextExpireTime = (min != EB_EXPIRE_TIME_INVALID) ? min : 0; } -/* Remove TTL from the field. */ -static void listpackExPersist(robj *o, sds field, unsigned char *fptr, - unsigned char *vptr) -{ - serverAssert(o->encoding == OBJ_ENCODING_LISTPACK_EX); - - unsigned char tmp[512]; - unsigned int slen; - long long val; - unsigned char *s; - sds p = NULL; +static void listpackExAddInternal(robj *o, listpackEntry ent[3]) { listpackEx *lpt = o->ptr; - /* To persist a field, we have to delete it first and append to the end as - * we want to maintain order by expiry time. Before deleting it, copy the - * value if it is stored as string. */ - s = lpGetValue(vptr, &slen, &val); - if (s) { - /* Normally, item length in the listpack is limited by - * 'hash-max-listpack-value' config. It is unlikely, but it might be - * larger than sizeof(tmp). */ - if (slen > sizeof(tmp)) - p = sdsnewlen(s, slen); - else - memcpy(tmp, s, slen); + /* Shortcut, just append at the end if this is a non-volatile field. */ + if (ent[2].lval == HASH_LP_NO_TTL) { + lpt->lp = lpBatchAppend(lpt->lp, ent, 3); + return; } - /* Delete field name, value and expiry time. */ - lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); + struct lpFingArgs r = { + .max_to_search = UINT64_MAX, + .expire_time = ent[2].lval, + }; - /* Append field to the end as it does not have expiry time. */ - lpt->lp = lpAppend(lpt->lp, (unsigned char*)field, sdslen(field)); + /* Check if there is a field with a larger TTL. */ + lpFindCb(lpt->lp, NULL, &r, cbFindInListpack, 0); - if (s) - lpt->lp = lpAppend(lpt->lp, p ? (unsigned char*) p : tmp, slen); + /* If list is empty or there is no field with a larger TTL, result will be + * NULL. Otherwise, just insert before the found item.*/ + if (r.p) + lpt->lp = lpBatchInsert(lpt->lp, r.p, LP_BEFORE, ent, 3, NULL); else - lpt->lp = lpAppendInteger(lpt->lp, val); + lpt->lp = lpBatchAppend(lpt->lp, ent, 3); +} - lpt->lp = lpAppendInteger(lpt->lp, HASH_LP_NO_TTL); +/* Add new field ordered by expire time. */ +void listpackExAddNew(robj *o, sds field, sds value, uint64_t expireAt) { + listpackEntry ent[3] = { + {.sval = (unsigned char*) field, .slen = sdslen(field)}, + {.sval = (unsigned char*) value, .slen = sdslen(value)}, + {.lval = expireAt} + }; - sdsfree(p); + listpackExAddInternal(o, ent); } /* If expiry time is changed, this function will place field into the correct @@ -458,13 +470,13 @@ static void listpackExPersist(robj *o, sds field, unsigned char *fptr, static void listpackExUpdateExpiry(robj *o, sds field, unsigned char *fptr, unsigned char *vptr, - uint64_t expireAt) { - unsigned int slen; - long long val; + uint64_t expire_at) { + unsigned int slen = 0; + long long val = 0; unsigned char tmp[512] = {0}; - unsigned char *valstr, *elem; - listpackEx *lpt = o->ptr; + unsigned char *valstr; sds tmpval = NULL; + listpackEx *lpt = o->ptr; /* Copy value */ valstr = lpGetValue(vptr, &slen, &val); @@ -481,99 +493,23 @@ static void listpackExUpdateExpiry(robj *o, sds field, /* Delete field name, value and expiry time */ lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); - /* Insert to the listpack */ - fptr = lpFirst(lpt->lp); - while (fptr) { - long long currExpiry; - - elem = fptr; /* Keep a pointer to field name */ - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr); - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr && lpGetIntegerValue(fptr, &currExpiry)); - - if (currExpiry == HASH_LP_NO_TTL || (uint64_t) currExpiry >= expireAt) { - /* Found a field with no expiry time or with a higher expiry time. - * Insert new field just before it. */ - lpt->lp = lpInsertString(lpt->lp, (unsigned char*) field, - sdslen(field), elem, LP_BEFORE, &fptr); - - /* Insert value after field name */ - if (valstr) { - lpt->lp = lpInsertString(lpt->lp, - tmpval ? (unsigned char*) tmpval : tmp, - slen, fptr, LP_AFTER, &fptr); - } else { - lpt->lp = lpInsertInteger(lpt->lp, val, fptr, LP_AFTER, &fptr); - } + listpackEntry ent[3] = {{0}}; - /* Insert expiry time after value. */ - lpt->lp = lpInsertInteger(lpt->lp, (long long) expireAt, fptr, - LP_AFTER, NULL); - goto out; - } + ent[0].sval = (unsigned char*) field; + ent[0].slen = sdslen(field); - fptr = lpNext(lpt->lp, fptr); + if (valstr) { + ent[1].sval = tmpval ? (unsigned char *) tmpval : tmp; + ent[1].slen = slen; + } else { + ent[1].lval = val; } + ent[2].lval = expire_at; - /* Listpack is empty, append new item */ - lpt->lp = lpAppend(lpt->lp, (unsigned char*)field, sdslen(field)); - if (valstr) - lpt->lp = lpAppend(lpt->lp, tmpval ? (unsigned char*) tmpval : tmp, slen); - else - lpt->lp = lpAppendInteger(lpt->lp, val); - - lpt->lp = lpAppendInteger(lpt->lp, (long long) expireAt); - -out: + listpackExAddInternal(o, ent); sdsfree(tmpval); } -/* Add new field ordered by expire time. */ -void listpackExAddNew(robj *o, sds field, sds value, uint64_t expireAt) { - unsigned char *fptr, *elem; - listpackEx *lpt = o->ptr; - - /* Shortcut, just append at the end if this is a non-volatile field. */ - if (expireAt == HASH_LP_NO_TTL) { - goto append; - } - - fptr = lpFirst(lpt->lp); - while (fptr) { - long long currExpiry; - - elem = fptr; /* Keep a pointer to field name */ - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr); - fptr = lpNext(lpt->lp, fptr); - serverAssert(fptr && lpGetIntegerValue(fptr, &currExpiry)); - - if (currExpiry == HASH_LP_NO_TTL || (uint64_t) currExpiry >= expireAt) { - /* Found a field with no expiry time or with a higher expiry time. - * Insert new field just before it. */ - lpt->lp = lpInsertString(lpt->lp, (unsigned char*) field, - sdslen(field), elem, LP_BEFORE, &fptr); - - lpt->lp = lpInsertString(lpt->lp,(unsigned char*) value, sdslen(value), - fptr, LP_AFTER, &fptr); - - /* Insert expiry time after value. */ - lpt->lp = lpInsertInteger(lpt->lp, (long long) expireAt, fptr, - LP_AFTER, NULL); - return; - } - - fptr = lpNext(lpt->lp, fptr); - } - - /* Either listpack is empty or field expiry time is HASH_LP_NO_TTL */ -append: - lpt->lp = lpAppend(lpt->lp, (unsigned char*)field, sdslen(field)); - lpt->lp = lpAppend(lpt->lp, (unsigned char*)value, sdslen(value)); - lpt->lp = lpAppendInteger(lpt->lp, (long long) expireAt); -} - /* Update field expire time. */ SetExRes hashTypeSetExpiryListpack(HashTypeSetEx *ex, sds field, unsigned char *fptr, unsigned char *vptr, @@ -1209,7 +1145,7 @@ static SetExRes hashTypeSetExListpack(redisDb *db, robj *o, sds field, HashTypeS goto out; } else if (res == HSET_UPDATE && expireTime != HASH_LP_NO_TTL) { /* Clear TTL */ - listpackExPersist(o, field, fptr, vptr); + listpackExUpdateExpiry(o, field, fptr, vptr, HASH_LP_NO_TTL); } } } @@ -3058,7 +2994,7 @@ void hpersistCommand(client *c) { continue; } - listpackExPersist(hashObj, field, fptr, vptr); + listpackExUpdateExpiry(hashObj, field, fptr, vptr, HASH_LP_NO_TTL); addReplyLongLong(c, HFE_PERSIST_OK); changed = 1; } @@ -3473,10 +3409,8 @@ static int hgetfReplyValueAndSetExpiry(client *c, robj *o, sds field, int flag, ebAdd(&meta->hfe, &hashFieldExpireBucketsType, hf, expireAt); } } else { - if (flag & HFE_CMD_PERSIST) - listpackExPersist(o, field, fptr, vptr); - else - listpackExUpdateExpiry(o, field, fptr, vptr, expireAt); + uint64_t exp = flag & HFE_CMD_PERSIST ? HASH_LP_NO_TTL : expireAt; + listpackExUpdateExpiry(o, field, fptr, vptr, exp); } return 1; @@ -3659,25 +3593,26 @@ static int hsetfSetFieldAndReply(client *c, robj *o, sds field, sds value, } } } else { - lpt->lp = lpReplace(lpt->lp, &vptr, (unsigned char *) value, sdslen(value)); - fptr = lpPrev(lpt->lp, vptr); /* Update fptr as above line invalidates it. */ - serverAssert(fptr != NULL); + if (ret != HSETF_FIELD_AND_TTL) { + /* We just set the field value without updating the TTL */ + lpt->lp = lpReplace(lpt->lp, &vptr, (unsigned char *) value, sdslen(value)); + } else { + /* We are going to update TTL. Delete the field first and then + * insert again according to new TTL if necessary. */ + lpt->lp = lpDeleteRangeWithEntry(lpt->lp, &fptr, 3); - if (ret == HSETF_FIELD_AND_TTL) { if (*minPrevExp > prevExpire) *minPrevExp = prevExpire; if (!(flag & HFE_CMD_EXPIRY_MASK)) { /* If none of EX,EXAT,PX,PXAT,KEEPTTL is specified, TTL is * discarded. */ - listpackExPersist(o, field, fptr, vptr); - } else if (checkAlreadyExpired(expireAt)) { - hashTypeDelete(o, field); - } else { + listpackExAddNew(o, field, value, HASH_LP_NO_TTL); + } else if (!checkAlreadyExpired(expireAt)){ if (*minPrevExp > expireAt) *minPrevExp = expireAt; - listpackExUpdateExpiry(o, field, fptr, vptr, expireAt); + listpackExAddNew(o, field, value, expireAt); } } }