Skip to content

Commit

Permalink
so it turns out that everything is always bytes
Browse files Browse the repository at this point in the history
Currently, the `http_box::BoxBody` type erases both the type of the
`http_body::Body` _and_ the `Body::Data` associated type (to `Box<dyn
Buf + Send>`). However, it turns out that erasing the `Data` type isn't
actually necessary: _all_ `Body` types in Linkerd use `Bytes` as the
`Bytes` type.

This branch changes `BoxBody` to just require that `B::Data` is `Bytes`,
and always produce `Bytes` as its data type. This will enable a much
more performant implementation of request body buffering for POST
retries (linkerd/linkerd2#6130). Rather than copying all the bytes from
a `dyn Buf` `Data` type into an additional buffer, we can just clone the
`Bytes`, increasing the reference count for that buffer, and return the
original `Bytes`. This means we don't need to perform additional memcpys
to buffer bodies for retries; we just hold onto the already-allocated
`Bytes` buffers.

The cloned `Bytes` can then be stored in a vector, and if we have to
write them out again for a retry, we can do one big vectored write :)
  • Loading branch information
hawkw committed May 20, 2021
1 parent 9abd27d commit 892d49b
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 20 deletions.
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/http/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ impl<E> Outbound<E> {
>,
>
where
B: http::HttpBody<Error = Error> + std::fmt::Debug + Default + Send + 'static,
B::Data: Send + 'static,
B: http::HttpBody<Data = bytes::Bytes, Error = Error>,
B: std::fmt::Debug + Default + Send + 'static,
E: svc::NewService<Endpoint, Service = ESvc> + Clone + Send + Sync + 'static,
ESvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
+ Send
Expand Down
23 changes: 9 additions & 14 deletions linkerd/http-box/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body::Body;
use linkerd_error::Error;
Expand All @@ -6,7 +7,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};

pub struct BoxBody {
inner: Pin<Box<dyn Body<Data = Data, Error = Error> + Send + 'static>>,
inner: Pin<Box<dyn Body<Data = Bytes, Error = Error> + Send + 'static>>,
}

#[pin_project]
Expand All @@ -16,7 +17,7 @@ pub struct Data {
}

#[pin_project]
struct Inner<B: Body>(#[pin] B);
struct Inner<B: Body<Data = Bytes>>(#[pin] B);

struct NoBody;

Expand All @@ -31,8 +32,7 @@ impl Default for BoxBody {
impl BoxBody {
pub fn new<B>(inner: B) -> Self
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B: Body<Data = Bytes> + Send + 'static,
B::Error: Into<Error>,
{
Self {
Expand All @@ -42,7 +42,7 @@ impl BoxBody {
}

impl Body for BoxBody {
type Data = Data;
type Data = Bytes;
type Error = Error;

fn is_end_stream(&self) -> bool {
Expand Down Expand Up @@ -88,11 +88,10 @@ impl bytes::Buf for Data {

impl<B> Body for Inner<B>
where
B: Body,
B::Data: Send + 'static,
B: Body<Data = Bytes>,
B::Error: Into<Error>,
{
type Data = Data;
type Data = Bytes;
type Error = Error;

fn is_end_stream(&self) -> bool {
Expand All @@ -104,11 +103,7 @@ where
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let opt = futures::ready!(self.project().0.poll_data(cx));
Poll::Ready(opt.map(|res| {
res.map_err(Into::into).map(|buf| Data {
inner: Box::new(buf),
})
}))
Poll::Ready(opt.map(|res| res.map_err(Into::into)))
}

fn poll_trailers(
Expand All @@ -130,7 +125,7 @@ impl std::fmt::Debug for BoxBody {
}

impl Body for NoBody {
type Data = Data;
type Data = Bytes;
type Error = Error;

fn is_end_stream(&self) -> bool {
Expand Down
3 changes: 1 addition & 2 deletions linkerd/http-box/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ impl<S: Clone, B> Clone for BoxRequest<S, B> {

impl<S, B> tower::Service<http::Request<B>> for BoxRequest<S, B>
where
B: http_body::Body + Send + 'static,
B::Data: Send + 'static,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<Error>,
S: tower::Service<http::Request<BoxBody>>,
{
Expand Down
3 changes: 1 addition & 2 deletions linkerd/http-box/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ impl<S> BoxResponse<S> {
impl<S, Req, B> tower::Service<Req> for BoxResponse<S>
where
S: tower::Service<Req, Response = http::Response<B>>,
B: http_body::Body + Send + 'static,
B::Data: Send + 'static,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<Error> + 'static,
{
type Response = http::Response<BoxBody>;
Expand Down

0 comments on commit 892d49b

Please sign in to comment.