Skip to content

Commit

Permalink
Optimize client memory usage tracking operation while client eviction…
Browse files Browse the repository at this point in the history
… is disabled (redis#11348)

## Issue
During the client input/output buffer processing, the memory usage is
incrementally updated to keep track of clients going beyond a certain
threshold `maxmemory-clients` to be evicted. However, this additional
tracking activity leads to unnecessary CPU cycles wasted when no
client-eviction is required. It is applicable in two cases.

* `maxmemory-clients` is set to `0` which equates to no client eviction
  (applicable to all clients)
* `CLIENT NO-EVICT` flag is set to `ON` which equates to a particular
  client not applicable for eviction.

## Solution
* Disable client memory usage tracking during the read/write flow when
  `maxmemory-clients` is set to `0` or `client no-evict` is `on`.
  The memory usage is tracked only during the `clientCron` i.e. it gets
  periodically updated.
* Cleanup the clients from the memory usage bucket when client eviction
  is disabled.
* When the maxmemory-clients config is enabled or disabled at runtime,
  we immediately update the memory usage buckets for all clients (tested
  scanning 80000 took some 20ms)

Benchmark shown that this can improve performance by about 5% in
certain situations.

Co-authored-by: Oran Agra <oran@redislabs.com>
(cherry picked from commit c0267b3)
  • Loading branch information
hpatro authored and oranagra committed Dec 7, 2022
1 parent 1d433c8 commit b74d78c
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 61 deletions.
33 changes: 32 additions & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2932,6 +2932,37 @@ void rewriteConfigLatencyTrackingInfoPercentilesOutputOption(standardConfig *con
rewriteConfigRewriteLine(state,name,line,1);
}

static int applyClientMaxMemoryUsage(const char **err) {
UNUSED(err);
listIter li;
listNode *ln;

/* server.client_mem_usage_buckets is an indication that the previous config
* was non-zero, in which case we can exit and no apply is needed. */
if(server.maxmemory_clients !=0 && server.client_mem_usage_buckets)
return 1;
if (server.maxmemory_clients != 0)
initServerClientMemUsageBuckets();

/* When client eviction is enabled update memory buckets for all clients.
* When disabled, clear that data structure. */
listRewind(server.clients, &li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
if (server.maxmemory_clients == 0) {
/* Remove client from memory usage bucket. */
removeClientFromMemUsageBucket(c, 0);
} else {
/* Update each client(s) memory usage and add to appropriate bucket. */
updateClientMemUsageAndBucket(c);
}
}

if (server.maxmemory_clients == 0)
freeServerClientMemUsageBuckets();
return 1;
}

standardConfig static_configs[] = {
/* Bool configs */
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
Expand Down Expand Up @@ -3098,7 +3129,7 @@ standardConfig static_configs[] = {
createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL),
createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */
createSizeTConfig("client-query-buffer-limit", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.client_max_querybuf_len, 1024*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Default: 1GB max query buffer. */
createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, NULL),
createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, applyClientMaxMemoryUsage),

/* Other configs */
createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */
Expand Down
4 changes: 4 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,10 @@ NULL
else
addReply(c, shared.ok);
} else if(!strcasecmp(c->argv[1]->ptr,"client-eviction") && c->argc == 2) {
if (!server.client_mem_usage_buckets) {
addReplyError(c,"maxmemory-clients is disabled.");
return;
}
sds bucket_info = sdsempty();
for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
if (j == 0)
Expand Down
14 changes: 9 additions & 5 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2020,7 +2020,7 @@ int writeToClient(client *c, int handler_installed) {
* Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in
* handleClientsWithPendingWritesUsingThreads(). */
if (io_threads_op == IO_THREADS_OP_IDLE)
updateClientMemUsage(c);
updateClientMemUsageAndBucket(c);
return C_OK;
}

Expand Down Expand Up @@ -2470,7 +2470,7 @@ int processCommandAndResetClient(client *c) {
commandProcessed(c);
/* Update the client's memory to include output buffer growth following the
* processed command. */
updateClientMemUsage(c);
updateClientMemUsageAndBucket(c);
}

if (server.current_client == NULL) deadclient = 1;
Expand Down Expand Up @@ -2607,7 +2607,7 @@ int processInputBuffer(client *c) {
* important in case the query buffer is big and wasn't drained during
* the above loop (because of partially sent big commands). */
if (io_threads_op == IO_THREADS_OP_IDLE)
updateClientMemUsage(c);
updateClientMemUsageAndBucket(c);

return C_OK;
}
Expand Down Expand Up @@ -3045,9 +3045,11 @@ NULL
/* CLIENT NO-EVICT ON|OFF */
if (!strcasecmp(c->argv[2]->ptr,"on")) {
c->flags |= CLIENT_NO_EVICT;
removeClientFromMemUsageBucket(c, 0);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
c->flags &= ~CLIENT_NO_EVICT;
updateClientMemUsageAndBucket(c);
addReply(c,shared.ok);
} else {
addReplyErrorObject(c,shared.syntaxerr);
Expand Down Expand Up @@ -4276,7 +4278,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
client *c = listNodeValue(ln);

/* Update the client in the mem usage after we're done processing it in the io-threads */
updateClientMemUsage(c);
updateClientMemUsageAndBucket(c);

/* Install the write handler if there are pending writes in some
* of the clients. */
Expand Down Expand Up @@ -4383,7 +4385,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
}

/* Once io-threads are idle we can update the client in the mem usage */
updateClientMemUsage(c);
updateClientMemUsageAndBucket(c);

if (processPendingCommandAndInputBuffer(c) == C_ERR) {
/* If the client is no longer valid, we avoid
Expand Down Expand Up @@ -4430,6 +4432,8 @@ size_t getClientEvictionLimit(void) {
}

void evictClients(void) {
if (!server.client_mem_usage_buckets)
return;
/* Start eviction from topmost bucket (largest clients) */
int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1;
listIter bucket_iter;
Expand Down
2 changes: 1 addition & 1 deletion src/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {

/* Computing the memory used by the clients would be O(N) if done
* here online. We use our values computed incrementally by
* updateClientMemUsage(). */
* updateClientMemoryUsage(). */
mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_MASTER]+
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]+
server.stat_clients_type_memory[CLIENT_TYPE_NORMAL];
Expand Down
4 changes: 2 additions & 2 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReplyPubsubMessage(c,channel,message,*type.messageBulk);
updateClientMemUsage(c);
updateClientMemUsageAndBucket(c);
receivers++;
}
}
Expand All @@ -491,7 +491,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
addReplyPubsubPatMessage(c,pattern,channel,message);
updateClientMemUsage(c);
updateClientMemUsageAndBucket(c);
receivers++;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
while((ln = listNext(&li))) {
client *monitor = ln->value;
addReply(monitor,cmdobj);
updateClientMemUsage(c);
updateClientMemUsageAndBucket(c);
}
decrRefCount(cmdobj);
}
Expand Down
135 changes: 91 additions & 44 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -815,37 +815,44 @@ static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) {
return &server.client_mem_usage_buckets[bucket_idx];
}

/* This is called both on explicit clients when something changed their buffers,
* so we can track clients' memory and enforce clients' maxmemory in real time,
* and also from the clientsCron. We call it from the cron so we have updated
* stats for non CLIENT_TYPE_NORMAL/PUBSUB clients and in case a configuration
* change requires us to evict a non-active client.
/*
* This method updates the client memory usage and update the
* server stats for client type.
*
* This also adds the client to the correct memory usage bucket. Each bucket contains
* all clients with roughly the same amount of memory. This way we group
* together clients consuming about the same amount of memory and can quickly
* free them in case we reach maxmemory-clients (client eviction).
* This method is called from the clientsCron to have updated
* stats for non CLIENT_TYPE_NORMAL/PUBSUB clients to accurately
* provide information around clients memory usage.
*
* It is also used in updateClientMemUsageAndBucket to have latest
* client memory usage information to place it into appropriate client memory
* usage bucket.
*/
int updateClientMemUsage(client *c) {
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
void updateClientMemoryUsage(client *c) {
size_t mem = getClientMemoryUsage(c, NULL);
int type = getClientType(c);
/* Now that we have the memory used by the client, remove the old
* value from the old category, and add it back. */
server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage;
server.stat_clients_type_memory[type] += mem;
/* Remember what we added and where, to remove it next time. */
c->last_memory_type = type;
c->last_memory_usage = mem;
}

/* Remove the old value of the memory used by the client from the old
* category, and add it back. */
if (type != c->last_memory_type) {
server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage;
server.stat_clients_type_memory[type] += mem;
c->last_memory_type = type;
} else {
server.stat_clients_type_memory[type] += mem - c->last_memory_usage;
int clientEvictionAllowed(client *c) {
if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT) {
return 0;
}
int type = getClientType(c);
return (type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB);
}

int allow_eviction =
(type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB) &&
!(c->flags & CLIENT_NO_EVICT);

/* Update the client in the mem usage buckets */
/* This function is used to cleanup the client's previously tracked memory usage.
* This is called during incremental client memory usage tracking as well as
* used to reset when client to bucket allocation is not required when
* client eviction is disabled. */
void removeClientFromMemUsageBucket(client *c, int allow_eviction) {
if (c->mem_usage_bucket) {
c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
/* If this client can't be evicted then remove it from the mem usage
Expand All @@ -856,23 +863,42 @@ int updateClientMemUsage(client *c) {
c->mem_usage_bucket_node = NULL;
}
}
if (allow_eviction) {
clientMemUsageBucket *bucket = getMemUsageBucket(mem);
bucket->mem_usage_sum += mem;
if (bucket != c->mem_usage_bucket) {
if (c->mem_usage_bucket)
listDelNode(c->mem_usage_bucket->clients,
c->mem_usage_bucket_node);
c->mem_usage_bucket = bucket;
listAddNodeTail(bucket->clients, c);
c->mem_usage_bucket_node = listLast(bucket->clients);
}
}

/* This is called only if explicit clients when something changed their buffers,
* so we can track clients' memory and enforce clients' maxmemory in real time.
*
* This also adds the client to the correct memory usage bucket. Each bucket contains
* all clients with roughly the same amount of memory. This way we group
* together clients consuming about the same amount of memory and can quickly
* free them in case we reach maxmemory-clients (client eviction).
*
* returns 1 if client eviction for this client is allowed, 0 otherwise.
*/
int updateClientMemUsageAndBucket(client *c) {
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
int allow_eviction = clientEvictionAllowed(c);
removeClientFromMemUsageBucket(c, allow_eviction);

if (!allow_eviction) {
return 0;
}

/* Remember what we added, to remove it next time. */
c->last_memory_usage = mem;
/* Update client memory usage. */
updateClientMemoryUsage(c);

return 0;
/* Update the client in the mem usage buckets */
clientMemUsageBucket *bucket = getMemUsageBucket(c->last_memory_usage);
bucket->mem_usage_sum += c->last_memory_usage;
if (bucket != c->mem_usage_bucket) {
if (c->mem_usage_bucket)
listDelNode(c->mem_usage_bucket->clients,
c->mem_usage_bucket_node);
c->mem_usage_bucket = bucket;
listAddNodeTail(bucket->clients, c);
c->mem_usage_bucket_node = listLast(bucket->clients);
}
return 1;
}

/* Return the max samples in the memory usage of clients tracked by
Expand Down Expand Up @@ -960,8 +986,11 @@ void clientsCron(void) {
* in turn would make the INFO command too slow. So we perform this
* computation incrementally and track the (not instantaneous but updated
* to the second) total memory used by clients using clientsCron() in
* a more incremental way (depending on server.hz). */
if (updateClientMemUsage(c)) continue;
* a more incremental way (depending on server.hz).
* If client eviction is enabled, update the bucket as well. */
if (!updateClientMemUsageAndBucket(c))
updateClientMemoryUsage(c);

if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
}
}
Expand Down Expand Up @@ -1842,6 +1871,25 @@ void createSharedObjects(void) {
shared.maxstring = sdsnew("maxstring");
}

void initServerClientMemUsageBuckets() {
if (server.client_mem_usage_buckets)
return;
server.client_mem_usage_buckets = zmalloc(sizeof(clientMemUsageBucket)*CLIENT_MEM_USAGE_BUCKETS);
for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
server.client_mem_usage_buckets[j].mem_usage_sum = 0;
server.client_mem_usage_buckets[j].clients = listCreate();
}
}

void freeServerClientMemUsageBuckets() {
if (!server.client_mem_usage_buckets)
return;
for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++)
listRelease(server.client_mem_usage_buckets[j].clients);
zfree(server.client_mem_usage_buckets);
server.client_mem_usage_buckets = NULL;
}

void initServerConfig(void) {
int j;
char *default_bindaddr[CONFIG_DEFAULT_BINDADDR_COUNT] = CONFIG_DEFAULT_BINDADDR;
Expand Down Expand Up @@ -2439,6 +2487,7 @@ void initServer(void) {
server.cluster_drop_packet_filter = -1;
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
server.reply_buffer_resizing_enabled = 1;
server.client_mem_usage_buckets = NULL;
resetReplicationBuffer();

if ((server.tls_port || server.tls_replication || server.tls_cluster)
Expand All @@ -2447,11 +2496,6 @@ void initServer(void) {
exit(1);
}

for (j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
server.client_mem_usage_buckets[j].mem_usage_sum = 0;
server.client_mem_usage_buckets[j].clients = listCreate();
}

createSharedObjects();
adjustOpenFilesLimit();
const char *clk_msg = monotonicInit();
Expand Down Expand Up @@ -2624,6 +2668,9 @@ void initServer(void) {
ACLUpdateDefaultUserPassword(server.requirepass);

applyWatchdogPeriod();

if (server.maxmemory_clients != 0)
initServerClientMemUsageBuckets();
}

/* Some steps in server initialization need to be done last (after modules
Expand Down
10 changes: 6 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ typedef struct client {
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
/* In updateClientMemUsage() we track the memory usage of
/* In updateClientMemoryUsage() we track the memory usage of
* each client and add it to the sum of all the clients of a given type,
* however we need to remember what was the old contribution of each
* client, and in which category the client was, in order to remove it
Expand Down Expand Up @@ -1524,7 +1524,7 @@ struct redisServer {
client *current_client; /* Current client executing the command. */

/* Stuff for client mem eviction */
clientMemUsageBucket client_mem_usage_buckets[CLIENT_MEM_USAGE_BUCKETS];
clientMemUsageBucket* client_mem_usage_buckets;

rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
Expand Down Expand Up @@ -2536,8 +2536,8 @@ int handleClientsWithPendingReadsUsingThreads(void);
int stopThreadedIOIfNeeded(void);
int clientHasPendingReplies(client *c);
int islocalClient(client *c);
int updateClientMemUsage(client *c);
void updateClientMemUsageBucket(client *c);
int updateClientMemUsageAndBucket(client *c);
void removeClientFromMemUsageBucket(client *c, int allow_eviction);
void unlinkClient(client *c);
int writeToClient(client *c, int handler_installed);
void linkClient(client *c);
Expand Down Expand Up @@ -3064,6 +3064,8 @@ void initConfigValues();
void removeConfig(sds name);
sds getConfigDebugInfo();
int allowProtectedAction(int config, client *c);
void initServerClientMemUsageBuckets();
void freeServerClientMemUsageBuckets();

/* Module Configuration */
typedef struct ModuleConfig ModuleConfig;
Expand Down
Loading

0 comments on commit b74d78c

Please sign in to comment.