Skip to content

Commit

Permalink
Merge pull request #138 from wyyerd/2.0
Browse files Browse the repository at this point in the history
2.0
  • Loading branch information
Geal committed Feb 15, 2021
2 parents d778912 + 26b00c5 commit 09e3b9b
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 44 deletions.
17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ keywords = ["pulsar", "api", "client"]
travis-ci = {repository = "wyyerd/pulsar-rs"}

[dependencies]
bytes = "0.5.0"
bytes = "1.0.0"
crc = "1.0.0"
futures = "0.3"
nom = "6.0.0"
nom = { version="6.0.0", default-features=false, features=["alloc"] }
prost = "0.7.0"
prost-derive = "0.7.0"
rand = "0.8"
chrono = "0.4.6"
chrono = "0.4"
futures-timer = "3.0"
log = "0.4.6"
url = "2.1"
Expand All @@ -35,11 +35,11 @@ bit-vec = "0.6"
futures-io = "0.3"
native-tls = "0.2"
pem = "0.8"
tokio = { version = "0.2", features = ["rt-threaded", "blocking", "full"], optional = true }
tokio-util = { version = "0.3", features = ["codec"], optional = true }
tokio-native-tls = { version = "0.1", optional = true }
tokio = { version = "1.0", features = ["rt", "net", "time"], optional = true }
tokio-util = { version = "0.6", features = ["codec"], optional = true }
tokio-native-tls = { version = "0.3", optional = true }
async-std = {version = "1.9", features = [ "attributes", "unstable" ], optional = true }
futures_codec = { version = "0.4", optional = true }
asynchronous-codec = { version = "0.6", optional = true }
async-native-tls = { version = "0.3", optional = true }
lz4 = { version = "1.23", optional = true }
flate2 = { version = "1.0", optional = true }
Expand All @@ -50,6 +50,7 @@ snap = { version = "1.0", optional = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
env_logger = "0.8"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }

[build-dependencies]
prost-build = "0.7.0"
Expand All @@ -58,4 +59,4 @@ prost-build = "0.7.0"
default = [ "compression", "tokio-runtime", "async-std-runtime" ]
compression = [ "lz4", "flate2", "zstd", "snap" ]
tokio-runtime = [ "tokio", "tokio-util", "tokio-native-tls" ]
async-std-runtime = [ "async-std", "futures_codec", "async-native-tls" ]
async-std-runtime = [ "async-std", "asynchronous-codec", "async-native-tls" ]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn main() -> Result<(), pulsar::Error> {

counter += 1;
println!("{} messages", counter);
tokio::time::delay_for(std::time::Duration::from_millis(2000)).await;
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
}
}
```
Expand Down
2 changes: 1 addition & 1 deletion examples/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ async fn main() -> Result<(), pulsar::Error> {

counter += 1;
println!("{} messages", counter);
tokio::time::delay_for(std::time::Duration::from_millis(2000)).await;
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
}
}
65 changes: 40 additions & 25 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::Duration;

use futures::{
self,
channel::{mpsc, oneshot},
future::{select, Either},
pin_mut,
task::{Context, Poll},
Future, FutureExt, Sink, SinkExt, Stream, StreamExt,
};
Expand Down Expand Up @@ -469,6 +472,7 @@ impl Connection {
auth_data: Option<Authentication>,
proxy_to_broker_url: Option<String>,
certificate_chain: &[Certificate],
connection_timeout: Duration,
executor: Arc<Exe>,
) -> Result<Connection, ConnectionError> {
if url.scheme() != "pulsar" && url.scheme() != "pulsar+ssl" {
Expand All @@ -487,26 +491,23 @@ impl Connection {
};

let u = url.clone();
let address: SocketAddr = match executor
.spawn_blocking(move || {
u.socket_addrs(|| match u.scheme() {
"pulsar" => Some(6650),
"pulsar+ssl" => Some(6651),
_ => None,
})
.map_err(|e| {
error!("could not look up address: {:?}", e);
e
})
.ok()
.and_then(|v| {
let mut rng = thread_rng();
let index: usize = rng.gen_range(0..v.len());
v.get(index).copied()
})
let address: SocketAddr = match executor.spawn_blocking(move || {
u.socket_addrs(|| match u.scheme() {
"pulsar" => Some(6650),
"pulsar+ssl" => Some(6651),
_ => None,
})
.await
{
.map_err(|e| {
error!("could not look up address: {:?}", e);
e
})
.ok()
.and_then(|v| {
let mut rng = thread_rng();
let index: usize = rng.gen_range(0..v.len());
v.get(index).copied()
})
}).await {
Some(Some(address)) => address,
_ =>
//return Err(Error::Custom(format!("could not query address: {}", url))),
Expand All @@ -518,16 +519,30 @@ impl Connection {
let hostname = hostname.unwrap_or_else(|| address.ip().to_string());

debug!("Connecting to {}: {}", url, address);
let sender = Connection::prepare_stream(
let sender_prepare = Connection::prepare_stream(
address,
hostname,
tls,
auth_data,
proxy_to_broker_url,
certificate_chain,
executor,
)
.await?;
executor.clone(),
);
let delay_f = executor.delay(connection_timeout);

pin_mut!(sender_prepare);
pin_mut!(delay_f);

let sender;
match select(sender_prepare, delay_f).await {
Either::Left((res, _)) => sender = res?,
Either::Right(_) => {
return Err(ConnectionError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timeout connecting to the Pulsar server",
)));
}
};

let id = rand::random();
Ok(Connection { id, url, sender })
Expand Down Expand Up @@ -583,13 +598,13 @@ impl Connection {
let stream = connector
.connect(&hostname, stream)
.await
.map(|stream| futures_codec::Framed::new(stream, Codec))?;
.map(|stream| asynchronous_codec::Framed::new(stream, Codec))?;

Connection::connect(stream, auth_data, proxy_to_broker_url, executor).await
} else {
let stream = async_std::net::TcpStream::connect(&address)
.await
.map(|stream| futures_codec::Framed::new(stream, Codec))?;
.map(|stream| asynchronous_codec::Framed::new(stream, Codec))?;

Connection::connect(stream, auth_data, proxy_to_broker_url, executor).await
}
Expand Down
3 changes: 3 additions & 0 deletions src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct BackOffOptions {
pub min_backoff: Duration,
pub max_backoff: Duration,
pub max_retries: u32,
pub connection_timeout: Duration,
}

impl std::default::Default for BackOffOptions {
Expand All @@ -36,6 +37,7 @@ impl std::default::Default for BackOffOptions {
min_backoff: Duration::from_millis(10),
max_backoff: Duration::from_secs(30),
max_retries: 12u32,
connection_timeout: Duration::from_secs(10),
}
}
}
Expand Down Expand Up @@ -231,6 +233,7 @@ impl<Exe: Executor> ConnectionManager<Exe> {
self.auth.clone(),
proxy_url.clone(),
&self.certificate_chain,
self.back_off_options.connection_timeout,
self.executor.clone(),
)
.await
Expand Down
4 changes: 2 additions & 2 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1851,7 +1851,7 @@ mod tests {

let message = TestData {
topic: std::iter::repeat(())
.map(|()| rand::thread_rng().sample(Alphanumeric))
.map(|()| rand::thread_rng().sample(Alphanumeric) as char)
.take(8)
.map(|c| c as char)
.collect(),
Expand Down Expand Up @@ -1912,7 +1912,7 @@ mod tests {
println!("created second consumer");

// the message has already been acked, so we should not receive anything
let res: Result<_, tokio::time::Elapsed> =
let res: Result<_, tokio::time::error::Elapsed> =
tokio::time::timeout(Duration::from_secs(1), consumer.next()).await;
let is_err = res.is_err();
if let Ok(val) = res {
Expand Down
8 changes: 4 additions & 4 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Executor for TokioExecutor {
}

fn delay(&self, duration: std::time::Duration) -> Delay {
Delay::Tokio(tokio::time::delay_for(duration))
Delay::Tokio(tokio::time::sleep(duration))
}

fn kind(&self) -> ExecutorKind {
Expand Down Expand Up @@ -170,9 +170,9 @@ impl Stream for Interval {
unsafe {
match Pin::get_unchecked_mut(self) {
#[cfg(feature = "tokio-runtime")]
Interval::Tokio(j) => match Pin::new_unchecked(j).poll_next(cx) {
Interval::Tokio(j) => match Pin::new_unchecked(j).poll_tick(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(v) => Poll::Ready(v.map(|_| ())),
Poll::Ready(_) => Poll::Ready(Some(())),
},
#[cfg(feature = "async-std-runtime")]
Interval::AsyncStd(j) => match Pin::new_unchecked(j).poll_next(cx) {
Expand All @@ -190,7 +190,7 @@ impl Stream for Interval {

pub enum Delay {
#[cfg(feature = "tokio-runtime")]
Tokio(tokio::time::Delay),
Tokio(tokio::time::Sleep),
#[cfg(feature = "async-std-runtime")]
AsyncStd(Pin<Box<dyn Future<Output = ()> + Send>>),
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
//!
//! counter += 1;
//! println!("{} messages", counter);
//! tokio::time::delay_for(std::time::Duration::from_millis(2000)).await;
//! tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
//! }
//! }
//! ```
Expand Down
4 changes: 2 additions & 2 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl tokio_util::codec::Decoder for Codec {
}

#[cfg(feature = "async-std-runtime")]
impl futures_codec::Encoder for Codec {
impl asynchronous_codec::Encoder for Codec {
type Item = Message;
type Error = ConnectionError;

Expand Down Expand Up @@ -350,7 +350,7 @@ impl futures_codec::Encoder for Codec {
}

#[cfg(feature = "async-std-runtime")]
impl futures_codec::Decoder for Codec {
impl asynchronous_codec::Decoder for Codec {
type Item = Message;
type Error = ConnectionError;

Expand Down

0 comments on commit 09e3b9b

Please sign in to comment.