Skip to content

Commit

Permalink
PSYNC2: make partial sync possible after master reboot
Browse files Browse the repository at this point in the history
The main idea is how to allow a master to load replication info
from RDB file when rebooting, if master can load replication
info it means that replicas may have the chance to psync with
master, it can save much traffic.

The key point is we need guarantee safety and consistency, so there
are two differences between master and replica:
1. master would load the replication info as secondary ID and
   offset, in case other masters have the same replid.
2. when master loading RDB, it would propagate expired keys as DEL
   command to replication backlog, then replica can receive these
   commands to delete stale keys.
   p.s. the expired keys when RDB loading is userful for users, so
   we show it as rdb_expired_keys_last_load in info persistence.

Moreover, after load replication info, master should update
no_replica_time in case loading RDB cost too long time.
  • Loading branch information
soloestoy committed Aug 27, 2021
1 parent 1d9c8d6 commit 1a39dc0
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 14 deletions.
16 changes: 15 additions & 1 deletion src/rdb.c
Expand Up @@ -2343,6 +2343,7 @@ void startLoading(size_t size, int rdbflags) {
server.loading_loaded_bytes = 0;
server.loading_total_bytes = size;
server.loading_rdb_used_mem = 0;
server.rdb_expired_keys_last_load = 0;
blockingOperationStarts();

/* Fire the loading modules start event. */
Expand Down Expand Up @@ -2428,7 +2429,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t dbid;
uint64_t dbid = 0;
int type, rdbver;
redisDb *db = server.db+0;
char buf[1024];
Expand Down Expand Up @@ -2662,9 +2663,22 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
!(rdbflags&RDBFLAGS_AOF_PREAMBLE) &&
expiretime != -1 && expiretime < now)
{
if (rdbflags & RDBFLAGS_FEED_REPL) {
/* Caller should have created replication backlog,
* and now this path only works when rebooting,
* so we don't have replicas yet. */
serverAssert(server.repl_backlog != NULL && listLength(server.slaves) == 0);
robj keyobj;
initStaticStringObject(keyobj,key);
robj *argv[2];
argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
argv[1] = &keyobj;
replicationFeedSlaves(server.slaves,dbid,argv,2);
}
sdsfree(key);
decrRefCount(val);
expired_keys_skipped++;
server.rdb_expired_keys_last_load++;
} else {
robj keyobj;
initStaticStringObject(keyobj,key);
Expand Down
1 change: 1 addition & 0 deletions src/rdb.h
Expand Up @@ -127,6 +127,7 @@
#define RDBFLAGS_AOF_PREAMBLE (1<<0) /* Load/save the RDB as AOF preamble. */
#define RDBFLAGS_REPLICATION (1<<1) /* Load/save for SYNC. */
#define RDBFLAGS_ALLOW_DUP (1<<2) /* Allow duplicated keys when loading.*/
#define RDBFLAGS_FEED_REPL (1<<3) /* Feed replication stream when loading.*/

/* When rdbLoadObject() returns NULL, the err flag is
* set to hold the type of error that occurred */
Expand Down
44 changes: 32 additions & 12 deletions src/server.c
Expand Up @@ -3281,6 +3281,7 @@ void initServer(void) {
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.rdb_expired_keys_last_load = 0;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
Expand Down Expand Up @@ -4901,6 +4902,7 @@ sds genRedisInfoString(const char *section) {
"rdb_last_bgsave_time_sec:%jd\r\n"
"rdb_current_bgsave_time_sec:%jd\r\n"
"rdb_last_cow_size:%zu\r\n"
"rdb_expired_keys_last_load:%lld\r\n"
"aof_enabled:%d\r\n"
"aof_rewrite_in_progress:%d\r\n"
"aof_rewrite_scheduled:%d\r\n"
Expand All @@ -4926,6 +4928,7 @@ sds genRedisInfoString(const char *section) {
(intmax_t)((server.child_type != CHILD_TYPE_RDB) ?
-1 : time(NULL)-server.rdb_save_time_start),
server.stat_rdb_cow_bytes,
server.rdb_expired_keys_last_load,
server.aof_state != AOF_OFF,
server.child_type == CHILD_TYPE_AOF,
server.aof_rewrite_scheduled,
Expand Down Expand Up @@ -6037,28 +6040,45 @@ void loadDataFromDisk(void) {
} else {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
errno = 0; /* Prevent a stale value from affecting error checking */
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
int rdb_flags = RDBFLAGS_NONE;
if (iAmMaster()) {
/* Master may delete expired keys when loading, we should
* propagate expire to replication backlog. */
createReplicationBacklog();
rdb_flags |= RDBFLAGS_FEED_REPL;
}
if (rdbLoad(server.rdb_filename,&rsi,rdb_flags) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);

/* Restore the replication ID / offset from the RDB file. */
if ((server.masterhost ||
(server.cluster_enabled &&
nodeIsSlave(server.cluster->myself))) &&
rsi.repl_id_is_set &&
if (rsi.repl_id_is_set &&
rsi.repl_offset != -1 &&
/* Note that older implementations may save a repl_stream_db
* of -1 inside the RDB file in a wrong way, see more
* information in function rdbPopulateSaveInfo. */
rsi.repl_stream_db != -1)
{
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
/* If we are a slave, create a cached master from this
* information, in order to allow partial resynchronization
* with masters. */
replicationCacheMasterUsingMyself();
selectDb(server.cached_master,rsi.repl_stream_db);
if (!iAmMaster()) {
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
/* If this is a replica, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself();
selectDb(server.cached_master,rsi.repl_stream_db);
} else {
/* If this is a master, we can save the replication info
* as secondary ID and offset, in order to allow replicas
* to partial resynchronizations with masters. */
memcpy(server.replid2,rsi.repl_id,sizeof(server.replid));
server.second_replid_offset = rsi.repl_offset+1;
/* Rebase master_repl_offset from rsi.repl_offset. */
server.master_repl_offset += rsi.repl_offset;
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
server.repl_no_slaves_since = time(NULL);
}
}
} else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
Expand Down
3 changes: 2 additions & 1 deletion src/server.h
Expand Up @@ -1432,6 +1432,7 @@ struct redisServer {
/* RDB persistence */
long long dirty; /* Changes to DB from the last save */
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
long long rdb_expired_keys_last_load; /* number of expired keys when loading RDB */
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
Expand Down Expand Up @@ -2096,7 +2097,7 @@ long long getPsyncInitialOffset(void);
int replicationSetupSlaveForFullResync(client *slave, long long offset);
void changeReplicationId(void);
void clearReplicationId2(void);
void chopReplicationBacklog(void);
void createReplicationBacklog(void);
void replicationCacheMasterUsingMyself(void);
void feedReplicationBacklog(void *ptr, size_t len);
void showLatestBacklog(void);
Expand Down

0 comments on commit 1a39dc0

Please sign in to comment.