Skip to content

Commit

Permalink
feat: add zstd support (#1866)
Browse files Browse the repository at this point in the history
Closes #1463
  • Loading branch information
paolobarbolini committed Apr 15, 2024
1 parent 1af8945 commit 1073881
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 20 deletions.
10 changes: 6 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,23 @@ jobs:
- name: windows / stable-x86_64-msvc
os: windows-latest
target: x86_64-pc-windows-msvc
features: "--features blocking,gzip,brotli,deflate,json,multipart,stream"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart,stream"
- name: windows / stable-i686-msvc
os: windows-latest
target: i686-pc-windows-msvc
features: "--features blocking,gzip,brotli,deflate,json,multipart,stream"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart,stream"
- name: windows / stable-x86_64-gnu
os: windows-latest
rust: stable-x86_64-pc-windows-gnu
target: x86_64-pc-windows-gnu
features: "--features blocking,gzip,brotli,deflate,json,multipart,stream"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart,stream"
package_name: mingw-w64-x86_64-gcc
mingw64_path: "C:\\msys64\\mingw64\\bin"
- name: windows / stable-i686-gnu
os: windows-latest
rust: stable-i686-pc-windows-gnu
target: i686-pc-windows-gnu
features: "--features blocking,gzip,brotli,deflate,json,multipart,stream"
features: "--features blocking,gzip,brotli,zstd,deflate,json,multipart,stream"
package_name: mingw-w64-i686-gcc
mingw64_path: "C:\\msys64\\mingw32\\bin"

Expand All @@ -145,6 +145,8 @@ jobs:
features: "--features gzip,stream"
- name: "feat.: brotli"
features: "--features brotli,stream"
- name: "feat.: zstd"
features: "--features zstd,stream"
- name: "feat.: deflate"
features: "--features deflate,stream"
- name: "feat.: json"
Expand Down
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ gzip = ["dep:async-compression", "async-compression?/gzip", "dep:tokio-util"]

brotli = ["dep:async-compression", "async-compression?/brotli", "dep:tokio-util"]

zstd = ["dep:async-compression", "async-compression?/zstd", "dep:tokio-util"]

deflate = ["dep:async-compression", "async-compression?/zlib", "dep:tokio-util"]

json = ["dep:serde_json"]
Expand Down Expand Up @@ -167,6 +169,7 @@ hyper-util = { version = "0.1", features = ["http1", "http2", "client", "client-
serde = { version = "1.0", features = ["derive"] }
libflate = "1.0"
brotli_crate = { package = "brotli", version = "3.3.0" }
zstd_crate = { package = "zstd", version = "0.13" }
doc-comment = "0.3"
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread"] }
futures-util = { version = "0.3.0", default-features = false, features = ["std", "alloc"] }
Expand Down Expand Up @@ -258,6 +261,11 @@ name = "brotli"
path = "tests/brotli.rs"
required-features = ["brotli", "stream"]

[[test]]
name = "zstd"
path = "tests/zstd.rs"
required-features = ["zstd", "stream"]

[[test]]
name = "deflate"
path = "tests/deflate.rs"
Expand Down
40 changes: 40 additions & 0 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,29 @@ impl ClientBuilder {
self
}

/// Enable auto zstd decompression by checking the `Content-Encoding` response header.
///
/// If auto zstd decompression is turned on:
///
/// - When sending a request and if the request's headers do not already contain
/// an `Accept-Encoding` **and** `Range` values, the `Accept-Encoding` header is set to `zstd`.
/// The request body is **not** automatically compressed.
/// - When receiving a response, if its headers contain a `Content-Encoding` value of
/// `zstd`, both `Content-Encoding` and `Content-Length` are removed from the
/// headers' set. The response body is automatically decompressed.
///
/// If the `zstd` feature is turned on, the default option is enabled.
///
/// # Optional
///
/// This requires the optional `zstd` feature to be enabled
#[cfg(feature = "zstd")]
#[cfg_attr(docsrs, doc(cfg(feature = "zstd")))]
pub fn zstd(mut self, enable: bool) -> ClientBuilder {
self.config.accepts.zstd = enable;
self
}

/// Enable auto deflate decompression by checking the `Content-Encoding` response header.
///
/// If auto deflate decompression is turned on:
Expand Down Expand Up @@ -968,6 +991,23 @@ impl ClientBuilder {
}
}

/// Disable auto response body zstd decompression.
///
/// This method exists even if the optional `zstd` feature is not enabled.
/// This can be used to ensure a `Client` doesn't use zstd decompression
/// even if another dependency were to enable the optional `zstd` feature.
pub fn no_zstd(self) -> ClientBuilder {
#[cfg(feature = "zstd")]
{
self.zstd(false)
}

#[cfg(not(feature = "zstd"))]
{
self
}
}

/// Disable auto response body deflate decompression.
///
/// This method exists even if the optional `deflate` feature is not enabled.
Expand Down
144 changes: 128 additions & 16 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use async_compression::tokio::bufread::GzipDecoder;
#[cfg(feature = "brotli")]
use async_compression::tokio::bufread::BrotliDecoder;

#[cfg(feature = "zstd")]
use async_compression::tokio::bufread::ZstdDecoder;

#[cfg(feature = "deflate")]
use async_compression::tokio::bufread::ZlibDecoder;

Expand All @@ -19,9 +22,19 @@ use http::HeaderMap;
use hyper::body::Body as HttpBody;
use hyper::body::Frame;

#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
#[cfg(any(
feature = "gzip",
feature = "brotli",
feature = "zstd",
feature = "deflate"
))]
use tokio_util::codec::{BytesCodec, FramedRead};
#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
#[cfg(any(
feature = "gzip",
feature = "brotli",
feature = "zstd",
feature = "deflate"
))]
use tokio_util::io::StreamReader;

use super::body::ResponseBody;
Expand All @@ -33,6 +46,8 @@ pub(super) struct Accepts {
pub(super) gzip: bool,
#[cfg(feature = "brotli")]
pub(super) brotli: bool,
#[cfg(feature = "zstd")]
pub(super) zstd: bool,
#[cfg(feature = "deflate")]
pub(super) deflate: bool,
}
Expand All @@ -44,6 +59,8 @@ impl Accepts {
gzip: false,
#[cfg(feature = "brotli")]
brotli: false,
#[cfg(feature = "zstd")]
zstd: false,
#[cfg(feature = "deflate")]
deflate: false,
}
Expand All @@ -59,7 +76,12 @@ pub(crate) struct Decoder {

type PeekableIoStream = Peekable<IoStream>;

#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
#[cfg(any(
feature = "gzip",
feature = "zstd",
feature = "brotli",
feature = "deflate"
))]
type PeekableIoStreamReader = StreamReader<PeekableIoStream, Bytes>;

enum Inner {
Expand All @@ -74,12 +96,21 @@ enum Inner {
#[cfg(feature = "brotli")]
Brotli(Pin<Box<FramedRead<BrotliDecoder<PeekableIoStreamReader>, BytesCodec>>>),

/// A `Zstd` decoder will uncompress the zstd compressed response content before returning it.
#[cfg(feature = "zstd")]
Zstd(Pin<Box<FramedRead<ZstdDecoder<PeekableIoStreamReader>, BytesCodec>>>),

/// A `Deflate` decoder will uncompress the deflated response content before returning it.
#[cfg(feature = "deflate")]
Deflate(Pin<Box<FramedRead<ZlibDecoder<PeekableIoStreamReader>, BytesCodec>>>),

/// A decoder that doesn't have a value yet.
#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
Pending(Pin<Box<Pending>>),
}

Expand All @@ -93,6 +124,8 @@ enum DecoderType {
Gzip,
#[cfg(feature = "brotli")]
Brotli,
#[cfg(feature = "zstd")]
Zstd,
#[cfg(feature = "deflate")]
Deflate,
}
Expand Down Expand Up @@ -155,6 +188,21 @@ impl Decoder {
}
}

/// A zstd decoder.
///
/// This decoder will buffer and decompress chunks that are zstd compressed.
#[cfg(feature = "zstd")]
fn zstd(body: ResponseBody) -> Decoder {
use futures_util::StreamExt;

Decoder {
inner: Inner::Pending(Box::pin(Pending(
IoStream(body).peekable(),
DecoderType::Zstd,
))),
}
}

/// A deflate decoder.
///
/// This decoder will buffer and decompress chunks that are deflated.
Expand All @@ -170,7 +218,12 @@ impl Decoder {
}
}

#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
fn detect_encoding(headers: &mut HeaderMap, encoding_str: &str) -> bool {
use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
use log::warn;
Expand Down Expand Up @@ -225,6 +278,13 @@ impl Decoder {
}
}

#[cfg(feature = "zstd")]
{
if _accepts.zstd && Decoder::detect_encoding(_headers, "zstd") {
return Decoder::zstd(body);
}
}

#[cfg(feature = "deflate")]
{
if _accepts.deflate && Decoder::detect_encoding(_headers, "deflate") {
Expand All @@ -245,7 +305,12 @@ impl HttpBody for Decoder {
cx: &mut Context,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.inner {
#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) {
Poll::Ready(Ok(inner)) => {
self.inner = inner;
Expand Down Expand Up @@ -277,6 +342,14 @@ impl HttpBody for Decoder {
None => Poll::Ready(None),
}
}
#[cfg(feature = "zstd")]
Inner::Zstd(ref mut decoder) => {
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(Frame::data(bytes.freeze())))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
None => Poll::Ready(None),
}
}
#[cfg(feature = "deflate")]
Inner::Deflate(ref mut decoder) => {
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Expand All @@ -292,7 +365,12 @@ impl HttpBody for Decoder {
match self.inner {
Inner::PlainText(ref body) => HttpBody::size_hint(body),
// the rest are "unknown", so default
#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
#[cfg(any(
feature = "brotli",
feature = "zstd",
feature = "gzip",
feature = "deflate"
))]
_ => http_body::SizeHint::default(),
}
}
Expand Down Expand Up @@ -332,6 +410,11 @@ impl Future for Pending {
BrotliDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
))))),
#[cfg(feature = "zstd")]
DecoderType::Zstd => Poll::Ready(Ok(Inner::Zstd(Box::pin(FramedRead::new(
ZstdDecoder::new(StreamReader::new(_body)),
BytesCodec::new(),
))))),
#[cfg(feature = "gzip")]
DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin(FramedRead::new(
GzipDecoder::new(StreamReader::new(_body)),
Expand Down Expand Up @@ -381,22 +464,37 @@ impl Accepts {
gzip: false,
#[cfg(feature = "brotli")]
brotli: false,
#[cfg(feature = "zstd")]
zstd: false,
#[cfg(feature = "deflate")]
deflate: false,
}
}
*/

pub(super) fn as_str(&self) -> Option<&'static str> {
match (self.is_gzip(), self.is_brotli(), self.is_deflate()) {
(true, true, true) => Some("gzip, br, deflate"),
(true, true, false) => Some("gzip, br"),
(true, false, true) => Some("gzip, deflate"),
(false, true, true) => Some("br, deflate"),
(true, false, false) => Some("gzip"),
(false, true, false) => Some("br"),
(false, false, true) => Some("deflate"),
(false, false, false) => None,
match (
self.is_gzip(),
self.is_brotli(),
self.is_zstd(),
self.is_deflate(),
) {
(true, true, true, true) => Some("gzip, br, zstd, deflate"),
(true, true, false, true) => Some("gzip, br, deflate"),
(true, true, true, false) => Some("gzip, br, zstd"),
(true, true, false, false) => Some("gzip, br"),
(true, false, true, true) => Some("gzip, zstd, deflate"),
(true, false, false, true) => Some("gzip, zstd, deflate"),
(false, true, true, true) => Some("br, zstd, deflate"),
(false, true, false, true) => Some("br, zstd, deflate"),
(true, false, true, false) => Some("gzip, zstd"),
(true, false, false, false) => Some("gzip"),
(false, true, true, false) => Some("br, zstd"),
(false, true, false, false) => Some("br"),
(false, false, true, true) => Some("zstd, deflate"),
(false, false, true, false) => Some("zstd"),
(false, false, false, true) => Some("deflate"),
(false, false, false, false) => None,
}
}

Expand Down Expand Up @@ -424,6 +522,18 @@ impl Accepts {
}
}

fn is_zstd(&self) -> bool {
#[cfg(feature = "zstd")]
{
self.zstd
}

#[cfg(not(feature = "zstd"))]
{
false
}
}

fn is_deflate(&self) -> bool {
#[cfg(feature = "deflate")]
{
Expand All @@ -444,6 +554,8 @@ impl Default for Accepts {
gzip: true,
#[cfg(feature = "brotli")]
brotli: true,
#[cfg(feature = "zstd")]
zstd: true,
#[cfg(feature = "deflate")]
deflate: true,
}
Expand Down
Loading

0 comments on commit 1073881

Please sign in to comment.