Skip to content

Commit

Permalink
Remove topology argument and cleanup related code changes
Browse files Browse the repository at this point in the history
Remove topology argument and cleanup related code changes.

Signed-off-by: Ram Prasad Voleti <ramvolet@amazon.com>
  • Loading branch information
Ram Prasad Voleti committed May 17, 2024
1 parent 715816a commit 22fc336
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 91 deletions.
16 changes: 3 additions & 13 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -873,19 +873,9 @@ void clusterCommand(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
/* CLUSTER SLOTS */
clusterCommandSlots(c);
} else if (!strcasecmp(c->argv[1]->ptr,"shards") &&
(c->argc == 2 || c->argc == 3))
{
/* CLUSTER SHARDS [TOPOLOGY] */
int topology = 1;
if (c->argc == 3 && (strcasecmp(c->argv[2]->ptr,"topology"))) {
addReplyErrorObject(c,shared.syntaxerr);
return;
} else if (c->argc == 2) {
topology = 0;
}

clusterCommandShards(c, topology);
} else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) {
/* CLUSTER SHARDS */
clusterCommandShards(c);
} else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
/* CLUSTER INFO */

Expand Down
2 changes: 1 addition & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ int clusterManualFailoverTimeLimit(void);
void clusterCommandSlots(client * c);
void clusterCommandMyId(client *c);
void clusterCommandMyShardId(client *c);
void clusterCommandShards(client *c, int topology);
void clusterCommandShards(client *c);
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary);

int clusterNodeCoversSlot(clusterNode *n, int slot);
Expand Down
98 changes: 50 additions & 48 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ void clusterUpdateState(void);
int clusterNodeCoversSlot(clusterNode *n, int slot);
list *clusterGetNodesInMyShard(clusterNode *node);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
int clusterNodeAddMaster(clusterNode *master);
int clusterNodeRemoveMaster(clusterNode *master);
int clusterNodeAddToMasters(clusterNode *master);
int clusterNodeRemoveFromMasters(clusterNode *master);
int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node);
Expand Down Expand Up @@ -228,7 +228,7 @@ int auxShardIdSetter(clusterNode *n, void *value, int length) {
}
/* Initially, during the load, add every node as master until the respective
* role is assigned with the persisted shard ID. */
clusterNodeAddMaster(n);
clusterNodeAddToMasters(n);
return C_OK;
}

Expand Down Expand Up @@ -595,15 +595,15 @@ int clusterLoadConfig(char *filename) {
/* Since the role of node is decided as replica, remove it from
* master list which was added initially during the load and continue
* maintain the persisted master list */
clusterNodeRemoveMaster(n);
clusterNodeRemoveFromMasters(n);
n->slaveof = master;
clusterNodeAddSlave(master,n);
} else if (auxFieldHandlers[af_shard_id].isPresent(n) == 0) {
/* n is a primary but it does not have a persisted shard_id.
* This happens if we are loading a nodes.conf generated by
* an older version of the server. We should manually update the
* shard membership in this case */
clusterNodeAddMaster(n);
clusterNodeAddToMasters(n);
}

/* Set ping sent / pong received timestamps */
Expand Down Expand Up @@ -1016,7 +1016,7 @@ void clusterInit(void) {
serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
myself->name);
clusterAddNode(myself);
clusterNodeAddMaster(myself);
clusterNodeAddToMasters(myself);
saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1);
Expand Down Expand Up @@ -1136,7 +1136,7 @@ void clusterReset(int hard) {
}

/* Re-populate masters */
clusterNodeAddMaster(myself);
clusterNodeAddToMasters(myself);

/* Make sure to persist the new config and update the state. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
Expand Down Expand Up @@ -1485,7 +1485,7 @@ int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
return C_ERR;
}

int clusterNodeRemoveMaster(clusterNode *master) {
int clusterNodeRemoveFromMasters(clusterNode *master) {
for (int j = 0; j < server.cluster->nummasters; j++) {
if (server.cluster->masters[j] == master) {
if ((j+1) < server.cluster->nummasters) {
Expand Down Expand Up @@ -1516,7 +1516,7 @@ int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
return C_OK;
}

int clusterNodeAddMaster(clusterNode *master) {
int clusterNodeAddToMasters(clusterNode *master) {
/* If it's already a master, don't add it again. */
for (int j = 0; j < server.cluster->nummasters; j++)
if (server.cluster->masters[j] == master) return C_ERR;
Expand Down Expand Up @@ -1549,7 +1549,7 @@ void freeClusterNode(clusterNode *n) {

/* Remove this node from the list of slaves or masters. */
if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
else clusterNodeRemoveMaster(n);
else clusterNodeRemoveFromMasters(n);

/* Unlink from the set of nodes. */
nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
Expand Down Expand Up @@ -1582,8 +1582,7 @@ void clusterAddNode(clusterNode *node) {
* 1) Mark all the slots handled by it as unassigned.
* 2) Remove all the failure reports sent by this node and referenced by
* other nodes.
* 3) Remove the node from the owning shard
* 4) Free the node with freeClusterNode() that will in turn remove it
* 3) Free the node with freeClusterNode() that will in turn remove it
* from the hash table and from the list of slaves of its master, if
* it is a slave node.
*/
Expand Down Expand Up @@ -1612,7 +1611,7 @@ void clusterDelNode(clusterNode *delnode) {
}
dictReleaseIterator(di);

/* 4) Free the node, unlinking it from the cluster. */
/* 3) Free the node, unlinking it from the cluster. */
freeClusterNode(delnode);
}

Expand Down Expand Up @@ -1656,7 +1655,7 @@ void clusterRenameNode(clusterNode *node, char *newname) {
serverAssert(retval == DICT_OK);
memcpy(node->name, newname, CLUSTER_NAMELEN);
clusterAddNode(node);
clusterNodeAddMaster(node);
clusterNodeAddToMasters(node);
}

/* -----------------------------------------------------------------------------
Expand Down Expand Up @@ -2220,7 +2219,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
node->tls_port = msg_tls_port;
node->cport = ntohs(g->cport);
clusterAddNode(node);
clusterNodeAddMaster(node);
clusterNodeAddToMasters(node);
}
}

Expand Down Expand Up @@ -2313,7 +2312,7 @@ void clusterSetNodeAsMaster(clusterNode *n) {
n->flags &= ~CLUSTER_NODE_SLAVE;
n->slaveof = NULL;

clusterNodeAddMaster(n);
clusterNodeAddToMasters(n);

/* Update config and state. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
Expand Down Expand Up @@ -3066,9 +3065,15 @@ int clusterProcessPacket(clusterLink *link) {
if (sender->slaveof)
clusterNodeRemoveSlave(sender->slaveof,sender);
else
clusterNodeRemoveMaster(sender);

clusterNodeAddSlave(master, sender);
clusterNodeRemoveFromMasters(sender);
serverLog(LL_NOTICE,
"Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s",
sender->name,
sender->human_nodename,
master->name,
master->human_nodename,
sender->shard_id);
clusterNodeAddSlave(master,sender);
sender->slaveof = master;

/* Update the shard_id when a replica is connected to its
Expand Down Expand Up @@ -5246,7 +5251,7 @@ void clusterSetMaster(clusterNode *n) {
serverAssert(myself->numslots == 0);

if (clusterNodeIsMaster(myself)) {
clusterNodeRemoveMaster(myself);
clusterNodeRemoveFromMasters(myself);
myself->flags |= CLUSTER_NODE_SLAVE;
clusterCloseAllSlots();
} else {
Expand Down Expand Up @@ -5627,7 +5632,7 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}

/* Add detailed information of a node to the output buffer of the given client. */
void addNodeDetailsToShardReply(client *c, clusterNode *node, int topology) {
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
int reply_count = 0;
void *node_replylen = addReplyDeferredLen(c);
addReplyBulkCString(c, "id");
Expand Down Expand Up @@ -5671,51 +5676,49 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node, int topology) {
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
reply_count++;

if (!topology) {
addReplyBulkCString(c, "replication-offset");
addReplyLongLong(c, node_offset);
reply_count++;
addReplyBulkCString(c, "replication-offset");
addReplyLongLong(c, node_offset);
reply_count++;

addReplyBulkCString(c, "health");
const char *health_msg = NULL;
if (nodeFailed(node)) {
health_msg = "fail";
} else if (nodeIsSlave(node) && node_offset == 0) {
health_msg = "loading";
} else {
health_msg = "online";
}
addReplyBulkCString(c, health_msg);
reply_count++;
addReplyBulkCString(c, "health");
const char *health_msg = NULL;
if (nodeFailed(node)) {
health_msg = "fail";
} else if (nodeIsSlave(node) && node_offset == 0) {
health_msg = "loading";
} else {
health_msg = "online";
}
addReplyBulkCString(c, health_msg);
reply_count++;

setDeferredMapLen(c, node_replylen, reply_count);
}

/* Add the shard reply of a single shard based off the given primary node. */
void addShardReplyForClusterShards(client *c, clusterNode* n, int topology) {
void addShardReplyForClusterShards(client *c, clusterNode* primary) {
addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots");

if (n->slot_info_pairs != NULL) {
serverAssert((n->slot_info_pairs_count % 2) == 0);
addReplyArrayLen(c, n->slot_info_pairs_count);
for (int i = 0; i < n->slot_info_pairs_count; i++)
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]);
if (primary->slot_info_pairs != NULL) {
serverAssert((primary->slot_info_pairs_count % 2) == 0);
addReplyArrayLen(c, primary->slot_info_pairs_count);
for (int i = 0; i < primary->slot_info_pairs_count; i++)
addReplyLongLong(c, (unsigned long)primary->slot_info_pairs[i]);
} else {
/* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0);
}

list *nodes = clusterGetNodesInMyShard(n);
list *nodes = clusterGetNodesInMyShard(primary);

addReplyBulkCString(c, "nodes");
addReplyArrayLen(c, listLength(nodes));
listIter li;
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
n = listNodeValue(ln);
addNodeDetailsToShardReply(c, n, topology);
clusterNode *n = listNodeValue(ln);
addNodeDetailsToShardReply(c, n);
clusterFreeNodesSlotsInfo(n);
}
listRelease(nodes);
Expand All @@ -5724,13 +5727,12 @@ void addShardReplyForClusterShards(client *c, clusterNode* n, int topology) {
/* Add to the output buffer of the given client, an array of slot (start, end)
* pair owned by the shard, also the primary and set of replica(s) along with
* information about each node. */
void clusterCommandShards(client *c, int topology) {
serverAssert(server.cluster->nummasters > 0);
void clusterCommandShards(client *c) {
addReplyArrayLen(c, server.cluster->nummasters);
/* This call will add slot_info_pairs to all nodes */
clusterGenNodesSlotsInfo(0);
for (int i = 0; i < server.cluster->nummasters; i++) {
addShardReplyForClusterShards(c, server.cluster->masters[i], topology);
addShardReplyForClusterShards(c, server.cluster->masters[i]);
}
}

Expand Down
7 changes: 1 addition & 6 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -895,11 +895,6 @@ struct COMMAND_ARG CLUSTER_SETSLOT_Args[] = {
#define CLUSTER_SHARDS_Keyspecs NULL
#endif

/* CLUSTER SHARDS argument table */
struct COMMAND_ARG CLUSTER_SHARDS_Args[] = {
{MAKE_ARG("topology",ARG_TYPE_PURE_TOKEN,-1,"TOPOLOGY",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
};

/********** CLUSTER SLAVES ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -973,7 +968,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)},
{MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args},
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,0,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,0,clusterCommand,-2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,1),.args=CLUSTER_SHARDS_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,0,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER SHARDS`","7.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
{0}
Expand Down
16 changes: 3 additions & 13 deletions src/commands/cluster-shards.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,13 @@
"complexity": "O(N) where N is the total number of cluster nodes",
"group": "cluster",
"since": "7.0.0",
"arity": -2,
"arity": 2,
"container": "CLUSTER",
"function": "clusterCommand",
"command_flags": [
"LOADING",
"STALE"
],
"arguments":[
{
"name": "topology",
"type": "pure-token",
"token": "TOPOLOGY",
"optional": true
}
],
"reply_schema": {
"description": "A nested list of a map of hash ranges and shard nodes describing individual shards.",
"type": "array",
Expand Down Expand Up @@ -69,8 +61,7 @@
]
},
"replication-offset": {
"type": "integer",
"optional": true
"type": "integer"
},
"health": {
"oneOf": [
Expand All @@ -83,8 +74,7 @@
{
"const": "online"
}
],
"optional": true
]
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions tests/cluster/cluster.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,10 @@ proc cluster_config_consistent {} {
}

if {$j == 0} {
set base_slots_cfg [R $j cluster slots]
set base_shards_cfg [R $j CLUSTER SHARDS TOPOLOGY]
set base_cfg [R $j cluster slots]
} else {
set slots_cfg [R $j cluster slots]
set shards_cfg [R $j CLUSTER SHARDS TOPOLOGY]
if {$slots_cfg != $base_slots_cfg || $shards_cfg != $base_shards_cfg} {
set cfg [R $j cluster slots]
if {$cfg != $base_cfg} {
return 0
}
}
Expand Down
21 changes: 21 additions & 0 deletions tests/cluster/tests/28-cluster-shards.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,24 @@ test "CLUSTER MYSHARDID reports same shard id after cluster restart" {
assert_equal [dict get $node_ids $i] [R $i cluster myshardid]
}
}

test "Deterministic order of CLUSTER SHARDS response" {
set node_ids {}
for {set j 0} {$j < 8} {incr j} {
set shards_cfg [R $j CLUSTER SHARDS]
set i 0
foreach shard_cfg $shards_cfg {
set nodes [dict get $shard_cfg nodes]
foreach node $nodes {
if {$j == 0} {
# Save the node ids from the first node response
dict set node_ids $i [dict get $node id]
} else {
# Verify the order of the node ids is the same as the first node response
assert_equal [dict get $node id] [dict get $node_ids $i]
}
incr i
}
}
}
}

0 comments on commit 22fc336

Please sign in to comment.