Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max_delay for every reconnect #1192

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion redis/examples/async-await.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use redis::AsyncCommands;

#[tokio::main]
async fn main() -> redis::RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let client = redis::Client::open("redis://127.0.0.1/", None).unwrap();
let mut con = client.get_multiplexed_async_connection().await?;

con.set("key1", b"foo").await?;
Expand Down
2 changes: 1 addition & 1 deletion redis/examples/async-multiplexed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn test_cmd(con: &MultiplexedConnection, i: i32) -> RedisResult<()> {

#[tokio::main]
async fn main() {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let client = redis::Client::open("redis://127.0.0.1/", None).unwrap();

let con = client.get_multiplexed_tokio_connection().await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion redis/examples/async-pub-sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use redis::AsyncCommands;

#[tokio::main]
async fn main() -> redis::RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let client = redis::Client::open("redis://127.0.0.1/", None).unwrap();
let mut publish_conn = client.get_multiplexed_async_connection().await?;
let mut pubsub_conn = client.get_async_pubsub().await?;

Expand Down
2 changes: 1 addition & 1 deletion redis/examples/async-scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use redis::{AsyncCommands, AsyncIter};

#[tokio::main]
async fn main() -> redis::RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let client = redis::Client::open("redis://127.0.0.1/", None).unwrap();
let mut con = client.get_multiplexed_async_connection().await?;

con.set("async-key1", b"foo").await?;
Expand Down
2 changes: 1 addition & 1 deletion redis/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ fn do_atomic_increment(con: &mut redis::Connection) -> redis::RedisResult<()> {
/// Runs all the examples and propagates errors up.
fn do_redis_code(url: &str) -> redis::RedisResult<()> {
// general connection handling
let client = redis::Client::open(url)?;
let client = redis::Client::open(url, None)?;
let mut con = client.get_connection()?;

// read some config and print it.
Expand Down
2 changes: 1 addition & 1 deletion redis/examples/geospatial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn run() -> RedisResult<()> {
Err(..) => "redis://127.0.0.1/".to_string(),
};

let client = redis::Client::open(redis_url.as_str())?;
let client = redis::Client::open(redis_url.as_str(), None)?;
let mut con = client.get_connection()?;

// Add some members to the geospatial index.
Expand Down
2 changes: 1 addition & 1 deletion redis/examples/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const SLOWNESSES: &[u8] = &[2, 3, 4];
/// that demonstrates basic usage of both the XREAD and XREADGROUP
/// commands.
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("client");
let client = redis::Client::open("redis://127.0.0.1/", None).expect("client");

println!("Demonstrating XADD followed by XREAD, single threaded\n");

Expand Down
39 changes: 30 additions & 9 deletions redis/src/aio/connection_manager.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::RedisFuture;
use crate::cmd::Cmd;
use crate::connection::RetryStrategyInfo;
use crate::push_manager::PushManager;
use crate::types::{RedisError, RedisResult, Value};
use crate::{
aio::{ConnectionLike, MultiplexedConnection, Runtime},
aio::{Connection::RetryStrategyInfo, ConnectionLike, MultiplexedConnection, Runtime},
Client,
};
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
Expand Down Expand Up @@ -101,13 +102,23 @@ impl ConnectionManager {
/// This requires the `connection-manager` feature, which will also pull in
/// the Tokio executor.
pub async fn new(client: Client) -> RedisResult<Self> {
Self::new_with_backoff(
client,
Self::DEFAULT_CONNECTION_RETRY_EXPONENT_BASE,
Self::DEFAULT_CONNECTION_RETRY_FACTOR,
Self::DEFAULT_NUMBER_OF_CONNECTION_RETRIESE,
)
.await
let mut exponent_base: u64 = Self::DEFAULT_CONNECTION_RETRY_EXPONENT_BASE;
let mut factor: u64 = Self::DEFAULT_CONNECTION_RETRY_FACTOR;
let mut number_of_retries: usize = Self::DEFAULT_NUMBER_OF_CONNECTION_RETRIESE;
let mut max_delay: Option<u64> = None;

let mut retry_strategy_info: Option<RetryStrategyInfo> =
client.get_connection_info().clone().retry_strategy;
if let Some(retry_strategy_info) = retry_strategy_info {
exponent_base = retry_strategy_info.exponent_base.unwrap_or(exponent_base);
factor = retry_strategy_info.factor.unwrap_or(factor);
number_of_retries = retry_strategy_info
.number_of_retries
.unwrap_or(number_of_retries);
max_delay = retry_strategy_info.max_delay;
}

Self::new_with_backoff(client, exponent_base, factor, number_of_retries, max_delay).await
}

/// Connect to the server and store the connection inside the returned `ConnectionManager`.
Expand All @@ -123,12 +134,14 @@ impl ConnectionManager {
exponent_base: u64,
factor: u64,
number_of_retries: usize,
max_delay: Option<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of adding another field to the function, which is a breaking change, you should add another function taking that field.
and in order to limit the number of functions, please follow the example in this PR - that is, add a configuration object, and a function using that configuration object, so that new fields won't break users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, i will update code with new PR

) -> RedisResult<Self> {
Self::new_with_backoff_and_timeouts(
client,
exponent_base,
factor,
number_of_retries,
max_delay,
std::time::Duration::MAX,
std::time::Duration::MAX,
)
Expand All @@ -151,13 +164,21 @@ impl ConnectionManager {
exponent_base: u64,
factor: u64,
number_of_retries: usize,
max_delay: Option<u64>,
response_timeout: std::time::Duration,
connection_timeout: std::time::Duration,
) -> RedisResult<Self> {
// Create a MultiplexedConnection and wait for it to be established
let push_manager = PushManager::default();
let runtime = Runtime::locate();
let retry_strategy = ExponentialBackoff::from_millis(exponent_base).factor(factor);

/// Apply a maximum delay. No retry delay will be longer than this `Duration`.
let mut retry_strategy = ExponentialBackoff::from_millis(exponent_base).factor(factor);
if let Some(max_retry_delay) = max_delay {
retry_strategy =
retry_strategy.max_delay(std::time::Duration::from_millis(max_retry_delay));
}

let mut connection = Self::new_connection(
client.clone(),
retry_strategy.clone(),
Expand Down
31 changes: 24 additions & 7 deletions redis/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::time::Duration;

use crate::{
connection::{connect, Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo},
connection::{
connect, Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo, RetryStrategyInfo,
},
types::{RedisResult, Value},
};
#[cfg(feature = "aio")]
Expand Down Expand Up @@ -29,17 +31,24 @@ pub struct Client {
/// Example usage::
///
/// ```rust,no_run
/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
/// let client = redis::Client::open("redis://127.0.0.1/", None).unwrap();
/// let con = client.get_connection().unwrap();
/// ```
impl Client {
/// Connects to a redis server and returns a client. This does not
/// actually open a connection yet but it does perform some basic
/// checks on the URL that might make the operation fail.
pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
Ok(Client {
connection_info: params.into_connection_info()?,
})
pub fn open<T: IntoConnectionInfo>(
params: T,
retry_strategy_info: Option<RetryStrategyInfo>,
) -> RedisResult<Client> {
let mut connection_info = params.into_connection_info()?;

if retry_strategy_info.is_some() {
connection_info.retry_strategy = retry_strategy_info
}

Ok(Client { connection_info })
}

/// Instructs the client to actually connect to redis and returns a
Expand Down Expand Up @@ -418,11 +427,13 @@ impl Client {
exponent_base: u64,
factor: u64,
number_of_retries: usize,
max_delay: Option<u64>,
) -> RedisResult<crate::aio::ConnectionManager> {
self.get_connection_manager_with_backoff_and_timeouts(
exponent_base,
factor,
number_of_retries,
max_delay,
std::time::Duration::MAX,
std::time::Duration::MAX,
)
Expand Down Expand Up @@ -454,6 +465,7 @@ impl Client {
exponent_base: u64,
factor: u64,
number_of_retries: usize,
max_delay: Option<u64>,
response_timeout: std::time::Duration,
connection_timeout: std::time::Duration,
) -> RedisResult<crate::aio::ConnectionManager> {
Expand All @@ -462,6 +474,7 @@ impl Client {
exponent_base,
factor,
number_of_retries,
max_delay,
response_timeout,
connection_timeout,
)
Expand Down Expand Up @@ -492,6 +505,7 @@ impl Client {
exponent_base: u64,
factor: u64,
number_of_retries: usize,
max_delay: Option<u64>,
response_timeout: std::time::Duration,
connection_timeout: std::time::Duration,
) -> RedisResult<crate::aio::ConnectionManager> {
Expand All @@ -500,6 +514,7 @@ impl Client {
exponent_base,
factor,
number_of_retries,
max_delay,
response_timeout,
connection_timeout,
)
Expand Down Expand Up @@ -530,12 +545,14 @@ impl Client {
exponent_base: u64,
factor: u64,
number_of_retries: usize,
max_delay: Option<u64>,
) -> RedisResult<crate::aio::ConnectionManager> {
crate::aio::ConnectionManager::new_with_backoff(
self.clone(),
exponent_base,
factor,
number_of_retries,
max_delay,
)
.await
}
Expand Down Expand Up @@ -743,6 +760,6 @@ mod test {

#[test]
fn regression_293_parse_ipv6_with_interface() {
assert!(Client::open(("fe80::cafe:beef%eno1", 6379)).is_ok());
assert!(Client::open(("fe80::cafe:beef%eno1", 6379), None).is_ok());
}
}
32 changes: 30 additions & 2 deletions redis/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ pub struct ConnectionInfo {

/// A boxed connection address for where to connect to.
pub redis: RedisConnectionInfo,

/// In case of reconnection issues, the manager will retry reconnection
pub retry_strategy: Option<RetryStrategyInfo>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't the place for this - if only one connection type uses this, it should be in that type's config, not here.

}

/// Redis specific/connection independent information used to establish a connection to redis.
Expand All @@ -226,6 +229,21 @@ pub struct RedisConnectionInfo {
pub protocol: ProtocolVersion,
}

#[derive(Clone, Debug, Default)]
pub struct RetryStrategyInfo {
/// The resulting duration is calculated by taking the base to the `n`-th power,
/// where `n` denotes the number of past attempts.
pub exponent_base: Option<u64>,
/// A multiplicative factor that will be applied to the retry delay.
///
/// For example, using a factor of `1000` will make each delay in units of seconds.
pub factor: Option<u64>,
/// number_of_retries times, with an exponentially increasing delay
pub number_of_retries: Option<usize>,
/// Apply a maximum delay. No retry delay will be longer than this `Duration`.
pub max_delay: Option<u64>,
}

impl FromStr for ConnectionInfo {
type Err = RedisError;

Expand Down Expand Up @@ -273,6 +291,7 @@ where
Ok(ConnectionInfo {
addr: ConnectionAddr::Tcp(self.0.into(), self.1),
redis: RedisConnectionInfo::default(),
retry_strategy: None,
})
}
}
Expand Down Expand Up @@ -390,6 +409,7 @@ fn url_to_tcp_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
_ => ProtocolVersion::RESP2,
},
},
retry_strategy: None,
})
}

Expand Down Expand Up @@ -421,6 +441,7 @@ fn url_to_unix_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
_ => ProtocolVersion::RESP2,
},
},
retry_strategy: None,
})
}

Expand Down Expand Up @@ -1443,7 +1464,7 @@ where
///
/// ```rust,no_run
/// # fn do_something() -> redis::RedisResult<()> {
/// let client = redis::Client::open("redis://127.0.0.1/")?;
/// let client = redis::Client::open("redis://127.0.0.1/", None)?;
/// let mut con = client.get_connection()?;
/// let mut pubsub = con.as_pubsub();
/// pubsub.subscribe("channel_1")?;
Expand Down Expand Up @@ -1673,7 +1694,7 @@ impl Msg {
/// ```rust,no_run
/// use redis::Commands;
/// # fn do_something() -> redis::RedisResult<()> {
/// # let client = redis::Client::open("redis://127.0.0.1/").unwrap();
/// # let client = redis::Client::open("redis://127.0.0.1/", None).unwrap();
/// # let mut con = client.get_connection().unwrap();
/// let key = "the_key";
/// let (new_val,) : (isize,) = redis::transaction(&mut con, &[key], |con, pipe| {
Expand Down Expand Up @@ -1791,13 +1812,15 @@ mod tests {
ConnectionInfo {
addr: ConnectionAddr::Tcp("127.0.0.1".to_string(), 6379),
redis: Default::default(),
retry_strategy: None,
},
),
(
url::Url::parse("redis://[::1]").unwrap(),
ConnectionInfo {
addr: ConnectionAddr::Tcp("::1".to_string(), 6379),
redis: Default::default(),
retry_strategy: None,
},
),
(
Expand All @@ -1810,6 +1833,7 @@ mod tests {
password: Some("#@<>$".to_string()),
..Default::default()
},
retry_strategy: None,
},
),
];
Expand Down Expand Up @@ -1877,6 +1901,7 @@ mod tests {
password: None,
protocol: ProtocolVersion::RESP2,
},
retry_strategy: None,
},
),
(
Expand All @@ -1887,6 +1912,7 @@ mod tests {
db: 1,
..Default::default()
},
retry_strategy: None,
},
),
(
Expand All @@ -1902,6 +1928,7 @@ mod tests {
password: Some("#@<>$".to_string()),
..Default::default()
},
retry_strategy: None,
},
),
(
Expand All @@ -1917,6 +1944,7 @@ mod tests {
password: Some("&?= *+".to_string()),
..Default::default()
},
retry_strategy: None,
},
),
];
Expand Down
Loading