From ff6a23c1790f41f08cfa63116ffda86fd8211e4a Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Thu, 18 Jan 2024 15:40:39 +0000 Subject: [PATCH] Save reconnected connections during retries. This change ensures that reconnect attempts that happen during retries, or new connections that happen after MOVED/ASKING errors, will be saved instead of constantly reconnecting until slots are refreshed. --- redis/src/cluster_async/mod.rs | 22 +++++- redis/tests/test_cluster_async.rs | 117 ++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 2 deletions(-) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index a26b8b8bc..f0ae636c2 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -989,7 +989,7 @@ where let addr_conn_option = match conn { Some((addr, Some(conn))) => Some((addr, conn.await)), - Some((addr, None)) => connect_and_check(&addr, core.cluster_params.clone()) + Some((addr, None)) => connect_check_and_add(core.clone(), addr.clone()) .await .ok() .map(|conn| (addr, conn)), @@ -1030,7 +1030,7 @@ where drop(read_guard); let mut conn = match conn { Some(conn) => conn.await, - None => connect_and_check(&addr, core.cluster_params.clone()).await?, + None => connect_check_and_add(core.clone(), addr.clone()).await?, }; if asking { let _ = conn.req_packed_command(&crate::cmd::cmd("ASKING")).await; @@ -1415,6 +1415,24 @@ impl Connect for MultiplexedConnection { } } +async fn connect_check_and_add(core: Core, addr: String) -> RedisResult +where + C: ConnectionLike + Connect + Send + Clone + 'static, +{ + match connect_and_check::(&addr, core.cluster_params.clone()).await { + Ok(conn) => { + let conn_clone = conn.clone(); + core.conn_lock + .write() + .await + .0 + .insert(addr, async { conn_clone }.boxed().shared()); + Ok(conn) + } + Err(err) => Err(err), + } +} + async fn connect_and_check(node: &str, params: ClusterParams) -> RedisResult where C: ConnectionLike + Connect + Send + 'static, diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index bba2c6ee1..68bb82532 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -633,6 +633,48 @@ fn test_async_cluster_ask_redirect() { assert_eq!(value, Ok(Some(123))); } +#[test] +fn test_async_cluster_ask_save_new_connection() { + let name = "node"; + let ping_attempts = Arc::new(AtomicI32::new(0)); + let ping_attempts_clone = ping_attempts.clone(); + let MockEnv { + async_connection: mut connection, + handler: _handler, + runtime, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]), + name, + { + move |cmd: &[u8], port| { + if port != 6391 { + respond_startup_two_nodes(name, cmd)?; + return Err(parse_redis_value(b"-ASK 14000 node:6391\r\n")); + } + + if contains_slice(cmd, b"PING") { + ping_attempts_clone.fetch_add(1, Ordering::Relaxed); + } + respond_startup_two_nodes(name, cmd)?; + Err(Ok(Value::Okay)) + } + }, + ); + + for _ in 0..4 { + runtime + .block_on( + cmd("GET") + .arg("test") + .query_async::<_, Value>(&mut connection), + ) + .unwrap(); + } + + assert_eq!(ping_attempts.load(Ordering::Relaxed), 1); +} + #[test] fn test_async_cluster_reset_routing_if_redirect_fails() { let name = "test_async_cluster_reset_routing_if_redirect_fails"; @@ -1593,6 +1635,81 @@ fn test_async_cluster_reconnect_after_complete_server_disconnect() { .unwrap(); } +#[test] +fn test_async_cluster_saves_reconnected_connection() { + let name = "test_async_cluster_saves_reconnected_connection"; + let ping_attempts = Arc::new(AtomicI32::new(0)); + let ping_attempts_clone = ping_attempts.clone(); + let get_attempts = AtomicI32::new(0); + + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(1), + name, + move |cmd: &[u8], port| { + if port == 6380 { + respond_startup_two_nodes(name, cmd)?; + return Err(parse_redis_value( + format!("-MOVED 123 {name}:6379\r\n").as_bytes(), + )); + } + + if contains_slice(cmd, b"PING") { + let connect_attempt = ping_attempts_clone.fetch_add(1, Ordering::Relaxed); + let past_get_attempts = get_attempts.load(Ordering::Relaxed); + // We want connection checks to fail after the first GET attempt, until it retries. Hence, we wait for 5 PINGs - + // 1. initial connection, + // 2. refresh slots on client creation, + // 3. refresh_connections `check_connection` after first GET failed, + // 4. refresh_connections `connect_and_check` after first GET failed, + // 5. reconnect on 2nd GET attempt. + // more than 5 attempts mean that the server reconnects more than once, which is the behavior we're testing against. + if past_get_attempts != 1 || connect_attempt > 3 { + respond_startup_two_nodes(name, cmd)?; + } + if connect_attempt > 5 { + panic!("Too many pings!"); + } + Err(Err(RedisError::from(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "mock-io-error", + )))) + } else { + respond_startup_two_nodes(name, cmd)?; + let past_get_attempts = get_attempts.fetch_add(1, Ordering::Relaxed); + // we fail the initial GET request, and after that we'll fail the first reconnect attempt, in the `refresh_connections` attempt. + if past_get_attempts == 0 { + // Error once with io-error, ensure connection is reestablished w/out calling + // other node (i.e., not doing a full slot rebuild) + Err(Err(RedisError::from(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "mock-io-error", + )))) + } else { + Err(Ok(Value::Data(b"123".to_vec()))) + } + } + }, + ); + + for _ in 0..4 { + let value = runtime.block_on( + cmd("GET") + .arg("test") + .query_async::<_, Option>(&mut connection), + ); + + assert_eq!(value, Ok(Some(123))); + } + // If you need to change the number here due to a change in the cluster, you probably also need to adjust the test. + // See the PING counts above to explain why 5 is the target number. + assert_eq!(ping_attempts.load(Ordering::Acquire), 5); +} + #[cfg(feature = "tls-rustls")] mod mtls_test { use crate::support::mtls_test::create_cluster_client_from_cluster;