Skip to content
Permalink
1d13ff0b39
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
5776 lines (5164 sloc) 226 KB
/* Redis Cluster implementation.
*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
#include "cluster.h"
#include "endianconv.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <math.h>
/* A global reference to myself is handy to make code more clear.
* Myself always points to server.cluster->myself, that is, the clusterNode
* that represents this node. */
clusterNode *myself = NULL;
clusterNode *createClusterNode(char *nodename, int flags);
int clusterAddNode(clusterNode *node);
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void clusterReadHandler(connection *conn);
void clusterSendPing(clusterLink *link, int type);
void clusterSendFail(char *nodename);
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
void clusterUpdateState(void);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
sds clusterGenNodesDescription(int filter);
clusterNode *clusterLookupNode(const char *name);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
int clusterDelNodeSlots(clusterNode *node);
int clusterNodeSetSlotBit(clusterNode *n, int slot);
void clusterSetMaster(clusterNode *n);
void clusterHandleSlaveFailover(void);
void clusterHandleSlaveMigration(int max_slaves);
int bitmapTestBit(unsigned char *bitmap, int pos);
void clusterDoBeforeSleep(int flags);
void clusterSendUpdate(clusterLink *link, clusterNode *node);
void resetManualFailover(void);
void clusterCloseAllSlots(void);
void clusterSetNodeAsMaster(clusterNode *n);
void clusterDelNode(clusterNode *delnode);
sds representClusterNodeFlags(sds ci, uint16_t flags);
uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
/* -----------------------------------------------------------------------------
* Initialization
* -------------------------------------------------------------------------- */
/* Load the cluster config from 'filename'.
*
* If the file does not exist or is zero-length (this may happen because
* when we lock the nodes.conf file, we create a zero-length one for the
* sake of locking if it does not already exist), C_ERR is returned.
* If the configuration was loaded from the file, C_OK is returned. */
int clusterLoadConfig(char *filename) {
FILE *fp = fopen(filename,"r");
struct stat sb;
char *line;
int maxline, j;
if (fp == NULL) {
if (errno == ENOENT) {
return C_ERR;
} else {
serverLog(LL_WARNING,
"Loading the cluster node config from %s: %s",
filename, strerror(errno));
exit(1);
}
}
/* Check if the file is zero-length: if so return C_ERR to signal
* we have to write the config. */
if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
fclose(fp);
return C_ERR;
}
/* Parse the file. Note that single lines of the cluster config file can
* be really long as they include all the hash slots of the node.
* This means in the worst possible case, half of the Redis slots will be
* present in a single line, possibly in importing or migrating state, so
* together with the node ID of the sender/receiver.
*
* To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
maxline = 1024+CLUSTER_SLOTS*128;
line = zmalloc(maxline);
while(fgets(line,maxline,fp) != NULL) {
int argc;
sds *argv;
clusterNode *n, *master;
char *p, *s;
/* Skip blank lines, they can be created either by users manually
* editing nodes.conf or by the config writing process if stopped
* before the truncate() call. */
if (line[0] == '\n' || line[0] == '\0') continue;
/* Split the line into arguments for processing. */
argv = sdssplitargs(line,&argc);
if (argv == NULL) goto fmterr;
/* Handle the special "vars" line. Don't pretend it is the last
* line even if it actually is when generated by Redis. */
if (strcasecmp(argv[0],"vars") == 0) {
if (!(argc % 2)) goto fmterr;
for (j = 1; j < argc; j += 2) {
if (strcasecmp(argv[j],"currentEpoch") == 0) {
server.cluster->currentEpoch =
strtoull(argv[j+1],NULL,10);
} else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
server.cluster->lastVoteEpoch =
strtoull(argv[j+1],NULL,10);
} else {
serverLog(LL_WARNING,
"Skipping unknown cluster config variable '%s'",
argv[j]);
}
}
sdsfreesplitres(argv,argc);
continue;
}
/* Regular config lines have at least eight fields */
if (argc < 8) {
sdsfreesplitres(argv,argc);
goto fmterr;
}
/* Create this node if it does not exist */
n = clusterLookupNode(argv[0]);
if (!n) {
n = createClusterNode(argv[0],0);
clusterAddNode(n);
}
/* Address and port */
if ((p = strrchr(argv[1],':')) == NULL) {
sdsfreesplitres(argv,argc);
goto fmterr;
}
*p = '\0';
memcpy(n->ip,argv[1],strlen(argv[1])+1);
char *port = p+1;
char *busp = strchr(port,'@');
if (busp) {
*busp = '\0';
busp++;
}
n->port = atoi(port);
/* In older versions of nodes.conf the "@busport" part is missing.
* In this case we set it to the default offset of 10000 from the
* base port. */
n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
/* Parse flags */
p = s = argv[2];
while(p) {
p = strchr(s,',');
if (p) *p = '\0';
if (!strcasecmp(s,"myself")) {
serverAssert(server.cluster->myself == NULL);
myself = server.cluster->myself = n;
n->flags |= CLUSTER_NODE_MYSELF;
} else if (!strcasecmp(s,"master")) {
n->flags |= CLUSTER_NODE_MASTER;
} else if (!strcasecmp(s,"slave")) {
n->flags |= CLUSTER_NODE_SLAVE;
} else if (!strcasecmp(s,"fail?")) {
n->flags |= CLUSTER_NODE_PFAIL;
} else if (!strcasecmp(s,"fail")) {
n->flags |= CLUSTER_NODE_FAIL;
n->fail_time = mstime();
} else if (!strcasecmp(s,"handshake")) {
n->flags |= CLUSTER_NODE_HANDSHAKE;
} else if (!strcasecmp(s,"noaddr")) {
n->flags |= CLUSTER_NODE_NOADDR;
} else if (!strcasecmp(s,"nofailover")) {
n->flags |= CLUSTER_NODE_NOFAILOVER;
} else if (!strcasecmp(s,"noflags")) {
/* nothing to do */
} else {
serverPanic("Unknown flag in redis cluster config file");
}
if (p) s = p+1;
}
/* Get master if any. Set the master and populate master's
* slave list. */
if (argv[3][0] != '-') {
master = clusterLookupNode(argv[3]);
if (!master) {
master = createClusterNode(argv[3],0);
clusterAddNode(master);
}
n->slaveof = master;
clusterNodeAddSlave(master,n);
}
/* Set ping sent / pong received timestamps */
if (atoi(argv[4])) n->ping_sent = mstime();
if (atoi(argv[5])) n->pong_received = mstime();
/* Set configEpoch for this node. */
n->configEpoch = strtoull(argv[6],NULL,10);
/* Populate hash slots served by this instance. */
for (j = 8; j < argc; j++) {
int start, stop;
if (argv[j][0] == '[') {
/* Here we handle migrating / importing slots */
int slot;
char direction;
clusterNode *cn;
p = strchr(argv[j],'-');
serverAssert(p != NULL);
*p = '\0';
direction = p[1]; /* Either '>' or '<' */
slot = atoi(argv[j]+1);
if (slot < 0 || slot >= CLUSTER_SLOTS) {
sdsfreesplitres(argv,argc);
goto fmterr;
}
p += 3;
cn = clusterLookupNode(p);
if (!cn) {
cn = createClusterNode(p,0);
clusterAddNode(cn);
}
if (direction == '>') {
server.cluster->migrating_slots_to[slot] = cn;
} else {
server.cluster->importing_slots_from[slot] = cn;
}
continue;
} else if ((p = strchr(argv[j],'-')) != NULL) {
*p = '\0';
start = atoi(argv[j]);
stop = atoi(p+1);
} else {
start = stop = atoi(argv[j]);
}
if (start < 0 || start >= CLUSTER_SLOTS ||
stop < 0 || stop >= CLUSTER_SLOTS)
{
sdsfreesplitres(argv,argc);
goto fmterr;
}
while(start <= stop) clusterAddSlot(n, start++);
}
sdsfreesplitres(argv,argc);
}
/* Config sanity check */
if (server.cluster->myself == NULL) goto fmterr;
zfree(line);
fclose(fp);
serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
/* Something that should never happen: currentEpoch smaller than
* the max epoch found in the nodes configuration. However we handle this
* as some form of protection against manual editing of critical files. */
if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
server.cluster->currentEpoch = clusterGetMaxEpoch();
}
return C_OK;
fmterr:
serverLog(LL_WARNING,
"Unrecoverable error: corrupted cluster config file.");
zfree(line);
if (fp) fclose(fp);
exit(1);
}
/* Cluster node configuration is exactly the same as CLUSTER NODES output.
*
* This function writes the node config and returns 0, on error -1
* is returned.
*
* Note: we need to write the file in an atomic way from the point of view
* of the POSIX filesystem semantics, so that if the server is stopped
* or crashes during the write, we'll end with either the old file or the
* new one. Since we have the full payload to write available we can use
* a single write to write the whole file. If the pre-existing file was
* bigger we pad our payload with newlines that are anyway ignored and truncate
* the file afterward. */
int clusterSaveConfig(int do_fsync) {
sds ci;
size_t content_size;
struct stat sb;
int fd;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
/* Get the nodes description and concatenate our "vars" directive to
* save currentEpoch and lastVoteEpoch. */
ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
(unsigned long long) server.cluster->currentEpoch,
(unsigned long long) server.cluster->lastVoteEpoch);
content_size = sdslen(ci);
if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
== -1) goto err;
/* Pad the new payload if the existing file length is greater. */
if (fstat(fd,&sb) != -1) {
if (sb.st_size > (off_t)content_size) {
ci = sdsgrowzero(ci,sb.st_size);
memset(ci+content_size,'\n',sb.st_size-content_size);
}
}
if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
if (do_fsync) {
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
fsync(fd);
}
/* Truncate the file if needed to remove the final \n padding that
* is just garbage. */
if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
/* ftruncate() failing is not a critical error. */
}
close(fd);
sdsfree(ci);
return 0;
err:
if (fd != -1) close(fd);
sdsfree(ci);
return -1;
}
void clusterSaveConfigOrDie(int do_fsync) {
if (clusterSaveConfig(do_fsync) == -1) {
serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
exit(1);
}
}
/* Lock the cluster config using flock(), and leaks the file descritor used to
* acquire the lock so that the file will be locked forever.
*
* This works because we always update nodes.conf with a new version
* in-place, reopening the file, and writing to it in place (later adjusting
* the length with ftruncate()).
*
* On success C_OK is returned, otherwise an error is logged and
* the function returns C_ERR to signal a lock was not acquired. */
int clusterLockConfig(char *filename) {
/* flock() does not exist on Solaris
* and a fcntl-based solution won't help, as we constantly re-open that file,
* which will release _all_ locks anyway
*/
#if !defined(__sun)
/* To lock it, we need to open the file in a way it is created if
* it does not exist, otherwise there is a race condition with other
* processes. */
int fd = open(filename,O_WRONLY|O_CREAT,0644);
if (fd == -1) {
serverLog(LL_WARNING,
"Can't open %s in order to acquire a lock: %s",
filename, strerror(errno));
return C_ERR;
}
if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
if (errno == EWOULDBLOCK) {
serverLog(LL_WARNING,
"Sorry, the cluster configuration file %s is already used "
"by a different Redis Cluster node. Please make sure that "
"different nodes use different cluster configuration "
"files.", filename);
} else {
serverLog(LL_WARNING,
"Impossible to lock %s: %s", filename, strerror(errno));
}
close(fd);
return C_ERR;
}
/* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
* lock to the file as long as the process exists. */
#endif /* __sun */
return C_OK;
}
/* Some flags (currently just the NOFAILOVER flag) may need to be updated
* in the "myself" node based on the current configuration of the node,
* that may change at runtime via CONFIG SET. This function changes the
* set of flags in myself->flags accordingly. */
void clusterUpdateMyselfFlags(void) {
int oldflags = myself->flags;
int nofailover = server.cluster_slave_no_failover ?
CLUSTER_NODE_NOFAILOVER : 0;
myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
myself->flags |= nofailover;
if (myself->flags != oldflags) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
}
void clusterInit(void) {
int saveconf = 0;
server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;
server.cluster->state = CLUSTER_FAIL;
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster->nodes_black_list =
dictCreate(&clusterNodesBlackListDictType,NULL);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
server.cluster->stats_bus_messages_sent[i] = 0;
server.cluster->stats_bus_messages_received[i] = 0;
}
server.cluster->stats_pfail_nodes = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots();
/* Lock the cluster config file to make sure every node uses
* its own nodes.conf. */
if (clusterLockConfig(server.cluster_configfile) == C_ERR)
exit(1);
/* Load or create a new nodes configuration. */
if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
/* No configuration found. We will just use the random name provided
* by the createClusterNode() function. */
myself = server.cluster->myself =
createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
myself->name);
clusterAddNode(myself);
saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1);
/* We need a listening TCP port for our cluster messaging needs. */
server.cfd_count = 0;
/* Port sanity check II
* The other handshake port check is triggered too late to stop
* us from trying to use a too-high cluster port number. */
int port = server.tls_cluster ? server.tls_port : server.port;
if (port > (65535-CLUSTER_PORT_INCR)) {
serverLog(LL_WARNING, "Redis port number too high. "
"Cluster communication port is 10,000 port "
"numbers higher than your Redis port. "
"Your Redis port number must be "
"lower than 55535.");
exit(1);
}
if (listenToPort(port+CLUSTER_PORT_INCR,
server.cfd,&server.cfd_count) == C_ERR)
{
exit(1);
} else {
int j;
for (j = 0; j < server.cfd_count; j++) {
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR)
serverPanic("Unrecoverable error creating Redis Cluster "
"file event.");
}
}
/* The slots -> keys map is a radix tree. Initialize it here. */
server.cluster->slots_to_keys = raxNew();
memset(server.cluster->slots_keys_count,0,
sizeof(server.cluster->slots_keys_count));
/* Set myself->port / cport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
myself->port = port;
myself->cport = port+CLUSTER_PORT_INCR;
if (server.cluster_announce_port)
myself->port = server.cluster_announce_port;
if (server.cluster_announce_bus_port)
myself->cport = server.cluster_announce_bus_port;
server.cluster->mf_end = 0;
resetManualFailover();
clusterUpdateMyselfFlags();
}
/* Reset a node performing a soft or hard reset:
*
* 1) All other nodes are forget.
* 2) All the assigned / open slots are released.
* 3) If the node is a slave, it turns into a master.
* 5) Only for hard reset: a new Node ID is generated.
* 6) Only for hard reset: currentEpoch and configEpoch are set to 0.
* 7) The new configuration is saved and the cluster state updated.
* 8) If the node was a slave, the whole data set is flushed away. */
void clusterReset(int hard) {
dictIterator *di;
dictEntry *de;
int j;
/* Turn into master. */
if (nodeIsSlave(myself)) {
clusterSetNodeAsMaster(myself);
replicationUnsetMaster();
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
}
/* Close slots, reset manual failover state. */
clusterCloseAllSlots();
resetManualFailover();
/* Unassign all the slots. */
for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
/* Forget all the nodes, but myself. */
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node == myself) continue;
clusterDelNode(node);
}
dictReleaseIterator(di);
/* Hard reset only: set epochs to 0, change node ID. */
if (hard) {
sds oldname;
server.cluster->currentEpoch = 0;
server.cluster->lastVoteEpoch = 0;
myself->configEpoch = 0;
serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");
/* To change the Node ID we need to remove the old name from the
* nodes table, change the ID, and re-add back with new name. */
oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
dictDelete(server.cluster->nodes,oldname);
sdsfree(oldname);
getRandomHexChars(myself->name, CLUSTER_NAMELEN);
clusterAddNode(myself);
serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
}
/* Make sure to persist the new config and update the state. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
}
/* -----------------------------------------------------------------------------
* CLUSTER communication link
* -------------------------------------------------------------------------- */
clusterLink *createClusterLink(clusterNode *node) {
clusterLink *link = zmalloc(sizeof(*link));
link->ctime = mstime();
link->sndbuf = sdsempty();
link->rcvbuf = sdsempty();
link->node = node;
link->conn = NULL;
return link;
}
/* Free a cluster link, but does not free the associated node of course.
* This function will just make sure that the original node associated
* with this link will have the 'link' field set to NULL. */
void freeClusterLink(clusterLink *link) {
if (link->conn) {
connClose(link->conn);
link->conn = NULL;
}
sdsfree(link->sndbuf);
sdsfree(link->rcvbuf);
if (link->node)
link->node->link = NULL;
zfree(link);
}
static void clusterConnAcceptHandler(connection *conn) {
clusterLink *link;
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE,
"Error accepting cluster node connection: %s", connGetLastError(conn));
connClose(conn);
return;
}
/* Create a link object we use to handle the connection.
* It gets passed to the readable handler when data is available.
* Initiallly the link->node pointer is set to NULL as we don't know
* which node is, but the right node is references once we know the
* node identity. */
link = createClusterLink(NULL);
link->conn = conn;
connSetPrivateData(conn, link);
/* Register read handler */
connSetReadHandler(conn, clusterReadHandler);
}
#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
/* If the server is starting up, don't accept cluster connections:
* UPDATE messages may interact with the database content. */
if (server.masterhost == NULL && server.loading) return;
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_VERBOSE,
"Error accepting cluster node: %s", server.neterr);
return;
}
connection *conn = server.tls_cluster ? connCreateAcceptedTLS(cfd,1) : connCreateAcceptedSocket(cfd);
connNonBlock(conn);
connEnableTcpNoDelay(conn);
/* Use non-blocking I/O for cluster messages. */
serverLog(LL_VERBOSE,"Accepting cluster node connection from %s:%d", cip, cport);
/* Accept the connection now. connAccept() may call our handler directly
* or schedule it for later depending on connection implementation.
*/
if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) {
if (connGetState(conn) == CONN_STATE_ERROR)
serverLog(LL_VERBOSE,
"Error accepting cluster node connection: %s",
connGetLastError(conn));
connClose(conn);
return;
}
}
}
/* -----------------------------------------------------------------------------
* Key space handling
* -------------------------------------------------------------------------- */
/* We have 16384 hash slots. The hash slot of a given key is obtained
* as the least significant 14 bits of the crc16 of the key.
*
* However if the key contains the {...} pattern, only the part between
* { and } is hashed. This may be useful in the future to force certain
* keys to be in the same node (assuming no resharding is in progress). */
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key,keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}
/* -----------------------------------------------------------------------------
* CLUSTER node API
* -------------------------------------------------------------------------- */
/* Create a new cluster node, with the specified flags.
* If "nodename" is NULL this is considered a first handshake and a random
* node name is assigned to this node (it will be fixed later when we'll
* receive the first pong).
*
* The node is created and returned to the user, but it is not automatically
* added to the nodes hash table. */
clusterNode *createClusterNode(char *nodename, int flags) {
clusterNode *node = zmalloc(sizeof(*node));
if (nodename)
memcpy(node->name, nodename, CLUSTER_NAMELEN);
else
getRandomHexChars(node->name, CLUSTER_NAMELEN);
node->ctime = mstime();
node->configEpoch = 0;
node->flags = flags;
memset(node->slots,0,sizeof(node->slots));
node->numslots = 0;
node->numslaves = 0;
node->slaves = NULL;
node->slaveof = NULL;
node->ping_sent = node->pong_received = 0;
node->fail_time = 0;
node->link = NULL;
memset(node->ip,0,sizeof(node->ip));
node->port = 0;
node->cport = 0;
node->fail_reports = listCreate();
node->voted_time = 0;
node->orphaned_time = 0;
node->repl_offset_time = 0;
node->repl_offset = 0;
listSetFreeMethod(node->fail_reports,zfree);
return node;
}
/* This function is called every time we get a failure report from a node.
* The side effect is to populate the fail_reports list (or to update
* the timestamp of an existing report).
*
* 'failing' is the node that is in failure state according to the
* 'sender' node.
*
* The function returns 0 if it just updates a timestamp of an existing
* failure report from the same sender. 1 is returned if a new failure
* report is created. */
int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
list *l = failing->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;
/* If a failure report from the same sender already exists, just update
* the timestamp. */
listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
if (fr->node == sender) {
fr->time = mstime();
return 0;
}
}
/* Otherwise create a new report. */
fr = zmalloc(sizeof(*fr));
fr->node = sender;
fr->time = mstime();
listAddNodeTail(l,fr);
return 1;
}
/* Remove failure reports that are too old, where too old means reasonably
* older than the global node timeout. Note that anyway for a node to be
* flagged as FAIL we need to have a local PFAIL state that is at least
* older than the global node timeout, so we don't just trust the number
* of failure reports from other nodes. */
void clusterNodeCleanupFailureReports(clusterNode *node) {
list *l = node->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;
mstime_t maxtime = server.cluster_node_timeout *
CLUSTER_FAIL_REPORT_VALIDITY_MULT;
mstime_t now = mstime();
listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
if (now - fr->time > maxtime) listDelNode(l,ln);
}
}
/* Remove the failing report for 'node' if it was previously considered
* failing by 'sender'. This function is called when a node informs us via
* gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
*
* Note that this function is called relatively often as it gets called even
* when there are no nodes failing, and is O(N), however when the cluster is
* fine the failure reports list is empty so the function runs in constant
* time.
*
* The function returns 1 if the failure report was found and removed.
* Otherwise 0 is returned. */
int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
list *l = node->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;
/* Search for a failure report from this sender. */
listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
if (fr->node == sender) break;
}
if (!ln) return 0; /* No failure report from this sender. */
/* Remove the failure report. */
listDelNode(l,ln);
clusterNodeCleanupFailureReports(node);
return 1;
}
/* Return the number of external nodes that believe 'node' is failing,
* not including this node, that may have a PFAIL or FAIL state for this
* node as well. */
int clusterNodeFailureReportsCount(clusterNode *node) {
clusterNodeCleanupFailureReports(node);
return listLength(node->fail_reports);
}
int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
int j;
for (j = 0; j < master->numslaves; j++) {
if (master->slaves[j] == slave) {
if ((j+1) < master->numslaves) {
int remaining_slaves = (master->numslaves - j) - 1;
memmove(master->slaves+j,master->slaves+(j+1),
(sizeof(*master->slaves) * remaining_slaves));
}
master->numslaves--;
if (master->numslaves == 0)
master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
return C_OK;
}
}
return C_ERR;
}
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
int j;
/* If it's already a slave, don't add it again. */
for (j = 0; j < master->numslaves; j++)
if (master->slaves[j] == slave) return C_ERR;
master->slaves = zrealloc(master->slaves,
sizeof(clusterNode*)*(master->numslaves+1));
master->slaves[master->numslaves] = slave;
master->numslaves++;
master->flags |= CLUSTER_NODE_MIGRATE_TO;
return C_OK;
}
int clusterCountNonFailingSlaves(clusterNode *n) {
int j, okslaves = 0;
for (j = 0; j < n->numslaves; j++)
if (!nodeFailed(n->slaves[j])) okslaves++;
return okslaves;
}
/* Low level cleanup of the node structure. Only called by clusterDelNode(). */
void freeClusterNode(clusterNode *n) {
sds nodename;
int j;
/* If the node has associated slaves, we have to set
* all the slaves->slaveof fields to NULL (unknown). */
for (j = 0; j < n->numslaves; j++)
n->slaves[j]->slaveof = NULL;
/* Remove this node from the list of slaves of its master. */
if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
/* Unlink from the set of nodes. */
nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
sdsfree(nodename);
/* Release link and associated data structures. */
if (n->link) freeClusterLink(n->link);
listRelease(n->fail_reports);
zfree(n->slaves);
zfree(n);
}
/* Add a node to the nodes hash table */
int clusterAddNode(clusterNode *node) {
int retval;
retval = dictAdd(server.cluster->nodes,
sdsnewlen(node->name,CLUSTER_NAMELEN), node);
return (retval == DICT_OK) ? C_OK : C_ERR;
}
/* Remove a node from the cluster. The functio performs the high level
* cleanup, calling freeClusterNode() for the low level cleanup.
* Here we do the following:
*
* 1) Mark all the slots handled by it as unassigned.
* 2) Remove all the failure reports sent by this node and referenced by
* other nodes.
* 3) Free the node with freeClusterNode() that will in turn remove it
* from the hash table and from the list of slaves of its master, if
* it is a slave node.
*/
void clusterDelNode(clusterNode *delnode) {
int j;
dictIterator *di;
dictEntry *de;
/* 1) Mark slots as unassigned. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (server.cluster->importing_slots_from[j] == delnode)
server.cluster->importing_slots_from[j] = NULL;
if (server.cluster->migrating_slots_to[j] == delnode)
server.cluster->migrating_slots_to[j] = NULL;
if (server.cluster->slots[j] == delnode)
clusterDelSlot(j);
}
/* 2) Remove failure reports. */
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node == delnode) continue;
clusterNodeDelFailureReport(node,delnode);
}
dictReleaseIterator(di);
/* 3) Free the node, unlinking it from the cluster. */
freeClusterNode(delnode);
}
/* Node lookup by name */
clusterNode *clusterLookupNode(const char *name) {
sds s = sdsnewlen(name, CLUSTER_NAMELEN);
dictEntry *de;
de = dictFind(server.cluster->nodes,s);
sdsfree(s);
if (de == NULL) return NULL;
return dictGetVal(de);
}
/* This is only used after the handshake. When we connect a given IP/PORT
* as a result of CLUSTER MEET we don't have the node name yet, so we
* pick a random one, and will fix it when we receive the PONG request using
* this function. */
void clusterRenameNode(clusterNode *node, char *newname) {
int retval;
sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
node->name, newname);
retval = dictDelete(server.cluster->nodes, s);
sdsfree(s);
serverAssert(retval == DICT_OK);
memcpy(node->name, newname, CLUSTER_NAMELEN);
clusterAddNode(node);
}
/* -----------------------------------------------------------------------------
* CLUSTER config epoch handling
* -------------------------------------------------------------------------- */
/* Return the greatest configEpoch found in the cluster, or the current
* epoch if greater than any node configEpoch. */
uint64_t clusterGetMaxEpoch(void) {
uint64_t max = 0;
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->configEpoch > max) max = node->configEpoch;
}
dictReleaseIterator(di);
if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
return max;
}
/* If this node epoch is zero or is not already the greatest across the
* cluster (from the POV of the local configuration), this function will:
*
* 1) Generate a new config epoch, incrementing the current epoch.
* 2) Assign the new epoch to this node, WITHOUT any consensus.
* 3) Persist the configuration on disk before sending packets with the
* new configuration.
*
* If the new config epoch is generated and assigend, C_OK is returned,
* otherwise C_ERR is returned (since the node has already the greatest
* configuration around) and no operation is performed.
*
* Important note: this function violates the principle that config epochs
* should be generated with consensus and should be unique across the cluster.
* However Redis Cluster uses this auto-generated new config epochs in two
* cases:
*
* 1) When slots are closed after importing. Otherwise resharding would be
* too expensive.
* 2) When CLUSTER FAILOVER is called with options that force a slave to
* failover its master even if there is not master majority able to
* create a new configuration epoch.
*
* Redis Cluster will not explode using this function, even in the case of
* a collision between this node and another node, generating the same
* configuration epoch unilaterally, because the config epoch conflict
* resolution algorithm will eventually move colliding nodes to different
* config epochs. However using this function may violate the "last failover
* wins" rule, so should only be used with care. */
int clusterBumpConfigEpochWithoutConsensus(void) {
uint64_t maxEpoch = clusterGetMaxEpoch();
if (myself->configEpoch == 0 ||
myself->configEpoch != maxEpoch)
{
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG);
serverLog(LL_WARNING,
"New configEpoch set to %llu",
(unsigned long long) myself->configEpoch);
return C_OK;
} else {
return C_ERR;
}
}
/* This function is called when this node is a master, and we receive from
* another master a configuration epoch that is equal to our configuration
* epoch.
*
* BACKGROUND
*
* It is not possible that different slaves get the same config
* epoch during a failover election, because the slaves need to get voted
* by a majority. However when we perform a manual resharding of the cluster
* the node will assign a configuration epoch to itself without to ask
* for agreement. Usually resharding happens when the cluster is working well
* and is supervised by the sysadmin, however it is possible for a failover
* to happen exactly while the node we are resharding a slot to assigns itself
* a new configuration epoch, but before it is able to propagate it.
*
* So technically it is possible in this condition that two nodes end with
* the same configuration epoch.
*
* Another possibility is that there are bugs in the implementation causing
* this to happen.
*
* Moreover when a new cluster is created, all the nodes start with the same
* configEpoch. This collision resolution code allows nodes to automatically
* end with a different configEpoch at startup automatically.
*
* In all the cases, we want a mechanism that resolves this issue automatically
* as a safeguard. The same configuration epoch for masters serving different
* set of slots is not harmful, but it is if the nodes end serving the same
* slots for some reason (manual errors or software bugs) without a proper
* failover procedure.
*
* In general we want a system that eventually always ends with different
* masters having different configuration epochs whatever happened, since
* nothign is worse than a split-brain condition in a distributed system.
*
* BEHAVIOR
*
* When this function gets called, what happens is that if this node
* has the lexicographically smaller Node ID compared to the other node
* with the conflicting epoch (the 'sender' node), it will assign itself
* the greatest configuration epoch currently detected among nodes plus 1.
*
* This means that even if there are multiple nodes colliding, the node
* with the greatest Node ID never moves forward, so eventually all the nodes
* end with a different configuration epoch.
*/
void clusterHandleConfigEpochCollision(clusterNode *sender) {
/* Prerequisites: nodes have the same configEpoch and are both masters. */
if (sender->configEpoch != myself->configEpoch ||
!nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
/* Don't act if the colliding node has a smaller Node ID. */
if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
/* Get the next ID available at the best of this node knowledge. */
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterSaveConfigOrDie(1);
serverLog(LL_VERBOSE,
"WARNING: configEpoch collision with node %.40s."
" configEpoch set to %llu",
sender->name,
(unsigned long long) myself->configEpoch);
}
/* -----------------------------------------------------------------------------
* CLUSTER nodes blacklist
*
* The nodes blacklist is just a way to ensure that a given node with a given
* Node ID is not readded before some time elapsed (this time is specified
* in seconds in CLUSTER_BLACKLIST_TTL).
*
* This is useful when we want to remove a node from the cluster completely:
* when CLUSTER FORGET is called, it also puts the node into the blacklist so
* that even if we receive gossip messages from other nodes that still remember
* about the node we want to remove, we don't re-add it before some time.
*
* Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
* that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
* in the cluster without dealing with the problem of other nodes re-adding
* back the node to nodes we already sent the FORGET command to.
*
* The data structure used is a hash table with an sds string representing
* the node ID as key, and the time when it is ok to re-add the node as
* value.
* -------------------------------------------------------------------------- */
#define CLUSTER_BLACKLIST_TTL 60 /* 1 minute. */
/* Before of the addNode() or Exists() operations we always remove expired
* entries from the black list. This is an O(N) operation but it is not a
* problem since add / exists operations are called very infrequently and
* the hash table is supposed to contain very little elements at max.
* However without the cleanup during long uptimes and with some automated
* node add/removal procedures, entries could accumulate. */
void clusterBlacklistCleanup(void) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes_black_list);
while((de = dictNext(di)) != NULL) {
int64_t expire = dictGetUnsignedIntegerVal(de);
if (expire < server.unixtime)
dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
}
dictReleaseIterator(di);
}
/* Cleanup the blacklist and add a new node ID to the black list. */
void clusterBlacklistAddNode(clusterNode *node) {
dictEntry *de;
sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
clusterBlacklistCleanup();
if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
/* If the key was added, duplicate the sds string representation of
* the key for the next lookup. We'll free it at the end. */
id = sdsdup(id);
}
de = dictFind(server.cluster->nodes_black_list,id);
dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
sdsfree(id);
}
/* Return non-zero if the specified node ID exists in the blacklist.
* You don't need to pass an sds string here, any pointer to 40 bytes
* will work. */
int clusterBlacklistExists(char *nodeid) {
sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
int retval;
clusterBlacklistCleanup();
retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
sdsfree(id);
return retval;
}
/* -----------------------------------------------------------------------------
* CLUSTER messages exchange - PING/PONG and gossip
* -------------------------------------------------------------------------- */
/* This function checks if a given node should be marked as FAIL.
* It happens if the following conditions are met:
*
* 1) We received enough failure reports from other master nodes via gossip.
* Enough means that the majority of the masters signaled the node is
* down recently.
* 2) We believe this node is in PFAIL state.
*
* If a failure is detected we also inform the whole cluster about this
* event trying to force every other node to set the FAIL flag for the node.
*
* Note that the form of agreement used here is weak, as we collect the majority
* of masters state during some time, and even if we force agreement by
* propagating the FAIL message, because of partitions we may not reach every
* node. However:
*
* 1) Either we reach the majority and eventually the FAIL state will propagate
* to all the cluster.
* 2) Or there is no majority so no slave promotion will be authorized and the
* FAIL flag will be cleared after some time.
*/
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;
int needed_quorum = (server.cluster->size / 2) + 1;
if (!nodeTimedOut(node)) return; /* We can reach it. */
if (nodeFailed(node)) return; /* Already FAILing. */
failures = clusterNodeFailureReportsCount(node);
/* Also count myself as a voter if I'm a master. */
if (nodeIsMaster(myself)) failures++;
if (failures < needed_quorum) return; /* No weak agreement from masters. */
serverLog(LL_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);
/* Mark the node as failing. */
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
/* Broadcast the failing node name to everybody, forcing all the other
* reachable nodes to flag the node as FAIL. */
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
/* This function is called only if a node is marked as FAIL, but we are able
* to reach it again. It checks if there are the conditions to undo the FAIL
* state. */
void clearNodeFailureIfNeeded(clusterNode *node) {
mstime_t now = mstime();
serverAssert(nodeFailed(node));
/* For slaves we always clear the FAIL flag if we can contact the
* node again. */
if (nodeIsSlave(node) || node->numslots == 0) {
serverLog(LL_NOTICE,
"Clear FAIL state for node %.40s: %s is reachable again.",
node->name,
nodeIsSlave(node) ? "replica" : "master without slots");
node->flags &= ~CLUSTER_NODE_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
/* If it is a master and...
* 1) The FAIL state is old enough.
* 2) It is yet serving slots from our point of view (not failed over).
* Apparently no one is going to fix these slots, clear the FAIL flag. */
if (nodeIsMaster(node) && node->numslots > 0 &&
(now - node->fail_time) >
(server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
{
serverLog(LL_NOTICE,
"Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
node->name);
node->flags &= ~CLUSTER_NODE_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
}
/* Return true if we already have a node in HANDSHAKE state matching the
* specified ip address and port number. This function is used in order to
* avoid adding a new handshake node for the same address multiple times. */
int clusterHandshakeInProgress(char *ip, int port, int cport) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (!nodeInHandshake(node)) continue;
if (!strcasecmp(node->ip,ip) &&
node->port == port &&
node->cport == cport) break;
}
dictReleaseIterator(di);
return de != NULL;
}
/* Start an handshake with the specified address if there is not one
* already in progress. Returns non-zero if the handshake was actually
* started. On error zero is returned and errno is set to one of the
* following values:
*
* EAGAIN - There is already an handshake in progress for this address.
* EINVAL - IP or port are not valid. */
int clusterStartHandshake(char *ip, int port, int cport) {
clusterNode *n;
char norm_ip[NET_IP_STR_LEN];
struct sockaddr_storage sa;
/* IP sanity check */
if (inet_pton(AF_INET,ip,
&(((struct sockaddr_in *)&sa)->sin_addr)))
{
sa.ss_family = AF_INET;
} else if (inet_pton(AF_INET6,ip,
&(((struct sockaddr_in6 *)&sa)->sin6_addr)))
{
sa.ss_family = AF_INET6;
} else {
errno = EINVAL;
return 0;
}
/* Port sanity check */
if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
errno = EINVAL;
return 0;
}
/* Set norm_ip as the normalized string representation of the node
* IP address. */
memset(norm_ip,0,NET_IP_STR_LEN);
if (sa.ss_family == AF_INET)
inet_ntop(AF_INET,
(void*)&(((struct sockaddr_in *)&sa)->sin_addr),
norm_ip,NET_IP_STR_LEN);
else
inet_ntop(AF_INET6,
(void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
norm_ip,NET_IP_STR_LEN);
if (clusterHandshakeInProgress(norm_ip,port,cport)) {
errno = EAGAIN;
return 0;
}
/* Add the node with a random address (NULL as first argument to
* createClusterNode()). Everything will be fixed during the
* handshake. */
n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
n->cport = cport;
clusterAddNode(n);
return 1;
}
/* Process the gossip section of PING or PONG packets.
* Note that this function assumes that the packet is already sanity-checked
* by the caller, not in the content of the gossip section, but in the
* length. */
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
uint16_t count = ntohs(hdr->count);
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
while(count--) {
uint16_t flags = ntohs(g->flags);
clusterNode *node;
sds ci;
if (server.verbosity == LL_DEBUG) {
ci = representClusterNodeFlags(sdsempty(), flags);
serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
g->nodename,
g->ip,
ntohs(g->port),
ntohs(g->cport),
ci);
sdsfree(ci);
}
/* Update our state accordingly to the gossip sections */
node = clusterLookupNode(g->nodename);
if (node) {
/* We already know this node.
Handle failure reports, only when the sender is a master. */
if (sender && nodeIsMaster(sender) && node != myself) {
if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
if (clusterNodeAddFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}
markNodeAsFailingIfNeeded(node);
} else {
if (clusterNodeDelFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}
/* If from our POV the node is up (no failure flags are set),
* we have no pending ping for the node, nor we have failure
* reports for this node, update the last pong time with the
* one we see from the other nodes. */
if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
node->ping_sent == 0 &&
clusterNodeFailureReportsCount(node) == 0)
{
mstime_t pongtime = ntohl(g->pong_received);
pongtime *= 1000; /* Convert back to milliseconds. */
/* Replace the pong time with the received one only if
* it's greater than our view but is not in the future
* (with 500 milliseconds tolerance) from the POV of our
* clock. */
if (pongtime <= (server.mstime+500) &&
pongtime > node->pong_received)
{
node->pong_received = pongtime;
}
}
/* If we already know this node, but it is not reachable, and
* we see a different address in the gossip section of a node that
* can talk with this other node, update the address, disconnect
* the old link if any, so that we'll attempt to connect with the
* new address. */
if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
!(flags & CLUSTER_NODE_NOADDR) &&
!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
(strcasecmp(node->ip,g->ip) ||
node->port != ntohs(g->port) ||
node->cport != ntohs(g->cport)))
{
if (node->link) freeClusterLink(node->link);
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
node->port = ntohs(g->port);
node->cport = ntohs(g->cport);
node->flags &= ~CLUSTER_NODE_NOADDR;
}
} else {
/* If it's not in NOADDR state and we don't have it, we
* start a handshake process against this IP/PORT pairs.
*
* Note that we require that the sender of this gossip message
* is a well known node in our cluster, otherwise we risk
* joining another cluster. */
if (sender &&
!(flags & CLUSTER_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
{
clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
}
}
/* Next node */
g++;
}
}
/* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
* If 'announced_ip' length is non-zero, it is used instead of extracting
* the IP from the socket peer address. */
void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
if (announced_ip[0] != '\0') {
memcpy(buf,announced_ip,NET_IP_STR_LEN);
buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
} else {
connPeerToString(link->conn, buf, NET_IP_STR_LEN, NULL);
}
}
/* Update the node address to the IP address that can be extracted
* from link->fd, or if hdr->myip is non empty, to the address the node
* is announcing us. The port is taken from the packet header as well.
*
* If the address or port changed, disconnect the node link so that we'll
* connect again to the new address.
*
* If the ip/port pair are already correct no operation is performed at
* all.
*
* The function returns 0 if the node address is still the same,
* otherwise 1 is returned. */
int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
clusterMsg *hdr)
{
char ip[NET_IP_STR_LEN] = {0};
int port = ntohs(hdr->port);
int cport = ntohs(hdr->cport);
/* We don't proceed if the link is the same as the sender link, as this
* function is designed to see if the node link is consistent with the
* symmetric link that is used to receive PINGs from the node.
*
* As a side effect this function never frees the passed 'link', so
* it is safe to call during packet processing. */
if (link == node->link) return 0;
nodeIp2String(ip,link,hdr->myip);
if (node->port == port && node->cport == cport &&
strcmp(ip,node->ip) == 0) return 0;
/* IP / port is different, update it. */
memcpy(node->ip,ip,sizeof(ip));
node->port = port;
node->cport = cport;
if (node->link) freeClusterLink(node->link);
node->flags &= ~CLUSTER_NODE_NOADDR;
serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
node->name, node->ip, node->port);
/* Check if this is our master and we have to change the
* replication target as well. */
if (nodeIsSlave(myself) && myself->slaveof == node)
replicationSetMaster(node->ip, node->port);
return 1;
}
/* Reconfigure the specified node 'n' as a master. This function is called when
* a node that we believed to be a slave is now acting as master in order to
* update the state of the node. */
void clusterSetNodeAsMaster(clusterNode *n) {
if (nodeIsMaster(n)) return;
if (n->slaveof) {
clusterNodeRemoveSlave(n->slaveof,n);
if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
}
n->flags &= ~CLUSTER_NODE_SLAVE;
n->flags |= CLUSTER_NODE_MASTER;
n->slaveof = NULL;
/* Update config and state. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
/* This function is called when we receive a master configuration via a
* PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
* node, and the set of slots claimed under this configEpoch.
*
* What we do is to rebind the slots with newer configuration compared to our
* local configuration, and if needed, we turn ourself into a replica of the
* node (see the function comments for more info).
*
* The 'sender' is the node for which we received a configuration update.
* Sometimes it is not actually the "Sender" of the information, like in the
* case we receive the info via an UPDATE packet. */
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
int j;
clusterNode *curmaster, *newmaster = NULL;
/* The dirty slots list is a list of slots for which we lose the ownership
* while having still keys inside. This usually happens after a failover
* or after a manual cluster reconfiguration operated by the admin.
*
* If the update message is not able to demote a master to slave (in this
* case we'll resync with the master updating the whole key space), we
* need to delete all the keys in the slots we lost ownership. */
uint16_t dirty_slots[CLUSTER_SLOTS];
int dirty_slots_count = 0;
/* Here we set curmaster to this node or the node this node
* replicates to if it's a slave. In the for loop we are
* interested to check if slots are taken away from curmaster. */
curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
if (sender == myself) {
serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
return;
}
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots,j)) {
/* The slot is already bound to the sender of this message. */
if (server.cluster->slots[j] == sender) continue;
/* The slot is in importing state, it should be modified only
* manually via redis-trib (example: a resharding is in progress
* and the migrating side slot was already closed and is advertising
* a new config. We still want the slot to be closed manually). */
if (server.cluster->importing_slots_from[j]) continue;
/* We rebind the slot to the new node claiming it if:
* 1) The slot was unassigned or the new node claims it with a
* greater configEpoch.
* 2) We are not currently importing the slot. */
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch < senderConfigEpoch)
{
/* Was this slot mine, and still contains keys? Mark it as
* a dirty slot. */
if (server.cluster->slots[j] == myself &&
countKeysInSlot(j) &&
sender != myself)
{
dirty_slots[dirty_slots_count] = j;
dirty_slots_count++;
}
if (server.cluster->slots[j] == curmaster)
newmaster = sender;
clusterDelSlot(j);
clusterAddSlot(sender,j);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
}
}
}
/* After updating the slots configuration, don't do any actual change
* in the state of the server if a module disabled Redis Cluster
* keys redirections. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return;
/* If at least one slot was reassigned from a node to another node
* with a greater configEpoch, it is possible that:
* 1) We are a master left without slots. This means that we were
* failed over and we should turn into a replica of the new
* master.
* 2) We are a slave and our master is left without slots. We need
* to replicate to the new slots owner. */
if (newmaster && curmaster->numslots == 0) {
serverLog(LL_WARNING,
"Configuration change detected. Reconfiguring myself "
"as a replica of %.40s", sender->name);
clusterSetMaster(sender);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
} else if (dirty_slots_count) {
/* If we are here, we received an update message which removed
* ownership for certain slots we still have keys about, but still
* we are serving some slots, so this master node was not demoted to
* a slave.
*
* In order to maintain a consistent state between keys and slots
* we need to remove all the keys from the slots we lost. */
for (j = 0; j < dirty_slots_count; j++)
delKeysInSlot(dirty_slots[j]);
}
}
/* When this function is called, there is a packet to process starting
* at node->rcvbuf. Releasing the buffer is up to the caller, so this
* function should just handle the higher level stuff of processing the
* packet, modifying the cluster state if needed.
*
* The function returns 1 if the link is still valid after the packet
* was processed, otherwise 0 if the link was freed since the packet
* processing lead to some inconsistency error (for instance a PONG
* received from the wrong sender ID). */
int clusterProcessPacket(clusterLink *link) {
clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
uint32_t totlen = ntohl(hdr->totlen);
uint16_t type = ntohs(hdr->type);
if (type < CLUSTERMSG_TYPE_COUNT)
server.cluster->stats_bus_messages_received[type]++;
serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
type, (unsigned long) totlen);
/* Perform sanity checks */
if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
if (totlen > sdslen(link->rcvbuf)) return 1;
if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
/* Can't handle messages of different versions. */
return 1;
}
uint16_t flags = ntohs(hdr->flags);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
clusterNode *sender;
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
uint16_t count = ntohs(hdr->count);
uint32_t explen; /* expected length of this packet */
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += (sizeof(clusterMsgDataGossip)*count);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_FAIL) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataFail);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_PUBLISH) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) -
8 +
ntohl(hdr->data.publish.msg.channel_len) +
ntohl(hdr->data.publish.msg.message_len);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
type == CLUSTERMSG_TYPE_MFSTART)
{
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataUpdate);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_MODULE) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) -
3 + ntohl(hdr->data.module.msg.len);
if (totlen != explen) return 1;
}
/* Check if the sender is a known node. */
sender = clusterLookupNode(hdr->sender);
if (sender && !nodeInHandshake(sender)) {
/* Update our curretEpoch if we see a newer epoch in the cluster. */
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
senderConfigEpoch = ntohu64(hdr->configEpoch);
if (senderCurrentEpoch > server.cluster->currentEpoch)
server.cluster->currentEpoch = senderCurrentEpoch;
/* Update the sender configEpoch if it is publishing a newer one. */
if (senderConfigEpoch > sender->configEpoch) {
sender->configEpoch = senderConfigEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG);
}
/* Update the replication offset info for this node. */
sender->repl_offset = ntohu64(hdr->offset);
sender->repl_offset_time = mstime();
/* If we are a slave performing a manual failover and our master
* sent its offset while already paused, populate the MF state. */
if (server.cluster->mf_end &&
nodeIsSlave(myself) &&
myself->slaveof == sender &&
hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
server.cluster->mf_master_offset == 0)
{
server.cluster->mf_master_offset = sender->repl_offset;
serverLog(LL_WARNING,
"Received replication offset for paused "
"master manual failover: %lld",
server.cluster->mf_master_offset);
}
}
/* Initial processing of PING and MEET requests replying with a PONG. */
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
/* We use incoming MEET messages in order to set the address
* for 'myself', since only other cluster nodes will send us
* MEET messages on handshakes, when the cluster joins, or
* later if we changed address, and those nodes will use our
* official address to connect to us. So by obtaining this address
* from the socket is a simple way to discover / update our own
* address in the cluster without it being hardcoded in the config.
*
* However if we don't have an address at all, we update the address
* even with a normal PING packet. If it's wrong it will be fixed
* by MEET later. */
if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
server.cluster_announce_ip == NULL)
{
char ip[NET_IP_STR_LEN];
if (connSockName(link->conn,ip,sizeof(ip),NULL) != -1 &&
strcmp(ip,myself->ip))
{
memcpy(myself->ip,ip,NET_IP_STR_LEN);
serverLog(LL_WARNING,"IP address for this node updated to %s",
myself->ip);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, slaveof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. */
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
nodeIp2String(node->ip,link,hdr->myip);
node->port = ntohs(hdr->port);
node->cport = ntohs(hdr->cport);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
/* If this is a MEET packet from an unknown node, we still process
* the gossip section here since we have to trust the sender because
* of the message type. */
if (!sender && type == CLUSTERMSG_TYPE_MEET)
clusterProcessGossipSection(hdr,link);
/* Anyway reply with a PONG */
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}
/* PING, PONG, MEET: process config information. */
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
serverLog(LL_DEBUG,"%s packet received: %p",
type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
(void*)link->node);
if (link->node) {
if (nodeInHandshake(link->node)) {
/* If we already have this node, try to change the
* IP/port of the node with the new one. */
if (sender) {
serverLog(LL_VERBOSE,
"Handshake: we already know node %.40s, "
"updating the address if needed.", sender->name);
if (nodeUpdateAddressIfNeeded(sender,link,hdr))
{
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
/* Free this node as we already have it. This will
* cause the link to be freed as well. */
clusterDelNode(link->node);
return 0;
}
/* First thing to do is replacing the random name with the
* right node name if this was a handshake stage. */
clusterRenameNode(link->node, hdr->sender);
serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
link->node->name);
link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
} else if (memcmp(link->node->name,hdr->sender,
CLUSTER_NAMELEN) != 0)
{
/* If the reply has a non matching node ID we
* disconnect this node and set it as not having an associated
* address. */
serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
link->node->name,
(int)(mstime()-(link->node->ctime)),
link->node->flags);
link->node->flags |= CLUSTER_NODE_NOADDR;
link->node->ip[0] = '\0';
link->node->port = 0;
link->node->cport = 0;
freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
return 0;
}
}
/* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
* announced. This is a dynamic flag that we receive from the
* sender, and the latest status must be trusted. We need it to
* be propagated because the slave ranking used to understand the
* delay of each slave in the voting process, needs to know
* what are the instances really competing. */
if (sender) {
int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
sender->flags |= nofailover;
}
/* Update the node address if it changed. */
if (sender && type == CLUSTERMSG_TYPE_PING &&
!nodeInHandshake(sender) &&
nodeUpdateAddressIfNeeded(sender,link,hdr))
{
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
/* Update our info about the node */
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
link->node->pong_received = mstime();
link->node->ping_sent = 0;
/* The PFAIL condition can be reversed without external
* help if it is momentary (that is, if it does not
* turn into a FAIL state).
*
* The FAIL condition is also reversible under specific
* conditions detected by clearNodeFailureIfNeeded(). */
if (nodeTimedOut(link->node)) {
link->node->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
} else if (nodeFailed(link->node)) {
clearNodeFailureIfNeeded(link->node);
}
}
/* Check for role switch: slave -> master or master -> slave. */
if (sender) {
if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
sizeof(hdr->slaveof)))
{
/* Node is a master. */
clusterSetNodeAsMaster(sender);
} else {
/* Node is a slave. */
clusterNode *master = clusterLookupNode(hdr->slaveof);
if (nodeIsMaster(sender)) {
/* Master turned into a slave! Reconfigure the node. */
clusterDelNodeSlots(sender);
sender->flags &= ~(CLUSTER_NODE_MASTER|
CLUSTER_NODE_MIGRATE_TO);
sender->flags |= CLUSTER_NODE_SLAVE;
/* Update config and state. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
/* Master node changed for this slave? */
if (master && sender->slaveof != master) {
if (sender->slaveof)
clusterNodeRemoveSlave(sender->slaveof,sender);
clusterNodeAddSlave(master,sender);
sender->slaveof = master;
/* Update config. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
}
/* Update our info about served slots.
*
* Note: this MUST happen after we update the master/slave state
* so that CLUSTER_NODE_MASTER flag will be set. */
/* Many checks are only needed if the set of served slots this
* instance claims is different compared to the set of slots we have
* for it. Check this ASAP to avoid other computational expansive
* checks later. */
clusterNode *sender_master = NULL; /* Sender or its master if slave. */
int dirty_slots = 0; /* Sender claimed slots don't match my view? */
if (sender) {
sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
if (sender_master) {
dirty_slots = memcmp(sender_master->slots,
hdr->myslots,sizeof(hdr->myslots)) != 0;
}
}
/* 1) If the sender of the message is a master, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
if (sender && nodeIsMaster(sender) && dirty_slots)
clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
/* 2) We also check for the reverse condition, that is, the sender
* claims to serve slots we know are served by a master with a
* greater configEpoch. If this happens we inform the sender.
*
* This is useful because sometimes after a partition heals, a
* reappearing master may be the last one to claim a given set of
* hash slots, but with a configuration that other instances know to
* be deprecated. Example:
*
* A and B are master and slave for slots 1,2,3.
* A is partitioned away, B gets promoted.
* B is partitioned away, and A returns available.
*
* Usually B would PING A publishing its set of served slots and its
* configEpoch, but because of the partition B can't inform A of the
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a master (or can try to
* failover if there are the conditions to win the election). */
if (sender && dirty_slots) {
int j;
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(hdr->myslots,j)) {
if (server.cluster->slots[j] == sender ||
server.cluster->slots[j] == NULL) continue;
if (server.cluster->slots[j]->configEpoch >
senderConfigEpoch)
{
serverLog(LL_VERBOSE,
"Node %.40s has old slots configuration, sending "
"an UPDATE message about %.40s",
sender->name, server.cluster->slots[j]->name);
clusterSendUpdate(sender->link,
server.cluster->slots[j]);
/* TODO: instead of exiting the loop send every other
* UPDATE packet for other nodes that are the new owner
* of sender's slots. */
break;
}
}
}
}
/* If our config epoch collides with the sender's try to fix
* the problem. */
if (sender &&
nodeIsMaster(myself) && nodeIsMaster(sender) &&
senderConfigEpoch == myself->configEpoch)
{
clusterHandleConfigEpochCollision(sender);
}
/* Get info from the gossip section */
if (sender) clusterProcessGossipSection(hdr,link);
} else if (type == CLUSTERMSG_TYPE_FAIL) {
clusterNode *failing;
if (sender) {
failing = clusterLookupNode(hdr->data.fail.about.nodename);
if (failing &&
!(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
{
serverLog(LL_NOTICE,
"FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
failing->flags |= CLUSTER_NODE_FAIL;
failing->fail_time = mstime();
failing->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
} else {
serverLog(LL_NOTICE,
"Ignoring FAIL message from unknown node %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
}
} else if (type == CLUSTERMSG_TYPE_PUBLISH) {
robj *channel, *message;
uint32_t channel_len, message_len;
/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
if (dictSize(server.pubsub_channels) ||
listLength(server.pubsub_patterns))
{
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject(
(char*)hdr->data.publish.msg.bulk_data,channel_len);
message = createStringObject(
(char*)hdr->data.publish.msg.bulk_data+channel_len,
message_len);
pubsubPublishMessage(channel,message);
decrRefCount(channel);
decrRefCount(message);
}
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
if (!sender) return 1; /* We don't know that node. */
clusterSendFailoverAuthIfNeeded(sender,hdr);
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
if (!sender) return 1; /* We don't know that node. */
/* We consider this vote only if the sender is a master serving
* a non zero number of slots, and its currentEpoch is greater or
* equal to epoch where this node started the election. */
if (nodeIsMaster(sender) && sender->numslots > 0 &&
senderCurrentEpoch >= server.cluster->failover_auth_epoch)
{
server.cluster->failover_auth_count++;
/* Maybe we reached a quorum here, set a flag to make sure
* we check ASAP. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
} else if (type == CLUSTERMSG_TYPE_MFSTART) {
/* This message is acceptable only if I'm a master and the sender
* is one of my slaves. */
if (!sender || sender->slaveof != myself) return 1;
/* Manual failover requested from slaves. Initialize the state
* accordingly. */
resetManualFailover();
server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
server.cluster->mf_slave = sender;
pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2));
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
sender->name);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
clusterNode *n; /* The node the update is about. */
uint64_t reportedConfigEpoch =
ntohu64(hdr->data.update.nodecfg.configEpoch);
if (!sender) return 1; /* We don't know the sender. */
n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
if (!n) return 1; /* We don't know the reported node. */
if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
/* If in our current config the node is a slave, set it as a master. */
if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
/* Update the node's configEpoch. */
n->configEpoch = reportedConfigEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG);
/* Check the bitmap of served slots and update our
* config accordingly. */
clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
hdr->data.update.nodecfg.slots);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
if (!sender) return 1; /* Protect the module from unknown nodes. */
/* We need to route this message back to the right module subscribed
* for the right message type. */
uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
uint32_t len = ntohl(hdr->data.module.msg.len);
uint8_t type = hdr->data.module.msg.type;
unsigned char *payload = hdr->data.module.msg.bulk_data;
moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
} else {
serverLog(LL_WARNING,"Received unknown packet type: %d", type);
}
return 1;
}
/* This function is called when we detect the link with this node is lost.
We set the node as no longer connected. The Cluster Cron will detect
this connection and will try to get it connected again.
Instead if the node is a temporary node used to accept a query, we
completely free the node on error. */
void handleLinkIOError(clusterLink *link) {
freeClusterLink(link);
}
/* Send data. This is handled using a trivial send buffer that gets
* consumed by write(). We don't try to optimize this for speed too much
* as this is a very low traffic channel. */
void clusterWriteHandler(connection *conn) {
clusterLink *link = connGetPrivateData(conn);
ssize_t nwritten;
nwritten = connWrite(conn, link->sndbuf, sdslen(link->sndbuf));
if (nwritten <= 0) {
serverLog(LL_DEBUG,"I/O error writing to node link: %s",
(nwritten == -1) ? connGetLastError(conn) : "short write");
handleLinkIOError(link);
return;
}
sdsrange(link->sndbuf,nwritten,-1);
if (sdslen(link->sndbuf) == 0)
connSetWriteHandler(link->conn, NULL);
}
/* A connect handler that gets called when a connection to another node
* gets established.
*/
void clusterLinkConnectHandler(connection *conn) {
clusterLink *link = connGetPrivateData(conn);
clusterNode *node = link->node;
/* Check if connection succeeded */
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s",
node->name, node->ip, node->cport,
connGetLastError(conn));
freeClusterLink(link);
return;
}
/* Register a read handler from now on */
connSetReadHandler(conn, clusterReadHandler);
/* Queue a PING in the new connection ASAP: this is crucial
* to avoid false positives in failure detection.
*
* If the node is flagged as MEET, we send a MEET message instead
* of a PING one, to force the receiver to add us in its node
* table. */
mstime_t old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}
/* We can clear the flag after the first packet is sent.
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets. */
node->flags &= ~CLUSTER_NODE_MEET;
serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->cport);
}
/* Read data. Try to read the first field of the header first to check the
* full length of the packet. When a whole packet is in memory this function
* will call the function to process the packet. And so forth. */
void clusterReadHandler(connection *conn) {
clusterMsg buf[1];
ssize_t nread;
clusterMsg *hdr;
clusterLink *link = connGetPrivateData(conn);
unsigned int readlen, rcvbuflen;
while(1) { /* Read as long as there is data to read. */
rcvbuflen = sdslen(link->rcvbuf);
if (rcvbuflen < 8) {
/* First, obtain the first 8 bytes to get the full message
* length. */
readlen = 8 - rcvbuflen;
} else {
/* Finally read the full message. */
hdr = (clusterMsg*) link->rcvbuf;
if (rcvbuflen == 8) {
/* Perform some sanity check on the message signature
* and length. */
if (memcmp(hdr->sig,"RCmb",4) != 0 ||
ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
{
serverLog(LL_WARNING,
"Bad message length or signature received "
"from Cluster bus.");
handleLinkIOError(link);
return;
}
}
readlen = ntohl(hdr->totlen) - rcvbuflen;
if (readlen > sizeof(buf)) readlen = sizeof(buf);
}
nread = connRead(conn,buf,readlen);
if (nread == -1 && (connGetState(conn) == CONN_STATE_CONNECTED)) return; /* No more data ready. */
if (nread <= 0) {
/* I/O error... */
serverLog(LL_DEBUG,"I/O error reading from node link: %s",
(nread == 0) ? "connection closed" : connGetLastError(conn));
handleLinkIOError(link);
return;
} else {
/* Read data and recast the pointer to the new buffer. */
link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
hdr = (clusterMsg*) link->rcvbuf;
rcvbuflen += nread;
}
/* Total length obtained? Process this packet. */
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
if (clusterProcessPacket(link)) {
sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty();
} else {
return; /* Link no longer valid. */
}
}
}
}
/* Put stuff into the send buffer.
*
* It is guaranteed that this function will never have as a side effect
* the link to be invalidated, so it is safe to call this function
* from event handlers that will do stuff with the same link later. */
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
if (sdslen(link->sndbuf) == 0 && msglen != 0)
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
/* Populate sent messages stats. */
clusterMsg *hdr = (clusterMsg*) msg;
uint16_t type = ntohs(hdr->type);
if (type < CLUSTERMSG_TYPE_COUNT)
server.cluster->stats_bus_messages_sent[type]++;
}
/* Send a message to all the nodes that are part of the cluster having
* a connected link.
*
* It is guaranteed that this function will never have as a side effect
* some node->link to be invalidated, so it is safe to call this function
* from event handlers that will do stuff with node links later. */
void clusterBroadcastMessage(void *buf, size_t len) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (!node->link) continue;
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
clusterSendMessage(node->link,buf,len);
}
dictReleaseIterator(di);
}
/* Build the message header. hdr must point to a buffer at least
* sizeof(clusterMsg) in bytes. */
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
int totlen = 0;
uint64_t offset;
clusterNode *master;
/* If this node is a master, we send its slots bitmap and configEpoch.
* If this node is a slave we send the master's information instead (the
* node is flagged as slave so the receiver knows that it is NOT really
* in charge for this slots. */
master = (nodeIsSlave(myself) && myself->slaveof) ?
myself->slaveof : myself;
memset(hdr,0,sizeof(*hdr));
hdr->ver = htons(CLUSTER_PROTO_VER);
hdr->sig[0] = 'R';
hdr->sig[1] = 'C';
hdr->sig[2] = 'm';
hdr->sig[3] = 'b';
hdr->type = htons(type);
memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
/* If cluster-announce-ip option is enabled, force the receivers of our
* packets to use the specified address for this node. Otherwise if the
* first byte is zero, they'll do auto discovery. */
memset(hdr->myip,0,NET_IP_STR_LEN);
if (server.cluster_announce_ip) {
strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
hdr->myip[NET_IP_STR_LEN-1] = '\0';
}
/* Handle cluster-announce-port as well. */
int port = server.tls_cluster ? server.tls_port : server.port;
int announced_port = server.cluster_announce_port ?
server.cluster_announce_port : port;
int announced_cport = server.cluster_announce_bus_port ?
server.cluster_announce_bus_port :
(port + CLUSTER_PORT_INCR);
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
memset(hdr->slaveof,0,CLUSTER_NAMELEN);
if (myself->slaveof != NULL)
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
hdr->port = htons(announced_port);
hdr->cport = htons(announced_cport);
hdr->flags = htons(myself->flags);
hdr->state = server.cluster->state;
/* Set the currentEpoch and configEpochs. */
hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
hdr->configEpoch = htonu64(master->configEpoch);
/* Set the replication offset. */
if (nodeIsSlave(myself))
offset = replicationGetSlaveOffset();
else
offset = server.master_repl_offset;
hdr->offset = htonu64(offset);
/* Set the message flags. */
if (nodeIsMaster(myself) && server.cluster->mf_end)
hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
/* Compute the message length for certain messages. For other messages
* this is up to the caller. */
if (type == CLUSTERMSG_TYPE_FAIL) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataFail);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataUpdate);
}
hdr->totlen = htonl(totlen);
/* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
}
/* Return non zero if the node is already present in the gossip section of the
* message pointed by 'hdr' and having 'count' gossip entries. Otherwise
* zero is returned. Helper for clusterSendPing(). */
int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
int j;
for (j = 0; j < count; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
CLUSTER_NAMELEN) == 0) break;
}
return j != count;
}
/* Set the i-th entry of the gossip section in the message pointed by 'hdr'
* to the info of the specified node 'n'. */
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
clusterMsgDataGossip *gossip;
gossip = &(hdr->data.ping.gossip[i]);
memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
gossip->ping_sent = htonl(n->ping_sent/1000);
gossip->pong_received = htonl(n->pong_received/1000);
memcpy(gossip->ip,n->ip,sizeof(n->ip));
gossip->port = htons(n->port);
gossip->cport = htons(n->cport);
gossip->flags = htons(n->flags);
gossip->notused1 = 0;
}
/* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip informations. */
void clusterSendPing(clusterLink *link, int type) {
unsigned char *buf;
clusterMsg *hdr;
int gossipcount = 0; /* Number of gossip sections added so far. */
int wanted; /* Number of gossip sections we want to append if possible. */
int totlen; /* Total packet length. */
/* freshnodes is the max number of nodes we can hope to append at all:
* nodes available minus two (ourself and the node we are sending the
* message to). However practically there may be less valid nodes since
* nodes in handshake state, disconnected, are not considered. */
int freshnodes = dictSize(server.cluster->nodes)-2;
/* How many gossip sections we want to add? 1/10 of the number of nodes
* and anyway at least 3. Why 1/10?
*
* If we have N masters, with N/10 entries, and we consider that in
* node_timeout we exchange with each other node at least 4 packets
* (we ping in the worst case in node_timeout/2 time, and we also
* receive two pings from the host), we have a total of 8 packets
* in the node_timeout*2 falure reports validity time. So we have
* that, for a single PFAIL node, we can expect to receive the following
* number of failure reports (in the specified window of time):
*
* PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
*
* PROB = probability of being featured in a single gossip entry,
* which is 1 / NUM_OF_NODES.
* ENTRIES = 10.
* TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
*
* If we assume we have just masters (so num of nodes and num of masters
* is the same), with 1/10 we always get over the majority, and specifically
* 80% of the number of nodes, to account for many masters failing at the
* same time.
*
* Since we have non-voting slaves that lower the probability of an entry
* to feature our node, we set the number of entries per packet as
* 10% of the total nodes we have. */
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
/* Include all the nodes in PFAIL state, so that failure reports are
* faster to propagate to go from PFAIL to FAIL state. */
int pfail_wanted = server.cluster->stats_pfail_nodes;
/* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
* later according to the number of gossip sections we really were able
* to put inside the packet. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
buf = zcalloc(totlen);
hdr = (clusterMsg*) buf;
/* Populate the header. */
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
clusterBuildMessageHdr(hdr,type);
/* Populate the gossip fields */
int maxiterations = wanted*3;
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't include this node: the whole packet header is about us
* already, so we just gossip about other nodes. */
if (this == myself) continue;
/* PFAIL nodes will be added later. */
if (this->flags & CLUSTER_NODE_PFAIL) continue;
/* In the gossip section don't include:
* 1) Nodes in HANDSHAKE state.
* 3) Nodes with the NOADDR flag set.
* 4) Disconnected nodes if they don't have configured slots.
*/
if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* Tecnically not correct, but saves CPU. */
continue;
}
/* Do not add a node we already have. */
if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
/* Add it */
clusterSetGossipEntry(hdr,gossipcount,this);
freshnodes--;
gossipcount++;
}
/* If there are PFAIL nodes, add them at the end. */
if (pfail_wanted) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
clusterNode *node = dictGetVal(de);
if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
if (node->flags & CLUSTER_NODE_NOADDR) continue;
if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
clusterSetGossipEntry(hdr,gossipcount,node);
freshnodes--;
gossipcount++;
/* We take the count of the slots we allocated, since the
* PFAIL stats may not match perfectly with the current number
* of PFAIL nodes. */
pfail_wanted--;
}
dictReleaseIterator(di);
}
/* Ready to send... fix the totlen fiend and queue the message in the
* output buffer. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
hdr->count = htons(gossipcount);
hdr->totlen = htonl(totlen);
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
/* Send a PONG packet to every connected node that's not in handshake state
* and for which we have a valid link.
*
* In Redis Cluster pongs are not used just for failure detection, but also
* to carry important configuration information. So broadcasting a pong is
* useful when something changes in the configuration and we want to make
* the cluster aware ASAP (for instance after a slave promotion).
*
* The 'target' argument specifies the receiving instances using the
* defines below:
*
* CLUSTER_BROADCAST_ALL -> All known instances.
* CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
*/
#define CLUSTER_BROADCAST_ALL 0
#define CLUSTER_BROADCAST_LOCAL_SLAVES 1
void clusterBroadcastPong(int target) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (!node->link) continue;
if (node == myself || nodeInHandshake(node)) continue;
if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
int local_slave =
nodeIsSlave(node) && node->slaveof &&
(node->slaveof == myself || node->slaveof == myself->slaveof);
if (!local_slave) continue;
}
clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
}
dictReleaseIterator(di);
}
/* Send a PUBLISH message.
*
* If link is NULL, then the message is broadcasted to the whole cluster. */
void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
unsigned char *payload;
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
uint32_t channel_len, message_len;
channel = getDecodedObject(channel);
message = getDecodedObject(message);
channel_len = sdslen(channel->ptr);
message_len = sdslen(message->ptr);
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
hdr->data.publish.msg.channel_len = htonl(channel_len);
hdr->data.publish.msg.message_len = htonl(message_len);
hdr->totlen = htonl(totlen);
/* Try to use the local buffer if possible */
if (totlen < sizeof(buf)) {
payload = (unsigned char*)buf;
} else {
payload = zmalloc(totlen);
memcpy(payload,hdr,sizeof(*hdr));
hdr = (clusterMsg*) payload;
}
memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
message->ptr,sdslen(message->ptr));
if (link)
clusterSendMessage(link,payload,totlen);
else
clusterBroadcastMessage(payload,totlen);
decrRefCount(channel);
decrRefCount(message);
if (payload != (unsigned char*)buf) zfree(payload);
}
/* Send a FAIL message to all the nodes we are able to contact.
* The FAIL message is sent when we detect that a node is failing
* (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
* we switch the node state to CLUSTER_NODE_FAIL and ask all the other
* nodes to do the same ASAP. */
void clusterSendFail(char *nodename) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
clusterBroadcastMessage(buf,ntohl(hdr->totlen));
}
/* Send an UPDATE message to the specified link carrying the specified 'node'
* slots configuration. The node name, slots bitmap, and configEpoch info
* are included. */
void clusterSendUpdate(clusterLink *link, clusterNode *node) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
if (link == NULL) return;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen));
}
/* Send a MODULE message.
*
* If link is NULL, then the message is broadcasted to the whole cluster. */
void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
unsigned char *payload, uint32_t len) {
unsigned char *heapbuf;
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgModule) - 3 + len;
hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
hdr->data.module.msg.type = type;
hdr->data.module.msg.len = htonl(len);
hdr->totlen = htonl(totlen);
/* Try to use the local buffer if possible */
if (totlen < sizeof(buf)) {
heapbuf = (unsigned char*)buf;
} else {
heapbuf = zmalloc(totlen);
memcpy(heapbuf,hdr,sizeof(*hdr));
hdr = (clusterMsg*) heapbuf;
}
memcpy(hdr->data.module.msg.bulk_data,payload,len);
if (link)
clusterSendMessage(link,heapbuf,totlen);
else
clusterBroadcastMessage(heapbuf,totlen);
if (heapbuf != (unsigned char*)buf) zfree(heapbuf);
}
/* This function gets a cluster node ID string as target, the same way the nodes
* addresses are represented in the modules side, resolves the node, and sends
* the message. If the target is NULL the message is broadcasted.
*
* The function returns C_OK if the target is valid, otherwise C_ERR is
* returned. */
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
clusterNode *node = NULL;
if (target != NULL) {
node = clusterLookupNode(target);
if (node == NULL || node->link == NULL) return C_ERR;
}
clusterSendModule(target ? node->link : NULL,
module_id, type, payload, len);
return C_OK;
}
/* -----------------------------------------------------------------------------
* CLUSTER Pub/Sub support
*
* For now we do very little, just propagating PUBLISH messages across the whole
* cluster. In the future we'll try to get smarter and avoiding propagating those
* messages to hosts without receives for a given channel.
* -------------------------------------------------------------------------- */
void clusterPropagatePublish(robj *channel, robj *message) {
clusterSendPublish(NULL, channel, message);
}
/* -----------------------------------------------------------------------------
* SLAVE node specific functions
* -------------------------------------------------------------------------- */
/* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
* see if there is the quorum for this slave instance to failover its failing
* master.
*
* Note that we send the failover request to everybody, master and slave nodes,
* but only the masters are supposed to reply to our query. */
void clusterRequestFailoverAuth(void) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
/* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
* in the header to communicate the nodes receiving the message that
* they should authorized the failover even if the master is working. */
if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
clusterBroadcastMessage(buf,totlen);
}
/* Send a FAILOVER_AUTH_ACK message to the specified node. */
void clusterSendFailoverAuth(clusterNode *node) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
if (!node->link) return;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
clusterSendMessage(node->link,(unsigned char*)buf,totlen);
}
/* Send a MFSTART message to the specified node. */
void clusterSendMFStart(clusterNode *node) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
if (!node->link) return;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
clusterSendMessage(node->link,(unsigned char*)buf,totlen);
}
/* Vote for the node asking for our vote if there are the conditions. */
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
clusterNode *master = node->slaveof;
uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
unsigned char *claimed_slots = request->myslots;
int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
int j;
/* IF we are not a master serving at least 1 slot, we don't have the
* right to vote, as the cluster size in Redis Cluster is the number
* of masters serving at least one slot, and quorum is the cluster
* size + 1 */
if (nodeIsSlave(myself) || myself->numslots == 0) return;
/* Request epoch must be >= our currentEpoch.
* Note that it is impossible for it to actually be greater since
* our currentEpoch was updated as a side effect of receiving this
* request, if the request epoch was greater. */
if (requestCurrentEpoch < server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
node->name,
(unsigned long long) requestCurrentEpoch,
(unsigned long long) server.cluster->currentEpoch);
return;
}
/* I already voted for this epoch? Return ASAP. */
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: already voted for epoch %llu",
node->name,
(unsigned long long) server.cluster->currentEpoch);
return;
}
/* Node must be a slave and its master down.
* The master can be non failing if the request is flagged
* with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
if (nodeIsMaster(node) || master == NULL ||
(!nodeFailed(master) && !force_ack))
{
if (nodeIsMaster(node)) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: it is a master node",
node->name);
} else if (master == NULL) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: I don't know its master",
node->name);
} else if (!nodeFailed(master)) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: its master is up",
node->name);
}
return;
}
/* We did not voted for a slave about this master for two
* times the node timeout. This is not strictly needed for correctness
* of the algorithm but makes the base case more linear. */
if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
{
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"can't vote about this master before %lld milliseconds",
node->name,
(long long) ((server.cluster_node_timeout*2)-
(mstime() - node->slaveof->voted_time)));
return;
}
/* The slave requesting the vote must have a configEpoch for the claimed
* slots that is >= the one of the masters currently serving the same
* slots in the current configuration. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(claimed_slots, j) == 0) continue;
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
{
continue;
}
/* If we reached this point we found a slot that in our current slots
* is served by a master with a greater configEpoch than the one claimed
* by the slave requesting our vote. Refuse to vote for this slave. */
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"slot %d epoch (%llu) > reqEpoch (%llu)",
node->name, j,
(unsigned long long) server.cluster->slots[j]->configEpoch,
(unsigned long long) requestConfigEpoch);
return;
}
/* We can vote for this slave. */
server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
node->slaveof->voted_time = mstime();
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
clusterSendFailoverAuth(node);
serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
node->name, (unsigned long long) server.cluster->currentEpoch);
}
/* This function returns the "rank" of this instance, a slave, in the context
* of its master-slaves ring. The rank of the slave is given by the number of
* other slaves for the same master that have a better replication offset
* compared to the local one (better means, greater, so they claim more data).
*
* A slave with rank 0 is the one with the greatest (most up to date)
* replication offset, and so forth. Note that because how the rank is computed
* multiple slaves may have the same rank, in case they have the same offset.
*
* The slave rank is used to add a delay to start an election in order to
* get voted and replace a failing master. Slaves with better replication
* offsets are more likely to win. */
int clusterGetSlaveRank(void) {
long long myoffset;
int j, rank = 0;
clusterNode *master;
serverAssert(nodeIsSlave(myself));
master = myself->slaveof;
if (master == NULL) return 0; /* Never called by slaves without master. */
myoffset = replicationGetSlaveOffset();
for (j = 0; j < master->numslaves; j++)
if (master->slaves[j] != myself &&
!nodeCantFailover(master->slaves[j]) &&
master->slaves[j]->repl_offset > myoffset) rank++;
return rank;
}
/* This function is called by clusterHandleSlaveFailover() in order to
* let the slave log why it is not able to failover. Sometimes there are
* not the conditions, but since the failover function is called again and
* again, we can't log the same things continuously.
*
* This function works by logging only if a given set of conditions are
* true:
*
* 1) The reason for which the failover can't be initiated changed.
* The reasons also include a NONE reason we reset the state to
* when the slave finds that its master is fine (no FAIL flag).
* 2) Also, the log is emitted again if the master is still down and
* the reason for not failing over is still the same, but more than
* CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
* 3) Finally, the function only logs if the slave is down for more than
* five seconds + NODE_TIMEOUT. This way nothing is logged when a
* failover starts in a reasonable time.
*
* The function is called with the reason why the slave can't failover
* which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
*
* The function is guaranteed to be called only if 'myself' is a slave. */
void clusterLogCantFailover(int reason) {
char *msg;
static time_t lastlog_time = 0;
mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
/* Don't log if we have the same reason for some time. */
if (reason == server.cluster->cant_failover_reason &&
time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
return;
server.cluster->cant_failover_reason = reason;
/* We also don't emit any log if the master failed no long ago, the
* goal of this function is to log slaves in a stalled condition for
* a long time. */
if (myself->slaveof &&
nodeFailed(myself->slaveof) &&
(mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
switch(reason) {
case CLUSTER_CANT_FAILOVER_DATA_AGE:
msg = "Disconnected from master for longer than allowed. "
"Please check the 'cluster-replica-validity-factor' configuration "
"option.";
break;
case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
msg = "Waiting the delay before I can start a new failover.";
break;
case CLUSTER_CANT_FAILOVER_EXPIRED:
msg = "Failover attempt expired.";
break;
case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
msg = "Waiting for votes, but majority still not reached.";
break;
default:
msg = "Unknown reason code.";
break;
}
lastlog_time = time(NULL);
serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
}
/* This function implements the final part of automatic and manual failovers,
* where the slave grabs its master's hash slots, and propagates the new
* configuration.
*
* Note that it's up to the caller to be sure that the node got a new
* configuration epoch already. */
void clusterFailoverReplaceYourMaster(void) {
int j;
clusterNode *oldmaster = myself->slaveof;
if (nodeIsMaster(myself) || oldmaster == NULL) return;
/* 1) Turn this node into a master. */
clusterSetNodeAsMaster(myself);
replicationUnsetMaster();
/* 2) Claim all the slots assigned to our master. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (clusterNodeGetSlotBit(oldmaster,j)) {
clusterDelSlot(j);
clusterAddSlot(myself,j);
}
}
/* 3) Update state and save config. */
clusterUpdateState();
clusterSaveConfigOrDie(1);
/* 4) Pong all the other nodes so that they can update the state
* accordingly and detect that we switched to master role. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
/* 5) If there was a manual failover in progress, clear the state. */
resetManualFailover();
}
/* This function is called if we are a slave node and our master serving
* a non-zero amount of hash slots is in FAIL state.
*
* The gaol of this function is:
* 1) To check if we are able to perform a failover, is our data updated?
* 2) Try to get elected by masters.
* 3) Perform the failover informing all the other nodes.
*/
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
int needed_quorum = (server.cluster->size / 2) + 1;
int manual_failover = server.cluster->mf_end != 0 &&
server.cluster->mf_can_start;
mstime_t auth_timeout, auth_retry_time;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
/* Compute the failover timeout (the max time we have to send votes
* and wait for replies), and the failover retry time (the time to wait
* before trying to get voted again).
*
* Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
* Retry is two times the Timeout.
*/
auth_timeout = server.cluster_node_timeout*2;
if (auth_timeout < 2000) auth_timeout = 2000;
auth_retry_time = auth_timeout*2;
/* Pre conditions to run the function, that must be met both in case
* of an automatic or manual failover:
* 1) We are a slave.
* 2) Our master is flagged as FAIL, or this is a manual failover.
* 3) We don't have the no failover configuration set, and this is
* not a manual failover.
* 4) It is serving slots. */
if (nodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) ||
(server.cluster_slave_no_failover && !manual_failover) ||
myself->slaveof->numslots == 0)
{
/* There are no reasons to failover, so we set the reason why we
* are returning without failing over to NONE. */
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
return;
}
/* Set data_age to the number of seconds we are disconnected from
* the master. */
if (server.repl_state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
}
/* Remove the node timeout from the data age as it is fine that we are
* disconnected from our master at least for the time it was down to be
* flagged as FAIL, that's the baseline. */
if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;
/* Check if our data is recent enough according to the slave validity
* factor configured by the user.
*
* Check bypassed for manual failovers. */
if (server.cluster_slave_validity_factor &&
data_age >
(((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * server.cluster_slave_validity_factor)))
{
if (!manual_failover) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
return;
}
}
/* If the previous failover attempt timedout and the retry time has
* elapsed, we can setup a new one. */
if (auth_age > auth_retry_time) {
server.cluster->failover_auth_time = mstime() +
500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
random() % 500; /* Random delay between 0 and 500 milliseconds. */
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_auth_rank = clusterGetSlaveRank();
/* We add another delay that is proportional to the slave rank.
* Specifically 1 second * rank. This way slaves that have a probably
* less updated replication offset, are penalized. */
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;
/* However if this is a manual failover, no delay is needed. */
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
serverLog(LL_WARNING,
"Start of election delayed for %lld milliseconds "
"(rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(),
server.cluster->failover_auth_rank,
replicationGetSlaveOffset());
/* Now that we have a scheduled election, broadcast our offset
* to all the other slaves so that they'll updated their offsets
* if our offset is better. */
clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
return;
}
/* It is possible that we received more updated offsets from other
* slaves for the same master since we computed our election delay.
* Update the delay if our rank changed.
*
* Not performed if this is a manual failover. */
if (server.cluster->failover_auth_sent == 0 &&
server.cluster->mf_end == 0)
{
int newrank = clusterGetSlaveRank();
if (newrank > server.cluster->failover_auth_rank) {
long long added_delay =
(newrank - server.cluster->failover_auth_rank) * 1000;
server.cluster->failover_auth_time += added_delay;
server.cluster->failover_auth_rank = newrank;
serverLog(LL_WARNING,
"Replica rank updated to #%d, added %lld milliseconds of delay.",
newrank, added_delay);
}
}
/* Return ASAP if we can't still start the election. */
if (mstime() < server.cluster->failover_auth_time) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
return;
}
/* Return ASAP if the election is too old to be valid. */
if (auth_age > auth_timeout) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
return;
}
/* Ask for votes if needed. */
if (server.cluster->failover_auth_sent == 0) {
server.cluster->currentEpoch++;
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
(unsigned long long) server.cluster->currentEpoch);
clusterRequestFailoverAuth();
server.cluster->failover_auth_sent = 1;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
return; /* Wait for replies. */
}
/* Check if we reached the quorum. */
if (server.cluster->failover_auth_count >= needed_quorum) {
/* We have the quorum, we can finally failover the master. */
serverLog(LL_WARNING,
"Failover election won: I'm the new master.");
/* Update my configEpoch to the epoch of the election. */
if (myself->configEpoch < server.cluster->failover_auth_epoch) {
myself->configEpoch = server.cluster->failover_auth_epoch;
serverLog(LL_WARNING,
"configEpoch set to %llu after successful failover",
(unsigned long long) myself->configEpoch);
}
/* Take responsibility for the cluster slots. */
clusterFailoverReplaceYourMaster();
} else {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
}
}
/* -----------------------------------------------------------------------------
* CLUSTER slave migration
*
* Slave migration is the process that allows a slave of a master that is
* already covered by at least another slave, to "migrate" to a master that
* is orpaned, that is, left with no working slaves.
* ------------------------------------------------------------------------- */
/* This function is responsible to decide if this replica should be migrated
* to a different (orphaned) master. It is called by the clusterCron() function
* only if:
*
* 1) We are a slave node.
* 2) It was detected that there is at least one orphaned master in
* the cluster.
* 3) We are a slave of one of the masters with the greatest number of
* slaves.
*
* This checks are performed by the caller since it requires to iterate
* the nodes anyway, so we spend time into clusterHandleSlaveMigration()
* if definitely needed.
*
* The fuction is called with a pre-computed max_slaves, that is the max
* number of working (not in FAIL state) slaves for a single master.
*
* Additional conditions for migration are examined inside the function.
*/
void clusterHandleSlaveMigration(int max_slaves) {
int j, okslaves = 0;
clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
dictIterator *di;
dictEntry *de;
/* Step 1: Don't migrate if the cluster state is not ok. */
if (server.cluster->state != CLUSTER_OK) return;
/* Step 2: Don't migrate if my master will not be left with at least
* 'migration-barrier' slaves after my migration. */
if (mymaster == NULL) return;
for (j = 0; j < mymaster->numslaves; j++)
if (!nodeFailed(mymaster->slaves[j]) &&
!nodeTimedOut(mymaster->slaves[j])) okslaves++;
if (okslaves <= server.cluster_migration_barrier) return;
/* Step 3: Identify a candidate for migration, and check if among the
* masters with the greatest number of ok slaves, I'm the one with the
* smallest node ID (the "candidate slave").
*
* Note: this means that eventually a replica migration will occur
* since slaves that are reachable again always have their FAIL flag
* cleared, so eventually there must be a candidate. At the same time
* this does not mean that there are no race conditions possible (two
* slaves migrating at the same time), but this is unlikely to
* happen, and harmless when happens. */
candidate = myself;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
int okslaves = 0, is_orphaned = 1;
/* We want to migrate only if this master is working, orphaned, and
* used to have slaves or if failed over a master that had slaves
* (MIGRATE_TO flag). This way we only migrate to instances that were
* supposed to have replicas. */
if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
/* Check number of working slaves. */
if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
if (okslaves > 0) is_orphaned = 0;
if (is_orphaned) {
if (!target && node->numslots > 0) target = node;
/* Track the starting time of the orphaned condition for this
* master. */
if (!node->orphaned_time) node->orphaned_time = mstime();
} else {
node->orphaned_time = 0;
}
/* Check if I'm the slave candidate for the migration: attached
* to a master with the maximum number of slaves and with the smallest
* node ID. */
if (okslaves == max_slaves) {
for (j = 0; j < node->numslaves; j++) {
if (memcmp(node->slaves[j]->name,
candidate->name,
CLUSTER_NAMELEN) < 0)
{
candidate = node->slaves[j];
}
}
}
}
dictReleaseIterator(di);
/* Step 4: perform the migration if there is a target, and if I'm the
* candidate, but only if the master is continuously orphaned for a
* couple of seconds, so that during failovers, we give some time to
* the natural slaves of this instance to advertise their switch from
* the old master to the new one. */
if (target && candidate == myself &&
(mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
{
serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
target->name);
clusterSetMaster(target);
}
}
/* -----------------------------------------------------------------------------
* CLUSTER manual failover
*
* This are the important steps performed by slaves during a manual failover:
* 1) User send CLUSTER FAILOVER command. The failover state is initialized
* setting mf_end to the millisecond unix time at which we'll abort the
* attempt.
* 2) Slave sends a MFSTART message to the master requesting to pause clients
* for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
* When master is paused for manual failover, it also starts to flag
* packets with CLUSTERMSG_FLAG0_PAUSED.
* 3) Slave waits for master to send its replication offset flagged as PAUSED.
* 4) If slave received the offset from the master, and its offset matches,
* mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
* the failover as usually, with the difference that the vote request
* will be modified to force masters to vote for a slave that has a
* working master.
*
* From the point of view of the master things are simpler: when a
* PAUSE_CLIENTS packet is received the master sets mf_end as well and
* the sender in mf_slave. During the time limit for the manual failover
* the master will just send PINGs more often to this slave, flagged with
* the PAUSED flag, so that the slave will set mf_master_offset when receiving
* a packet from the master with this flag set.
*
* The gaol of the manual failover is to perform a fast failover without
* data loss due to the asynchronous master-slave replication.
* -------------------------------------------------------------------------- */
/* Reset the manual failover state. This works for both masters and slavesa
* as all the state about manual failover is cleared.
*
* The function can be used both to initialize the manual failover state at
* startup or to abort a manual failover in progress. */
void resetManualFailover(void) {
if (server.cluster->mf_end && clientsArePaused()) {
server.clients_pause_end_time = 0;
clientsArePaused(); /* Just use the side effect of the function. */
}
server.cluster->mf_end = 0; /* No manual failover in progress. */
server.cluster->mf_can_start = 0;
server.cluster->mf_slave = NULL;
server.cluster->mf_master_offset = 0;
}
/* If a manual failover timed out, abort it. */
void manualFailoverCheckTimeout(void) {
if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
serverLog(LL_WARNING,"Manual failover timed out.");
resetManualFailover();
}
}
/* This function is called from the cluster cron function in order to go
* forward with a manual failover state machine. */
void clusterHandleManualFailover(void) {
/* Return ASAP if no manual failover is in progress. */
if (server.cluster->mf_end == 0) return;
/* If mf_can_start is non-zero, the failover was already triggered so the
* next steps are performed by clusterHandleSlaveFailover(). */
if (server.cluster->mf_can_start) return;
if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
/* Our replication offset matches the master replication offset
* announced after clients were paused. We can start the failover. */
server.cluster->mf_can_start = 1;
serverLog(LL_WARNING,
"All master replication stream processed, "
"manual failover can start.");