Skip to content

Commit

Permalink
Add with_config to able to add timeouts when using sentinel client
Browse files Browse the repository at this point in the history
  • Loading branch information
Johan Rylander authored and jrylander committed May 23, 2024
1 parent e567df0 commit 5a95822
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 29 deletions.
112 changes: 86 additions & 26 deletions redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,42 @@ impl Client {
}
}

/// Options for creation of async connection
pub struct AsyncConnectionConfig {
/// Maximum time to wait for a response from the server
response_timeout: Option<std::time::Duration>,
/// Maximum time to wait for a connection to be established
connection_timeout: Option<std::time::Duration>,
}

impl AsyncConnectionConfig {
/// Creates a new instance of the options with nothing set
pub fn new() -> Self {
Self {
response_timeout: None,
connection_timeout: None,
}
}

/// Sets the connection timeout
pub fn with_connection_timeout(mut self, connection_timeout: std::time::Duration) -> Self {
self.connection_timeout = Some(connection_timeout);
self
}

/// Sets the response timeout
pub fn with_response_timeout(mut self, response_timeout: std::time::Duration) -> Self {
self.response_timeout = Some(response_timeout);
self
}
}

impl Default for AsyncConnectionConfig {
fn default() -> Self {
Self::new()
}
}

/// To enable async support you need to chose one of the supported runtimes and active its
/// corresponding feature: `tokio-comp` or `async-std-comp`
#[cfg(feature = "aio")]
Expand Down Expand Up @@ -135,18 +171,8 @@ impl Client {
pub async fn get_multiplexed_async_connection(
&self,
) -> RedisResult<crate::aio::MultiplexedConnection> {
match Runtime::locate() {
#[cfg(feature = "tokio-comp")]
Runtime::Tokio => {
self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(None)
.await
}
#[cfg(feature = "async-std-comp")]
Runtime::AsyncStd => {
self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(None)
.await
}
}
self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
.await
}

/// Returns an async connection from the client.
Expand All @@ -159,27 +185,61 @@ impl Client {
&self,
response_timeout: std::time::Duration,
connection_timeout: std::time::Duration,
) -> RedisResult<crate::aio::MultiplexedConnection> {
self.get_multiplexed_async_connection_with_config(
&AsyncConnectionConfig::new()
.with_connection_timeout(connection_timeout)
.with_response_timeout(response_timeout),
)
.await
}

/// Returns an async connection from the client.
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
#[cfg_attr(
docsrs,
doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp")))
)]
pub async fn get_multiplexed_async_connection_with_config(
&self,
config: &AsyncConnectionConfig,
) -> RedisResult<crate::aio::MultiplexedConnection> {
let result = match Runtime::locate() {
#[cfg(feature = "tokio-comp")]
rt @ Runtime::Tokio => {
rt.timeout(
connection_timeout,
self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(Some(
response_timeout,
)),
)
.await
if let Some(connection_timeout) = config.connection_timeout {
rt.timeout(
connection_timeout,
self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
config.response_timeout,
),
)
.await
} else {
Ok(self
.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
config.response_timeout,
)
.await)
}
}
#[cfg(feature = "async-std-comp")]
rt @ Runtime::AsyncStd => {
rt.timeout(
connection_timeout,
self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
Some(response_timeout),
),
)
.await
if let Some(connection_timeout) = config.connection_timeout {
rt.timeout(
connection_timeout,
self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
config.response_timeout,
),
)
.await
} else {
Ok(self
.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
config.response_timeout,
)
.await)
}
}
};

Expand Down
1 change: 1 addition & 0 deletions redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
#![cfg_attr(docsrs, feature(doc_cfg))]

// public api
pub use crate::client::AsyncConnectionConfig;
pub use crate::client::Client;
pub use crate::cmd::{cmd, pack_command, pipe, Arg, Cmd, Iter};
pub use crate::commands::{
Expand Down
18 changes: 16 additions & 2 deletions redis/src/sentinel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ use rand::Rng;
#[cfg(feature = "aio")]
use crate::aio::MultiplexedConnection as AsyncConnection;

use crate::client::AsyncConnectionConfig;
use crate::{
connection::ConnectionInfo, types::RedisResult, Client, Cmd, Connection, ErrorKind,
FromRedisValue, IntoConnectionInfo, RedisConnectionInfo, TlsMode, Value,
Expand Down Expand Up @@ -766,7 +767,20 @@ impl SentinelClient {
/// `SentinelClient::get_connection`.
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
pub async fn get_async_connection(&mut self) -> RedisResult<AsyncConnection> {
let client = self.async_get_client().await?;
client.get_multiplexed_async_connection().await
self.get_async_connection_with_config(&AsyncConnectionConfig::new())
.await
}

/// Returns an async connection from the client with options, using the same logic from
/// `SentinelClient::get_connection`.
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
pub async fn get_async_connection_with_config(
&mut self,
config: &AsyncConnectionConfig,
) -> RedisResult<AsyncConnection> {
self.async_get_client()
.await?
.get_multiplexed_async_connection_with_config(config)
.await
}
}
165 changes: 164 additions & 1 deletion redis/tests/test_sentinel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ pub mod async_tests {
use redis::{
aio::MultiplexedConnection,
sentinel::{Sentinel, SentinelClient, SentinelNodeConnectionInfo},
Client, ConnectionAddr, RedisError,
AsyncConnectionConfig, Client, ConnectionAddr, RedisError,
};

use crate::{assert_is_master_role, assert_replica_role_and_master_addr, support::*};
Expand Down Expand Up @@ -486,4 +486,167 @@ pub mod async_tests {
})
.unwrap();
}

#[test]
fn test_sentinel_client_async_with_connection_timeout() {
let master_name = "master1";
let mut context = TestSentinelContext::new(2, 3, 3);
let mut master_client = SentinelClient::build(
context.sentinels_connection_info().clone(),
String::from(master_name),
Some(context.sentinel_node_connection_info()),
redis::sentinel::SentinelServerType::Master,
)
.unwrap();

let mut replica_client = SentinelClient::build(
context.sentinels_connection_info().clone(),
String::from(master_name),
Some(context.sentinel_node_connection_info()),
redis::sentinel::SentinelServerType::Replica,
)
.unwrap();

let connection_options =
AsyncConnectionConfig::new().with_connection_timeout(std::time::Duration::from_secs(1));

block_on_all(async move {
let mut master_con = master_client
.get_async_connection_with_config(&connection_options)
.await?;

async_assert_is_connection_to_master(&mut master_con).await;

let node_conn_info = context.sentinel_node_connection_info();
let sentinel = context.sentinel_mut();
let master_client = sentinel
.async_master_for(master_name, Some(&node_conn_info))
.await?;

// Read commands to the replica node
for _ in 0..20 {
let mut replica_con = replica_client
.get_async_connection_with_config(&connection_options)
.await?;

async_assert_connection_is_replica_of_correct_master(
&mut replica_con,
&master_client,
)
.await;
}

Ok::<(), RedisError>(())
})
.unwrap();
}

#[test]
fn test_sentinel_client_async_with_response_timeout() {
let master_name = "master1";
let mut context = TestSentinelContext::new(2, 3, 3);
let mut master_client = SentinelClient::build(
context.sentinels_connection_info().clone(),
String::from(master_name),
Some(context.sentinel_node_connection_info()),
redis::sentinel::SentinelServerType::Master,
)
.unwrap();

let mut replica_client = SentinelClient::build(
context.sentinels_connection_info().clone(),
String::from(master_name),
Some(context.sentinel_node_connection_info()),
redis::sentinel::SentinelServerType::Replica,
)
.unwrap();

let connection_options =
AsyncConnectionConfig::new().with_response_timeout(std::time::Duration::from_secs(1));

block_on_all(async move {
let mut master_con = master_client
.get_async_connection_with_config(&connection_options)
.await?;

async_assert_is_connection_to_master(&mut master_con).await;

let node_conn_info = context.sentinel_node_connection_info();
let sentinel = context.sentinel_mut();
let master_client = sentinel
.async_master_for(master_name, Some(&node_conn_info))
.await?;

// Read commands to the replica node
for _ in 0..20 {
let mut replica_con = replica_client
.get_async_connection_with_config(&connection_options)
.await?;

async_assert_connection_is_replica_of_correct_master(
&mut replica_con,
&master_client,
)
.await;
}

Ok::<(), RedisError>(())
})
.unwrap();
}

#[test]
fn test_sentinel_client_async_with_timeouts() {
let master_name = "master1";
let mut context = TestSentinelContext::new(2, 3, 3);
let mut master_client = SentinelClient::build(
context.sentinels_connection_info().clone(),
String::from(master_name),
Some(context.sentinel_node_connection_info()),
redis::sentinel::SentinelServerType::Master,
)
.unwrap();

let mut replica_client = SentinelClient::build(
context.sentinels_connection_info().clone(),
String::from(master_name),
Some(context.sentinel_node_connection_info()),
redis::sentinel::SentinelServerType::Replica,
)
.unwrap();

let connection_options = AsyncConnectionConfig::new()
.with_connection_timeout(std::time::Duration::from_secs(1))
.with_response_timeout(std::time::Duration::from_secs(1));

block_on_all(async move {
let mut master_con = master_client
.get_async_connection_with_config(&connection_options)
.await?;

async_assert_is_connection_to_master(&mut master_con).await;

let node_conn_info = context.sentinel_node_connection_info();
let sentinel = context.sentinel_mut();
let master_client = sentinel
.async_master_for(master_name, Some(&node_conn_info))
.await?;

// Read commands to the replica node
for _ in 0..20 {
let mut replica_con = replica_client
.get_async_connection_with_config(&connection_options)
.await?;

async_assert_connection_is_replica_of_correct_master(
&mut replica_con,
&master_client,
)
.await;
}

Ok::<(), RedisError>(())
})
.unwrap();
}
}

0 comments on commit 5a95822

Please sign in to comment.