Skip to content

Commit

Permalink
make the ServiceNotReady retry configurable
Browse files Browse the repository at this point in the history
retry indefinitely by default, since we are just waiting for the server to finish
moving stuff around
  • Loading branch information
Geal committed Feb 24, 2021
1 parent 6dfe4e8 commit 0afe6e6
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 59 deletions.
24 changes: 20 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::sync::Arc;
use futures::channel::{mpsc, oneshot};

use crate::connection::Authentication;
use crate::connection_manager::{ConnectionRetryOptions, BrokerAddress, ConnectionManager, TlsOptions};
use crate::connection_manager::{ConnectionRetryOptions, OperationRetryOptions,
BrokerAddress, ConnectionManager, TlsOptions};
use crate::consumer::ConsumerBuilder;
use crate::error::Error;
use crate::executor::Executor;
Expand Down Expand Up @@ -137,6 +138,7 @@ pub struct Pulsar<Exe: Executor> {
// run_producer, then fill in the producer field afterwards in the
// main Pulsar instance
producer: Option<mpsc::UnboundedSender<SendMessage>>,
pub(crate) operation_retry_options: OperationRetryOptions,
pub(crate) executor: Arc<Exe>,
}

Expand All @@ -145,14 +147,18 @@ impl<Exe: Executor> Pulsar<Exe> {
pub(crate) async fn new<S: Into<String>>(
url: S,
auth: Option<Authentication>,
retry_parameters: Option<ConnectionRetryOptions>,
connection_retry_parameters: Option<ConnectionRetryOptions>,
operation_retry_parameters: Option<OperationRetryOptions>,
tls_options: Option<TlsOptions>,
executor: Exe,
) -> Result<Self, Error> {
let url: String = url.into();
let executor = Arc::new(executor);
let operation_retry_options = operation_retry_parameters.unwrap_or_default();
let manager =
ConnectionManager::new(url, auth, retry_parameters, tls_options, executor.clone())
ConnectionManager::new(url, auth, connection_retry_parameters,
operation_retry_options.clone(), tls_options,
executor.clone())
.await?;
let manager = Arc::new(manager);
let service_discovery = Arc::new(ServiceDiscovery::with_manager(manager.clone()));
Expand All @@ -162,6 +168,7 @@ impl<Exe: Executor> Pulsar<Exe> {
manager,
service_discovery,
producer: None,
operation_retry_options,
executor,
};

Expand All @@ -178,6 +185,7 @@ impl<Exe: Executor> Pulsar<Exe> {
url: url.into(),
auth: None,
connection_retry_options: None,
operation_retry_options: None,
tls_options: None,
executor,
}
Expand Down Expand Up @@ -305,6 +313,7 @@ pub struct PulsarBuilder<Exe: Executor> {
url: String,
auth: Option<Authentication>,
connection_retry_options: Option<ConnectionRetryOptions>,
operation_retry_options: Option<OperationRetryOptions>,
tls_options: Option<TlsOptions>,
executor: Exe,
}
Expand All @@ -322,6 +331,12 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
self
}

/// Retry parameters for Pulsar operations
pub fn with_operation_retry_options(mut self, operation_retry_options: OperationRetryOptions) -> Self {
self.operation_retry_options = Some(operation_retry_options);
self
}

/// add a custom certificate chain to authenticate the server in TLS connectioons
pub fn with_certificate_chain(mut self, certificate_chain: Vec<u8>) -> Self {
self.tls_options = Some(TlsOptions {
Expand Down Expand Up @@ -350,10 +365,11 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
url,
auth,
connection_retry_options,
operation_retry_options,
tls_options,
executor,
} = self;
Pulsar::new(url, auth, connection_retry_options, tls_options, executor).await
Pulsar::new(url, auth, connection_retry_options, operation_retry_options, tls_options, executor).await
}
}

Expand Down
31 changes: 26 additions & 5 deletions src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@ pub struct BrokerAddress {
/// configuration for reconnection exponential back off
#[derive(Debug, Clone)]
pub struct ConnectionRetryOptions {
/// minimum time between connection retries
/// minimum delay between connection retries
pub min_backoff: Duration,
/// maximum time between rconnection etries
/// maximum delay between rconnection etries
pub max_backoff: Duration,
/// maximum number of connection retries
pub max_retries: u32,
/// time limit to establish a connection
pub connection_timeout: Duration,
/// time limit to receive an answer to a Pulsar operation
pub operation_timeout: Duration,
}

impl std::default::Default for ConnectionRetryOptions {
Expand All @@ -44,7 +42,27 @@ impl std::default::Default for ConnectionRetryOptions {
max_backoff: Duration::from_secs(30),
max_retries: 12u32,
connection_timeout: Duration::from_secs(10),
}
}
}

/// configuration for Pulsar operation retries
#[derive(Debug, Clone)]
pub struct OperationRetryOptions {
/// time limit to receive an answer to a Pulsar operation
pub operation_timeout: Duration,
/// delay between operation retries after a ServiceNotReady error
pub retry_delay: Duration,
/// maximum number of operation retries. None indicates infinite retries
pub max_retries: Option<u32>,
}

impl std::default::Default for OperationRetryOptions {
fn default() -> Self {
OperationRetryOptions {
operation_timeout: Duration::from_secs(30),
retry_delay: Duration::from_millis(500),
max_retries: None,
}
}
}
Expand Down Expand Up @@ -72,6 +90,7 @@ pub struct ConnectionManager<Exe: Executor> {
pub(crate) executor: Arc<Exe>,
connections: Arc<Mutex<HashMap<BrokerAddress, ConnectionStatus<Exe>>>>,
connection_retry_options: ConnectionRetryOptions,
pub(crate) operation_retry_options: OperationRetryOptions,
tls_options: TlsOptions,
certificate_chain: Vec<native_tls::Certificate>,
}
Expand All @@ -81,6 +100,7 @@ impl<Exe: Executor> ConnectionManager<Exe> {
url: String,
auth: Option<Authentication>,
connection_retry: Option<ConnectionRetryOptions>,
operation_retry_options: OperationRetryOptions,
tls: Option<TlsOptions>,
executor: Arc<Exe>,
) -> Result<Self, ConnectionError> {
Expand Down Expand Up @@ -119,6 +139,7 @@ impl<Exe: Executor> ConnectionManager<Exe> {
executor,
connections: Arc::new(Mutex::new(HashMap::new())),
connection_retry_options,
operation_retry_options,
tls_options,
certificate_chain,
};
Expand Down Expand Up @@ -241,7 +262,7 @@ impl<Exe: Executor> ConnectionManager<Exe> {
proxy_url.clone(),
&self.certificate_chain,
self.connection_retry_options.connection_timeout,
self.connection_retry_options.operation_timeout,
self.operation_retry_options.operation_timeout,
self.executor.clone(),
)
.await
Expand Down
24 changes: 13 additions & 11 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,10 @@ impl<T: DeserializeMessage, Exe: Executor> TopicConsumer<T, Exe> {
let batch_size = batch_size.unwrap_or(1000);

let mut connection = client.manager.get_connection(&addr).await?;
let mut max_retries = 20u8;
let mut retried = false;
let mut current_retries = 0u32;
let start = std::time::Instant::now();
let operation_retry_options = client.operation_retry_options.clone();

loop {
match connection
.sender()
Expand All @@ -357,12 +358,12 @@ impl<T: DeserializeMessage, Exe: Executor> TopicConsumer<T, Exe> {
.await
{
Ok(_) => {
if retried {
if current_retries > 0 {
let dur = (std::time::Instant::now() - start).as_secs();
log::info!(
"subscribe({}) success after {} retries over {} seconds",
topic,
20 - max_retries,
current_retries +1,
dur
);
}
Expand All @@ -372,13 +373,14 @@ impl<T: DeserializeMessage, Exe: Executor> TopicConsumer<T, Exe> {
Some(proto::ServerError::ServiceNotReady),
text,
)) => {
if max_retries > 0 {
error!("subscribe({}) answered ServiceNotReady, retrying request after 500ms (max_retries = {}): {}",
topic, max_retries, text.unwrap_or_else(String::new));

max_retries -= 1;
retried = true;
client.executor.delay(Duration::from_millis(500)).await;
if operation_retry_options.max_retries.is_none() ||
operation_retry_options.max_retries.unwrap() > current_retries {
error!("subscribe({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?}): {}",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries, text.unwrap_or_else(String::new));

current_retries += 1;
client.executor.delay(operation_retry_options.retry_delay).await;

// we need to look up again the topic's address
let prev = addr;
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ extern crate serde;

pub use client::{DeserializeMessage, Pulsar, PulsarBuilder, SerializeMessage};
pub use connection::Authentication;
pub use connection_manager::{ConnectionRetryOptions, BrokerAddress, TlsOptions};
pub use connection_manager::{ConnectionRetryOptions, OperationRetryOptions,
BrokerAddress, TlsOptions};
pub use consumer::{Consumer, ConsumerBuilder, ConsumerOptions};
pub use error::Error;
#[cfg(feature = "async-std-runtime")]
Expand Down
21 changes: 12 additions & 9 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,9 @@ impl<Exe: Executor> TopicProducer<Exe> {
};

let producer_name: ProducerName;
let mut max_retries = 20u8;
let mut retried = false;
let mut current_retries = 0u32;
let start = std::time::Instant::now();
let operation_retry_options = client.operation_retry_options.clone();

loop {
match connection
Expand All @@ -405,12 +405,12 @@ impl<Exe: Executor> TopicProducer<Exe> {
Ok(success) => {
producer_name = success.producer_name;

if retried {
if current_retries > 0 {
let dur = (std::time::Instant::now() - start).as_secs();
log::info!(
"subscribe({}) success after {} retries over {} seconds",
topic,
20 - max_retries,
current_retries + 1,
dur
);
}
Expand All @@ -420,13 +420,16 @@ impl<Exe: Executor> TopicProducer<Exe> {
Some(proto::ServerError::ServiceNotReady),
text,
)) => {
if max_retries > 0 {
error!("create_producer({}) answered ServiceNotReady, retrying request after 500ms (max_retries = {}): {}", topic, max_retries, text.unwrap_or_else(String::new));
max_retries -= 1;
retried = true;
if operation_retry_options.max_retries.is_none() ||
operation_retry_options.max_retries.unwrap() > current_retries {
error!("create_producer({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?}): {}",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries, text.unwrap_or_else(String::new));

current_retries += 1;
client
.executor
.delay(std::time::Duration::from_millis(500))
.delay(operation_retry_options.retry_delay)
.await;

let addr = client.lookup_topic(&topic).await?;
Expand Down
61 changes: 32 additions & 29 deletions src/service_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ impl<Exe: Executor> ServiceDiscovery<Exe> {
let mut is_authoritative = false;
let mut broker_address = self.manager.get_base_address();

let mut max_retries = 20u8;
let mut retried = false;
let mut current_retries = 0u32;
let start = std::time::Instant::now();
let operation_retry_options = self.manager.operation_retry_options.clone();

loop {
let response = match conn
Expand All @@ -62,32 +62,32 @@ impl<Exe: Executor> ServiceDiscovery<Exe> {
== Some(command_lookup_topic_response::LookupType::Failed as i32)
{
let error = response.error.and_then(crate::error::server_error);
if error == Some(crate::message::proto::ServerError::ServiceNotReady)
&& max_retries > 0
{
error!("lookup({}) answered ServiceNotReady, retrying request after 500ms (max_retries = {})", topic, max_retries);
max_retries -= 1;
retried = true;
self.manager
.executor
.delay(Duration::from_millis(500))
.await;
continue;
} else if max_retries == 0 {
error!("lookup({}) reached max retries", topic);
if error == Some(crate::message::proto::ServerError::ServiceNotReady) {
if operation_retry_options.max_retries.is_none() ||
operation_retry_options.max_retries.unwrap() > current_retries {
error!("lookup({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?})", topic, operation_retry_options.retry_delay.as_millis(), operation_retry_options.max_retries);
current_retries += 1;
self.manager
.executor
.delay(operation_retry_options.retry_delay)
.await;
continue;
} else {
error!("lookup({}) reached max retries", topic);
}
}
return Err(ServiceDiscoveryError::Query(
error,
response.message.clone(),
));
}

if retried {
if current_retries >0 {
let dur = (std::time::Instant::now() - start).as_secs();
log::info!(
"lookup({}) success after {} retries over {} seconds",
topic,
20 - max_retries,
current_retries + 1,
dur
);
}
Expand Down Expand Up @@ -151,9 +151,9 @@ impl<Exe: Executor> ServiceDiscovery<Exe> {
let mut connection = self.manager.get_base_connection().await?;
let topic = topic.into();

let mut max_retries = 20u8;
let mut retried = false;
let mut current_retries = 0u32;
let start = std::time::Instant::now();
let operation_retry_options = self.manager.operation_retry_options.clone();

let response = loop {
let response = match connection.sender().lookup_partitioned_topic(&topic).await {
Expand All @@ -171,23 +171,26 @@ impl<Exe: Executor> ServiceDiscovery<Exe> {
== Some(command_partitioned_topic_metadata_response::LookupType::Failed as i32)
{
let error = response.error.and_then(crate::error::server_error);
if error == Some(crate::message::proto::ServerError::ServiceNotReady)
&& max_retries > 0
{
error!("lookup_partitioned_topic_number({}) answered ServiceNotReady, retrying request after 500ms (max_retries = {})", topic, max_retries);
max_retries -= 1;
retried = true;
if error == Some(crate::message::proto::ServerError::ServiceNotReady) {
if operation_retry_options.max_retries.is_none() ||
operation_retry_options.max_retries.unwrap() > current_retries {
error!("lookup_partitioned_topic_number({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?})",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries);

current_retries += 1;
self.manager
.executor
.delay(Duration::from_millis(500))
.delay(operation_retry_options.retry_delay)
.await;
continue;
} else if max_retries == 0 {
} else {
error!(
"lookup_partitioned_topic_number({}) reached max retries",
topic
);
}
}
return Err(ServiceDiscoveryError::Query(
error,
response.message.clone(),
Expand All @@ -197,12 +200,12 @@ impl<Exe: Executor> ServiceDiscovery<Exe> {
break response;
};

if retried {
if current_retries > 0 {
let dur = (std::time::Instant::now() - start).as_secs();
log::info!(
"lookup_partitioned_topic_number({}) success after {} retries over {} seconds",
topic,
20 - max_retries,
current_retries + 1,
dur
);
}
Expand Down

0 comments on commit 0afe6e6

Please sign in to comment.