Skip to content

Commit

Permalink
cluster: attempt reconnect control conn more often
Browse files Browse the repository at this point in the history
If the control connection is faulty and hence metadata fetch fails,
it is advisable that further attempts to reconnect and fetch take place
more frequently. The motivation is: if the control connection fails,
it is possible that the node has changed its IP and hence we need to
fetch new metadata ASAP to discover its new address. Therefore,
the ClusterWorker's sleep time is changed from 60 seconds to 1 second
once a metadata fetch fails, and is only reverted back to 60 seconds
after a fetch succeeds.

We are still not good enough: if all nodes change their IPs at once,
we will discover them only after the next metadata fetch is issued,
which may happen only after 60 seconds (if previous fetch succeeded).
Hence, the next commit introduces immediate signalling that the control
connection is broken, so that ClusterWorker begins instantly its
every-1-second-attempt phase.
  • Loading branch information
wprzytula committed Jul 31, 2023
1 parent 9461304 commit f0debd7
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,14 +478,20 @@ impl ClusterWorker {
use tokio::time::Instant;

let refresh_duration = Duration::from_secs(60); // Refresh topology every 60 seconds
let control_connection_repair_duration = Duration::from_secs(1); // Attempt control connection repair every second
let mut last_refresh_time = Instant::now();
let mut control_connection_works = true;

loop {
let mut cur_request: Option<RefreshRequest> = None;

// Wait until it's time for the next refresh
let sleep_until: Instant = last_refresh_time
.checked_add(refresh_duration)
.checked_add(if control_connection_works {
refresh_duration
} else {
control_connection_repair_duration
})
.unwrap_or_else(Instant::now);

let sleep_future = tokio::time::sleep_until(sleep_until);
Expand Down Expand Up @@ -543,6 +549,8 @@ impl ClusterWorker {
last_refresh_time = Instant::now();
let refresh_res = self.perform_refresh().await;

control_connection_works = refresh_res.is_ok();

// Send refresh result if there was a request
if let Some(request) = cur_request {
// We can ignore sending error - if no one waits for the response we can drop it
Expand Down

0 comments on commit f0debd7

Please sign in to comment.