Skip to content

Commit

Permalink
refactor client body (#1462)
Browse files Browse the repository at this point in the history
* add kind field to debug impl for body

Signed-off-by: tottoto <tottotodev@gmail.com>

* simplify body implementation

Signed-off-by: tottoto <tottotodev@gmail.com>

* simplify Body::into_data_stream method signature

Signed-off-by: tottoto <tottotodev@gmail.com>

* enhance client body document

Signed-off-by: tottoto <tottotodev@gmail.com>

---------

Signed-off-by: tottoto <tottotodev@gmail.com>
  • Loading branch information
tottoto committed Apr 7, 2024
1 parent dac48d9 commit 2fb575c
Showing 1 changed file with 15 additions and 16 deletions.
31 changes: 15 additions & 16 deletions kube-client/src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ pub struct Body {

impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Body").finish()
let mut builder = f.debug_struct("Body");
match self.kind {
Kind::Once(_) => builder.field("kind", &"Once"),
Kind::Wrap(_) => builder.field("kind", &"Wrap"),
};
builder.finish()
}
}

Expand All @@ -36,6 +41,7 @@ impl Body {
Self::new(Kind::Once(None))
}

// Create a body from an existing body
pub(crate) fn wrap_body<B>(body: B) -> Self
where
B: HttpBody<Data = Bytes> + Send + 'static,
Expand All @@ -44,14 +50,13 @@ impl Body {
Body::new(Kind::Wrap(body.map_err(Into::into).boxed_unsync()))
}

/// Collect all the data frames and trailers of the request body
/// Collect all the data frames and trailers of this request body and return the data frame
pub async fn collect_bytes(self) -> Result<Bytes, crate::Error> {
Ok(self.collect().await?.to_bytes())
}

pub(crate) fn into_data_stream(
self,
) -> impl Stream<Item = Result<<Self as HttpBody>::Data, <Self as HttpBody>::Error>> {
// Convert this body into `Stream` which iterates only data frame skipping the other kind of frame
pub(crate) fn into_data_stream(self) -> impl Stream<Item = Result<Bytes, crate::Error>> {
Box::pin(BodyStream::new(self).try_filter_map(|frame| async { Ok(frame.into_data().ok()) }))
}
}
Expand Down Expand Up @@ -79,17 +84,11 @@ impl HttpBody for Body {
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
match self.kind {
Kind::Once(ref mut val) => {
if let Some(data) = val.take() {
Poll::Ready(Some(Ok(Frame::data(data))))
} else {
Poll::Ready(None)
}
}
Kind::Wrap(ref mut stream) => Poll::Ready(
ready!(Pin::new(stream).poll_frame(cx))
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match &mut self.kind {
Kind::Once(val) => Poll::Ready(val.take().map(|bytes| Ok(Frame::data(bytes)))),
Kind::Wrap(body) => Poll::Ready(
ready!(Pin::new(body).poll_frame(cx))
.map(|opt_chunk| opt_chunk.map_err(crate::Error::Service)),
),
}
Expand Down

0 comments on commit 2fb575c

Please sign in to comment.