Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network-bytes-out metric support for CLUSTER SLOT-STATS command (#20) #771

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "server.h"
#include "cluster.h"
#include "cluster_legacy.h"
#include "cluster_slot_stats.h"
#include "endianconv.h"
#include "connection.h"

Expand Down Expand Up @@ -4009,6 +4010,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
clusterNode *node = listNodeValue(ln);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
clusterSendMessage(node->link, msgblock);
clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(sdslen(channel->ptr) + sdslen(message->ptr));
}
clusterMsgSendBlockDecrRefCount(msgblock);
}
Expand Down
5 changes: 5 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ struct _clusterNode {
Update with updateAndCountChangedNodeHealth(). */
};

typedef struct slotStat {
uint64_t network_bytes_out;
} slotStat;

struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
Expand Down Expand Up @@ -376,6 +380,7 @@ struct clusterState {
* stops claiming the slot. This prevents spreading incorrect information (that
* source still owns the slot) using UPDATE messages. */
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
slotStat slot_stats[CLUSTER_SLOTS];
};


Expand Down
82 changes: 78 additions & 4 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"

#define UNASSIGNED_SLOT 0

typedef enum {
KEY_COUNT,
NETWORK_BYTES_OUT,
INVALID,
} slotStatTypes;

Expand Down Expand Up @@ -88,9 +88,16 @@ static void addReplySlotStat(client *c, int slot) {
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
* and 1st index represents (map) usage statistics. */
addReplyLongLong(c, slot);
addReplyMapLen(c, 1); /* Nested map representing slot usage statistics. */
addReplyMapLen(c, (server.cluster_slot_stats_enabled) ? 2 : 1); /* Nested map representing slot usage statistics. */
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));

/* Any additional metrics aside from key-count come with a performance trade-off,
* and are aggregated and returned based on its server config. */
if (server.cluster_slot_stats_enabled) {
addReplyBulkCString(c, "network-bytes-out");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out);
}
}

/* Adds reply for the SLOTSRANGE variant.
Expand All @@ -113,6 +120,71 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}

/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
/* key-count is exempt, as it is queried separately through countKeysInSlot(). */
server.cluster->slot_stats[slot].network_bytes_out = 0;
}

void clusterSlotStatResetAll(void) {
if (server.cluster == NULL) return;

memset(server.cluster->slot_stats, 0, sizeof(server.cluster->slot_stats));
}

static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
}

/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
}

/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
void clusterSlotStatsAddNetworkBytesOutForReplication(size_t len) {

client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
}

/* Upon SPUBLISH, two egress events are triggerred.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
Comment on lines +163 to +164
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clearly following this trick and seems dangerous. If we do an early exit on the following statement, we don't restore the value in c->slot. Is that intentional ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, this was not done intentionally. The command should stay idempotent, irregardless of its early return condition.

Update has been reflected on the other PR, which replaces this one; #720

if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;

/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}

/* Upon SPUBLISH, two egress events are triggerred.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This function covers the external propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(size_t len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += len;
}

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
Expand Down Expand Up @@ -149,8 +221,10 @@ void clusterSlotStatsCommand(client *c) {
int desc = 1, order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out")) {
order_by = NETWORK_BYTES_OUT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported metrics are: key-count.");
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
Expand Down
10 changes: 10 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include "server.h"
#include "cluster.h"
#include "cluster_legacy.h"

void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsAddNetworkBytesOutForReplication(int len);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(size_t len);
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3104,6 +3104,7 @@ standardConfig static_configs[] = {
createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat),
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
6 changes: 6 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "script.h"
#include "fpconv_dtoa.h"
#include "fmtargs.h"
Expand Down Expand Up @@ -225,6 +226,7 @@ client *createClient(connection *conn) {
initClientMultiState(c);
c->net_input_bytes = 0;
c->net_output_bytes = 0;
c->net_output_bytes_curr_cmd = 0;
c->commands_processed = 0;
return c;
}
Expand Down Expand Up @@ -442,6 +444,8 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
return;
}

c->net_output_bytes_curr_cmd += len;

/* We call it here because this function may affect the reply
* buffer offset (see function comment) */
reqresSaveClientReplyOffset(c);
Expand Down Expand Up @@ -2443,6 +2447,7 @@ void resetClient(client *c) {
c->slot = -1;
c->flag.executing_command = 0;
c->flag.replication_done = 0;
c->net_output_bytes_curr_cmd = 0;

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down Expand Up @@ -2833,6 +2838,7 @@ int processCommandAndResetClient(client *c) {
client *old_client = server.current_client;
server.current_client = c;
if (processCommand(c) == C_OK) {
clusterSlotStatsAddNetworkBytesOutForUserClient(c);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also go into afterCommand ?

commandProcessed(c);
/* Update the client's memory to include output buffer growth following the
* processed command. */
Expand Down
6 changes: 4 additions & 2 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"

/* Structure to hold the pubsub related metadata. Currently used
* for pubsub and pubsubshard feature. */
Expand Down Expand Up @@ -475,20 +476,21 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
int receivers = 0;
dictEntry *de;
dictIterator *di;
unsigned int slot = 0;
int slot = -1;

/* Send to clients listening for that channel */
if (server.cluster_enabled && type.shard) {
slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
}
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
de = kvstoreDictFind(*type.serverPubSubChannels, (slot == -1) ? 0 : slot, channel);
if (de) {
dict *clients = dictGetVal(de);
dictEntry *entry;
dictIterator *iter = dictGetIterator(clients);
while ((entry = dictNext(iter)) != NULL) {
client *c = dictGetKey(entry);
addReplyPubsubMessage(c, channel, message, *type.messageBulk);
clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(c, slot);
updateClientMemUsageAndBucket(c);
receivers++;
}
Expand Down
8 changes: 8 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "bio.h"
#include "functions.h"
#include "connection.h"
Expand Down Expand Up @@ -320,6 +321,8 @@ void feedReplicationBuffer(char *s, size_t len) {

if (server.repl_backlog == NULL) return;

clusterSlotStatsAddNetworkBytesOutForReplication(len);

while (len > 0) {
size_t start_pos = 0; /* The position of referenced block to start sending. */
listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
Expand Down Expand Up @@ -470,6 +473,11 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) {

feedReplicationBufferWithObject(selectcmd);

/* Although the SELECT command is not associated with any slot,
* its per-slot network-bytes-out accumulation is made by the above function call.
* To cancel-out this accumulation, below adjustment is made. */
clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr));

if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd);

server.replicas_eldb = dictid;
Expand Down
2 changes: 2 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "server.h"
#include "monotonic.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "slowlog.h"
#include "bio.h"
#include "latency.h"
Expand Down Expand Up @@ -2516,6 +2517,7 @@ void resetServerStats(void) {
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
server.el_cmd_cnt_max = 0;
lazyfreeResetStats();
clusterSlotStatResetAll();
}

/* Make the thread killable at any time, so that kill threads functions
Expand Down
3 changes: 3 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,8 @@ typedef struct client {
unsigned long long net_input_bytes; /* Total network input bytes read from this client. */
unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */
unsigned long long commands_processed; /* Total count of commands this client executed. */
unsigned long long
net_output_bytes_curr_cmd; /* Total network output bytes sent to this client, by the current command. */
} client;

/* ACL information */
Expand Down Expand Up @@ -2079,6 +2081,7 @@ struct valkeyServer {
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
int cluster_slot_stats_enabled; /* Cluster wide slot usage statistics tracking enabled. */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Index in array is a bitwise or of CACHE_CONN_TYPE_* */
Expand Down
Loading