Skip to content

Commit

Permalink
Always compact nodes in stream listpacks after creating new nodes (re…
Browse files Browse the repository at this point in the history
…dis#11885)

This change attempts to alleviate a minor memory usage degradation for Redis 6.2 and onwards when using rather large objects (~2k) in streams. Introduced in redis#6281, we pre-allocate the head nodes of a stream to be 4kb, to limit the amount of unnecessary initial reallocations that are done. However, if we only ever allocate one object because 2 objects exceeds the max_stream_entry_size, we never actually shrink it to fit the single item. This can lead to a lot of excessive memory usage. For smaller item sizes this becomes less of an issue, as the overhead decreases as the items become smaller in size.

This commit also changes the MEMORY USAGE of streams, since it was reporting the lpBytes instead of the allocated size. This introduced an observability issue when diagnosing the memory issue, since Redis reported the same amount of used bytes pre and post change, even though the new implementation allocated more memory.

(cherry picked from commit 2bb29e4)
  • Loading branch information
madolson authored and oranagra committed Mar 20, 2023
1 parent b6e2b14 commit d19ef7e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
6 changes: 4 additions & 2 deletions src/object.c
Expand Up @@ -1071,7 +1071,8 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
size_t lpsize = 0, samples = 0;
while(samples < sample_size && raxNext(&ri)) {
unsigned char *lp = ri.data;
lpsize += lpBytes(lp);
/* Use the allocated size, since we overprovision the node initially. */
lpsize += zmalloc_size(lp);
samples++;
}
if (s->rax->numele <= samples) {
Expand All @@ -1083,7 +1084,8 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
* if there are a few elements in the radix tree. */
raxSeek(&ri,"$",NULL,0);
raxNext(&ri);
asize += lpBytes(ri.data);
/* Use the allocated size, since we overprovision the node initially. */
asize += zmalloc_size(ri.data);
}
raxStop(&ri);

Expand Down
19 changes: 11 additions & 8 deletions src/t_stream.c
Expand Up @@ -528,22 +528,25 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
* if we need to switch to the next one. 'lp' will be set to NULL if
* the current node is full. */
if (lp != NULL) {
int new_node = 0;
size_t node_max_bytes = server.stream_node_max_bytes;
if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE)
node_max_bytes = STREAM_LISTPACK_MAX_SIZE;
if (lp_bytes + totelelen >= node_max_bytes) {
lp = NULL;
new_node = 1;
} else if (server.stream_node_max_entries) {
unsigned char *lp_ele = lpFirst(lp);
/* Count both live entries and deleted ones. */
int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp,lp_ele));
if (count >= server.stream_node_max_entries) {
/* Shrink extra pre-allocated memory */
lp = lpShrinkToFit(lp);
if (ri.data != lp)
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
lp = NULL;
}
if (count >= server.stream_node_max_entries) new_node = 1;
}

if (new_node) {
/* Shrink extra pre-allocated memory */
lp = lpShrinkToFit(lp);
if (ri.data != lp)
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
lp = NULL;
}
}

Expand Down

0 comments on commit d19ef7e

Please sign in to comment.