diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index a3f0176ff0..4882c9d9bc 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -17,7 +17,7 @@ references = ["smithy-rs#3416"] meta = { "breaking" = false, "bug" = false, "tada" = true } author = "jackkleeman" -[[aws-sdk-rust]] +[[smithy-rs]] message = "Added aws-smithy-wasm crate to enable SDK use in WASI compliant environments" references = ["smithy-rs#2087", "smithy-rs#2520", "smithy-rs#3409", "aws-sdk-rust#59"] meta = { "breaking" = false, "tada" = true, "bug" = false } @@ -28,3 +28,9 @@ message = "Added aws-smithy-wasm crate to enable SDK use in WASI compliant envir references = ["smithy-rs#2087", "smithy-rs#2520", "smithy-rs#3409"] meta = { "breaking" = false, "tada" = true, "bug" = false, "target" = "client"} author = "landonxjames" + +[[smithy-rs]] +message = "[`SdkBody`](https://docs.rs/aws-smithy-types/latest/aws_smithy_types/body/struct.SdkBody.html) now implements the 1.0 version of the `http_body::Body` trait." +references = ["smithy-rs#3365", "aws-sdk-rust#1046"] +meta = { "breaking" = false, "tada" = true, "bug" = false, "target" = "all" } +author = "cayman-amzn" \ No newline at end of file diff --git a/rust-runtime/aws-smithy-types/Cargo.toml b/rust-runtime/aws-smithy-types/Cargo.toml index c9de55a5c8..6e5c64c240 100644 --- a/rust-runtime/aws-smithy-types/Cargo.toml +++ b/rust-runtime/aws-smithy-types/Cargo.toml @@ -13,7 +13,7 @@ repository = "https://github.com/smithy-lang/smithy-rs" [features] byte-stream-poll-next = [] http-body-0-4-x = ["dep:http-body-0-4"] -http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "dep:http-body-0-4", "dep:http-1x"] +http-body-1-x = ["dep:http-body-1-0", "dep:http-body-util", "dep:http-body-0-4", "dep:http-1x", "dep:hyper-1-0"] hyper-0-14-x = ["dep:hyper-0-14"] rt-tokio = [ "dep:http-body-0-4", @@ -38,6 +38,7 @@ http-body-0-4 = { package = "http-body", version = "0.4.4", optional = true } http-body-1-0 = { package = "http-body", version = "1", optional = true } http-body-util = { version = "0.1.0", optional = true } hyper-0-14 = { package = "hyper", version = "0.14.26", optional = true } +hyper-1-0 = { package = "hyper", version = "1", optional = true } itoa = "1.0.0" num-integer = "0.1.44" pin-project-lite = "0.2.9" diff --git a/rust-runtime/aws-smithy-types/src/body.rs b/rust-runtime/aws-smithy-types/src/body.rs index 9798524387..4fe75dcf39 100644 --- a/rust-runtime/aws-smithy-types/src/body.rs +++ b/rust-runtime/aws-smithy-types/src/body.rs @@ -32,8 +32,6 @@ pin_project! { /// For handling responses, the type of the body will be controlled /// by the HTTP stack. /// - // TODO(naming): Consider renaming to simply `Body`, although I'm concerned about naming headaches - // between hyper::Body and our Body pub struct SdkBody { #[pin] inner: Inner, @@ -191,10 +189,10 @@ impl SdkBody { } } - #[cfg(feature = "http-body-0-4-x")] + #[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x",))] pub(crate) fn poll_next_trailers( self: Pin<&mut Self>, - #[allow(unused)] cx: &mut Context<'_>, + cx: &mut Context<'_>, ) -> Poll>, Error>> { let this = self.project(); match this.inner.project() { diff --git a/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs b/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs index 6b3278d987..2f777f4207 100644 --- a/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs +++ b/rust-runtime/aws-smithy-types/src/body/http_body_1_x.rs @@ -23,6 +23,53 @@ impl SdkBody { { SdkBody::from_body_0_4_internal(Http1toHttp04::new(body.map_err(Into::into))) } + + pub(crate) fn poll_data_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Error>>> { + match ready!(self.as_mut().poll_next(cx)) { + // if there's no more data, try to return trailers + None => match ready!(self.poll_next_trailers(cx)) { + Ok(Some(trailers)) => Poll::Ready(Some(Ok(http_body_1_0::Frame::trailers( + convert_headers_0x_1x(trailers), + )))), + Ok(None) => Poll::Ready(None), + Err(e) => Poll::Ready(Some(Err(e))), + }, + Some(result) => match result { + Err(err) => Poll::Ready(Some(Err(err))), + Ok(bytes) => Poll::Ready(Some(Ok(http_body_1_0::Frame::data(bytes)))), + }, + } + } +} + +#[cfg(feature = "http-body-1-x")] +impl http_body_1_0::Body for SdkBody { + type Data = Bytes; + type Error = Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + self.poll_data_frame(cx) + } + + fn is_end_stream(&self) -> bool { + self.is_end_stream() + } + + fn size_hint(&self) -> http_body_1_0::SizeHint { + let mut hint = http_body_1_0::SizeHint::default(); + let (lower, upper) = self.bounds_on_remaining_length(); + hint.set_lower(lower); + if let Some(upper) = upper { + hint.set_upper(upper); + } + hint + } } pin_project! { @@ -83,7 +130,7 @@ where // already read everything let this = self.project(); match this.trailers.take() { - Some(headers) => Poll::Ready(Ok(Some(convert_header_map(headers)))), + Some(headers) => Poll::Ready(Ok(Some(convert_headers_1x_0x(headers)))), None => Poll::Ready(Ok(None)), } } @@ -107,7 +154,7 @@ where } } -fn convert_header_map(input: http_1x::HeaderMap) -> http::HeaderMap { +fn convert_headers_1x_0x(input: http_1x::HeaderMap) -> http::HeaderMap { let mut map = http::HeaderMap::with_capacity(input.capacity()); let mut mem: Option = None; for (k, v) in input.into_iter() { @@ -121,6 +168,20 @@ fn convert_header_map(input: http_1x::HeaderMap) -> http::HeaderMap { map } +fn convert_headers_0x_1x(input: http::HeaderMap) -> http_1x::HeaderMap { + let mut map = http_1x::HeaderMap::with_capacity(input.capacity()); + let mut mem: Option = None; + for (k, v) in input.into_iter() { + let name = k.or_else(|| mem.clone()).unwrap(); + map.append( + http_1x::HeaderName::from_bytes(name.as_str().as_bytes()).expect("already validated"), + http_1x::HeaderValue::from_bytes(v.as_bytes()).expect("already validated"), + ); + mem = Some(name); + } + map +} + #[cfg(test)] mod test { use std::collections::VecDeque; @@ -132,8 +193,9 @@ mod test { use http_1x::header::{CONTENT_LENGTH as CL1, CONTENT_TYPE as CT1}; use http_1x::{HeaderMap, HeaderName, HeaderValue}; use http_body_1_0::Frame; + use http_body_util::BodyExt; - use crate::body::http_body_1_x::convert_header_map; + use crate::body::http_body_1_x::{convert_headers_1x_0x, Http1toHttp04}; use crate::body::{Error, SdkBody}; use crate::byte_stream::ByteStream; @@ -215,10 +277,46 @@ mod test { while let Some(_data) = http_body_0_4::Body::data(&mut body).await {} assert_eq!( http_body_0_4::Body::trailers(&mut body).await.unwrap(), - Some(convert_header_map(trailers())) + Some(convert_headers_1x_0x(trailers())) ); } + #[tokio::test] + async fn test_read_trailers_as_1x() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Trailers(trailers()), + ] + .into(), + }; + let body = SdkBody::from_body_1_x(body); + + let collected = BodyExt::collect(body).await.expect("should succeed"); + assert_eq!(collected.trailers(), Some(&trailers())); + assert_eq!(collected.to_bytes().as_ref(), b"123456789"); + } + + #[tokio::test] + async fn test_trailers_04x_to_1x() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Trailers(trailers()), + ] + .into(), + }; + let body = SdkBody::from_body_0_4(Http1toHttp04::new(body)); + + let collected = BodyExt::collect(body).await.expect("should succeed"); + assert_eq!(collected.trailers(), Some(&trailers())); + assert_eq!(collected.to_bytes().as_ref(), b"123456789"); + } + #[tokio::test] async fn test_errors() { let body = TestBody { @@ -235,6 +333,7 @@ mod test { let body = ByteStream::new(body); body.collect().await.expect_err("body returned an error"); } + #[tokio::test] async fn test_no_trailers() { let body = TestBody { @@ -262,6 +361,21 @@ mod test { expect.insert(CL0, http::HeaderValue::from_static("1234")); - assert_eq!(convert_header_map(http1_headermap), expect); + assert_eq!(convert_headers_1x_0x(http1_headermap), expect); + } + + #[test] + fn sdkbody_debug_dyn() { + let body = TestBody { + chunks: vec![ + Chunk::Data("123"), + Chunk::Data("456"), + Chunk::Data("789"), + Chunk::Trailers(trailers()), + ] + .into(), + }; + let body = SdkBody::from_body_1_x(body); + assert!(format!("{:?}", body).contains("BoxBody")); } } diff --git a/rust-runtime/aws-smithy-types/src/byte_stream.rs b/rust-runtime/aws-smithy-types/src/byte_stream.rs index b6123ee2ff..108a8bc90c 100644 --- a/rust-runtime/aws-smithy-types/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-types/src/byte_stream.rs @@ -248,7 +248,7 @@ pin_project! { /// 3. **From an `SdkBody` directly**: For more advanced / custom use cases, a ByteStream can be created directly /// from an SdkBody. **When created from an SdkBody, care must be taken to ensure retriability.** An SdkBody is retryable /// when constructed from in-memory data or when using [`SdkBody::retryable`](crate::body::SdkBody::retryable). - /// ```no_run + /// ```ignore /// # use hyper_0_14 as hyper; /// use aws_smithy_types::byte_stream::ByteStream; /// use aws_smithy_types::body::SdkBody;