diff --git a/Cargo.lock b/Cargo.lock index af66f9d348..0dd1fbc7cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -954,7 +954,6 @@ dependencies = [ "linkerd-error", "linkerd-stack", "pin-project", - "tower", ] [[package]] @@ -1297,6 +1296,7 @@ dependencies = [ "linkerd-addr", "linkerd-dns-name", "linkerd-error", + "linkerd-http-box", "linkerd-proxy-api-resolve", "linkerd-stack", "linkerd-tonic-watch", diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index 4a6a206cc3..59c366aadc 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -115,14 +115,12 @@ impl Inbound { // request has not been received for `cache_max_idle_age`. http.clone() .check_new_service::>() - .push_on_response(http::BoxRequest::layer()) // The HTTP stack doesn't use the profile resolution, so drop it. .push_map_target(Logical::from) + .push_on_response(http::BoxResponse::layer()) .push(profiles::http::route_request::layer( svc::proxies() - // Sets the route as a request extension so that it can be used - // by tap. - .push_http_insert_target::() + .push_on_response(http::BoxRequest::layer()) // Records per-route metrics. .push( rt.metrics @@ -132,7 +130,9 @@ impl Inbound { // Sets the per-route response classifier as a request // extension. .push(classify::NewClassify::layer()) - .check_new_clone::() + // Sets the route as a request extension so that it can be used + // by tap. + .push_http_insert_target::() .push_map_target(|(route, logical): (profiles::http::Route, Profile)| { dst::Route { route, @@ -140,9 +140,9 @@ impl Inbound { direction: metrics::Direction::In, } }) + .push_on_response(http::BoxResponse::layer()) .into_inner(), )) - .push_on_response(http::BoxResponse::layer()) .push_switch( // If the profile was resolved to a logical (service) address, build a profile // stack to include route-level metrics, etc. Otherwise, skip this stack and use diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index 38ac278fc1..2cae46716e 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -109,9 +109,11 @@ impl Outbound { .push_spawn_buffer(buffer_capacity), ) .push_cache(cache_max_idle_age) + .push_on_response(http::BoxResponse::layer()) // Note: routes can't exert backpressure. .push(profiles::http::route_request::layer( svc::proxies() + .push_on_response(http::BoxRequest::layer()) .push( rt.metrics .http_route_actual @@ -133,8 +135,10 @@ impl Outbound { // extension. .push(classify::NewClassify::layer()) .push_map_target(Logical::mk_route) + .push_on_response(http::BoxResponse::layer()) .into_inner(), )) + .push_on_response(http::BoxRequest::layer()) // Strips headers that may be set by this proxy and add an outbound // canonical-dst-header. The response body is boxed unify the profile // stack's response type with that of to endpoint stack. diff --git a/linkerd/http-box/Cargo.toml b/linkerd/http-box/Cargo.toml index 0189808e3e..079bbf2bbc 100644 --- a/linkerd/http-box/Cargo.toml +++ b/linkerd/http-box/Cargo.toml @@ -13,5 +13,4 @@ http = "0.2" http-body = "0.4" linkerd-error = { path = "../error" } linkerd-stack = { path = "../stack" } -tower = {version = "0.4", default-features = false } pin-project = "1" diff --git a/linkerd/http-box/src/erase_request.rs b/linkerd/http-box/src/erase_request.rs index 4f122baf07..5f373167a7 100644 --- a/linkerd/http-box/src/erase_request.rs +++ b/linkerd/http-box/src/erase_request.rs @@ -2,14 +2,14 @@ use crate::BoxBody; use linkerd_error::Error; -use linkerd_stack::{layer, Proxy}; +use linkerd_stack::{layer, Proxy, Service}; use std::task::{Context, Poll}; /// Boxes request bodies, erasing the original type. /// /// This is *very* similar to the [`BoxRequest`](crate::request::BoxRequest) /// middleware. However, that middleware is generic over a specific body type -/// that is erased. A given instance of `EraseRequest` can only erase the type +/// that is erased. A given instance of `BoxRequest` can only erase the type /// of one particular `Body` type, while this middleware will erase bodies of /// *any* type. /// @@ -43,12 +43,12 @@ impl Clone for EraseRequest { } } -impl tower::Service> for EraseRequest +impl Service> for EraseRequest where B: http_body::Body + Send + 'static, B::Data: Send + 'static, B::Error: Into, - S: tower::Service>, + S: Service>, { type Response = S::Response; type Error = S::Error; @@ -70,7 +70,7 @@ where B: http_body::Body + Send + 'static, B::Data: Send + 'static, B::Error: Into, - S: tower::Service, + S: Service, P: Proxy, S>, { type Request = P::Request; diff --git a/linkerd/http-box/src/request.rs b/linkerd/http-box/src/request.rs index 714f2a206f..8ae598086e 100644 --- a/linkerd/http-box/src/request.rs +++ b/linkerd/http-box/src/request.rs @@ -2,16 +2,16 @@ use crate::{erase_request::EraseRequest, BoxBody}; use linkerd_error::Error; -use linkerd_stack::layer; +use linkerd_stack::{layer, Proxy, Service}; use std::{ marker::PhantomData, task::{Context, Poll}, }; #[derive(Debug)] -pub struct BoxRequest(S, PhantomData); +pub struct BoxRequest(S, PhantomData); -impl BoxRequest { +impl BoxRequest { pub fn layer() -> impl layer::Layer + Clone + Copy { layer::mk(|inner| BoxRequest(inner, PhantomData)) } @@ -24,18 +24,18 @@ impl BoxRequest { } } -impl Clone for BoxRequest { +impl Clone for BoxRequest { fn clone(&self) -> Self { BoxRequest(self.0.clone(), self.1) } } -impl tower::Service> for BoxRequest +impl Service> for BoxRequest where B: http_body::Body + Send + 'static, B::Data: Send + 'static, B::Error: Into, - S: tower::Service>, + S: Service>, { type Response = S::Response; type Error = S::Error; @@ -51,3 +51,22 @@ where self.0.call(req.map(BoxBody::new)) } } + +impl Proxy, S> for BoxRequest +where + B: http_body::Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, + S: Service, + P: Proxy, S>, +{ + type Request = P::Request; + type Response = P::Response; + type Error = P::Error; + type Future = P::Future; + + #[inline] + fn proxy(&self, inner: &mut S, req: http::Request) -> Self::Future { + self.0.proxy(inner, req.map(BoxBody::new)) + } +} diff --git a/linkerd/http-box/src/response.rs b/linkerd/http-box/src/response.rs index 73e88e608a..90e86cf84c 100644 --- a/linkerd/http-box/src/response.rs +++ b/linkerd/http-box/src/response.rs @@ -3,7 +3,7 @@ use crate::BoxBody; use futures::{future, TryFutureExt}; use linkerd_error::Error; -use linkerd_stack::layer; +use linkerd_stack::{layer, Proxy, Service}; use std::task::{Context, Poll}; #[derive(Clone, Debug)] @@ -15,9 +15,9 @@ impl BoxResponse { } } -impl tower::Service for BoxResponse +impl Service for BoxResponse where - S: tower::Service>, + S: Service>, B: http_body::Body + Send + 'static, B::Data: Send + 'static, B::Error: Into + 'static, @@ -36,3 +36,22 @@ where self.0.call(req).map_ok(|rsp| rsp.map(BoxBody::new)) } } + +impl Proxy for BoxResponse

+where + B: http_body::Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, + S: Service, + P: Proxy>, +{ + type Request = P::Request; + type Response = http::Response; + type Error = P::Error; + type Future = future::MapOk Self::Response>; + + #[inline] + fn proxy(&self, inner: &mut S, req: Req) -> Self::Future { + self.0.proxy(inner, req).map_ok(|rsp| rsp.map(BoxBody::new)) + } +} diff --git a/linkerd/service-profiles/Cargo.toml b/linkerd/service-profiles/Cargo.toml index 82a92fbfe7..1f45c2825a 100644 --- a/linkerd/service-profiles/Cargo.toml +++ b/linkerd/service-profiles/Cargo.toml @@ -21,10 +21,11 @@ indexmap = "1.7" linkerd-addr = { path = "../addr" } linkerd-dns-name = { path = "../dns/name" } linkerd-error = { path = "../error" } -linkerd2-proxy-api = { version = "0.2", features = ["destination", "client"] } +linkerd-http-box = { path = "../http-box" } linkerd-proxy-api-resolve = { path = "../proxy/api-resolve" } linkerd-stack = { path = "../stack" } linkerd-tonic-watch = { path = "../tonic-watch" } +linkerd2-proxy-api = { version = "0.2", features = ["destination", "client"] } rand = { version = "0.8", features = ["small_rng"] } regex = "1.5.4" tokio = { version = "1", features = ["macros", "rt", "sync", "time"] } diff --git a/linkerd/service-profiles/src/http/route_request.rs b/linkerd/service-profiles/src/http/route_request.rs index 5225c3a009..b6735f9960 100644 --- a/linkerd/service-profiles/src/http/route_request.rs +++ b/linkerd/service-profiles/src/http/route_request.rs @@ -1,7 +1,8 @@ use super::{RequestMatch, Route}; use crate::{Profile, Receiver, ReceiverStream}; -use futures::{future::ErrInto, prelude::*, ready}; +use futures::{future, prelude::*, ready}; use linkerd_error::Error; +use linkerd_http_box::BoxBody; use linkerd_stack::{layer, NewService, Param, Proxy}; use std::{ collections::HashMap, @@ -15,11 +16,9 @@ pub fn layer( ) -> impl layer::Layer> { // This is saved so that the same `Arc`s are used and cloned instead of // calling `Route::default()` every time. - let default = Route::default(); layer::mk(move |inner| NewRouteRequest { inner, new_route: new_route.clone(), - default: default.clone(), _route: PhantomData, }) } @@ -27,7 +26,6 @@ pub fn layer( pub struct NewRouteRequest { inner: M, new_route: N, - default: Route, _route: PhantomData, } @@ -38,7 +36,6 @@ pub struct RouteRequest { new_route: N, http_routes: Vec<(RequestMatch, Route)>, proxies: HashMap, - default: R, } impl Clone for NewRouteRequest { @@ -46,7 +43,6 @@ impl Clone for NewRouteRequest { Self { inner: self.inner.clone(), new_route: self.new_route.clone(), - default: self.default.clone(), _route: self._route, } } @@ -63,14 +59,10 @@ where fn new_service(&mut self, target: T) -> Self::Service { let rx = target.param(); let inner = self.inner.new_service(target.clone()); - let default = self - .new_route - .new_service((self.default.clone(), target.clone())); RouteRequest { rx: rx.into(), target, inner, - default, new_route: self.new_route.clone(), http_routes: Vec::new(), proxies: HashMap::new(), @@ -78,18 +70,23 @@ where } } -impl tower::Service> for RouteRequest +impl tower::Service> for RouteRequest where - B: Send + 'static, T: Clone, N: NewService<(Route, T), Service = R> + Clone, - R: Proxy, S>, - S: tower::Service, + R: Proxy< + http::Request, + S, + Request = http::Request, + Response = http::Response, + >, + S: tower::Service, Response = http::Response>, S::Error: Into, { - type Response = R::Response; + type Response = http::Response; type Error = Error; - type Future = ErrInto; + type Future = + future::Either, future::ErrInto>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let mut update = None; @@ -119,17 +116,19 @@ where Poll::Ready(ready!(self.inner.poll_ready(cx)).map_err(Into::into)) } - fn call(&mut self, req: http::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { for (ref condition, ref route) in &self.http_routes { if condition.is_match(&req) { trace!(?condition, "Using configured route"); - return self.proxies[route] - .proxy(&mut self.inner, req) - .err_into::(); + return future::Either::Left( + self.proxies[route] + .proxy(&mut self.inner, req) + .err_into::(), + ); } } - trace!("Using default route"); - self.default.proxy(&mut self.inner, req).err_into::() + trace!("No routes matched"); + future::Either::Right(self.inner.call(req).err_into::()) } }