Skip to content

Commit

Permalink
Add a timeout mechanism for replicas stuck in fullsync (redis#8762)
Browse files Browse the repository at this point in the history
Starting redis 6.0 (part of the TLS feature), diskless master uses pipe from the fork
child so that the parent is the one sending data to the replicas.
This mechanism has an issue in which a hung replica will cause the master to wait
for it to read the data sent to it forever, thus preventing the fork child from terminating
and preventing the creations of any other forks.

This PR adds a timeout mechanism, much like the ACK-based timeout,
we disconnect replicas that aren't reading the RDB file fast enough.
  • Loading branch information
guybe7 committed Apr 15, 2021
1 parent 0a2621c commit d63d026
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ client *createClient(connection *conn) {
c->read_reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->repl_last_partial_write = 0;
c->slave_listening_port = 0;
c->slave_addr = NULL;
c->slave_capa = SLAVE_CAPA_NONE;
Expand Down
34 changes: 26 additions & 8 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,8 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
if (!connHasWriteHandler(conn))
return;
connSetWriteHandler(conn, NULL);
client *slave = connGetPrivateData(conn);
slave->repl_last_partial_write = 0;
server.rdb_pipe_numconns_writing--;
/* if there are no more writes for now for this conn, or write error: */
if (server.rdb_pipe_numconns_writing == 0) {
Expand Down Expand Up @@ -1180,8 +1182,10 @@ void rdbPipeWriteHandler(struct connection *conn) {
} else {
slave->repldboff += nwritten;
atomicIncr(server.stat_net_output_bytes, nwritten);
if (slave->repldboff < server.rdb_pipe_bufflen)
if (slave->repldboff < server.rdb_pipe_bufflen) {
slave->repl_last_partial_write = server.unixtime;
return; /* more data to write.. */
}
}
rdbPipeWriteHandlerConnRemoved(conn);
}
Expand Down Expand Up @@ -1262,6 +1266,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
if (nwritten != server.rdb_pipe_bufflen) {
slave->repl_last_partial_write = server.unixtime;
server.rdb_pipe_numconns_writing++;
connSetWriteHandler(conn, rdbPipeWriteHandler);
}
Expand Down Expand Up @@ -3390,13 +3395,26 @@ void replicationCron(void) {
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate != SLAVE_STATE_ONLINE) continue;
if (slave->flags & CLIENT_PRE_PSYNC) continue;
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
{
serverLog(LL_WARNING, "Disconnecting timedout replica: %s",
replicationGetSlaveName(slave));
freeClient(slave);
if (slave->replstate == SLAVE_STATE_ONLINE) {
if (slave->flags & CLIENT_PRE_PSYNC)
continue;
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) {
serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s",
replicationGetSlaveName(slave));
freeClient(slave);
}
}
/* We consider disconnecting only diskless replicas because disk-based replicas aren't fed
* by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child
* from terminating. */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
if (slave->repl_last_partial_write != 0 &&
(server.unixtime - slave->repl_last_partial_write) > server.repl_timeout)
{
serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s",
replicationGetSlaveName(slave));
freeClient(slave);
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ typedef struct client {
long long reploff; /* Applied replication offset if this is a master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
Expand Down
18 changes: 16 additions & 2 deletions tests/integration/replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ start_server {tags {"repl"}} {
# If running on Linux, we also measure utime/stime to detect possible I/O handling issues
set os [catch {exec unamee}]
set measure_time [expr {$os == "Linux"} ? 1 : 0]
foreach all_drop {no slow fast all} {
foreach all_drop {no slow fast all timeout} {
test "diskless $all_drop replicas drop during rdb pipe" {
set replicas {}
set replicas_alive {}
Expand Down Expand Up @@ -647,6 +647,12 @@ start_server {tags {"repl"}} {
exec kill [srv -1 pid]
set replicas_alive [lreplace $replicas_alive 0 0]
}
if {$all_drop == "timeout"} {
$master config set repl-timeout 1
# we want this replica to hang on a key for very long so it'll reach repl-timeout
exec kill -SIGSTOP [srv -1 pid]
after 3000
}

# wait for rdb child to exit
wait_for_condition 500 100 {
Expand All @@ -665,6 +671,14 @@ start_server {tags {"repl"}} {
if {$all_drop == "slow" || $all_drop == "fast"} {
wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1
}
if {$all_drop == "timeout"} {
wait_for_log_messages -2 {"*Disconnecting timedout replica (full sync)*"} $loglines 1 1
wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1
# master disconnected the slow replica, remove from array
set replicas_alive [lreplace $replicas_alive 0 0]
# release it
exec kill -SIGCONT [srv -1 pid]
}

# make sure we don't have a busy loop going thought epoll_wait
if {$measure_time} {
Expand All @@ -678,7 +692,7 @@ start_server {tags {"repl"}} {
puts "master utime: $master_utime"
puts "master stime: $master_stime"
}
if {!$::no_latency && ($all_drop == "all" || $all_drop == "slow")} {
if {!$::no_latency && ($all_drop == "all" || $all_drop == "slow" || $all_drop == "timeout")} {
assert {$master_utime < 70}
assert {$master_stime < 70}
}
Expand Down

0 comments on commit d63d026

Please sign in to comment.