Skip to content

Commit

Permalink
Simplify concurrency-limit initialization (#799)
Browse files Browse the repository at this point in the history
This change replaces the `concurrency_limit::Layer` type with
`ConcurrencyLimit::layer` helper. It also removes the
`push_concurrency_limit` stack helper, since the layer type is no longer
nameable.
  • Loading branch information
olix0r committed Dec 28, 2020
1 parent 665230c commit 88b11f1
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 41 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,7 @@ name = "linkerd2-concurrency-limit"
version = "0.1.0"
dependencies = [
"futures 0.3.5",
"linkerd2-stack",
"pin-project 0.4.22",
"tokio 0.2.23",
"tower",
Expand Down
13 changes: 1 addition & 12 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub use crate::proxy::http;
use crate::transport::Connect;
use crate::{cache, Error};
pub use linkerd2_buffer as buffer;
use linkerd2_concurrency_limit as concurrency_limit;
pub use linkerd2_concurrency_limit::ConcurrencyLimit;
pub use linkerd2_stack::{self as stack, layer, NewRouter, NewService};
pub use linkerd2_stack_tracing::{InstrumentMake, InstrumentMakeLayer};
pub use linkerd2_timeout as timeout;
Expand Down Expand Up @@ -87,10 +87,6 @@ impl<L> Layers<L> {
self.push(stack::OnResponseLayer::new(layer))
}

pub fn push_concurrency_limit(self, max: usize) -> Layers<Pair<L, concurrency_limit::Layer>> {
self.push(concurrency_limit::Layer::new(max))
}

pub fn push_make_ready<Req>(self) -> Layers<Pair<L, stack::MakeReadyLayer<Req>>> {
self.push(stack::MakeReadyLayer::default())
}
Expand Down Expand Up @@ -199,13 +195,6 @@ impl<S> Stack<S> {
self.push(stack::OnResponseLayer::new(layer))
}

pub fn push_concurrency_limit(
self,
max: usize,
) -> Stack<concurrency_limit::ConcurrencyLimit<S>> {
self.push(concurrency_limit::Layer::new(max))
}

pub fn push_timeout(self, timeout: Duration) -> Stack<tower::timeout::Timeout<S>> {
self.push(tower::timeout::TimeoutLayer::new(timeout))
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl Config {
// Downgrades the protocol if upgraded by an outbound proxy.
.push(orig_proto::Downgrade::layer())
// Limits the number of in-flight requests.
.push_concurrency_limit(max_in_flight_requests)
.push(svc::ConcurrencyLimit::layer(max_in_flight_requests))
// Eagerly fail requests when the proxy is out of capacity for a
// dispatch_timeout.
.push_failfast(dispatch_timeout)
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ where
svc::layers()
.box_http_request()
// Limits the number of in-flight requests.
.push_concurrency_limit(max_in_flight_requests)
.push(svc::ConcurrencyLimit::layer(max_in_flight_requests))
// Eagerly fail requests when the proxy is out of capacity for a
// dispatch_timeout.
.push_failfast(dispatch_timeout)
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ where
svc::layers()
.box_http_request()
// Limits the number of in-flight requests.
.push_concurrency_limit(max_in_flight_requests)
.push(svc::ConcurrencyLimit::layer(max_in_flight_requests))
// Eagerly fail requests when the proxy is out of capacity for a
// dispatch_timeout.
.push_failfast(dispatch_timeout)
Expand Down
1 change: 1 addition & 0 deletions linkerd/concurrency-limit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ publish = false

[dependencies]
futures = "0.3"
linkerd2-stack = { path = "../stack" }
tokio = { version = "0.2.21", features = ["sync"] }
tower = { version = "0.4", default-features = false }
tracing = "0.1.22"
Expand Down
33 changes: 7 additions & 26 deletions linkerd/concurrency-limit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#![deny(warnings, rust_2018_idioms)]

use linkerd2_stack::layer;
use pin_project::pin_project;
use std::{
fmt,
Expand All @@ -19,11 +20,6 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tower::Service;
use tracing::trace;

#[derive(Clone, Debug)]
pub struct Layer {
semaphore: Arc<Semaphore>,
}

/// Enforces a limit on the number of concurrent requests to the inner service.
#[derive(Debug)]
pub struct ConcurrencyLimit<T> {
Expand All @@ -48,29 +44,14 @@ pub struct ResponseFuture<T> {
permit: Option<OwnedSemaphorePermit>,
}

impl From<Arc<Semaphore>> for Layer {
fn from(semaphore: Arc<Semaphore>) -> Self {
Self { semaphore }
}
}

impl Layer {
pub fn new(max: usize) -> Self {
Arc::new(Semaphore::new(max)).into()
impl<S> ConcurrencyLimit<S> {
/// Create a new concurrency-limiting layer.
pub fn layer(limit: usize) -> impl layer::Layer<S, Service = Self> + Clone {
let semaphore = Arc::new(Semaphore::new(limit));
layer::mk(move |inner| Self::new(inner, semaphore.clone()))
}
}

impl<S> tower::layer::Layer<S> for Layer {
type Service = ConcurrencyLimit<S>;

fn layer(&self, inner: S) -> Self::Service {
ConcurrencyLimit::new(inner, self.semaphore.clone())
}
}

impl<T> ConcurrencyLimit<T> {
/// Create a new concurrency limiter.
pub fn new(inner: T, semaphore: Arc<Semaphore>) -> Self {
fn new(inner: S, semaphore: Arc<Semaphore>) -> Self {
ConcurrencyLimit {
inner,
semaphore,
Expand Down

0 comments on commit 88b11f1

Please sign in to comment.