Skip to content

Commit

Permalink
Handle errors even when out of retries.
Browse files Browse the repository at this point in the history
Async cluster connections now can handle request
errors even when the request shouldn't retry.
Before this change, topology refreshes and
reconnects only happened on retries. This change
ensures that they will happen regardless of retries.
  • Loading branch information
nihohit committed May 16, 2024
1 parent 523b4eb commit 6d3b258
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 40 deletions.
89 changes: 61 additions & 28 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,15 +425,18 @@ enum Next<C> {
request: PendingRequest<C>,
},
Reconnect {
request: PendingRequest<C>,
// if not set, then a reconnect should happen without sending a request afterwards
request: Option<PendingRequest<C>>,
target: String,
},
RefreshSlots {
request: PendingRequest<C>,
// if not set, then a slot refresh should happen without sending a request afterwards
request: Option<PendingRequest<C>>,
sleep_duration: Option<Duration>,
},
ReconnectToInitialNodes {
request: PendingRequest<C>,
// if not set, then a reconnect should happen without sending a request afterwards
request: Option<PendingRequest<C>>,
},
Done,
}
Expand Down Expand Up @@ -467,15 +470,39 @@ impl<C> Future for Request<C> {
trace!("Request error {}", err);

let request = this.request.as_mut().unwrap();
// TODO - would be nice if we didn't need to repeat this code twice, with & without retries.
if request.retry >= this.retry_params.number_of_retries {
let next = if err.kind() == ErrorKind::ClusterConnectionNotFound {
Next::ReconnectToInitialNodes { request: None }.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect)
|| matches!(target, OperationTarget::NotFound)
{
Next::RefreshSlots {
request: None,
sleep_duration: None,
}
.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) {
if let OperationTarget::Node { address } = target {
Next::Reconnect {
request: None,
target: address,
}
.into()
} else {
Next::Done.into()
}
} else {
Next::Done.into()
};
self.respond(Err(err));
return Next::Done.into();
return next;
}
request.retry = request.retry.saturating_add(1);

if err.kind() == ErrorKind::ClusterConnectionNotFound {
return Next::ReconnectToInitialNodes {
request: this.request.take().unwrap(),
request: Some(this.request.take().unwrap()),
}
.into();
}
Expand All @@ -494,7 +521,7 @@ impl<C> Future for Request<C> {
let mut request = this.request.take().unwrap();
request.info.reset_redirect();
return Next::RefreshSlots {
request,
request: Some(request),
sleep_duration: Some(sleep_duration),
}
.into();
Expand All @@ -517,7 +544,7 @@ impl<C> Future for Request<C> {
.map(|(node, _slot)| Redirect::Moved(node.to_string())),
);
Next::RefreshSlots {
request,
request: Some(request),
sleep_duration: None,
}
.into()
Expand All @@ -534,7 +561,7 @@ impl<C> Future for Request<C> {
// TODO should we reset the redirect here?
request.info.reset_redirect();
Next::Reconnect {
request,
request: Some(request),
target: address,
}
}
Expand Down Expand Up @@ -1148,36 +1175,42 @@ where
} => {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::RebuildSlots);
let future: RequestState<
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
> = match sleep_duration {
Some(sleep_duration) => RequestState::Sleep {
sleep: boxed_sleep(sleep_duration),
},
None => RequestState::Future {
future: Box::pin(Self::try_request(
request.info.clone(),
self.inner.clone(),
)),
},
};
self.in_flight_requests.push(Box::pin(Request {
retry_params: self.inner.cluster_params.retry_params.clone(),
request: Some(request),
future,
}));
if let Some(request) = request {
let future: RequestState<
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
> = match sleep_duration {
Some(sleep_duration) => RequestState::Sleep {
sleep: boxed_sleep(sleep_duration),
},
None => RequestState::Future {
future: Box::pin(Self::try_request(
request.info.clone(),
self.inner.clone(),
)),
},
};
self.in_flight_requests.push(Box::pin(Request {
retry_params: self.inner.cluster_params.retry_params.clone(),
request: Some(request),
future,
}));
}
}
Next::Reconnect {
request, target, ..
} => {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::Reconnect(vec![target]));
self.inner.pending_requests.lock().unwrap().push(request);
if let Some(request) = request {
self.inner.pending_requests.lock().unwrap().push(request);
}
}
Next::ReconnectToInitialNodes { request } => {
poll_flush_action = poll_flush_action
.change_state(PollFlushAction::ReconnectFromInitialConnections);
self.inner.pending_requests.lock().unwrap().push(request);
if let Some(request) = request {
self.inner.pending_requests.lock().unwrap().push(request);
}
}
}
}
Expand Down
179 changes: 167 additions & 12 deletions redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ mod cluster_async {

use crate::support::*;

fn broken_pipe_error() -> RedisError {
RedisError::from(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"mock-io-error",
))
}

#[test]
fn test_async_cluster_basic_cmd() {
let cluster = TestClusterContext::new();
Expand Down Expand Up @@ -695,6 +702,163 @@ mod cluster_async {
assert_eq!(value, Ok(Some(123)));
}

#[test]
fn test_async_cluster_refresh_topology_even_with_zero_retries() {
let name = "test_async_cluster_refresh_topology_even_with_zero_retries";

let should_refresh = atomic::AtomicBool::new(false);

let MockEnv {
runtime,
async_connection: mut connection,
handler: _handler,
..
} = MockEnv::with_client_builder(
ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(0),
name,
move |cmd: &[u8], port| {
if !should_refresh.load(atomic::Ordering::SeqCst) {
respond_startup(name, cmd)?;
}

if contains_slice(cmd, b"PING") {
return Err(Ok(Value::SimpleString("OK".into())));
}

if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") {
return Err(Ok(Value::Array(vec![
Value::Array(vec![
Value::Int(0),
Value::Int(1),
Value::Array(vec![
Value::BulkString(name.as_bytes().to_vec()),
Value::Int(6379),
]),
]),
Value::Array(vec![
Value::Int(2),
Value::Int(16383),
Value::Array(vec![
Value::BulkString(name.as_bytes().to_vec()),
Value::Int(6380),
]),
]),
])));
}

if contains_slice(cmd, b"GET") {
let get_response = Err(Ok(Value::BulkString(b"123".to_vec())));
match port {
6380 => get_response,
// Respond that the key exists on a node that does not yet have a connection:
_ => {
// Should not attempt to refresh slots more than once:
assert!(!should_refresh.swap(true, Ordering::SeqCst));
Err(parse_redis_value(
format!("-MOVED 123 {name}:6380\r\n").as_bytes(),
))
}
}
} else {
panic!("unexpected command {cmd:?}")
}
},
);

let value = runtime.block_on(
cmd("GET")
.arg("test")
.query_async::<_, Option<i32>>(&mut connection),
);

// The user should receive an initial error, because there are no retries and the first request failed.
assert_eq!(
value,
Err(RedisError::from((
ErrorKind::Moved,
"An error was signalled by the server",
"test_async_cluster_refresh_topology_even_with_zero_retries:6380".to_string()
)))
);

let value = runtime.block_on(
cmd("GET")
.arg("test")
.query_async::<_, Option<i32>>(&mut connection),
);

assert_eq!(value, Ok(Some(123)));
}

#[test]
fn test_async_cluster_reconnect_even_with_zero_retries() {
let name = "test_async_cluster_reconnect_even_with_zero_retries";

let should_reconnect = atomic::AtomicBool::new(true);
let connection_count = Arc::new(atomic::AtomicU16::new(0));
let connection_count_clone = connection_count.clone();

let MockEnv {
runtime,
async_connection: mut connection,
handler: _handler,
..
} = MockEnv::with_client_builder(
ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(0),
name,
move |cmd: &[u8], port| {
match respond_startup(name, cmd) {
Ok(_) => {}
Err(err) => {
connection_count.fetch_add(1, Ordering::Relaxed);
return Err(err);
}
}

if contains_slice(cmd, b"ECHO") && port == 6379 {
// Should not attempt to refresh slots more than once:
if should_reconnect.swap(false, Ordering::SeqCst) {
Err(Err(broken_pipe_error()))
} else {
Err(Ok(Value::BulkString(b"PONG".to_vec())))
}
} else {
panic!("unexpected command {cmd:?}")
}
},
);

// 4 - MockEnv creates a sync & async connections, each calling CLUSTER SLOTS once & PING per node.
// If we add more nodes or more setup calls, this number should increase.
assert_eq!(connection_count_clone.load(Ordering::Relaxed), 4);

let value = runtime.block_on(connection.route_command(
&cmd("ECHO"),
RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
host: name.to_string(),
port: 6379,
}),
));

// The user should receive an initial error, because there are no retries and the first request failed.
assert_eq!(
value.unwrap_err().to_string(),
broken_pipe_error().to_string()
);

let value = runtime.block_on(connection.route_command(
&cmd("ECHO"),
RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
host: name.to_string(),
port: 6379,
}),
));

assert_eq!(value, Ok(Value::BulkString(b"PONG".to_vec())));
// 5 - because of the 4 above, and then another PING for new connections.
assert_eq!(connection_count_clone.load(Ordering::Relaxed), 5);
}

#[test]
fn test_async_cluster_ask_redirect() {
let name = "test_async_cluster_ask_redirect";
Expand Down Expand Up @@ -801,10 +965,7 @@ mod cluster_async {
..
} = MockEnv::new(name, move |cmd: &[u8], port| {
if port != 6379 && port != 6380 {
return Err(Err(RedisError::from(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"mock-io-error",
))));
return Err(Err(broken_pipe_error()));
}
respond_startup_two_nodes(name, cmd)?;
let count = completed.fetch_add(1, Ordering::SeqCst);
Expand Down Expand Up @@ -1827,21 +1988,15 @@ mod cluster_async {
if connect_attempt > 5 {
panic!("Too many pings!");
}
Err(Err(RedisError::from(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"mock-io-error",
))))
Err(Err(broken_pipe_error()))
} else {
respond_startup_two_nodes(name, cmd)?;
let past_get_attempts = get_attempts.fetch_add(1, Ordering::Relaxed);
// we fail the initial GET request, and after that we'll fail the first reconnect attempt, in the `refresh_connections` attempt.
if past_get_attempts == 0 {
// Error once with io-error, ensure connection is reestablished w/out calling
// other node (i.e., not doing a full slot rebuild)
Err(Err(RedisError::from(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"mock-io-error",
))))
Err(Err(broken_pipe_error()))
} else {
Err(Ok(Value::BulkString(b"123".to_vec())))
}
Expand Down

0 comments on commit 6d3b258

Please sign in to comment.