Skip to content

Commit

Permalink
rename BackOffOptions to ConnectionRetryOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal committed Feb 24, 2021
1 parent 943cf0a commit 5adb842
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 27 deletions.
26 changes: 13 additions & 13 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use futures::channel::{mpsc, oneshot};

use crate::connection::Authentication;
use crate::connection_manager::{BackOffOptions, BrokerAddress, ConnectionManager, TlsOptions};
use crate::connection_manager::{ConnectionRetryOptions, BrokerAddress, ConnectionManager, TlsOptions};
use crate::consumer::ConsumerBuilder;
use crate::error::Error;
use crate::executor::Executor;
Expand Down Expand Up @@ -105,12 +105,12 @@ impl<'a> SerializeMessage for &'a str {
/// ```rust,no_run
/// use pulsar::{Pulsar, TokioExecutor};
///
/// # async fn run(auth: pulsar::Authentication, backoff: pulsar::BackOffOptions) -> Result<(), pulsar::Error> {
/// # async fn run(auth: pulsar::Authentication, retry: pulsar::ConnectionRetryOptions) -> Result<(), pulsar::Error> {
/// let addr = "pulsar://127.0.0.1:6650";
/// // you can indicate which executor you use as the return type of client creation
/// let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor)
/// .with_auth(auth)
/// .with_back_off_options(backoff)
/// .with_connection_retry_options(retry)
/// .build()
/// .await?;
///
Expand Down Expand Up @@ -145,14 +145,14 @@ impl<Exe: Executor> Pulsar<Exe> {
pub(crate) async fn new<S: Into<String>>(
url: S,
auth: Option<Authentication>,
backoff_parameters: Option<BackOffOptions>,
retry_parameters: Option<ConnectionRetryOptions>,
tls_options: Option<TlsOptions>,
executor: Exe,
) -> Result<Self, Error> {
let url: String = url.into();
let executor = Arc::new(executor);
let manager =
ConnectionManager::new(url, auth, backoff_parameters, tls_options, executor.clone())
ConnectionManager::new(url, auth, retry_parameters, tls_options, executor.clone())
.await?;
let manager = Arc::new(manager);
let service_discovery = Arc::new(ServiceDiscovery::with_manager(manager.clone()));
Expand All @@ -177,7 +177,7 @@ impl<Exe: Executor> Pulsar<Exe> {
PulsarBuilder {
url: url.into(),
auth: None,
back_off_options: None,
connection_retry_options: None,
tls_options: None,
executor,
}
Expand Down Expand Up @@ -304,7 +304,7 @@ impl<Exe: Executor> Pulsar<Exe> {
pub struct PulsarBuilder<Exe: Executor> {
url: String,
auth: Option<Authentication>,
back_off_options: Option<BackOffOptions>,
connection_retry_options: Option<ConnectionRetryOptions>,
tls_options: Option<TlsOptions>,
executor: Exe,
}
Expand All @@ -315,18 +315,18 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
PulsarBuilder {
url: self.url,
auth: Some(auth),
back_off_options: self.back_off_options,
connection_retry_options: self.connection_retry_options,
tls_options: self.tls_options,
executor: self.executor,
}
}

/// Exponential back off parameters for automatic reconnection
pub fn with_back_off_options(self, back_off_options: BackOffOptions) -> Self {
pub fn with_connection_retry_options(self, connection_retry_options: ConnectionRetryOptions) -> Self {
PulsarBuilder {
url: self.url,
auth: self.auth,
back_off_options: Some(back_off_options),
connection_retry_options: Some(connection_retry_options),
tls_options: self.tls_options,
executor: self.executor,
}
Expand All @@ -337,7 +337,7 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
PulsarBuilder {
url: self.url,
auth: self.auth,
back_off_options: self.back_off_options,
connection_retry_options: self.connection_retry_options,
tls_options: Some(TlsOptions {
certificate_chain: Some(certificate_chain),
}),
Expand All @@ -364,11 +364,11 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
let PulsarBuilder {
url,
auth,
back_off_options,
connection_retry_options,
tls_options,
executor,
} = self;
Pulsar::new(url, auth, back_off_options, tls_options, executor).await
Pulsar::new(url, auth, connection_retry_options, tls_options, executor).await
}
}

Expand Down
26 changes: 13 additions & 13 deletions src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct BrokerAddress {

/// configuration for reconnection exponential back off
#[derive(Debug, Clone)]
pub struct BackOffOptions {
pub struct ConnectionRetryOptions {
/// minimum time between connection retries
pub min_backoff: Duration,
/// maximum time between rconnection etries
Expand All @@ -37,9 +37,9 @@ pub struct BackOffOptions {
pub operation_timeout: Duration,
}

impl std::default::Default for BackOffOptions {
impl std::default::Default for ConnectionRetryOptions {
fn default() -> Self {
BackOffOptions {
ConnectionRetryOptions {
min_backoff: Duration::from_millis(10),
max_backoff: Duration::from_secs(30),
max_retries: 12u32,
Expand Down Expand Up @@ -71,7 +71,7 @@ pub struct ConnectionManager<Exe: Executor> {
auth: Option<Authentication>,
pub(crate) executor: Arc<Exe>,
connections: Arc<Mutex<HashMap<BrokerAddress, ConnectionStatus<Exe>>>>,
back_off_options: BackOffOptions,
connection_retry_options: ConnectionRetryOptions,
tls_options: TlsOptions,
certificate_chain: Vec<native_tls::Certificate>,
}
Expand All @@ -80,11 +80,11 @@ impl<Exe: Executor> ConnectionManager<Exe> {
pub async fn new(
url: String,
auth: Option<Authentication>,
backoff: Option<BackOffOptions>,
connection_retry: Option<ConnectionRetryOptions>,
tls: Option<TlsOptions>,
executor: Arc<Exe>,
) -> Result<Self, ConnectionError> {
let back_off_options = backoff.unwrap_or_default();
let connection_retry_options = connection_retry.unwrap_or_default();
let tls_options = tls.unwrap_or_default();
let url = Url::parse(&url)
.map_err(|e| {
Expand Down Expand Up @@ -118,7 +118,7 @@ impl<Exe: Executor> ConnectionManager<Exe> {
auth,
executor,
connections: Arc::new(Mutex::new(HashMap::new())),
back_off_options,
connection_retry_options,
tls_options,
certificate_chain,
};
Expand Down Expand Up @@ -240,8 +240,8 @@ impl<Exe: Executor> ConnectionManager<Exe> {
self.auth.clone(),
proxy_url.clone(),
&self.certificate_chain,
self.back_off_options.connection_timeout,
self.back_off_options.operation_timeout,
self.connection_retry_options.connection_timeout,
self.connection_retry_options.operation_timeout,
self.executor.clone(),
)
.await
Expand All @@ -253,15 +253,15 @@ impl<Exe: Executor> ConnectionManager<Exe> {
return Err(ConnectionError::Io(e));
}

if current_retries == self.back_off_options.max_retries {
if current_retries == self.connection_retry_options.max_retries {
return Err(ConnectionError::Io(e));
}

let jitter = rand::thread_rng().gen_range(0..10);
current_backoff = std::cmp::min(
self.back_off_options.min_backoff * 2u32.saturating_pow(current_retries),
self.back_off_options.max_backoff,
) + self.back_off_options.min_backoff * jitter;
self.connection_retry_options.min_backoff * 2u32.saturating_pow(current_retries),
self.connection_retry_options.max_backoff,
) + self.connection_retry_options.min_backoff * jitter;
current_retries += 1;

trace!(
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ extern crate serde;

pub use client::{DeserializeMessage, Pulsar, PulsarBuilder, SerializeMessage};
pub use connection::Authentication;
pub use connection_manager::{BackOffOptions, BrokerAddress, TlsOptions};
pub use connection_manager::{ConnectionRetryOptions, BrokerAddress, TlsOptions};
pub use consumer::{Consumer, ConsumerBuilder, ConsumerOptions};
pub use error::Error;
#[cfg(feature = "async-std-runtime")]
Expand Down

0 comments on commit 5adb842

Please sign in to comment.