From e912885eefa14baf5f68cb79f0bf796c1e096061 Mon Sep 17 00:00:00 2001 From: Oscar Cowdery Lack Date: Thu, 30 Nov 2023 17:36:35 +1100 Subject: [PATCH] Make all BoxFutures 'static None of these futures are borrowing anything from outside their Box, so their lifetimes can be 'static. This avoids polluting a bunch of public types with unnecessary lifetime parameters. --- async-nats/src/jetstream/consumer/pull.rs | 16 ++++----- async-nats/src/jetstream/consumer/push.rs | 10 +++--- async-nats/src/jetstream/context.rs | 16 ++++----- async-nats/src/jetstream/kv/mod.rs | 28 +++++++-------- async-nats/src/jetstream/object_store/mod.rs | 36 ++++++++++---------- async-nats/src/jetstream/stream.rs | 16 ++++----- 6 files changed, 61 insertions(+), 61 deletions(-) diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index fb7869f9a..9b552e3b0 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -420,15 +420,15 @@ impl futures::Stream for Batch { } } -pub struct Sequence<'a> { +pub struct Sequence { context: Context, subject: String, request: Bytes, pending_messages: usize, - next: Option>>, + next: Option>>, } -impl<'a> futures::Stream for Sequence<'a> { +impl futures::Stream for Sequence { type Item = Result; fn poll_next( @@ -490,7 +490,7 @@ impl<'a> futures::Stream for Sequence<'a> { } } -impl<'a> Consumer { +impl Consumer { /// Returns a stream of messages for Ordered Pull Consumer. /// /// Ordered consumers uses single replica ephemeral consumer, no matter the replication factor of the @@ -535,7 +535,7 @@ impl<'a> Consumer { /// Ok(()) /// # } /// ``` - pub async fn messages(self) -> Result, StreamError> { + pub async fn messages(self) -> Result { let config = Consumer { config: self.config.clone().into(), context: self.context.clone(), @@ -719,18 +719,18 @@ impl IntoConsumerConfig for OrderedConfig { } } -pub struct Ordered<'a> { +pub struct Ordered { context: Context, stream_name: String, consumer: OrderedConfig, consumer_name: String, stream: Option, - create_stream: Option>>, + create_stream: Option>>, consumer_sequence: u64, stream_sequence: u64, } -impl<'a> futures::Stream for Ordered<'a> { +impl futures::Stream for Ordered { type Item = Result; fn poll_next( diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index 5ecc1988f..d569a6b0f 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -471,7 +471,7 @@ impl IntoConsumerConfig for OrderedConfig { } impl Consumer { - pub async fn messages<'a>(self) -> Result, StreamError> { + pub async fn messages<'a>(self) -> Result { let subscriber = self .context .client @@ -540,11 +540,11 @@ impl Consumer { } } -pub struct Ordered<'a> { +pub struct Ordered { context: Context, consumer: Consumer, subscriber: Option, - subscriber_future: Option>>, + subscriber_future: Option>>, stream_sequence: Arc, consumer_sequence: Arc, shutdown: tokio::sync::oneshot::Receiver, @@ -552,14 +552,14 @@ pub struct Ordered<'a> { heartbeat_sleep: Option>>, } -impl<'a> Drop for Ordered<'a> { +impl Drop for Ordered { fn drop(&mut self) { // Stop trying to recreate the consumer self.handle.abort() } } -impl<'a> futures::Stream for Ordered<'a> { +impl futures::Stream for Ordered { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index 8c09821f0..a7c110aa5 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -1034,17 +1034,17 @@ struct StreamInfoPage { streams: Option>, } -type PageRequest<'a> = BoxFuture<'a, Result>; +type PageRequest = BoxFuture<'static, Result>; -pub struct StreamNames<'a> { +pub struct StreamNames { context: Context, offset: usize, - page_request: Option>, + page_request: Option, streams: Vec, done: bool, } -impl futures::Stream for StreamNames<'_> { +impl futures::Stream for StreamNames { type Item = Result; fn poll_next( @@ -1105,20 +1105,20 @@ impl futures::Stream for StreamNames<'_> { } } -type PageInfoRequest<'a> = BoxFuture<'a, Result>; +type PageInfoRequest = BoxFuture<'static, Result>; pub type StreamsErrorKind = RequestErrorKind; pub type StreamsError = RequestError; -pub struct Streams<'a> { +pub struct Streams { context: Context, offset: usize, - page_request: Option>, + page_request: Option, streams: Vec, done: bool, } -impl futures::Stream for Streams<'_> { +impl futures::Stream for Streams { type Item = Result; fn poll_next( diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index c01fb5a06..b14eb13af 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -427,7 +427,7 @@ impl Store { /// # Ok(()) /// # } /// ``` - pub async fn watch>(&self, key: T) -> Result, WatchError> { + pub async fn watch>(&self, key: T) -> Result { self.watch_with_deliver_policy(key, DeliverPolicy::New) .await } @@ -457,7 +457,7 @@ impl Store { /// # Ok(()) /// # } /// ``` - pub async fn watch_with_history>(&self, key: T) -> Result, WatchError> { + pub async fn watch_with_history>(&self, key: T) -> Result { self.watch_with_deliver_policy(key, DeliverPolicy::LastPerSubject) .await } @@ -466,7 +466,7 @@ impl Store { &self, key: T, deliver_policy: DeliverPolicy, - ) -> Result, WatchError> { + ) -> Result { let subject = format!("{}{}", self.prefix.as_str(), key.as_ref()); debug!("initial consumer creation"); @@ -527,7 +527,7 @@ impl Store { /// # Ok(()) /// # } /// ``` - pub async fn watch_all(&self) -> Result, WatchError> { + pub async fn watch_all(&self) -> Result { self.watch(ALL_KEYS).await } @@ -750,7 +750,7 @@ impl Store { /// # Ok(()) /// # } /// ``` - pub async fn history>(&self, key: T) -> Result, HistoryError> { + pub async fn history>(&self, key: T) -> Result { if !is_valid_key(key.as_ref()) { return Err(HistoryError::new(HistoryErrorKind::InvalidKey)); } @@ -851,13 +851,13 @@ impl Store { } /// A structure representing a watch on a key-value bucket, yielding values whenever there are changes. -pub struct Watch<'a> { - subscription: super::consumer::push::Ordered<'a>, +pub struct Watch { + subscription: super::consumer::push::Ordered, prefix: String, bucket: String, } -impl<'a> futures::Stream for Watch<'a> { +impl futures::Stream for Watch { type Item = Result; fn poll_next( @@ -905,14 +905,14 @@ impl<'a> futures::Stream for Watch<'a> { } /// A structure representing the history of a key-value bucket, yielding past values. -pub struct History<'a> { - subscription: super::consumer::push::Ordered<'a>, +pub struct History { + subscription: super::consumer::push::Ordered, done: bool, prefix: String, bucket: String, } -impl<'a> futures::Stream for History<'a> { +impl futures::Stream for History { type Item = Result; fn poll_next( @@ -965,11 +965,11 @@ impl<'a> futures::Stream for History<'a> { } } -pub struct Keys<'a> { - inner: History<'a>, +pub struct Keys { + inner: History, } -impl<'a> futures::Stream for Keys<'a> { +impl futures::Stream for Keys { type Item = Result; fn poll_next( diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index 9b8a3ea5f..614cf0a14 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -109,10 +109,10 @@ impl ObjectStore { /// # Ok(()) /// # } /// ``` - pub fn get<'bucket, 'object, 'future, T>( + pub fn get<'bucket, 'future, T>( &'bucket self, object_name: T, - ) -> BoxFuture<'future, Result, GetError>> + ) -> BoxFuture<'future, Result> where T: AsRef + Send + 'future, 'bucket: 'future, @@ -424,13 +424,13 @@ impl ObjectStore { /// # Ok(()) /// # } /// ``` - pub async fn watch(&self) -> Result, WatchError> { + pub async fn watch(&self) -> Result { self.watch_with_deliver_policy(DeliverPolicy::New).await } /// Creates a [Watch] stream over changes in the [ObjectStore] which yields values whenever /// there are changes for that key with as well as last value. - pub async fn watch_with_history(&self) -> Result, WatchError> { + pub async fn watch_with_history(&self) -> Result { self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject) .await } @@ -438,7 +438,7 @@ impl ObjectStore { async fn watch_with_deliver_policy( &self, deliver_policy: DeliverPolicy, - ) -> Result, WatchError> { + ) -> Result { let subject = format!("$O.{}.M.>", self.name); let ordered = self .stream @@ -474,7 +474,7 @@ impl ObjectStore { /// # Ok(()) /// # } /// ``` - pub async fn list(&self) -> Result, ListError> { + pub async fn list(&self) -> Result { trace!("starting Object List"); let subject = format!("$O.{}.M.>", self.name); let ordered = self @@ -828,11 +828,11 @@ async fn publish_meta(store: &ObjectStore, info: &ObjectInfo) -> Result<(), Publ Ok(()) } -pub struct Watch<'a> { - subscription: crate::jetstream::consumer::push::Ordered<'a>, +pub struct Watch { + subscription: crate::jetstream::consumer::push::Ordered, } -impl Stream for Watch<'_> { +impl Stream for Watch { type Item = Result; fn poll_next( @@ -858,12 +858,12 @@ impl Stream for Watch<'_> { } } -pub struct List<'a> { - subscription: Option>, +pub struct List { + subscription: Option, done: bool, } -impl Stream for List<'_> { +impl Stream for List { type Item = Result; fn poll_next( @@ -913,17 +913,17 @@ impl Stream for List<'_> { } /// Represents an object stored in a bucket. -pub struct Object<'a> { +pub struct Object { pub info: ObjectInfo, remaining_bytes: VecDeque, has_pending_messages: bool, digest: Option, - subscription: Option>, - subscription_future: Option, StreamError>>>, + subscription: Option, + subscription_future: Option>>, stream: crate::jetstream::stream::Stream, } -impl<'a> Object<'a> { +impl Object { pub(crate) fn new(info: ObjectInfo, stream: stream::Stream) -> Self { Object { subscription: None, @@ -942,7 +942,7 @@ impl<'a> Object<'a> { } } -impl tokio::io::AsyncRead for Object<'_> { +impl tokio::io::AsyncRead for Object { fn poll_read( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -1137,7 +1137,7 @@ pub trait AsObjectInfo { fn as_info(&self) -> &ObjectInfo; } -impl AsObjectInfo for &Object<'_> { +impl AsObjectInfo for &Object { fn as_info(&self) -> &ObjectInfo { &self.info } diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index f4df1d4ed..4832753a0 100644 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -1664,18 +1664,18 @@ struct ConsumerInfoPage { type ConsumerNamesErrorKind = StreamsErrorKind; type ConsumerNamesError = StreamsError; -type PageRequest<'a> = BoxFuture<'a, Result>; +type PageRequest = BoxFuture<'static, Result>; -pub struct ConsumerNames<'a> { +pub struct ConsumerNames { context: Context, stream: String, offset: usize, - page_request: Option>, + page_request: Option, consumers: Vec, done: bool, } -impl futures::Stream for ConsumerNames<'_> { +impl futures::Stream for ConsumerNames { type Item = Result; fn poll_next( @@ -1742,18 +1742,18 @@ impl futures::Stream for ConsumerNames<'_> { pub type ConsumersErrorKind = StreamsErrorKind; pub type ConsumersError = StreamsError; -type PageInfoRequest<'a> = BoxFuture<'a, Result>; +type PageInfoRequest = BoxFuture<'static, Result>; -pub struct Consumers<'a> { +pub struct Consumers { context: Context, stream: String, offset: usize, - page_request: Option>, + page_request: Option, consumers: Vec, done: bool, } -impl futures::Stream for Consumers<'_> { +impl futures::Stream for Consumers { type Item = Result; fn poll_next(