Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PSYNC2: make partial sync possible after master reboot #8015

Merged
merged 7 commits into from Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 21 additions & 7 deletions src/rdb.c
Expand Up @@ -2343,6 +2343,8 @@ void startLoading(size_t size, int rdbflags) {
server.loading_loaded_bytes = 0;
server.loading_total_bytes = size;
server.loading_rdb_used_mem = 0;
server.rdb_last_load_keys_expired = 0;
server.rdb_last_load_keys_loaded = 0;
blockingOperationStarts();

/* Fire the loading modules start event. */
Expand Down Expand Up @@ -2428,12 +2430,12 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t dbid;
uint64_t dbid = 0;
int type, rdbver;
redisDb *db = server.db+0;
char buf[1024];
int error;
long long empty_keys_skipped = 0, expired_keys_skipped = 0, keys_loaded = 0;
long long empty_keys_skipped = 0;

rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
Expand Down Expand Up @@ -2662,16 +2664,28 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
!(rdbflags&RDBFLAGS_AOF_PREAMBLE) &&
expiretime != -1 && expiretime < now)
{
if (rdbflags & RDBFLAGS_FEED_REPL) {
/* Caller should have created replication backlog,
* and now this path only works when rebooting,
* so we don't have replicas yet. */
serverAssert(server.repl_backlog != NULL && listLength(server.slaves) == 0);
robj keyobj;
initStaticStringObject(keyobj,key);
robj *argv[2];
argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
argv[1] = &keyobj;
replicationFeedSlaves(server.slaves,dbid,argv,2);
}
sdsfree(key);
decrRefCount(val);
expired_keys_skipped++;
server.rdb_last_load_keys_expired++;
} else {
robj keyobj;
initStaticStringObject(keyobj,key);

/* Add the new object in the hash table */
int added = dbAddRDBLoad(db,key,val);
keys_loaded++;
server.rdb_last_load_keys_loaded++;
if (!added) {
if (rdbflags & RDBFLAGS_ALLOW_DUP) {
/* This flag is useful for DEBUG RELOAD special modes.
Expand Down Expand Up @@ -2732,11 +2746,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
if (empty_keys_skipped) {
serverLog(LL_WARNING,
"Done loading RDB, keys loaded: %lld, keys expired: %lld, empty keys skipped: %lld.",
keys_loaded, expired_keys_skipped, empty_keys_skipped);
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, empty_keys_skipped);
} else {
serverLog(LL_WARNING,
serverLog(LL_NOTICE,
"Done loading RDB, keys loaded: %lld, keys expired: %lld.",
keys_loaded, expired_keys_skipped);
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired);
}
return C_OK;

Expand Down
1 change: 1 addition & 0 deletions src/rdb.h
Expand Up @@ -127,6 +127,7 @@
#define RDBFLAGS_AOF_PREAMBLE (1<<0) /* Load/save the RDB as AOF preamble. */
#define RDBFLAGS_REPLICATION (1<<1) /* Load/save for SYNC. */
#define RDBFLAGS_ALLOW_DUP (1<<2) /* Allow duplicated keys when loading.*/
#define RDBFLAGS_FEED_REPL (1<<3) /* Feed replication stream when loading.*/

/* When rdbLoadObject() returns NULL, the err flag is
* set to hold the type of error that occurred */
Expand Down
47 changes: 35 additions & 12 deletions src/server.c
Expand Up @@ -3281,6 +3281,8 @@ void initServer(void) {
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.rdb_last_load_keys_expired = 0;
server.rdb_last_load_keys_loaded = 0;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
Expand Down Expand Up @@ -4901,6 +4903,8 @@ sds genRedisInfoString(const char *section) {
"rdb_last_bgsave_time_sec:%jd\r\n"
"rdb_current_bgsave_time_sec:%jd\r\n"
"rdb_last_cow_size:%zu\r\n"
"rdb_last_load_keys_expired:%lld\r\n"
"rdb_last_load_keys_loaded:%lld\r\n"
"aof_enabled:%d\r\n"
"aof_rewrite_in_progress:%d\r\n"
"aof_rewrite_scheduled:%d\r\n"
Expand All @@ -4926,6 +4930,8 @@ sds genRedisInfoString(const char *section) {
(intmax_t)((server.child_type != CHILD_TYPE_RDB) ?
-1 : time(NULL)-server.rdb_save_time_start),
server.stat_rdb_cow_bytes,
server.rdb_last_load_keys_expired,
server.rdb_last_load_keys_loaded,
server.aof_state != AOF_OFF,
server.child_type == CHILD_TYPE_AOF,
server.aof_rewrite_scheduled,
Expand Down Expand Up @@ -6037,28 +6043,45 @@ void loadDataFromDisk(void) {
} else {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
errno = 0; /* Prevent a stale value from affecting error checking */
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
int rdb_flags = RDBFLAGS_NONE;
if (iAmMaster()) {
/* Master may delete expired keys when loading, we should
* propagate expire to replication backlog. */
createReplicationBacklog();
rdb_flags |= RDBFLAGS_FEED_REPL;
}
if (rdbLoad(server.rdb_filename,&rsi,rdb_flags) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);

/* Restore the replication ID / offset from the RDB file. */
if ((server.masterhost ||
(server.cluster_enabled &&
nodeIsSlave(server.cluster->myself))) &&
rsi.repl_id_is_set &&
if (rsi.repl_id_is_set &&
rsi.repl_offset != -1 &&
/* Note that older implementations may save a repl_stream_db
* of -1 inside the RDB file in a wrong way, see more
* information in function rdbPopulateSaveInfo. */
rsi.repl_stream_db != -1)
{
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
/* If we are a slave, create a cached master from this
* information, in order to allow partial resynchronization
* with masters. */
replicationCacheMasterUsingMyself();
selectDb(server.cached_master,rsi.repl_stream_db);
if (!iAmMaster()) {
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
/* If this is a replica, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself();
selectDb(server.cached_master,rsi.repl_stream_db);
} else {
/* If this is a master, we can save the replication info
* as secondary ID and offset, in order to allow replicas
* to partial resynchronizations with masters. */
memcpy(server.replid2,rsi.repl_id,sizeof(server.replid));
server.second_replid_offset = rsi.repl_offset+1;
/* Rebase master_repl_offset from rsi.repl_offset. */
server.master_repl_offset += rsi.repl_offset;
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
server.repl_no_slaves_since = time(NULL);
}
}
} else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
Expand Down
4 changes: 3 additions & 1 deletion src/server.h
Expand Up @@ -1432,6 +1432,8 @@ struct redisServer {
/* RDB persistence */
long long dirty; /* Changes to DB from the last save */
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */
long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
Expand Down Expand Up @@ -2096,7 +2098,7 @@ long long getPsyncInitialOffset(void);
int replicationSetupSlaveForFullResync(client *slave, long long offset);
void changeReplicationId(void);
void clearReplicationId2(void);
void chopReplicationBacklog(void);
void createReplicationBacklog(void);
void replicationCacheMasterUsingMyself(void);
void feedReplicationBacklog(void *ptr, size_t len);
void showLatestBacklog(void);
Expand Down
159 changes: 159 additions & 0 deletions tests/integration/psync2-master-restart.tcl
@@ -0,0 +1,159 @@
start_server {tags {"psync2 external:skip"}} {
start_server {} {
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]

set replica [srv -1 client]
set replica_host [srv -1 host]
set replica_port [srv -1 port]

set sub_replica [srv -2 client]

# Build replication chain
$replica replicaof $master_host $master_port
$sub_replica replicaof $replica_host $replica_port

wait_for_condition 50 100 {
[status $replica master_link_status] eq {up} &&
[status $sub_replica master_link_status] eq {up}
} else {
fail "Replication not started."
}

# Avoid PINGs
$master config set repl-ping-replica-period 3600
$master config rewrite

# Generate some data
createComplexDataset $master 1000

test "PSYNC2: Partial resync after Master restart using RDB aux fields" {
wait_for_condition 500 100 {
[status $master master_repl_offset] == [status $replica master_repl_offset] &&
[status $master master_repl_offset] == [status $sub_replica master_repl_offset]
} else {
fail "Replicas and master offsets were unable to match *exactly*."
}

set replid [status $master master_replid]
set offset [status $master master_repl_offset]
$replica config resetstat

catch {
restart_server 0 true false
set master [srv 0 client]
}
wait_for_condition 50 1000 {
[status $replica master_link_status] eq {up} &&
[status $sub_replica master_link_status] eq {up}
} else {
fail "Replicas didn't sync after master restart"
}

# Make sure master restore replication info correctly
assert {[status $master master_replid] != $replid}
assert {[status $master master_repl_offset] == $offset}
assert {[status $master master_replid2] eq $replid}
assert {[status $master second_repl_offset] == [expr $offset+1]}

# Make sure master set replication backlog correctly
assert {[status $master repl_backlog_active] == 1}
assert {[status $master repl_backlog_first_byte_offset] == [expr $offset+1]}
assert {[status $master repl_backlog_histlen] == 0}

# Partial resync after Master restart
assert {[status $master sync_partial_ok] == 1}
assert {[status $replica sync_partial_ok] == 1}
}

test "PSYNC2: Partial resync after Master restart using RDB aux fields with expire" {
$master debug set-active-expire 0
for {set j 0} {$j < 1024} {incr j} {
$master select [expr $j%16]
$master set $j somevalue px 10
}

after 20

wait_for_condition 500 100 {
[status $master master_repl_offset] == [status $replica master_repl_offset] &&
[status $master master_repl_offset] == [status $sub_replica master_repl_offset]
} else {
show_cluster_status
fail "Replicas and master offsets were unable to match *exactly*."
}

set offset [status $master master_repl_offset]
$replica config resetstat

catch {
restart_server 0 true false
set master [srv 0 client]
}
wait_for_condition 50 1000 {
[status $replica master_link_status] eq {up} &&
[status $sub_replica master_link_status] eq {up}
} else {
fail "Replicas didn't sync after master restart"
}

set expired_offset [status $master repl_backlog_histlen]
# Stale keys expired and master_repl_offset grows correctly
assert {[status $master rdb_last_load_keys_expired] == 1024}
assert {[status $master master_repl_offset] == [expr $offset+$expired_offset]}

# Partial resync after Master restart
assert {[status $master sync_partial_ok] == 1}
assert {[status $replica sync_partial_ok] == 1}

set digest [$master debug digest]
assert {$digest eq [$replica debug digest]}
assert {$digest eq [$sub_replica debug digest]}
}

test "PSYNC2: Full resync after Master restart when too many key expired" {
$master config set repl-backlog-size 16384
$master config rewrite

$master debug set-active-expire 0
for {set j 0} {$j < 1024} {incr j} {
$master select [expr $j%16]
$master set $j somevalue px 10
}

after 20

wait_for_condition 500 100 {
[status $master master_repl_offset] == [status $replica master_repl_offset] &&
[status $master master_repl_offset] == [status $sub_replica master_repl_offset]
} else {
fail "Replicas and master offsets were unable to match *exactly*."
}

$replica config resetstat

catch {
restart_server 0 true false
set master [srv 0 client]
}
wait_for_condition 50 1000 {
[status $replica master_link_status] eq {up} &&
[status $sub_replica master_link_status] eq {up}
} else {
fail "Replicas didn't sync after master restart"
}

# Replication backlog is full
assert {[status $master repl_backlog_first_byte_offset] > [status $master second_repl_offset]}
assert {[status $master sync_partial_ok] == 0}
assert {[status $master sync_full] == 1}
assert {[status $master rdb_last_load_keys_expired] == 1024}
assert {[status $replica sync_full] == 1}

set digest [$master debug digest]
assert {$digest eq [$replica debug digest]}
assert {$digest eq [$sub_replica debug digest]}
}
}}}
24 changes: 24 additions & 0 deletions tests/integration/rdb.tcl
Expand Up @@ -202,6 +202,30 @@ test {client freed during loading} {
}
}

start_server {} {
test {Test RDB load info} {
r debug populate 1000
r save
restart_server 0 true false
wait_done_loading r
assert {[s rdb_last_load_keys_expired] == 0}
assert {[s rdb_last_load_keys_loaded] == 1000}

r debug set-active-expire 0
for {set j 0} {$j < 1024} {incr j} {
r select [expr $j%16]
r set $j somevalue px 10
}
after 20

r save
restart_server 0 true false
wait_done_loading r
assert {[s rdb_last_load_keys_expired] == 1024}
assert {[s rdb_last_load_keys_loaded] == 1000}
}
}

# Our COW metrics (Private_Dirty) work only on Linux
set system_name [string tolower [exec uname -s]]
if {$system_name eq {linux}} {
Expand Down
1 change: 1 addition & 0 deletions tests/test_helper.tcl
Expand Up @@ -53,6 +53,7 @@ set ::all_tests {
integration/psync2
integration/psync2-reg
integration/psync2-pingoff
integration/psync2-master-restart
integration/failover
integration/redis-cli
integration/redis-benchmark
Expand Down