From 2669fb8364c4c4080b7b75809ca94fc8022151de Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 9 Nov 2016 11:31:06 +0100 Subject: [PATCH] PSYNC2: different improvements to Redis replication. The gist of the changes is that now, partial resynchronizations between slaves and masters (without the need of a full resync with RDB transfer and so forth), work in a number of cases when it was impossible in the past. For instance: 1. When a slave is promoted to mastrer, the slaves of the old master can partially resynchronize with the new master. 2. Chained slalves (slaves of slaves) can be moved to replicate to other slaves or the master itsef, without requiring a full resync. 3. The master itself, after being turned into a slave, is able to partially resynchronize with the new master, when it joins replication again. In order to obtain this, the following main changes were operated: * Slaves also take a replication backlog, not just masters. * Same stream replication for all the slaves and sub slaves. The replication stream is identical from the top level master to its slaves and is also the same from the slaves to their sub-slaves and so forth. This means that if a slave is later promoted to master, it has the same replication backlong, and can partially resynchronize with its slaves (that were previously slaves of the old master). * A given replication history is no longer identified by the `runid` of a Redis node. There is instead a `replication ID` which changes every time the instance has a new history no longer coherent with the past one. So, for example, slaves publish the same replication history of their master, however when they are turned into masters, they publish a new replication ID, but still remember the old ID, so that they are able to partially resynchronize with slaves of the old master (up to a given offset). * The replication protocol was slightly modified so that a new extended +CONTINUE reply from the master is able to inform the slave of a replication ID change. * REPLCONF CAPA is used in order to notify masters that a slave is able to understand the new +CONTINUE reply. * The RDB file was extended with an auxiliary field that is able to select a given DB after loading in the slave, so that the slave can continue receiving the replication stream from the point it was disconnected without requiring the master to insert "SELECT" statements. This is useful in order to guarantee the "same stream" property, because the slave must be able to accumulate an identical backlog. * Slave pings to sub-slaves are now sent in a special form, when the top-level master is disconnected, in order to don't interfer with the replication stream. We just use out of band "\n" bytes as in other parts of the Redis protocol. An old design document is available here: https://gist.github.com/antirez/ae068f95c0d084891305 However the implementation is not identical to the description because during the work to implement it, different changes were needed in order to make things working well. --- redis.conf | 4 + src/aof.c | 4 +- src/db.c | 2 +- src/debug.c | 4 +- src/networking.c | 26 ++- src/rdb.c | 49 ++++-- src/rdb.h | 10 +- src/replication.c | 418 +++++++++++++++++++++++++++++++++++----------- src/server.c | 19 ++- src/server.h | 47 ++++-- 10 files changed, 440 insertions(+), 143 deletions(-) diff --git a/redis.conf b/redis.conf index a7b7f3a97cdd..bce5332e049d 100644 --- a/redis.conf +++ b/redis.conf @@ -402,6 +402,10 @@ repl-disable-tcp-nodelay no # need to elapse, starting from the time the last slave disconnected, for # the backlog buffer to be freed. # +# Note that slaves never free the backlog for timeout, since they may be +# promoted to masters later, and should be able to correctly "partially +# resynchronize" with the slaves: hence they should always accumulate backlog. +# # A value of 0 means to never release the backlog. # # repl-backlog-ttl 3600 diff --git a/src/aof.c b/src/aof.c index c75153cc7243..07d8561dace4 100644 --- a/src/aof.c +++ b/src/aof.c @@ -653,7 +653,7 @@ int loadAppendOnlyFile(char *filename) { serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; rioInitWithFile(&rdb,fp); - if (rdbLoadRio(&rdb) != C_OK) { + if (rdbLoadRio(&rdb,NULL) != C_OK) { serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); goto readerr; } else { @@ -1152,7 +1152,7 @@ int rewriteAppendOnlyFile(char *filename) { if (server.aof_use_rdb_preamble) { int error; - if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) { + if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } diff --git a/src/db.c b/src/db.c index 268e7c384301..55ae663c2bcb 100644 --- a/src/db.c +++ b/src/db.c @@ -413,7 +413,7 @@ void flushallCommand(client *c) { /* Normally rdbSave() will reset dirty, but we don't want this here * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ int saved_dirty = server.dirty; - rdbSave(server.rdb_filename); + rdbSave(server.rdb_filename,NULL); server.dirty = saved_dirty; } server.dirty++; diff --git a/src/debug.c b/src/debug.c index d48caedcc39b..f4689d53207b 100644 --- a/src/debug.c +++ b/src/debug.c @@ -320,12 +320,12 @@ void debugCommand(client *c) { if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]); serverAssertWithInfo(c,c->argv[0],1 == 2); } else if (!strcasecmp(c->argv[1]->ptr,"reload")) { - if (rdbSave(server.rdb_filename) != C_OK) { + if (rdbSave(server.rdb_filename,NULL) != C_OK) { addReply(c,shared.err); return; } emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); - if (rdbLoad(server.rdb_filename) != C_OK) { + if (rdbLoad(server.rdb_filename,NULL) != C_OK) { addReplyError(c,"Error trying to load the RDB dump"); return; } diff --git a/src/networking.c b/src/networking.c index 2be40ae15314..b2cec8631b23 100644 --- a/src/networking.c +++ b/src/networking.c @@ -352,6 +352,14 @@ void addReplySds(client *c, sds s) { } } +/* This low level function just adds whatever protocol you send it to the + * client buffer, trying the static buffer initially, and using the string + * of objects if not possible. + * + * It is efficient because does not create an SDS object nor an Redis object + * if not needed. The object will only be created by calling + * _addReplyStringToList() if we fail to extend the existing tail object + * in the list of objects. */ void addReplyString(client *c, const char *s, size_t len) { if (prepareClientToWrite(c) != C_OK) return; if (_addReplyToBuffer(c,s,len) != C_OK) @@ -1022,7 +1030,7 @@ int processInlineBuffer(client *c) { char *newline; int argc, j; sds *argv, aux; - size_t querylen; + size_t querylen, protolen; /* Search for end of line */ newline = strchr(c->querybuf,'\n'); @@ -1035,6 +1043,7 @@ int processInlineBuffer(client *c) { } return C_ERR; } + protolen = (newline - c->querybuf)+1; /* Total protocol bytes of command. */ /* Handle the \r\n case. */ if (newline && newline != c->querybuf && *(newline-1) == '\r') @@ -1057,6 +1066,15 @@ int processInlineBuffer(client *c) { if (querylen == 0 && c->flags & CLIENT_SLAVE) c->repl_ack_time = server.unixtime; + /* Newline from masters can be used to prevent timeouts, but should + * not affect the replication offset since they are always sent + * "out of band" directly writing to the socket and without passing + * from the output buffers. */ + if (querylen == 0 && c->flags & CLIENT_MASTER) { + c->reploff -= protolen; + while (protolen--) chopReplicationBacklog(); + } + /* Leave data after the first line of the query in the buffer */ sdsrange(c->querybuf,querylen+2,-1); @@ -1321,7 +1339,11 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; - if (c->flags & CLIENT_MASTER) c->reploff += nread; + if (c->flags & CLIENT_MASTER) { + c->reploff += nread; + replicationFeedSlavesFromMasterStream(server.slaves, + c->querybuf+qblen,nread); + } server.stat_net_input_bytes += nread; if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); diff --git a/src/rdb.c b/src/rdb.c index 29f880dac7fb..aa9c631de9d4 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -835,7 +835,7 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) { } /* Save a few default AUX fields with information about the RDB generated. */ -int rdbSaveInfoAuxFields(rio *rdb, int flags) { +int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { int redis_bits = (sizeof(void*) == 8) ? 64 : 32; int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0; @@ -844,6 +844,16 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) { if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1; + + /* Handle saving options that generate aux fields. */ + if (rsi) { + if (rsi->repl_stream_db && + rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db) + == -1) + { + return -1; + } + } if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; return 1; } @@ -856,7 +866,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) { * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(rio *rdb, int *error, int flags) { +int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { dictIterator *di = NULL; dictEntry *de; char magic[10]; @@ -869,7 +879,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags) { rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; - if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr; + if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -945,7 +955,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags) { * While the suffix is the 40 bytes hex string we announced in the prefix. * This way processes receiving the payload can understand when it ends * without doing any processing of the content. */ -int rdbSaveRioWithEOFMark(rio *rdb, int *error) { +int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE); @@ -953,7 +963,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) { if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr; - if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr; + if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; return C_OK; @@ -964,7 +974,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) { } /* Save the DB on disk. Return C_ERR on error, C_OK on success. */ -int rdbSave(char *filename) { +int rdbSave(char *filename, rdbSaveInfo *rsi) { char tmpfile[256]; char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ FILE *fp; @@ -985,7 +995,7 @@ int rdbSave(char *filename) { } rioInitWithFile(&rdb,fp); - if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) { + if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) { errno = error; goto werr; } @@ -1023,7 +1033,7 @@ int rdbSave(char *filename) { return C_ERR; } -int rdbSaveBackground(char *filename) { +int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { pid_t childpid; long long start; @@ -1040,7 +1050,7 @@ int rdbSaveBackground(char *filename) { /* Child */ closeListeningSockets(0); redisSetProcTitle("redis-rdb-bgsave"); - retval = rdbSave(filename); + retval = rdbSave(filename,rsi); if (retval == C_OK) { size_t private_dirty = zmalloc_get_private_dirty(-1); @@ -1410,7 +1420,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ -int rdbLoadRio(rio *rdb) { +int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) { uint64_t dbid; int type, rdbver; redisDb *db = server.db+0; @@ -1501,6 +1511,8 @@ int rdbLoadRio(rio *rdb) { serverLog(LL_NOTICE,"RDB '%s': %s", (char*)auxkey->ptr, (char*)auxval->ptr); + } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) { + if (rsi) rsi->repl_stream_db = atoi(auxval->ptr); } else { /* We ignore fields we don't understand, as by AUX field * contract. */ @@ -1559,8 +1571,11 @@ int rdbLoadRio(rio *rdb) { /* Like rdbLoadRio() but takes a filename instead of a rio stream. The * filename is open for reading and a rio stream object created in order * to do the actual loading. Moreover the ETA displayed in the INFO - * output is initialized and finalized. */ -int rdbLoad(char *filename) { + * output is initialized and finalized. + * + * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the + * loading code will fiil the information fields in the structure. */ +int rdbLoad(char *filename, rdbSaveInfo *rsi) { FILE *fp; rio rdb; int retval; @@ -1568,7 +1583,7 @@ int rdbLoad(char *filename) { if ((fp = fopen(filename,"r")) == NULL) return C_ERR; startLoading(fp); rioInitWithFile(&rdb,fp); - retval = rdbLoadRio(&rdb); + retval = rdbLoadRio(&rdb,rsi); fclose(fp); stopLoading(); return retval; @@ -1721,7 +1736,7 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) { /* Spawn an RDB child that writes the RDB to the sockets of the slaves * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ -int rdbSaveToSlavesSockets(void) { +int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { int *fds; uint64_t *clientids; int numfds; @@ -1779,7 +1794,7 @@ int rdbSaveToSlavesSockets(void) { closeListeningSockets(0); redisSetProcTitle("redis-rdb-to-slaves"); - retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL); + retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi); if (retval == C_OK && rioFlush(&slave_sockets) == 0) retval = C_ERR; @@ -1884,7 +1899,7 @@ void saveCommand(client *c) { addReplyError(c,"Background save already in progress"); return; } - if (rdbSave(server.rdb_filename) == C_OK) { + if (rdbSave(server.rdb_filename,NULL) == C_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -1918,7 +1933,7 @@ void bgsaveCommand(client *c) { "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenver " "possible."); } - } else if (rdbSaveBackground(server.rdb_filename) == C_OK) { + } else if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK) { addReplyStatus(c,"Background saving started"); } else { addReply(c,shared.err); diff --git a/src/rdb.h b/src/rdb.h index 60c52a7c1af5..efe932255858 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -118,11 +118,11 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); -int rdbLoad(char *filename); -int rdbSaveBackground(char *filename); -int rdbSaveToSlavesSockets(void); +int rdbLoad(char *filename, rdbSaveInfo *rsi); +int rdbSaveBackground(char *filename, rdbSaveInfo *rsi); +int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); -int rdbSave(char *filename); +int rdbSave(char *filename, rdbSaveInfo *rsi); ssize_t rdbSaveObject(rio *rdb, robj *o); size_t rdbSavedObjectLen(robj *o); robj *rdbLoadObject(int type, rio *rdb); @@ -136,6 +136,6 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val); int rdbLoadBinaryDoubleValue(rio *rdb, double *val); int rdbSaveBinaryFloatValue(rio *rdb, float val); int rdbLoadBinaryFloatValue(rio *rdb, float *val); -int rdbLoadRio(rio *rdb); +int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi); #endif diff --git a/src/replication.c b/src/replication.c index 67091dd0b425..a98d0d35e620 100644 --- a/src/replication.c +++ b/src/replication.c @@ -39,6 +39,7 @@ void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(int newfd); +void replicationCacheMasterUsingMyself(void); void replicationSendAck(void); void putSlaveOnline(client *slave); int cancelReplicationHandshake(void); @@ -79,11 +80,6 @@ void createReplicationBacklog(void) { server.repl_backlog = zmalloc(server.repl_backlog_size); server.repl_backlog_histlen = 0; server.repl_backlog_idx = 0; - /* When a new backlog buffer is created, we increment the replication - * offset by one to make sure we'll not be able to PSYNC with any - * previous slave. This is needed because we avoid incrementing the - * master_repl_offset if no backlog exists nor slaves are attached. */ - server.master_repl_offset++; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the @@ -153,6 +149,22 @@ void feedReplicationBacklog(void *ptr, size_t len) { server.repl_backlog_histlen + 1; } +/* Remove the last byte from the replication backlog. This + * is useful when we receive an out of band "\n" to keep the connection + * alive but don't want to count it as replication stream. + * + * As a side effect this function adjusts the master replication offset + * of this instance to account for the missing byte. */ +void chopReplicationBacklog(void) { + if (!server.repl_backlog || !server.repl_backlog_histlen) return; + if (server.repl_backlog_idx == 0) + server.repl_backlog_idx = server.repl_backlog_size-1; + else + server.repl_backlog_idx--; + server.master_repl_offset--; + server.repl_backlog_histlen--; +} + /* Wrapper for feedReplicationBacklog() that takes Redis string objects * as input. */ void feedReplicationBacklogWithObject(robj *o) { @@ -170,12 +182,24 @@ void feedReplicationBacklogWithObject(robj *o) { feedReplicationBacklog(p,len); } +/* Propagate write commands to slaves, and populate the replication backlog + * as well. This function is used if the instance is a master: we use + * the commands received by our clients in order to create the replication + * stream. Instead if the instance is a slave and has sub-slaves attached, + * we use replicationFeedSlavesFromMaster() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; int j, len; char llstr[LONG_STR_SIZE]; + /* If the instance is not a top level master, return ASAP: we'll just proxy + * the stream of data we receive from our master instead, in order to + * propagate *identical* replication stream. In this way this slave can + * advertise the same replication ID as the master (since it shares the + * master replication history and has the same backlog and offsets). */ + if (server.masterhost != NULL) return; + /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ if (server.repl_backlog == NULL && listLength(slaves) == 0) return; @@ -265,6 +289,32 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { } } +/* This function is used in order to proxy what we receive from our master + * to our sub-slaves. */ +#include +void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { + listNode *ln; + listIter li; + + { + printf("%zu:",buflen); + for (size_t j = 0; j < buflen; j++) { + printf("%c", isprint(buf[j]) ? buf[j] : '.'); + } + printf("\n"); + } + + if (server.repl_backlog) feedReplicationBacklog(buf,buflen); + listRewind(slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + + /* Don't feed slaves that are still waiting for BGSAVE to start */ + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + addReplyString(slave,buf,buflen); + } +} + void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { listNode *ln; listIter li; @@ -329,7 +379,7 @@ long long addReplyReplicationBacklog(client *c, long long offset) { skip = offset - server.repl_backlog_off; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); - /* Point j to the oldest byte, that is actaully our + /* Point j to the oldest byte, that is actually our * server.repl_backlog_off byte. */ j = (server.repl_backlog_idx + (server.repl_backlog_size-server.repl_backlog_histlen)) % @@ -361,18 +411,14 @@ long long addReplyReplicationBacklog(client *c, long long offset) { * the BGSAVE process started and before executing any other command * from clients. */ long long getPsyncInitialOffset(void) { - long long psync_offset = server.master_repl_offset; - /* Add 1 to psync_offset if it the replication backlog does not exists - * as when it will be created later we'll increment the offset by one. */ - if (server.repl_backlog == NULL) psync_offset++; - return psync_offset; + return server.master_repl_offset; } /* Send a FULLRESYNC reply in the specific case of a full resynchronization, * as a side effect setup the slave for a full sync in different ways: * - * 1) Remember, into the slave client structure, the offset we sent - * here, so that if new slaves will later attach to the same + * 1) Remember, into the slave client structure, the replication offset + * we sent here, so that if new slaves will later attach to the same * background RDB saving process (by duplicating this client output * buffer), we can get the right offset from this slave. * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that @@ -392,14 +438,14 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) { slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; /* We are going to accumulate the incremental changes for this * slave as well. Set slaveseldb to -1 in order to force to re-emit - * a SLEECT statement in the replication stream. */ + * a SELECT statement in the replication stream. */ server.slaveseldb = -1; /* Don't send this reply to slaves that approached us with * the old SYNC command. */ if (!(slave->flags & CLIENT_PRE_PSYNC)) { buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", - server.runid,offset); + server.replid,offset); if (write(slave->fd,buf,buflen) != buflen) { freeClientAsync(slave); return C_ERR; @@ -415,19 +461,32 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) { * with the usual full resync. */ int masterTryPartialResynchronization(client *c) { long long psync_offset, psync_len; - char *master_runid = c->argv[1]->ptr; + char *master_replid = c->argv[1]->ptr; char buf[128]; int buflen; - /* Is the runid of this master the same advertised by the wannabe slave - * via PSYNC? If runid changed this master is a different instance and - * there is no way to continue. */ - if (strcasecmp(master_runid, server.runid)) { + /* Parse the replication offset asked by the slave. Go to full sync + * on parse error: this should never happen but we try to handle + * it in a robust way compared to aborting. */ + if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != + C_OK) goto need_full_resync; + + /* Is the replication ID of this master the same advertised by the wannabe + * slave via PSYNC? If the replication ID changed this master has a + * different replication history, and there is no way to continue. + * + * Note that there are two potentially valid replication IDs: the ID1 + * and the ID2. The ID2 however is only valid up to a specific offset. */ + if (strcasecmp(master_replid, server.replid) && + (strcasecmp(master_replid, server.replid2) || + psync_offset > server.second_replid_offset)) + { /* Run id "?" is used by slaves that want to force a full resync. */ - if (master_runid[0] != '?') { + if (master_replid[0] != '?') { serverLog(LL_NOTICE,"Partial resynchronization not accepted: " - "Runid mismatch (Client asked for runid '%s', my runid is '%s')", - master_runid, server.runid); + "Replication ID mismatch (Slave asked for '%s', my replication " + "ID is '%s')", + master_replid, server.replid); } else { serverLog(LL_NOTICE,"Full resync requested by slave %s", replicationGetSlaveName(c)); @@ -436,8 +495,6 @@ int masterTryPartialResynchronization(client *c) { } /* We still have the data our slave is asking for? */ - if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != - C_OK) goto need_full_resync; if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) @@ -463,7 +520,11 @@ int masterTryPartialResynchronization(client *c) { /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * empty so this write will never fail actually. */ - buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); + if (c->slave_capa & SLAVE_CAPA_PSYNC2) { + buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); + } else { + buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); + } if (write(c->fd,buf,buflen) != buflen) { freeClientAsync(c); return C_OK; @@ -515,10 +576,18 @@ int startBgsaveForReplication(int mincapa) { serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", socket_target ? "slaves sockets" : "disk"); + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + /* If we are saving for a chained slave (that is, if we are, + * in turn, a slave of another instance), make sure after + * loadig the RDB, our slaves select the right DB: we'll just + * send the replication stream we receive from our master, so + * no way to send SELECT commands. */ + if (server.master) rsi.repl_stream_db = server.master->db->id; + if (socket_target) - retval = rdbSaveToSlavesSockets(); + retval = rdbSaveToSlavesSockets(&rsi); else - retval = rdbSaveBackground(server.rdb_filename); + retval = rdbSaveBackground(server.rdb_filename,&rsi); /* If we failed to BGSAVE, remove the slaves waiting for a full * resynchorinization from the list of salves, inform them with @@ -589,22 +658,22 @@ void syncCommand(client *c) { * when this happens masterTryPartialResynchronization() already * replied with: * - * +FULLRESYNC + * +FULLRESYNC * - * So the slave knows the new runid and offset to try a PSYNC later + * So the slave knows the new replid and offset to try a PSYNC later * if the connection with the master is lost. */ if (!strcasecmp(c->argv[0]->ptr,"psync")) { if (masterTryPartialResynchronization(c) == C_OK) { server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { - char *master_runid = c->argv[1]->ptr; + char *master_replid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the - * runid is not "?", as this is used by slaves to force a full + * replid is not "?", as this is used by slaves to force a full * resync on purpose when they are not albe to partially * resync. */ - if (master_runid[0] != '?') server.stat_sync_partial_err++; + if (master_replid[0] != '?') server.stat_sync_partial_err++; } } else { /* If a slave uses SYNC, we are dealing with an old implementation @@ -625,6 +694,16 @@ void syncCommand(client *c) { c->flags |= CLIENT_SLAVE; listAddNodeTail(server.slaves,c); + /* Create the replication backlog if needed. */ + if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { + /* When we create the backlog from scratch, we always use a new + * replication ID and clear the ID2, since there is no valid + * past history. */ + changeReplicationId(); + clearReplicationId2(); + createReplicationBacklog(); + } + /* CASE 1: BGSAVE is in progress, with disk target. */ if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK) @@ -685,9 +764,6 @@ void syncCommand(client *c) { } } } - - if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) - createReplicationBacklog(); return; } @@ -735,6 +811,8 @@ void replconfCommand(client *c) { /* Ignore capabilities not understood by this master. */ if (!strcasecmp(c->argv[j+1]->ptr,"eof")) c->slave_capa |= SLAVE_CAPA_EOF; + else if (!strcasecmp(c->argv[j+1]->ptr,"psync2")) + c->slave_capa |= SLAVE_CAPA_PSYNC2; } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { /* REPLCONF ACK is used by slave to inform the master the amount * of replication stream that it processed so far. It is an @@ -928,6 +1006,43 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { if (startbgsave) startBgsaveForReplication(mincapa); } +/* Change the current instance replication ID with a new, random one. + * This will prevent successful PSYNCs between this master and other + * slaves, so the command should be called when something happens that + * alters the current story of the dataset. */ +void changeReplicationId(void) { + getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE); + server.replid[CONFIG_RUN_ID_SIZE] = '\0'; +} + +/* Clear (invalidate) the secondary replication ID. This happens, for + * example, after a full resynchronization, when we start a new replication + * history. */ +void clearReplicationId2(void) { + memset(server.replid2,'0',sizeof(server.replid)); + server.replid2[CONFIG_RUN_ID_SIZE] = '\0'; + server.second_replid_offset = -1; +} + +/* Use the current replication ID / offset as secondary replication + * ID, and change the current one in order to start a new history. + * This should be used when an instance is switched from slave to master + * so that it can serve PSYNC requests performed using the master + * replication ID. */ +void shiftReplicationId(void) { + memcpy(server.replid2,server.replid,sizeof(server.replid)); + /* We set the second replid offset to the master offset + 1, since + * the slave will ask for the first byte it has not yet received, so + * we need to add one to the offset: for example if, as a slave, we are + * sure we have the same history as the master for 50 bytes, after we + * are turned into a master, we can accept a PSYNC request with offset + * 51, since the slave asking has the same history up to the 50th + * byte, and is asking for the new bytes starting at offset 51. */ + server.second_replid_offset = server.master_repl_offset+1; + changeReplicationId(); + serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid); +} + /* ----------------------------------- SLAVE -------------------------------- */ /* Returns 1 if the given replication state is a handshake state, @@ -965,18 +1080,18 @@ void replicationEmptyDbCallback(void *privdata) { /* Once we have a link with the master and the synchroniziation was * performed, this function materializes the master client we store * at server.master, starting from the specified file descriptor. */ -void replicationCreateMasterClient(int fd) { +void replicationCreateMasterClient(int fd, int dbid) { server.master = createClient(fd); server.master->flags |= CLIENT_MASTER; server.master->authenticated = 1; - server.repl_state = REPL_STATE_CONNECTED; - server.master->reploff = server.repl_master_initial_offset; - memcpy(server.master->replrunid, server.repl_master_runid, - sizeof(server.repl_master_runid)); + server.master->reploff = server.master_initial_offset; + memcpy(server.master->replid, server.master_replid, + sizeof(server.master_replid)); /* If master offset is set to -1, this master is old and is not * PSYNC capable, so we flag it accordingly. */ if (server.master->reploff == -1) server.master->flags |= CLIENT_PRE_PSYNC; + if (dbid != -1) selectDb(server.master,dbid); } /* Asynchronously read the SYNC payload we receive from a master */ @@ -1137,7 +1252,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * time for non blocking loading. */ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); - if (rdbLoad(server.rdb_filename) != C_OK) { + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(); return; @@ -1145,7 +1261,20 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { /* Final setup of the connected slave <- master link */ zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd); - replicationCreateMasterClient(server.repl_transfer_s); + replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); + server.repl_state = REPL_STATE_CONNECTED; + /* After a full resynchroniziation we use the replication ID and + * offset of the master. The secondary ID / offset are cleared since + * we are starting a new history. */ + memcpy(server.replid,server.master->replid,sizeof(server.replid)); + server.master_repl_offset = server.master->reploff; + clearReplicationId2(); + /* Let's create the replication backlog if needed. Slaves need to + * accumulate the backlog regardless of the fact they have sub-slaves + * or not, in order to behave correctly if they are promoted to + * masters after a failover. */ + if (server.repl_backlog == NULL) createReplicationBacklog(); + serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success"); /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending @@ -1270,7 +1399,7 @@ char *sendSynchronousCommand(int flags, int fd, ...) { * * 1) As a side effect of the function call the function removes the readable * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY. - * 2) server.repl_master_initial_offset is set to the right value according + * 2) server.master_initial_offset is set to the right value according * to the master reply. This will be used to populate the 'server.master' * structure replication offset. */ @@ -1281,31 +1410,31 @@ char *sendSynchronousCommand(int flags, int fd, ...) { #define PSYNC_FULLRESYNC 3 #define PSYNC_NOT_SUPPORTED 4 int slaveTryPartialResynchronization(int fd, int read_reply) { - char *psync_runid; + char *psync_replid; char psync_offset[32]; sds reply; /* Writing half */ if (!read_reply) { - /* Initially set repl_master_initial_offset to -1 to mark the current + /* Initially set master_initial_offset to -1 to mark the current * master run_id and offset as not valid. Later if we'll be able to do * a FULL resync using the PSYNC command we'll set the offset at the * right value, so that this information will be propagated to the * client structure representing the master into server.master. */ - server.repl_master_initial_offset = -1; + server.master_initial_offset = -1; if (server.cached_master) { - psync_runid = server.cached_master->replrunid; + psync_replid = server.cached_master->replid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); - serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); + serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); } else { serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)"); - psync_runid = "?"; + psync_replid = "?"; memcpy(psync_offset,"-1",3); } /* Issue the PSYNC command */ - reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL); + reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL); if (reply != NULL) { serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); sdsfree(reply); @@ -1327,31 +1456,31 @@ int slaveTryPartialResynchronization(int fd, int read_reply) { aeDeleteFileEvent(server.el,fd,AE_READABLE); if (!strncmp(reply,"+FULLRESYNC",11)) { - char *runid = NULL, *offset = NULL; + char *replid = NULL, *offset = NULL; /* FULL RESYNC, parse the reply in order to extract the run id * and the replication offset. */ - runid = strchr(reply,' '); - if (runid) { - runid++; - offset = strchr(runid,' '); + replid = strchr(reply,' '); + if (replid) { + replid++; + offset = strchr(replid,' '); if (offset) offset++; } - if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) { + if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) { serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax."); /* This is an unexpected condition, actually the +FULLRESYNC * reply means that the master supports PSYNC, but the reply * format seems wrong. To stay safe we blank the master - * runid to make sure next PSYNCs will fail. */ - memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1); + * replid to make sure next PSYNCs will fail. */ + memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1); } else { - memcpy(server.repl_master_runid, runid, offset-runid-1); - server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0'; - server.repl_master_initial_offset = strtoll(offset,NULL,10); + memcpy(server.master_replid, replid, offset-replid-1); + server.master_replid[CONFIG_RUN_ID_SIZE] = '\0'; + server.master_initial_offset = strtoll(offset,NULL,10); serverLog(LL_NOTICE,"Full resync from master: %s:%lld", - server.repl_master_runid, - server.repl_master_initial_offset); + server.master_replid, + server.master_initial_offset); } /* We are going to full resync, discard the cached master structure. */ replicationDiscardCachedMaster(); @@ -1360,9 +1489,40 @@ int slaveTryPartialResynchronization(int fd, int read_reply) { } if (!strncmp(reply,"+CONTINUE",9)) { - /* Partial resync was accepted, set the replication state accordingly */ + /* Partial resync was accepted. */ serverLog(LL_NOTICE, "Successful partial resynchronization with master."); + + /* Check the new replication ID advertised by the master. If it + * changed, we need to set the new ID as primary ID, and set or + * secondary ID as the old master ID up to the current offset, so + * that our sub-slaves will be able to PSYNC with us after a + * disconnection. */ + char *start = reply+10; + char *end = reply+9; + while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++; + if (end-start == CONFIG_RUN_ID_SIZE) { + char new[CONFIG_RUN_ID_SIZE+1]; + memcpy(new,start,CONFIG_RUN_ID_SIZE); + new[CONFIG_RUN_ID_SIZE] = '\0'; + + if (strcmp(new,server.cached_master->replid)) { + /* Master ID changed. */ + serverLog(LL_WARNING,"Master replication ID changed to %s",new); + + /* Set the old ID as our ID2, up to the current offset+1. */ + memcpy(server.replid2,server.cached_master->replid, + sizeof(server.replid2)); + server.second_replid_offset = server.master_repl_offset+1; + + /* Update the cached master ID and our own primary ID to the + * new one. */ + memcpy(server.replid,new,sizeof(server.replid)); + memcpy(server.cached_master->replid,new,sizeof(server.replid)); + } + } + + /* Setup the replication to continue. */ sdsfree(reply); replicationResurrectCachedMaster(fd); return PSYNC_CONTINUE; @@ -1386,6 +1546,8 @@ int slaveTryPartialResynchronization(int fd, int read_reply) { return PSYNC_NOT_SUPPORTED; } +/* This handler fires when the non blocking connect was able to + * establish a connection with the master. */ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { char tmpfile[256], *err = NULL; int dfd, maxtries = 5; @@ -1402,7 +1564,8 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { return; } - /* Check for errors in the socket. */ + /* Check for errors in the socket: after a non blocking connect() we + * may find that the socket is in error state. */ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) sockerr = errno; if (sockerr) { @@ -1531,13 +1694,15 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { server.repl_state = REPL_STATE_SEND_CAPA; } - /* Inform the master of our capabilities. While we currently send - * just one capability, it is possible to chain new capabilities here - * in the form of REPLCONF capa X capa Y capa Z ... + /* Inform the master of our (slave) capabilities. + * + * EOF: supports EOF-style RDB transfer for diskless replication. + * PSYNC2: supports PSYNC v2, so understands +CONTINUE . + * * The master will ignore capabilities it does not understand. */ if (server.repl_state == REPL_STATE_SEND_CAPA) { err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", - "capa","eof",NULL); + "capa","eof","capa","psync2",NULL); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_CAPA; @@ -1591,14 +1756,14 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* PSYNC failed or is not supported: we want our slaves to resync with us - * as well, if we have any (chained replication case). The mater may - * transfer us an entirely different data set and we have no way to - * incrementally feed our slaves after that. */ + * as well, if we have any sub-slaves. The mater may transfer us an + * entirely different data set and we have no way to incrementally feed + * our slaves after that. */ disconnectSlaves(); /* Force our slaves to resync with us as well. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC - * and the server.repl_master_runid and repl_master_initial_offset are + * and the server.master_replid and master_initial_offset are * already populated. */ if (psync_result == PSYNC_NOT_SUPPORTED) { serverLog(LL_NOTICE,"Retrying with SYNC..."); @@ -1727,15 +1892,23 @@ int cancelReplicationHandshake(void) { /* Set replication to the specified master address and port. */ void replicationSetMaster(char *ip, int port) { + int was_master = server.masterhost == NULL; + sdsfree(server.masterhost); server.masterhost = sdsnew(ip); server.masterport = port; - if (server.master) freeClient(server.master); + if (server.master) { + freeClient(server.master); + } disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ - disconnectSlaves(); /* Force our slaves to resync with us as well. */ - replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ - freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ + + /* Force our slaves to resync with us as well. They may hopefully be able + * to partially resync with us, but we can notify the replid change. */ + disconnectSlaves(); cancelReplicationHandshake(); + /* Before destroying our master state, create a cached master using + * our own parameters, to later PSYNC with the new master. */ + if (was_master) replicationCacheMasterUsingMyself(); server.repl_state = REPL_STATE_CONNECT; server.master_repl_offset = 0; server.repl_down_since = 0; @@ -1746,20 +1919,26 @@ void replicationUnsetMaster(void) { if (server.masterhost == NULL) return; /* Nothing to do. */ sdsfree(server.masterhost); server.masterhost = NULL; - if (server.master) { - if (listLength(server.slaves) == 0) { - /* If this instance is turned into a master and there are no - * slaves, it inherits the replication offset from the master. - * Under certain conditions this makes replicas comparable by - * replication offset to understand what is the most updated. */ - server.master_repl_offset = server.master->reploff; - freeReplicationBacklog(); - } - freeClient(server.master); - } + /* When a slave is turned into a master, the current replication ID + * (that was inherited from the master at synchronization time) is + * used as secondary ID up to the current offset, and a new replication + * ID is created to continue with a new replication history. */ + shiftReplicationId(); + if (server.master) freeClient(server.master); replicationDiscardCachedMaster(); cancelReplicationHandshake(); + /* Disconnecting all the slaves is required: we need to inform slaves + * of the replication ID change (see shiftReplicationId() call). However + * the slaves will be able to partially resync with us, so it will be + * a very fast reconnection. */ + disconnectSlaves(); server.repl_state = REPL_STATE_NONE; + + /* We need to make sure the new master will start the replication stream + * with a SELECT statement. This is forced after a full resync, but + * with PSYNC version 2, there is no need for full resync after a + * master switch. */ + server.slaveseldb = -1; } /* This function is called when the slave lose the connection with the @@ -1931,6 +2110,31 @@ void replicationCacheMaster(client *c) { replicationHandleMasterDisconnection(); } +/* This function is called when a master is turend into a slave, in order to + * create from scratch a cached master for the new client, that will allow + * to PSYNC with the slave that was promoted as the new master after a + * failover. + * + * Assuming this instance was previously the master instance of the new master, + * the new master will accept its replication ID, and potentiall also the + * current offset if no data was lost during the failover. So we use our + * current replication ID and offset in order to synthesize a cached master. */ +void replicationCacheMasterUsingMyself(void) { + /* The master client we create can be set to any DBID, because + * the new master will start its replication stream with SELECT. */ + server.master_initial_offset = server.master_repl_offset; + replicationCreateMasterClient(-1,-1); + + /* Use our own ID / offset. */ + memcpy(server.master->replid, server.replid, sizeof(server.replid)); + + /* Set as cached master. */ + unlinkClient(server.master); + server.cached_master = server.master; + server.master = NULL; + serverLog(LL_NOTICE,"Before turning into a slave, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer."); +} + /* Free a cached master, called when there are no longer the conditions for * a partial resync on reconnection. */ void replicationDiscardCachedMaster(void) { @@ -2290,7 +2494,9 @@ void replicationCron(void) { robj *ping_argv[1]; /* First, send PING according to ping_slave_period. */ - if ((replication_cron_loops % server.repl_ping_slave_period) == 0) { + if ((replication_cron_loops % server.repl_ping_slave_period) == 0 && + listLength(server.slaves)) + { ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); @@ -2299,20 +2505,32 @@ void replicationCron(void) { /* Second, send a newline to all the slaves in pre-synchronization * stage, that is, slaves waiting for the master to create the RDB file. + * + * Also send the a newline to all the chained slaves we have, if we lost + * connection from our master, to keep the slaves aware that their + * master is online. This is needed since sub-slaves only receive proxied + * data from top-level masters, so there is no explicit pinging in order + * to avoid altering the replication offsets. This special out of band + * pings (newlines) can be sent, they will have no effect in the offset. + * * The newline will be ignored by the slave but will refresh the - * last-io timer preventing a timeout. In this case we ignore the + * last interaction timer preventing a timeout. In this case we ignore the * ping period and refresh the connection once per second since certain * timeouts are set at a few seconds (example: PSYNC response). */ listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || + int is_presync = + (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && - server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)) - { + server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)); + int is_subslave = server.masterhost && server.master == NULL && + slave->replstate == SLAVE_STATE_ONLINE; + + if (is_presync || is_subslave) { if (write(slave->fd, "\n", 1) == -1) { - /* Don't worry, it's just a ping. */ + /* Don't worry about socket errors, it's just a ping. */ } } } @@ -2337,10 +2555,14 @@ void replicationCron(void) { } } - /* If we have no attached slaves and there is a replication backlog - * using memory, free it after some (configured) time. */ + /* If this is a master without attached slaves and there is a replication + * backlog active, in order to reclaim memory we can free it after some + * (configured) time. Note that this cannot be done for slaves: slaves + * without sub-slaves attached should still accumulate data into the + * backlog, in order to reply to PSYNC queries if they are turned into + * masters after a failover. */ if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && - server.repl_backlog) + server.repl_backlog && server.masterhost == NULL) { time_t idle = server.unixtime - server.repl_no_slaves_since; diff --git a/src/server.c b/src/server.c index 7e9b962b3aaa..b94490a33a09 100644 --- a/src/server.c +++ b/src/server.c @@ -1079,7 +1079,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { { serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); - rdbSaveBackground(server.rdb_filename); + rdbSaveBackground(server.rdb_filename,NULL); break; } } @@ -1151,7 +1151,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { - if (rdbSaveBackground(server.rdb_filename) == C_OK) + if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK) server.rdb_bgsave_scheduled = 0; } @@ -1309,10 +1309,11 @@ void initServerConfig(void) { int j; getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); + server.runid[CONFIG_RUN_ID_SIZE] = '\0'; + changeReplicationId(); server.configfile = NULL; server.executable = NULL; server.hz = CONFIG_DEFAULT_HZ; - server.runid[CONFIG_RUN_ID_SIZE] = '\0'; server.arch_bits = (sizeof(long) == 8) ? 64 : 32; server.port = CONFIG_DEFAULT_SERVER_PORT; server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG; @@ -1409,7 +1410,7 @@ void initServerConfig(void) { server.masterport = 6379; server.master = NULL; server.cached_master = NULL; - server.repl_master_initial_offset = -1; + server.master_initial_offset = -1; server.repl_state = REPL_STATE_NONE; server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; @@ -2471,7 +2472,7 @@ int prepareForShutdown(int flags) { if ((server.saveparamslen > 0 && !nosave) || save) { serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting."); /* Snapshotting. Perform a SYNC SAVE and exit */ - if (rdbSave(server.rdb_filename) != C_OK) { + if (rdbSave(server.rdb_filename,NULL) != C_OK) { /* Ooops.. error saving! The best we can do is to continue * operating. Note that if there was a background saving process, * in the next cron() Redis will be notified that the background @@ -3135,12 +3136,18 @@ sds genRedisInfoString(char *section) { } } info = sdscatprintf(info, + "master_replid:%s\r\n" + "master_replid2:%s\r\n" "master_repl_offset:%lld\r\n" + "second_repl_offset:%lld\r\n" "repl_backlog_active:%d\r\n" "repl_backlog_size:%lld\r\n" "repl_backlog_first_byte_offset:%lld\r\n" "repl_backlog_histlen:%lld\r\n", + server.replid, + server.replid2, server.master_repl_offset, + server.second_replid_offset, server.repl_backlog != NULL, server.repl_backlog_size, server.repl_backlog_off, @@ -3416,7 +3423,7 @@ void loadDataFromDisk(void) { if (loadAppendOnlyFile(server.aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { - if (rdbLoad(server.rdb_filename) == C_OK) { + if (rdbLoad(server.rdb_filename,NULL) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); } else if (errno != ENOENT) { diff --git a/src/server.h b/src/server.h index b5dbaf0a53a9..8aa1d6fcb6a0 100644 --- a/src/server.h +++ b/src/server.h @@ -293,7 +293,8 @@ typedef long long mstime_t; /* millisecond time type. */ /* Slave capabilities. */ #define SLAVE_CAPA_NONE 0 -#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ +#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ +#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */ /* Synchronous read timeout - slave side */ #define CONFIG_REPL_SYNCIO_TIMEOUT 5 @@ -679,8 +680,8 @@ typedef struct client { long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ - char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */ - int slave_listening_port; /* As configured with: REPLCONF listening-port */ + char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ + int slave_listening_port; /* As configured with: SLAVECONF listening-port */ char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */ int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ multiState mstate; /* MULTI/EXEC state */ @@ -803,6 +804,20 @@ struct redisMemOverhead { } *db; }; +/* This structure can be optionally passed to RDB save/load functions in + * order to implement additional functionalities, by storing and loading + * metadata to the RDB file. + * + * Currently the only use is to select a DB at load time, useful in + * replication in order to make sure that chained slaves (slaves of slaves) + * select the correct DB and are able to accept the stream coming from the + * top-level master. */ +typedef struct rdbSaveInfo { + int repl_stream_db; /* DB to select in server.master client. */ +} rdbSaveInfo; + +#define RDB_SAVE_INFO_INIT {-1} + /*----------------------------------------------------------------------------- * Global server state *----------------------------------------------------------------------------*/ @@ -988,15 +1003,19 @@ struct redisServer { char *syslog_ident; /* Syslog ident */ int syslog_facility; /* Syslog facility */ /* Replication (master) */ + char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */ + char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/ + long long master_repl_offset; /* My current replication offset */ + long long second_replid_offset; /* Accept offsets up to this for replid2. */ int slaveseldb; /* Last SELECTed DB in replication output */ - long long master_repl_offset; /* Global replication offset */ int repl_ping_slave_period; /* Master pings the slave every N seconds */ char *repl_backlog; /* Replication backlog for partial syncs */ long long repl_backlog_size; /* Backlog circular buffer size */ long long repl_backlog_histlen; /* Backlog actual data length */ - long long repl_backlog_idx; /* Backlog circular buffer current offset */ - long long repl_backlog_off; /* Replication offset of first byte in the - backlog buffer. */ + long long repl_backlog_idx; /* Backlog circular buffer current offset, + that is the next byte will'll write to.*/ + long long repl_backlog_off; /* Replication "master offset" of first + byte in the replication backlog buffer.*/ time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ time_t repl_no_slaves_since; /* We have no slaves since that time. @@ -1029,8 +1048,11 @@ struct redisServer { int slave_priority; /* Reported in INFO and used by Sentinel. */ int slave_announce_port; /* Give the master this listening port. */ char *slave_announce_ip; /* Give the master this ip address. */ - char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC.*/ - long long repl_master_initial_offset; /* Master PSYNC offset. */ + /* The following two fields is where we store master PSYNC replid/offset + * while the PSYNC is in progress. At the end we'll copy the fields into + * the server->master client structure. */ + char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ + long long master_initial_offset; /* Master PSYNC offset. */ int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */ /* Replication script cache. */ dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ @@ -1259,6 +1281,7 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); +void addReplyString(client *c, const char *s, size_t len); void addReplyBulk(client *c, robj *obj); void addReplyBulkCString(client *c, const char *s); void addReplyBulkCBuffer(client *c, const void *p, size_t len); @@ -1393,6 +1416,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); +void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen); void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc); void updateSlavesWaitingBgsave(int bgsaveerr, int type); void replicationCron(void); @@ -1414,6 +1438,9 @@ long long replicationGetSlaveOffset(void); char *replicationGetSlaveName(client *c); long long getPsyncInitialOffset(void); int replicationSetupSlaveForFullResync(client *slave, long long offset); +void changeReplicationId(void); +void clearReplicationId2(void); +void chopReplicationBacklog(void); /* Generic persistence functions */ void startLoading(FILE *fp); @@ -1422,7 +1449,7 @@ void stopLoading(void); /* RDB persistence */ #include "rdb.h" -int rdbSaveRio(rio *rdb, int *error, int flags); +int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi); /* AOF persistence */ void flushAppendOnlyFile(int force);