Skip to content

Commit

Permalink
feat(body): remove stream cargo feature
Browse files Browse the repository at this point in the history
remove stream cargo feature and any usage of stream, as it isn't stable and shouldn't be depended on

closes issue hyperium#2855
  • Loading branch information
oddgrd committed Jun 17, 2022
1 parent 5fa113e commit 31a8be1
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 410 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ jobs:

- name: Test
# Can't enable tcp feature since Miri does not support the tokio runtime
run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --features http1,http2,client,server,stream,nightly
run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --features http1,http2,client,server,nightly

features:
name: features
Expand Down
7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ futures-core = { version = "0.3", default-features = false }
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.4"
http-body = { git = "https://github.com/hyperium/http-body", branch = "master" }
http-body-util = { git = "https://github.com/hyperium/http-body", branch = "master" }
httpdate = "1.0"
httparse = "1.6"
h2 = { version = "0.3.9", optional = true }
Expand Down Expand Up @@ -80,7 +81,6 @@ full = [
"http1",
"http2",
"server",
"stream",
"runtime",
]

Expand All @@ -92,9 +92,6 @@ http2 = ["h2"]
client = []
server = []

# `impl Stream` for things
stream = []

# Tokio support
runtime = [
"tcp",
Expand Down
5 changes: 3 additions & 2 deletions benches/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ extern crate test;
use bytes::Buf;
use futures_util::stream;
use futures_util::StreamExt;
use hyper::body::Body;
use http_body_util::StreamBody;

macro_rules! bench_stream {
($bencher:ident, bytes: $bytes:expr, count: $count:expr, $total_ident:ident, $body_pat:pat, $block:expr) => {{
Expand All @@ -20,9 +20,10 @@ macro_rules! bench_stream {

$bencher.iter(|| {
rt.block_on(async {
let $body_pat = Body::wrap_stream(
let $body_pat = StreamBody::new(
stream::iter(__s.iter()).map(|&s| Ok::<_, std::convert::Infallible>(s)),
);

$block;
});
});
Expand Down
7 changes: 4 additions & 3 deletions benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use std::sync::mpsc;
use std::time::Duration;

use futures_util::{stream, StreamExt};
use http_body_util::StreamBody;
use tokio::sync::oneshot;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Response, Server};
use hyper::{Response, Server};

macro_rules! bench_server {
($b:ident, $header:expr, $body:expr) => {{
Expand Down Expand Up @@ -101,7 +102,7 @@ fn throughput_fixedsize_large_payload(b: &mut test::Bencher) {
fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("content-length", "1000000"), || {
static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
StreamBody::new(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
})
}

Expand All @@ -123,7 +124,7 @@ fn throughput_chunked_large_payload(b: &mut test::Bencher) {
fn throughput_chunked_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("transfer-encoding", "chunked"), || {
static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
StreamBody::new(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
})
}

Expand Down
20 changes: 10 additions & 10 deletions examples/echo.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![deny(warnings)]

use futures_util::TryStreamExt;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};

Expand All @@ -16,16 +15,17 @@ async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
// Simply echo the body back to the client.
(&Method::POST, "/echo") => Ok(Response::new(req.into_body())),

// TODO: Fix this, broken in PR #2896
// Convert to uppercase before sending back to client using a stream.
(&Method::POST, "/echo/uppercase") => {
let chunk_stream = req.into_body().map_ok(|chunk| {
chunk
.iter()
.map(|byte| byte.to_ascii_uppercase())
.collect::<Vec<u8>>()
});
Ok(Response::new(Body::wrap_stream(chunk_stream)))
}
// (&Method::POST, "/echo/uppercase") => {
// let chunk_stream = req.into_body().map_ok(|chunk| {
// chunk
// .iter()
// .map(|byte| byte.to_ascii_uppercase())
// .collect::<Vec<u8>>()
// });
// Ok(Response::new(Body::wrap_stream(chunk_stream)))
// }

// Reverse the entire body before sending back to the client.
//
Expand Down
11 changes: 2 additions & 9 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#![deny(warnings)]

use tokio::fs::File;

use tokio_util::codec::{BytesCodec, FramedRead};

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Result, Server, StatusCode};

Expand Down Expand Up @@ -48,11 +44,8 @@ fn not_found() -> Response<Body> {
}

async fn simple_file_send(filename: &str) -> Result<Response<Body>> {
// Serve a file by asynchronously reading it by chunks using tokio-util crate.

if let Ok(file) = File::open(filename).await {
let stream = FramedRead::new(file, BytesCodec::new());
let body = Body::wrap_stream(stream);
if let Ok(contents) = tokio::fs::read(filename).await {
let body = contents.into();
return Ok(Response::new(body));
}

Expand Down
15 changes: 3 additions & 12 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![deny(warnings)]

use bytes::Buf;
use futures_util::{stream, StreamExt};
use hyper::client::HttpConnector;
use hyper::service::{make_service_fn, service_fn};
use hyper::{header, Body, Client, Method, Request, Response, Server, StatusCode};
Expand All @@ -24,18 +23,10 @@ async fn client_request_response(client: &Client<HttpConnector>) -> Result<Respo
.unwrap();

let web_res = client.request(req).await?;
// Compare the JSON we sent (before) with what we received (after):
let before = stream::once(async {
Ok(format!(
"<b>POST request body</b>: {}<br><b>Response</b>: ",
POST_DATA,
)
.into())
});
let after = web_res.into_body();
let body = Body::wrap_stream(before.chain(after));

Ok(Response::new(body))
let res_body = web_res.into_body();

Ok(Response::new(res_body))
}

async fn api_post_response(req: Request<Body>) -> Result<Response<Body>> {
Expand Down
82 changes: 0 additions & 82 deletions src/body/body.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
use std::borrow::Cow;
#[cfg(feature = "stream")]
use std::error::Error as StdError;
use std::fmt;

use bytes::Bytes;
use futures_channel::mpsc;
use futures_channel::oneshot;
use futures_core::Stream; // for mpsc::Receiver
#[cfg(feature = "stream")]
use futures_util::TryStreamExt;
use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use super::DecodedLength;
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::Future;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
use crate::common::Never;
Expand Down Expand Up @@ -56,12 +50,6 @@ enum Kind {
},
#[cfg(feature = "ffi")]
Ffi(crate::ffi::UserBody),
#[cfg(feature = "stream")]
Wrapped(
SyncWrapper<
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
>,
),
}

struct Extra {
Expand Down Expand Up @@ -164,39 +152,6 @@ impl Body {
(tx, rx)
}

/// Wrap a futures `Stream` in a box inside `Body`.
///
/// # Example
///
/// ```
/// # use hyper::Body;
/// let chunks: Vec<Result<_, std::io::Error>> = vec![
/// Ok("hello"),
/// Ok(" "),
/// Ok("world"),
/// ];
///
/// let stream = futures_util::stream::iter(chunks);
///
/// let body = Body::wrap_stream(stream);
/// ```
///
/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
pub fn wrap_stream<S, O, E>(stream: S) -> Body
where
S: Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn StdError + Send + Sync>> + 'static,
{
let mapped = stream.map_ok(Into::into).map_err(Into::into);
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
}

fn new(kind: Kind) -> Body {
Body { kind, extra: None }
}
Expand Down Expand Up @@ -329,12 +284,6 @@ impl Body {

#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_data(cx),

#[cfg(feature = "stream")]
Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
None => Poll::Ready(None),
},
}
}

Expand Down Expand Up @@ -405,8 +354,6 @@ impl HttpBody for Body {
Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
#[cfg(feature = "ffi")]
Kind::Ffi(..) => false,
#[cfg(feature = "stream")]
Kind::Wrapped(..) => false,
}
}

Expand All @@ -426,8 +373,6 @@ impl HttpBody for Body {
match self.kind {
Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
Kind::Once(None) => SizeHint::with_exact(0),
#[cfg(feature = "stream")]
Kind::Wrapped(..) => SizeHint::default(),
Kind::Chan { content_length, .. } => opt_len!(content_length),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 { content_length, .. } => opt_len!(content_length),
Expand Down Expand Up @@ -457,33 +402,6 @@ impl fmt::Debug for Body {
}
}

/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl Stream for Body {
type Item = crate::Result<Bytes>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
HttpBody::poll_data(self, cx)
}
}

/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
#[inline]
fn from(
stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
) -> Body {
Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
}
}

impl From<Bytes> for Body {
#[inline]
fn from(chunk: Bytes) -> Body {
Expand Down
5 changes: 1 addition & 4 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ pub(crate) mod io;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
mod lazy;
mod never;
#[cfg(any(
feature = "stream",
all(feature = "client", any(feature = "http1", feature = "http2"))
))]
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
pub(crate) mod sync_wrapper;
pub(crate) mod task;
pub(crate) mod watch;
Expand Down
6 changes: 3 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(super) enum Kind {
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
HeaderTimeout,
/// Error while reading a body from connection.
#[cfg(any(feature = "http1", feature = "http2", feature = "stream"))]
#[cfg(any(feature = "http1", feature = "http2"))]
Body,
/// Error while writing a body to connection.
#[cfg(any(feature = "http1", feature = "http2"))]
Expand Down Expand Up @@ -294,7 +294,7 @@ impl Error {
Error::new(Kind::ChannelClosed)
}

#[cfg(any(feature = "http1", feature = "http2", feature = "stream"))]
#[cfg(any(feature = "http1", feature = "http2"))]
pub(super) fn new_body<E: Into<Cause>>(cause: E) -> Error {
Error::new(Kind::Body).with(cause)
}
Expand Down Expand Up @@ -440,7 +440,7 @@ impl Error {
Kind::Accept => "error accepting connection",
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
Kind::HeaderTimeout => "read header from client timeout",
#[cfg(any(feature = "http1", feature = "http2", feature = "stream"))]
#[cfg(any(feature = "http1", feature = "http2"))]
Kind::Body => "error reading a body from connection",
#[cfg(any(feature = "http1", feature = "http2"))]
Kind::BodyWrite => "error writing a body to connection",
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
//! - `runtime`: Enables convenient integration with `tokio`, providing
//! connectors and acceptors for TCP, and a default executor.
//! - `tcp`: Enables convenient implementations over TCP (using tokio).
//! - `stream`: Provides `futures::Stream` capabilities.
//!
//! [feature flags]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-section

Expand Down

0 comments on commit 31a8be1

Please sign in to comment.