diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index b3c806878..00b7fb2ad 100644 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -31,7 +31,7 @@ use crate::{ use base64::engine::general_purpose::STANDARD; use base64::engine::Engine; use bytes::Bytes; -use futures::{future::BoxFuture, TryFutureExt}; +use futures::{future::BoxFuture, FutureExt, TryFutureExt}; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::json; use time::{serde::rfc3339, OffsetDateTime}; @@ -1696,60 +1696,64 @@ impl futures::Stream for ConsumerNames<'_> { fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - match self.page_request.as_mut() { - Some(page) => match page.try_poll_unpin(cx) { - std::task::Poll::Ready(page) => { - self.page_request = None; - let page = page.map_err(|err| { - ConsumerNamesError::with_source(ConsumerNamesErrorKind::Other, err) + ) -> Poll> { + let this = self.as_mut().get_mut(); + + loop { + if let Some(consumer) = this.consumers.pop() { + return Poll::Ready(Some(Ok(consumer))); + } + + if this.done { + return Poll::Ready(None); + } + + let page_request = this.page_request.get_or_insert_with(|| { + let context = this.context.clone(); + let offset = this.offset; + let stream = this.stream.clone(); + + Box::pin(async move { + match context + .request( + format!("CONSUMER.NAMES.{stream}").into(), + &json!({ + "offset": offset, + }), + ) + .await? + { + Response::Err { error } => Err(RequestError::with_source( + ConsumerNamesErrorKind::Other, + error, + )), + Response::Ok(page) => Ok(page), + } + }) + }); + + match page_request.poll_unpin(cx) { + Poll::Ready(page_result) => { + this.page_request = None; + + let page = page_result.map_err(|err| { + ConsumersError::with_source(ConsumerNamesErrorKind::Other, err) })?; + match page.consumers { + Some(consumers) => { + this.consumers = consumers; - if let Some(consumers) = page.consumers { - self.offset += consumers.len(); - self.consumers = consumers; - if self.offset >= page.total { - self.done = true; + this.offset += this.consumers.len(); + this.done = this.offset >= page.total; + continue; } - match self.consumers.pop() { - Some(stream) => Poll::Ready(Some(Ok(stream))), - None => Poll::Ready(None), + None => { + this.done = true; + return Poll::Ready(None); } - } else { - Poll::Ready(None) } } - std::task::Poll::Pending => std::task::Poll::Pending, - }, - None => { - if let Some(stream) = self.consumers.pop() { - Poll::Ready(Some(Ok(stream))) - } else { - if self.done { - return Poll::Ready(None); - } - let context = self.context.clone(); - let offset = self.offset; - let stream = self.stream.clone(); - self.page_request = Some(Box::pin(async move { - match context - .request( - format!("CONSUMER.NAMES.{stream}").into(), - &json!({ - "offset": offset, - }), - ) - .await? - { - Response::Err { error } => Err(RequestError::with_source( - super::context::RequestErrorKind::Other, - error, - )), - Response::Ok(page) => Ok(page), - } - })); - self.poll_next(cx) - } + Poll::Pending => return Poll::Pending, } } } @@ -1774,59 +1778,64 @@ impl futures::Stream for Consumers<'_> { fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - match self.page_request.as_mut() { - Some(page) => match page.try_poll_unpin(cx) { - std::task::Poll::Ready(page) => { - self.page_request = None; - let page = page.map_err(|err| { + ) -> Poll> { + let this = self.as_mut().get_mut(); + + loop { + if let Some(consumer) = this.consumers.pop() { + return Poll::Ready(Some(Ok(consumer))); + } + + if this.done { + return Poll::Ready(None); + } + + let page_request = this.page_request.get_or_insert_with(|| { + let context = this.context.clone(); + let offset = this.offset; + let stream = this.stream.clone(); + + Box::pin(async move { + match context + .request( + format!("CONSUMER.LIST.{stream}").into(), + &json!({ + "offset": offset, + }), + ) + .await? + { + Response::Err { error } => Err(RequestError::with_source( + super::context::RequestErrorKind::Other, + error, + )), + Response::Ok(page) => Ok(page), + } + }) + }); + + match page_request.poll_unpin(cx) { + Poll::Ready(page_result) => { + this.page_request = None; + + let page = page_result.map_err(|err| { ConsumersError::with_source(ConsumersErrorKind::Other, err) })?; - if let Some(consumers) = page.consumers { - self.offset += consumers.len(); - self.consumers = consumers; - if self.offset >= page.total { - self.done = true; + match page.consumers { + Some(consumers) => { + this.consumers = consumers; + + this.offset += this.consumers.len(); + this.done = this.offset >= page.total; + continue; } - match self.consumers.pop() { - Some(consumer) => Poll::Ready(Some(Ok(consumer))), - None => Poll::Ready(None), + None => { + this.done = true; + return Poll::Ready(None); } - } else { - Poll::Ready(None) - } - } - std::task::Poll::Pending => std::task::Poll::Pending, - }, - None => { - if let Some(stream) = self.consumers.pop() { - Poll::Ready(Some(Ok(stream))) - } else { - if self.done { - return Poll::Ready(None); } - let context = self.context.clone(); - let offset = self.offset; - let stream = self.stream.clone(); - self.page_request = Some(Box::pin(async move { - match context - .request( - format!("CONSUMER.LIST.{stream}").into(), - &json!({ - "offset": offset, - }), - ) - .await? - { - Response::Err { error } => Err(RequestError::with_source( - super::context::RequestErrorKind::Other, - error, - )), - Response::Ok(page) => Ok(page), - } - })); - self.poll_next(cx) } + Poll::Pending => return Poll::Pending, } } }