Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal committed Mar 5, 2021
1 parent 5a5ca11 commit 33f913a
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 81 deletions.
39 changes: 29 additions & 10 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::sync::Arc;
use futures::channel::{mpsc, oneshot};

use crate::connection::Authentication;
use crate::connection_manager::{ConnectionRetryOptions, OperationRetryOptions,
BrokerAddress, ConnectionManager, TlsOptions};
use crate::connection_manager::{
BrokerAddress, ConnectionManager, ConnectionRetryOptions, OperationRetryOptions, TlsOptions,
};
use crate::consumer::ConsumerBuilder;
use crate::error::Error;
use crate::executor::Executor;
Expand Down Expand Up @@ -155,11 +156,15 @@ impl<Exe: Executor> Pulsar<Exe> {
let url: String = url.into();
let executor = Arc::new(executor);
let operation_retry_options = operation_retry_parameters.unwrap_or_default();
let manager =
ConnectionManager::new(url, auth, connection_retry_parameters,
operation_retry_options.clone(), tls_options,
executor.clone())
.await?;
let manager = ConnectionManager::new(
url,
auth,
connection_retry_parameters,
operation_retry_options.clone(),
tls_options,
executor.clone(),
)
.await?;
let manager = Arc::new(manager);
let service_discovery = Arc::new(ServiceDiscovery::with_manager(manager.clone()));
let (producer, producer_rx) = mpsc::unbounded();
Expand Down Expand Up @@ -326,13 +331,19 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
}

/// Exponential back off parameters for automatic reconnection
pub fn with_connection_retry_options(mut self, connection_retry_options: ConnectionRetryOptions) -> Self {
pub fn with_connection_retry_options(
mut self,
connection_retry_options: ConnectionRetryOptions,
) -> Self {
self.connection_retry_options = Some(connection_retry_options);
self
}

/// Retry parameters for Pulsar operations
pub fn with_operation_retry_options(mut self, operation_retry_options: OperationRetryOptions) -> Self {
pub fn with_operation_retry_options(
mut self,
operation_retry_options: OperationRetryOptions,
) -> Self {
self.operation_retry_options = Some(operation_retry_options);
self
}
Expand Down Expand Up @@ -369,7 +380,15 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
tls_options,
executor,
} = self;
Pulsar::new(url, auth, connection_retry_options, operation_retry_options, tls_options, executor).await
Pulsar::new(
url,
auth,
connection_retry_options,
operation_retry_options,
tls_options,
executor,
)
.await
}
}

Expand Down
83 changes: 56 additions & 27 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,14 +467,12 @@ impl<Exe: Executor> ConnectionSender<Exe> {

match select(response, delay_f).await {
Either::Left((res, _)) => res,
Either::Right(_) => {
Err(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timeout sending message to the Pulsar server",
)))
}
Either::Right(_) => Err(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timeout sending message to the Pulsar server",
))),
}
},
}
_ => Err(ConnectionError::Disconnected),
}
}
Expand Down Expand Up @@ -512,23 +510,26 @@ impl<Exe: Executor> Connection<Exe> {
};

let u = url.clone();
let address: SocketAddr = match executor.spawn_blocking(move || {
u.socket_addrs(|| match u.scheme() {
"pulsar" => Some(6650),
"pulsar+ssl" => Some(6651),
_ => None,
})
.map_err(|e| {
error!("could not look up address: {:?}", e);
e
})
.ok()
.and_then(|v| {
let mut rng = thread_rng();
let index: usize = rng.gen_range(0..v.len());
v.get(index).copied()
let address: SocketAddr = match executor
.spawn_blocking(move || {
u.socket_addrs(|| match u.scheme() {
"pulsar" => Some(6650),
"pulsar+ssl" => Some(6651),
_ => None,
})
.map_err(|e| {
error!("could not look up address: {:?}", e);
e
})
.ok()
.and_then(|v| {
let mut rng = thread_rng();
let index: usize = rng.gen_range(0..v.len());
v.get(index).copied()
})
})
}).await {
.await
{
Some(Some(address)) => address,
_ =>
//return Err(Error::Custom(format!("could not query address: {}", url))),
Expand Down Expand Up @@ -597,13 +598,27 @@ impl<Exe: Executor> Connection<Exe> {
.await
.map(|stream| tokio_util::codec::Framed::new(stream, Codec))?;

Connection::connect(stream, auth_data, proxy_to_broker_url, executor, operation_timeout).await
Connection::connect(
stream,
auth_data,
proxy_to_broker_url,
executor,
operation_timeout,
)
.await
} else {
let stream = tokio::net::TcpStream::connect(&address)
.await
.map(|stream| tokio_util::codec::Framed::new(stream, Codec))?;

Connection::connect(stream, auth_data, proxy_to_broker_url, executor, operation_timeout).await
Connection::connect(
stream,
auth_data,
proxy_to_broker_url,
executor,
operation_timeout,
)
.await
}
}
#[cfg(not(feature = "tokio-runtime"))]
Expand All @@ -623,13 +638,27 @@ impl<Exe: Executor> Connection<Exe> {
.await
.map(|stream| asynchronous_codec::Framed::new(stream, Codec))?;

Connection::connect(stream, auth_data, proxy_to_broker_url, executor, operation_timeout).await
Connection::connect(
stream,
auth_data,
proxy_to_broker_url,
executor,
operation_timeout,
)
.await
} else {
let stream = async_std::net::TcpStream::connect(&address)
.await
.map(|stream| asynchronous_codec::Framed::new(stream, Codec))?;

Connection::connect(stream, auth_data, proxy_to_broker_url, executor, operation_timeout).await
Connection::connect(
stream,
auth_data,
proxy_to_broker_url,
executor,
operation_timeout,
)
.await
}
}
#[cfg(not(feature = "async-std-runtime"))]
Expand Down
13 changes: 9 additions & 4 deletions src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ impl<Exe: Executor> ConnectionManager<Exe> {
}
}

async fn connect(&self, broker: BrokerAddress) -> Result<Arc<Connection<Exe>>, ConnectionError> {
async fn connect(
&self,
broker: BrokerAddress,
) -> Result<Arc<Connection<Exe>>, ConnectionError> {
debug!("ConnectionManager::connect({:?})", broker);

let rx = {
Expand Down Expand Up @@ -269,8 +272,9 @@ impl<Exe: Executor> ConnectionManager<Exe> {
{
Ok(c) => break c,
Err(ConnectionError::Io(e)) => {
if e.kind() != std::io::ErrorKind::ConnectionRefused ||
e.kind() != std::io::ErrorKind::TimedOut {
if e.kind() != std::io::ErrorKind::ConnectionRefused
|| e.kind() != std::io::ErrorKind::TimedOut
{
return Err(ConnectionError::Io(e));
}

Expand All @@ -280,7 +284,8 @@ impl<Exe: Executor> ConnectionManager<Exe> {

let jitter = rand::thread_rng().gen_range(0..10);
current_backoff = std::cmp::min(
self.connection_retry_options.min_backoff * 2u32.saturating_pow(current_retries),
self.connection_retry_options.min_backoff
* 2u32.saturating_pow(current_retries),
self.connection_retry_options.max_backoff,
) + self.connection_retry_options.min_backoff * jitter;
current_retries += 1;
Expand Down
17 changes: 12 additions & 5 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl<T: DeserializeMessage, Exe: Executor> TopicConsumer<T, Exe> {
log::info!(
"subscribe({}) success after {} retries over {} seconds",
topic,
current_retries +1,
current_retries + 1,
dur
);
}
Expand All @@ -373,14 +373,18 @@ impl<T: DeserializeMessage, Exe: Executor> TopicConsumer<T, Exe> {
Some(proto::ServerError::ServiceNotReady),
text,
)) => {
if operation_retry_options.max_retries.is_none() ||
operation_retry_options.max_retries.unwrap() > current_retries {
if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
error!("subscribe({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?}): {}",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries, text.unwrap_or_else(String::new));

current_retries += 1;
client.executor.delay(operation_retry_options.retry_delay).await;
client
.executor
.delay(operation_retry_options.retry_delay)
.await;

// we need to look up again the topic's address
let prev = addr;
Expand Down Expand Up @@ -1509,7 +1513,10 @@ impl<T: DeserializeMessage, Exe: Executor> MultiTopicConsumer<T, Exe> {

self.new_consumers = Some(Box::pin(async move {
let topics = pulsar
.get_topics_of_namespace(namespace.clone(), proto::command_get_topics_of_namespace::Mode::All)
.get_topics_of_namespace(
namespace.clone(),
proto::command_get_topics_of_namespace::Mode::All,
)
.await?;
trace!("fetched topics {:?}", topics);

Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ extern crate serde;

pub use client::{DeserializeMessage, Pulsar, PulsarBuilder, SerializeMessage};
pub use connection::Authentication;
pub use connection_manager::{ConnectionRetryOptions, OperationRetryOptions,
BrokerAddress, TlsOptions};
pub use connection_manager::{
BrokerAddress, ConnectionRetryOptions, OperationRetryOptions, TlsOptions,
};
pub use consumer::{Consumer, ConsumerBuilder, ConsumerOptions};
pub use error::Error;
#[cfg(feature = "async-std-runtime")]
Expand Down
13 changes: 8 additions & 5 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,9 @@ impl<Exe: Executor> TopicProducer<Exe> {
Some(proto::ServerError::ServiceNotReady),
text,
)) => {
if operation_retry_options.max_retries.is_none() ||
operation_retry_options.max_retries.unwrap() > current_retries {
if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
error!("create_producer({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?}): {}",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries, text.unwrap_or_else(String::new));
Expand Down Expand Up @@ -691,9 +692,11 @@ impl<Exe: Executor> TopicProducer<Exe> {
{
Ok(receipt) => return Ok(receipt),
Err(ConnectionError::Disconnected) => {}
Err(ConnectionError::Io(e)) => if e.kind() != std::io::ErrorKind::TimedOut {
error!("send_inner got io error: {:?}", e);
return Err(ProducerError::Connection(ConnectionError::Io(e)).into());
Err(ConnectionError::Io(e)) => {
if e.kind() != std::io::ErrorKind::TimedOut {
error!("send_inner got io error: {:?}", e);
return Err(ProducerError::Connection(ConnectionError::Io(e)).into());
}
}
Err(e) => {
error!("send_inner got error: {:?}", e);
Expand Down
58 changes: 30 additions & 28 deletions src/service_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,27 @@ impl<Exe: Executor> ServiceDiscovery<Exe> {
{
let error = response.error.and_then(crate::error::server_error);
if error == Some(crate::message::proto::ServerError::ServiceNotReady) {
if operation_retry_options.max_retries.is_none() ||
operation_retry_options.max_retries.unwrap() > current_retries {
error!("lookup({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?})", topic, operation_retry_options.retry_delay.as_millis(), operation_retry_options.max_retries);
current_retries += 1;
self.manager
.executor
.delay(operation_retry_options.retry_delay)
.await;
continue;
} else {
error!("lookup({}) reached max retries", topic);
}
if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
error!("lookup({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?})", topic, operation_retry_options.retry_delay.as_millis(), operation_retry_options.max_retries);
current_retries += 1;
self.manager
.executor
.delay(operation_retry_options.retry_delay)
.await;
continue;
} else {
error!("lookup({}) reached max retries", topic);
}
}
return Err(ServiceDiscoveryError::Query(
error,
response.message.clone(),
));
}

if current_retries >0 {
if current_retries > 0 {
let dur = (std::time::Instant::now() - start).as_secs();
log::info!(
"lookup({}) success after {} retries over {} seconds",
Expand Down Expand Up @@ -172,25 +173,26 @@ impl<Exe: Executor> ServiceDiscovery<Exe> {
{
let error = response.error.and_then(crate::error::server_error);
if error == Some(crate::message::proto::ServerError::ServiceNotReady) {
if operation_retry_options.max_retries.is_none() ||
operation_retry_options.max_retries.unwrap() > current_retries {
error!("lookup_partitioned_topic_number({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?})",
if operation_retry_options.max_retries.is_none()
|| operation_retry_options.max_retries.unwrap() > current_retries
{
error!("lookup_partitioned_topic_number({}) answered ServiceNotReady, retrying request after {}ms (max_retries = {:?})",
topic, operation_retry_options.retry_delay.as_millis(),
operation_retry_options.max_retries);

current_retries += 1;
self.manager
.executor
.delay(operation_retry_options.retry_delay)
.await;
continue;
} else {
error!(
"lookup_partitioned_topic_number({}) reached max retries",
topic
);
current_retries += 1;
self.manager
.executor
.delay(operation_retry_options.retry_delay)
.await;
continue;
} else {
error!(
"lookup_partitioned_topic_number({}) reached max retries",
topic
);
}
}
}
return Err(ServiceDiscoveryError::Query(
error,
response.message.clone(),
Expand Down

0 comments on commit 33f913a

Please sign in to comment.