Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove master and slave from source code #591

Merged
merged 11 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/clang-format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
- name: Check for formatting changes
if: ${{ steps.clang-format.outputs.diff }}
run: |
echo "Code is not formatted correctly. Here is the diff:"
echo "ERROR: Code is not formatted correctly. Here is the diff:"
# Decode the Base64 diff to display it
echo "${{ steps.clang-format.outputs.diff }}" | base64 --decode
exit 1
Expand Down
23 changes: 12 additions & 11 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -904,12 +904,12 @@ int aofFsyncInProgress(void) {
/* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */
void aof_background_fsync(int fd) {
bioCreateFsyncJob(fd, server.master_repl_offset, 1);
bioCreateFsyncJob(fd, server.primary_repl_offset, 1);
}

/* Close the fd on the basis of aof_background_fsync. */
void aof_background_fsync_and_close(int fd) {
bioCreateCloseAofJob(fd, server.master_repl_offset, 1);
bioCreateCloseAofJob(fd, server.primary_repl_offset, 1);
}

/* Kills an AOFRW child process if exists */
Expand Down Expand Up @@ -1069,11 +1069,12 @@ void flushAppendOnlyFile(int force) {
} else {
/* All data is fsync'd already: Update fsynced_reploff_pending just in case.
* This is needed to avoid a WAITAOF hang in case a module used RM_Call with the NO_AOF flag,
* in which case master_repl_offset will increase but fsynced_reploff_pending won't be updated
* in which case primary_repl_offset will increase but fsynced_reploff_pending won't be updated
* (because there's no reason, from the AOF POV, to call fsync) and then WAITAOF may wait on
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice in this pr, you update comment as well, pls update the above line master_repl_offset to primary_repl_offset

* the higher offset (which contains data that was only propagated to replicas, and not to AOF) */
if (!sync_in_progress && server.aof_fsync != AOF_FSYNC_NO)
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, server.primary_repl_offset,
memory_order_relaxed);
return;
}
}
Expand Down Expand Up @@ -1243,7 +1244,7 @@ void flushAppendOnlyFile(int force) {
latencyAddSampleIfNeeded("aof-fsync-always", latency);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
server.aof_last_fsync = server.mstime;
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, server.primary_repl_offset, memory_order_relaxed);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.mstime - server.aof_last_fsync >= 1000) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd);
Expand Down Expand Up @@ -1355,7 +1356,7 @@ struct client *createAOFClient(void) {
c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */

/*
* The AOF client should never be blocked (unlike master
* The AOF client should never be blocked (unlike primary
* replication connection).
* This is because blocking the AOF client might cause
* deadlock (because potentially no one will unblock it).
Expand All @@ -1365,9 +1366,9 @@ struct client *createAOFClient(void) {
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls update line 1358 "master"

c->flags = CLIENT_DENY_BLOCKING;

/* We set the fake client as a slave waiting for the synchronization
/* We set the fake client as a replica waiting for the synchronization
* so that the server will not try to send replies to this client. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
c->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
return c;
}

Expand Down Expand Up @@ -2320,7 +2321,7 @@ int rewriteAppendOnlyFile(char *filename) {

if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(SLAVE_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
errno = error;
goto werr;
}
Expand Down Expand Up @@ -2403,12 +2404,12 @@ int rewriteAppendOnlyFileBackground(void) {
* between updates to `fsynced_reploff_pending` of the worker thread, belonging
* to the previous AOF, and the new one. This concern is specific for a full
* sync scenario where we don't wanna risk the ACKed replication offset
* jumping backwards or forward when switching to a different master. */
* jumping backwards or forward when switching to a different primary. */
bioDrainWorker(BIO_AOF_FSYNC);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 2406

/* Set the initial repl_offset, which will be applied to fsynced_reploff
* when AOFRW finishes (after possibly being updated by a bio thread) */
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, server.primary_repl_offset, memory_order_relaxed);
server.fsynced_reploff = 0;
}

Expand Down
6 changes: 3 additions & 3 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void initClientBlockingState(client *c) {
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
/* Master client should never be blocked unless pause or module */
serverAssert(!(c->flags & CLIENT_MASTER && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));
serverAssert(!(c->flags & CLIENT_PRIMARY && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));

c->flags |= CLIENT_BLOCKED;
c->bstate.btype = btype;
Expand Down Expand Up @@ -265,8 +265,8 @@ void replyToClientsBlockedOnShutdown(void) {

/* Mass-unblock clients because something changed in the instance that makes
* blocking no longer safe. For example clients blocked in list operations
* in an instance which turns from master to slave is unsafe, so this function
* is called when a master turns into a slave.
* in an instance which turns from master to replica is unsafe, so this function
* is called when a master turns into a replica.
*
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */
Expand Down
55 changes: 27 additions & 28 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,6 @@ void clusterCommand(client *c) {
}
kvstoreReleaseDictIterator(kvs_di);
} else if ((!strcasecmp(c->argv[1]->ptr, "slaves") || !strcasecmp(c->argv[1]->ptr, "replicas")) && c->argc == 3) {
/* CLUSTER SLAVES <NODE ID> */
/* CLUSTER REPLICAS <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
int j;
Expand All @@ -911,15 +910,15 @@ void clusterCommand(client *c) {
return;
}

if (clusterNodeIsSlave(n)) {
if (clusterNodeIsReplica(n)) {
addReplyError(c, "The specified node is not a master");
return;
}

/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
addReplyArrayLen(c, clusterNodeNumSlaves(n));
for (j = 0; j < clusterNodeNumSlaves(n); j++) {
sds ni = clusterGenNodeDescription(c, clusterNodeGetSlave(n, j), shouldReturnTlsInfo());
addReplyArrayLen(c, clusterNodeNumReplicas(n));
for (j = 0; j < clusterNodeNumReplicas(n); j++) {
sds ni = clusterGenNodeDescription(c, clusterNodeGetReplica(n, j), shouldReturnTlsInfo());
addReplyBulkCString(c, ni);
sdsfree(ni);
}
Expand Down Expand Up @@ -1048,8 +1047,8 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (clusterNodeIsMaster(myself) || c->flags & CLIENT_READONLY) {
if (n == clusterNodeGetMaster(myself) && getMigratingSlotDest(slot) != NULL) {
if (clusterNodeIsPrimary(myself) || c->flags & CLIENT_READONLY) {
if (n == clusterNodeGetPrimary(myself) && getMigratingSlotDest(slot) != NULL) {
migrating_slot = 1;
} else if (getImportingSlotSource(slot) != NULL) {
importing_slot = 1;
Expand Down Expand Up @@ -1122,7 +1121,7 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
/* MIGRATE always works in the context of the local node if the slot
* is open (migrating or importing state). We need to be able to freely
* move keys among instances in this case. */
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand && clusterNodeIsMaster(myself)) {
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand && clusterNodeIsPrimary(myself)) {
return myself;
}

Expand Down Expand Up @@ -1152,13 +1151,13 @@ getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int
}
}

/* Handle the read-only client case reading from a slave: if this
* node is a slave and the request is about a hash slot our master
/* Handle the read-only client case reading from a replica: if this
* node is a replica and the request is about a hash slot our primary
* is serving, we can reply without redirection. */
int is_write_command =
(cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) && !is_write_command && clusterNodeIsSlave(myself) &&
clusterNodeGetMaster(myself) == n) {
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) && !is_write_command && clusterNodeIsReplica(myself) &&
clusterNodeGetPrimary(myself) == n) {
return myself;
}

Expand Down Expand Up @@ -1204,7 +1203,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
* to detect timeouts, in order to handle the following case:
*
* 1) A client blocks with BLPOP or similar blocking operation.
* 2) The master migrates the hash slot elsewhere or turns into a slave.
* 2) The primary migrates the hash slot elsewhere or turns into a replica.
* 3) The client may remain blocked forever (or up to the max timeout time)
* waiting for a key change that will never happen.
*
Expand Down Expand Up @@ -1240,8 +1239,8 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {

/* if the client is read-only and attempting to access key that our
* replica can handle, allow it. */
if ((c->flags & CLIENT_READONLY) && !(c->lastcmd->flags & CMD_WRITE) && clusterNodeIsSlave(myself) &&
clusterNodeGetMaster(myself) == node) {
if ((c->flags & CLIENT_READONLY) && !(c->lastcmd->flags & CMD_WRITE) && clusterNodeIsReplica(myself) &&
clusterNodeGetPrimary(myself) == node) {
node = myself;
}

Expand Down Expand Up @@ -1331,9 +1330,9 @@ int isNodeAvailable(clusterNode *node) {
}

void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
int i, nested_elements = 3; /* slots (2) + master addr (1) */
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
int i, nested_elements = 3; /* slots (2) + primary addr (1) */
for (i = 0; i < clusterNodeNumReplicas(node); i++) {
if (!isNodeAvailable(clusterNodeGetReplica(node, i))) continue;
nested_elements++;
}
addReplyArrayLen(c, nested_elements);
Expand All @@ -1342,11 +1341,11 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
addNodeToNodeReply(c, node);

/* Remaining nodes in reply are replicas for slot range */
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
for (i = 0; i < clusterNodeNumReplicas(node); i++) {
/* This loop is copy/pasted from clusterGenNodeDescription()
* with modifications for per-slot node aggregation. */
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
addNodeToNodeReply(c, clusterNodeGetSlave(node, i));
if (!isNodeAvailable(clusterNodeGetReplica(node, i))) continue;
addNodeToNodeReply(c, clusterNodeGetReplica(node, i));
nested_elements--;
}
serverAssert(nested_elements == 3); /* Original 3 elements */
Expand All @@ -1364,7 +1363,7 @@ void clearCachedClusterSlotsResponse(void) {
sds generateClusterSlotResponse(void) {
client *recording_client = createCachedResponseClient();
clusterNode *n = NULL;
int num_masters = 0, start = -1;
int num_primaries = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(recording_client);

for (int i = 0; i <= CLUSTER_SLOTS; i++) {
Expand All @@ -1380,13 +1379,13 @@ sds generateClusterSlotResponse(void) {
* or end of slot. */
if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
addNodeReplyForClusterSlot(recording_client, n, start, i - 1);
num_masters++;
num_primaries++;
if (i == CLUSTER_SLOTS) break;
n = getNodeBySlot(i);
start = i;
}
}
setDeferredArrayLen(recording_client, slot_replylen, num_masters);
setDeferredArrayLen(recording_client, slot_replylen, num_primaries);
sds cluster_slot_response = aggregateClientOutputBuffer(recording_client);
deleteCachedResponseClient(recording_client);
return cluster_slot_response;
Expand All @@ -1405,8 +1404,8 @@ int verifyCachedClusterSlotsResponse(sds cached_response) {
void clusterCommandSlots(client *c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) 1) primary IP
* 2) primary port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
Expand Down Expand Up @@ -1446,8 +1445,8 @@ void askingCommand(client *c) {
}

/* The READONLY command is used by clients to enter the read-only mode.
* In this mode slaves will not redirect clients as long as clients access
* with read-only commands to keys that are served by the slave's master. */
* In this mode replica will not redirect clients as long as clients access
* with read-only commands to keys that are served by the replica's primary. */
void readonlyCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c, "This instance has cluster support disabled");
Expand Down
12 changes: 6 additions & 6 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ int clusterCommandSpecial(client *c);
const char **clusterCommandExtendedHelp(void);

int clusterAllowFailoverCmd(client *c);
void clusterPromoteSelfToMaster(void);
void clusterPromoteSelfToPrimary(void);
int clusterManualFailoverTimeLimit(void);

void clusterCommandSlots(client *c);
Expand All @@ -83,18 +83,18 @@ int getClusterSize(void);
int getMyShardSlotCount(void);
int handleDebugClusterCommand(client *c);
int clusterNodePending(clusterNode *node);
int clusterNodeIsMaster(clusterNode *n);
int clusterNodeIsPrimary(clusterNode *n);
char **getClusterNodesList(size_t *numnodes);
char *clusterNodeIp(clusterNode *node);
int clusterNodeIsSlave(clusterNode *node);
clusterNode *clusterNodeGetMaster(clusterNode *node);
int clusterNodeIsReplica(clusterNode *node);
clusterNode *clusterNodeGetPrimary(clusterNode *node);
char *clusterNodeGetName(clusterNode *node);
int clusterNodeTimedOut(clusterNode *node);
int clusterNodeIsFailing(clusterNode *node);
int clusterNodeIsNoFailover(clusterNode *node);
char *clusterNodeGetShardId(clusterNode *node);
int clusterNodeNumSlaves(clusterNode *node);
clusterNode *clusterNodeGetSlave(clusterNode *node, int slave_idx);
int clusterNodeNumReplicas(clusterNode *node);
clusterNode *clusterNodeGetReplica(clusterNode *node, int slave_idx);
clusterNode *getMigratingSlotDest(int slot);
clusterNode *getImportingSlotSource(int slot);
clusterNode *getNodeBySlot(int slot);
Expand Down
Loading
Loading