Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make all BoxFutures 'static #1166

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,15 @@ impl futures::Stream for Batch {
}
}

pub struct Sequence<'a> {
pub struct Sequence {
context: Context,
subject: String,
request: Bytes,
pending_messages: usize,
next: Option<BoxFuture<'a, Result<Batch, MessagesError>>>,
next: Option<BoxFuture<'static, Result<Batch, MessagesError>>>,
}

impl<'a> futures::Stream for Sequence<'a> {
impl futures::Stream for Sequence {
type Item = Result<Batch, MessagesError>;

fn poll_next(
Expand Down Expand Up @@ -490,7 +490,7 @@ impl<'a> futures::Stream for Sequence<'a> {
}
}

impl<'a> Consumer<OrderedConfig> {
impl Consumer<OrderedConfig> {
/// Returns a stream of messages for Ordered Pull Consumer.
///
/// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the
Expand Down Expand Up @@ -535,7 +535,7 @@ impl<'a> Consumer<OrderedConfig> {
/// Ok(())
/// # }
/// ```
pub async fn messages(self) -> Result<Ordered<'a>, StreamError> {
pub async fn messages(self) -> Result<Ordered, StreamError> {
let config = Consumer {
config: self.config.clone().into(),
context: self.context.clone(),
Expand Down Expand Up @@ -719,18 +719,18 @@ impl IntoConsumerConfig for OrderedConfig {
}
}

pub struct Ordered<'a> {
pub struct Ordered {
context: Context,
stream_name: String,
consumer: OrderedConfig,
consumer_name: String,
stream: Option<Stream>,
create_stream: Option<BoxFuture<'a, Result<Stream, ConsumerRecreateError>>>,
create_stream: Option<BoxFuture<'static, Result<Stream, ConsumerRecreateError>>>,
consumer_sequence: u64,
stream_sequence: u64,
}

impl<'a> futures::Stream for Ordered<'a> {
impl futures::Stream for Ordered {
type Item = Result<jetstream::Message, OrderedError>;

fn poll_next(
Expand Down
10 changes: 5 additions & 5 deletions async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ impl IntoConsumerConfig for OrderedConfig {
}

impl Consumer<OrderedConfig> {
pub async fn messages<'a>(self) -> Result<Ordered<'a>, StreamError> {
pub async fn messages<'a>(self) -> Result<Ordered, StreamError> {
let subscriber = self
.context
.client
Expand Down Expand Up @@ -540,26 +540,26 @@ impl Consumer<OrderedConfig> {
}
}

pub struct Ordered<'a> {
pub struct Ordered {
context: Context,
consumer: Consumer<OrderedConfig>,
subscriber: Option<Subscriber>,
subscriber_future: Option<BoxFuture<'a, Result<Subscriber, ConsumerRecreateError>>>,
subscriber_future: Option<BoxFuture<'static, Result<Subscriber, ConsumerRecreateError>>>,
stream_sequence: Arc<AtomicU64>,
consumer_sequence: Arc<AtomicU64>,
shutdown: tokio::sync::oneshot::Receiver<ConsumerRecreateError>,
handle: JoinHandle<()>,
heartbeat_sleep: Option<Pin<Box<tokio::time::Sleep>>>,
}

impl<'a> Drop for Ordered<'a> {
impl Drop for Ordered {
fn drop(&mut self) {
// Stop trying to recreate the consumer
self.handle.abort()
}
}

impl<'a> futures::Stream for Ordered<'a> {
impl futures::Stream for Ordered {
type Item = Result<Message, OrderedError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
16 changes: 8 additions & 8 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,17 +1034,17 @@ struct StreamInfoPage {
streams: Option<Vec<super::stream::Info>>,
}

type PageRequest<'a> = BoxFuture<'a, Result<StreamPage, RequestError>>;
type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;

pub struct StreamNames<'a> {
pub struct StreamNames {
context: Context,
offset: usize,
page_request: Option<PageRequest<'a>>,
page_request: Option<PageRequest>,
streams: Vec<String>,
done: bool,
}

impl futures::Stream for StreamNames<'_> {
impl futures::Stream for StreamNames {
type Item = Result<String, StreamsError>;

fn poll_next(
Expand Down Expand Up @@ -1105,20 +1105,20 @@ impl futures::Stream for StreamNames<'_> {
}
}

type PageInfoRequest<'a> = BoxFuture<'a, Result<StreamInfoPage, RequestError>>;
type PageInfoRequest = BoxFuture<'static, Result<StreamInfoPage, RequestError>>;

pub type StreamsErrorKind = RequestErrorKind;
pub type StreamsError = RequestError;

pub struct Streams<'a> {
pub struct Streams {
context: Context,
offset: usize,
page_request: Option<PageInfoRequest<'a>>,
page_request: Option<PageInfoRequest>,
streams: Vec<super::stream::Info>,
done: bool,
}

impl futures::Stream for Streams<'_> {
impl futures::Stream for Streams {
type Item = Result<super::stream::Info, StreamsError>;

fn poll_next(
Expand Down
28 changes: 14 additions & 14 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ impl Store {
/// # Ok(())
/// # }
/// ```
pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch<'_>, WatchError> {
pub async fn watch<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
self.watch_with_deliver_policy(key, DeliverPolicy::New)
.await
}
Expand Down Expand Up @@ -457,7 +457,7 @@ impl Store {
/// # Ok(())
/// # }
/// ```
pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch<'_>, WatchError> {
pub async fn watch_with_history<T: AsRef<str>>(&self, key: T) -> Result<Watch, WatchError> {
self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject)
.await
}
Expand All @@ -466,7 +466,7 @@ impl Store {
&self,
key: T,
deliver_policy: DeliverPolicy,
) -> Result<Watch<'_>, WatchError> {
) -> Result<Watch, WatchError> {
let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());

debug!("initial consumer creation");
Expand Down Expand Up @@ -527,7 +527,7 @@ impl Store {
/// # Ok(())
/// # }
/// ```
pub async fn watch_all(&self) -> Result<Watch<'_>, WatchError> {
pub async fn watch_all(&self) -> Result<Watch, WatchError> {
self.watch(ALL_KEYS).await
}

Expand Down Expand Up @@ -750,7 +750,7 @@ impl Store {
/// # Ok(())
/// # }
/// ```
pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History<'_>, HistoryError> {
pub async fn history<T: AsRef<str>>(&self, key: T) -> Result<History, HistoryError> {
if !is_valid_key(key.as_ref()) {
return Err(HistoryError::new(HistoryErrorKind::InvalidKey));
}
Expand Down Expand Up @@ -851,13 +851,13 @@ impl Store {
}

/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
pub struct Watch<'a> {
subscription: super::consumer::push::Ordered<'a>,
pub struct Watch {
subscription: super::consumer::push::Ordered,
prefix: String,
bucket: String,
}

impl<'a> futures::Stream for Watch<'a> {
impl futures::Stream for Watch {
type Item = Result<Entry, WatcherError>;

fn poll_next(
Expand Down Expand Up @@ -905,14 +905,14 @@ impl<'a> futures::Stream for Watch<'a> {
}

/// A structure representing the history of a key-value bucket, yielding past values.
pub struct History<'a> {
subscription: super::consumer::push::Ordered<'a>,
pub struct History {
subscription: super::consumer::push::Ordered,
done: bool,
prefix: String,
bucket: String,
}

impl<'a> futures::Stream for History<'a> {
impl futures::Stream for History {
type Item = Result<Entry, WatcherError>;

fn poll_next(
Expand Down Expand Up @@ -965,11 +965,11 @@ impl<'a> futures::Stream for History<'a> {
}
}

pub struct Keys<'a> {
inner: History<'a>,
pub struct Keys {
inner: History,
}

impl<'a> futures::Stream for Keys<'a> {
impl futures::Stream for Keys {
type Item = Result<String, WatcherError>;

fn poll_next(
Expand Down
36 changes: 18 additions & 18 deletions async-nats/src/jetstream/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ impl ObjectStore {
/// # Ok(())
/// # }
/// ```
pub fn get<'bucket, 'object, 'future, T>(
pub fn get<'bucket, 'future, T>(
&'bucket self,
object_name: T,
) -> BoxFuture<'future, Result<Object<'object>, GetError>>
) -> BoxFuture<'future, Result<Object, GetError>>
where
T: AsRef<str> + Send + 'future,
'bucket: 'future,
Expand Down Expand Up @@ -424,21 +424,21 @@ impl ObjectStore {
/// # Ok(())
/// # }
/// ```
pub async fn watch(&self) -> Result<Watch<'_>, WatchError> {
pub async fn watch(&self) -> Result<Watch, WatchError> {
self.watch_with_deliver_policy(DeliverPolicy::New).await
}

/// Creates a [Watch] stream over changes in the [ObjectStore] which yields values whenever
/// there are changes for that key with as well as last value.
pub async fn watch_with_history(&self) -> Result<Watch<'_>, WatchError> {
pub async fn watch_with_history(&self) -> Result<Watch, WatchError> {
self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
.await
}

async fn watch_with_deliver_policy(
&self,
deliver_policy: DeliverPolicy,
) -> Result<Watch<'_>, WatchError> {
) -> Result<Watch, WatchError> {
let subject = format!("$O.{}.M.>", self.name);
let ordered = self
.stream
Expand Down Expand Up @@ -474,7 +474,7 @@ impl ObjectStore {
/// # Ok(())
/// # }
/// ```
pub async fn list(&self) -> Result<List<'_>, ListError> {
pub async fn list(&self) -> Result<List, ListError> {
trace!("starting Object List");
let subject = format!("$O.{}.M.>", self.name);
let ordered = self
Expand Down Expand Up @@ -828,11 +828,11 @@ async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), Publ
Ok(())
}

pub struct Watch<'a> {
subscription: crate::jetstream::consumer::push::Ordered<'a>,
pub struct Watch {
subscription: crate::jetstream::consumer::push::Ordered,
}

impl Stream for Watch<'_> {
impl Stream for Watch {
type Item = Result<ObjectInfo, WatcherError>;

fn poll_next(
Expand All @@ -858,12 +858,12 @@ impl Stream for Watch<'_> {
}
}

pub struct List<'a> {
subscription: Option<crate::jetstream::consumer::push::Ordered<'a>>,
pub struct List {
subscription: Option<crate::jetstream::consumer::push::Ordered>,
done: bool,
}

impl Stream for List<'_> {
impl Stream for List {
type Item = Result<ObjectInfo, ListerError>;

fn poll_next(
Expand Down Expand Up @@ -913,17 +913,17 @@ impl Stream for List<'_> {
}

/// Represents an object stored in a bucket.
pub struct Object<'a> {
pub struct Object {
pub info: ObjectInfo,
remaining_bytes: VecDeque<u8>,
has_pending_messages: bool,
digest: Option<ring::digest::Context>,
subscription: Option<crate::jetstream::consumer::push::Ordered<'a>>,
subscription_future: Option<BoxFuture<'a, Result<Ordered<'a>, StreamError>>>,
subscription: Option<crate::jetstream::consumer::push::Ordered>,
subscription_future: Option<BoxFuture<'static, Result<Ordered, StreamError>>>,
stream: crate::jetstream::stream::Stream,
}

impl<'a> Object<'a> {
impl Object {
pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self {
Object {
subscription: None,
Expand All @@ -942,7 +942,7 @@ impl<'a> Object<'a> {
}
}

impl tokio::io::AsyncRead for Object<'_> {
impl tokio::io::AsyncRead for Object {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
Expand Down Expand Up @@ -1137,7 +1137,7 @@ pub trait AsObjectInfo {
fn as_info(&self) -> &ObjectInfo;
}

impl AsObjectInfo for &Object<'_> {
impl AsObjectInfo for &Object {
fn as_info(&self) -> &ObjectInfo {
&self.info
}
Expand Down