Skip to content

Commit

Permalink
Add a NewServeHttp::layer helper (#793)
Browse files Browse the repository at this point in the history
Now that `NewServeHttp` takes a single inner stack, we can instrument as
a simple layer.

This change replaces the module's constructor with a layer and
consolidates the inbound HTTP server stack.

Co-authored-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
olix0r and hawkw committed Dec 28, 2020
1 parent 0127463 commit ff2fdb2
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 70 deletions.
73 changes: 33 additions & 40 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,20 +349,29 @@ impl Config {
..
} = self.proxy.clone();

// Handles requests as they are initially received by the proxy.
let http_admit_request = svc::layers()
// Downgrades the protocol if upgraded by an outbound proxy.
.push(orig_proto::Downgrade::layer())
// Limits the number of in-flight requests.
.push_concurrency_limit(max_in_flight_requests)
// Eagerly fail requests when the proxy is out of capacity for a
// dispatch_timeout.
.push_failfast(dispatch_timeout)
.push(metrics.http_errors)
// Synthesizes responses for proxy errors.
.push(errors::layer());
// When HTTP detection fails, forward the connection to the application
// as an opaque TCP stream.
let tcp = svc::stack(tcp_forward.clone())
.push_map_target(TcpEndpoint::from)
.push_switch(
prevent_loop.into(),
// If the connection targets the inbound port, try to detect an
// opaque transport header and rewrite the target port
// accordingly. If there was no opaque transport header, the
// forwarding will fail when the tcp connect stack applies loop
// prevention.
svc::stack(tcp_forward)
.push_map_target(TcpEndpoint::from)
.push(transport::NewDetectService::layer(
transport::detect::DetectTimeout::new(
self.proxy.detect_protocol_timeout,
DetectHeader::default(),
),
)),
)
.into_inner();

let http_server = svc::stack(http_router)
svc::stack(http_router)
// Removes the override header after it has been used to
// determine a reuquest target.
.push_on_response(strip_header::request::layer(DST_OVERRIDE_HEADER))
Expand All @@ -377,7 +386,16 @@ impl Config {
.push_http_insert_target()
.push_on_response(
svc::layers()
.push(http_admit_request)
// Downgrades the protocol if upgraded by an outbound proxy.
.push(orig_proto::Downgrade::layer())
// Limits the number of in-flight requests.
.push_concurrency_limit(max_in_flight_requests)
// Eagerly fail requests when the proxy is out of capacity for a
// dispatch_timeout.
.push_failfast(dispatch_timeout)
.push(metrics.http_errors)
// Synthesizes responses for proxy errors.
.push(errors::layer())
.push(TraceContext::layer(span_sink.map(|span_sink| {
SpanConverter::server(span_sink, trace_labels())
})))
Expand All @@ -389,32 +407,7 @@ impl Config {
.push_map_target(|(_, accept): (_, TcpAccept)| accept)
.instrument(|(v, _): &(http::Version, _)| debug_span!("http", %v))
.check_new_service::<(http::Version, TcpAccept), http::Request<_>>()
.into_inner();

// When HTTP detection fails, forward the connection to the application
// as an opaque TCP stream.
let tcp = svc::stack(tcp_forward.clone())
.push_map_target(TcpEndpoint::from)
.push_switch(
prevent_loop.into(),
// If the connection targets the inbound port, try to detect an
// opaque transport header and rewrite the target port
// accordingly. If there was no opaque transport header, the
// forwarding will fail when the tcp connect stack applies loop
// prevention.
svc::stack(tcp_forward)
.push_map_target(TcpEndpoint::from)
.push(transport::NewDetectService::layer(
transport::detect::DetectTimeout::new(
self.proxy.detect_protocol_timeout,
DetectHeader::default(),
),
)),
)
.push_on_response(drain::Retain::layer(drain.clone()))
.into_inner();

svc::stack(http::NewServeHttp::new(h2_settings, http_server, drain))
.push(http::NewServeHttp::layer(h2_settings, drain))
.push(svc::stack::NewOptional::layer(tcp))
.push_cache(cache_max_idle_age)
.push(transport::NewDetectService::layer(
Expand Down
20 changes: 9 additions & 11 deletions linkerd/app/outbound/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ where
},
} = config.clone();

let http = svc::stack(http)
let tcp = svc::stack(tcp)
.push_on_response(drain::Retain::layer(drain.clone()))
.push_map_target(tcp::Endpoint::from_accept(
tls::ReasonForNoPeerName::IngressNonHttp,
))
.into_inner();

svc::stack(http)
.check_new_service::<http::Logical, http::Request<_>>()
.push_map_target(http::Logical::from)
.push(profiles::discover::layer(
Expand Down Expand Up @@ -122,16 +129,7 @@ where
.instrument(|a: &http::Accept| debug_span!("http", v = %a.protocol))
.push_map_target(http::Accept::from)
.check_new_service::<(http::Version, tcp::Accept), http::Request<_>>()
.into_inner();

let tcp = svc::stack(tcp)
.push_on_response(drain::Retain::layer(drain.clone()))
.push_map_target(tcp::Endpoint::from_accept(
tls::ReasonForNoPeerName::IngressNonHttp,
))
.into_inner();

svc::stack(http::NewServeHttp::new(h2_settings, http, drain))
.push(http::NewServeHttp::layer(h2_settings, drain))
.push(svc::stack::NewOptional::layer(tcp))
.push_cache(cache_max_idle_age)
.push(transport::NewDetectService::layer(
Expand Down
29 changes: 12 additions & 17 deletions linkerd/app/outbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,17 @@ where
..
} = config.proxy.clone();

let http_server = svc::stack(http_router)
let tcp_forward = svc::stack(tcp_connect)
.push_make_thunk()
.push_on_response(tcp::Forward::layer())
.into_new_service()
.push_on_response(metrics.stack.layer(stack_labels("tcp", "forward")))
.push_map_target(tcp::Endpoint::from_logical(
tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery,
))
.into_inner();

svc::stack(http_router)
.push_on_response(
svc::layers()
.box_http_request()
Expand All @@ -198,19 +208,7 @@ where
.push(http::NewNormalizeUri::layer())
.instrument(|l: &http::Logical| debug_span!("http", v = %l.protocol))
.push_map_target(http::Logical::from)
.into_inner();

let tcp_forward = svc::stack(tcp_connect)
.push_make_thunk()
.push_on_response(tcp::Forward::layer())
.into_new_service()
.push_on_response(metrics.stack.layer(stack_labels("tcp", "forward")))
.push_map_target(tcp::Endpoint::from_logical(
tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery,
))
.into_inner();

let http = svc::stack(http::NewServeHttp::new(h2_settings, http_server, drain))
.push(http::NewServeHttp::layer(h2_settings, drain))
.push(svc::stack::NewOptional::layer(
// When an HTTP version cannot be detected, we fallback to a logical
// TCP stack. This service needs to be buffered so that it can be
Expand All @@ -235,9 +233,6 @@ where
http::DetectHttp::default(),
),
))
.into_inner();

svc::stack(http)
.push_switch(
SkipByProfile,
// When the profile marks the target as opaque, we skip HTTP
Expand Down
11 changes: 9 additions & 2 deletions linkerd/proxy/http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
use linkerd2_drain as drain;
use linkerd2_error::Error;
use linkerd2_io::{self as io, PeerAddr, PrefixedIo};
use linkerd2_stack::NewService;
use linkerd2_stack::{layer, NewService};
use std::{
future::Future,
pin::Pin,
Expand Down Expand Up @@ -37,8 +37,15 @@ pub struct ServeHttp<S> {
// === impl NewServeHttp ===

impl<N> NewServeHttp<N> {
pub fn layer(
h2: H2Settings,
drain: drain::Watch,
) -> impl layer::Layer<N, Service = Self> + Clone {
layer::mk(move |inner| Self::new(h2, inner, drain.clone()))
}

/// Creates a new `ServeHttp`.
pub fn new(h2: H2Settings, inner: N, drain: drain::Watch) -> Self {
fn new(h2: H2Settings, inner: N, drain: drain::Watch) -> Self {
let mut server = hyper::server::conn::Http::new().with_executor(trace::Executor::new());
server
.http2_initial_stream_window_size(h2.initial_stream_window_size)
Expand Down

0 comments on commit ff2fdb2

Please sign in to comment.