Skip to content

Commit

Permalink
proxy: rebind services on connect errors (#952)
Browse files Browse the repository at this point in the history
Instead of having connect errors destroy all buffered requests,
this changes Bind to return a service that can rebind itself when
there is a connect error.

It won't try to establish the new connection itself, but waits for
the buffer to poll again. Combing this with changes in tower-buffer
to remove canceled requests from the buffer should mean that we
won't loop on connect errors for forever.

Signed-off-by: Sean McArthur <sean@seanmonstar.com>
  • Loading branch information
seanmonstar committed May 17, 2018
1 parent a1b5684 commit fb904f0
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 46 deletions.
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ tower-util = { git = "https://github.com/tower-rs/tower" }
libc = "0.2"

[dev-dependencies]
net2 = "0.2"
quickcheck = { version = "0.6", default-features = false }
conduit-proxy-controller-grpc = { path = "./controller-grpc" , features = ["arbitrary"] }
flate2 = { version = "1.0.1", default-features = false, features = ["rust_backend"] }
117 changes: 93 additions & 24 deletions proxy/src/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;

use futures::{Future, Poll, future};
use futures::{Async, Future, Poll, future, task};
use http::{self, uri};
use tokio_core::reactor::Handle;
use tower_service as tower;
use tower_h2;
use tower_reconnect::Reconnect;
use tower_reconnect::{Reconnect, Error as ReconnectError};

use control;
use control::destination::Endpoint;
Expand Down Expand Up @@ -40,19 +40,35 @@ pub struct BindProtocol<C, B> {
protocol: Protocol,
}

/// A type of service binding
/// A bound service that can re-bind itself on demand.
///
/// Reasons this would need to re-bind:
///
/// - `BindsPerRequest` can only service 1 request, and then needs to bind a
/// new service.
/// - If there is an error in the inner service (such as a connect error), we
/// need to throw it away and bind a new service.
pub struct BoundService<B: tower_h2::Body + 'static> {
bind: Bind<Arc<ctx::Proxy>, B>,
binding: Binding<B>,
/// Prevents logging repeated connect errors.
///
/// Set back to false after a connect succeeds, to log about future errors.
debounce_connect_error_log: bool,
endpoint: Endpoint,
protocol: Protocol,
}

/// A type of service binding.
///
/// Some services, for various reasons, may not be able to be used to serve multiple
/// requests. The `BindsPerRequest` binding ensures that a new stack is bound for each
/// request.
///
/// `Bound` serivces may be used to process an arbitrary number of requests.
pub enum Binding<B: tower_h2::Body + 'static> {
enum Binding<B: tower_h2::Body + 'static> {
Bound(Stack<B>),
BindsPerRequest {
endpoint: Endpoint,
protocol: Protocol,
bind: Bind<Arc<ctx::Proxy>, B>,
// When `poll_ready` is called, the _next_ service to be used may be bound
// ahead-of-time. This stack is used only to serve the next request to this
// service.
Expand Down Expand Up @@ -100,7 +116,7 @@ pub struct NormalizeUri<S> {
was_absolute_form: bool,
}

pub type Service<B> = Binding<B>;
pub type Service<B> = BoundService<B>;

pub type Stack<B> = Reconnect<NormalizeUri<NewHttp<B>>>;

Expand Down Expand Up @@ -227,16 +243,21 @@ where
Reconnect::new(proxy)
}

pub fn new_binding(&self, ep: &Endpoint, protocol: &Protocol) -> Binding<B> {
if protocol.can_reuse_clients() {
pub fn bind_service(&self, ep: &Endpoint, protocol: &Protocol) -> BoundService<B> {
let binding = if protocol.can_reuse_clients() {
Binding::Bound(self.bind_stack(ep, protocol))
} else {
Binding::BindsPerRequest {
endpoint: ep.clone(),
protocol: protocol.clone(),
bind: self.clone(),
next: None
}
};

BoundService {
bind: self.clone(),
binding,
debounce_connect_error_log: false,
endpoint: ep.clone(),
protocol: protocol.clone(),
}
}
}
Expand Down Expand Up @@ -265,7 +286,7 @@ where
type BindError = ();

fn bind(&self, ep: &Endpoint) -> Result<Self::Service, Self::BindError> {
Ok(self.bind.new_binding(ep, &self.protocol))
Ok(self.bind.bind_service(ep, &self.protocol))
}
}

Expand Down Expand Up @@ -340,36 +361,84 @@ where
}
// ===== impl Binding =====

impl<B: tower_h2::Body + 'static> tower::Service for Binding<B> {
impl<B: tower_h2::Body + 'static> tower::Service for BoundService<B> {
type Request = <Stack<B> as tower::Service>::Request;
type Response = <Stack<B> as tower::Service>::Response;
type Error = <Stack<B> as tower::Service>::Error;
type Future = <Stack<B> as tower::Service>::Future;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
match *self {
let ready = match self.binding {
// A service is already bound, so poll its readiness.
Binding::Bound(ref mut svc) |
Binding::BindsPerRequest { next: Some(ref mut svc), .. } => svc.poll_ready(),
Binding::BindsPerRequest { next: Some(ref mut svc) } => svc.poll_ready(),

// If no stack has been bound, bind it now so that its readiness can be
// checked. Store it so it can be consumed to dispatch the next request.
Binding::BindsPerRequest { ref endpoint, ref protocol, ref bind, ref mut next } => {
let mut svc = bind.bind_stack(endpoint, protocol);
let ready = svc.poll_ready()?;
Binding::BindsPerRequest { ref mut next } => {
let mut svc = self.bind.bind_stack(&self.endpoint, &self.protocol);
let ready = svc.poll_ready();
*next = Some(svc);
Ok(ready)
ready
}
};

// If there was a connect error, don't terminate this BoundService
// completely. Instead, simply clean up the inner service, prepare to
// make a new one, and tell our caller that we could maybe be ready
// if they call `poll_ready` again.
//
// If they *don't* call `poll_ready` again, that's ok, we won't ever
// try to connect again.
match ready {
Err(ReconnectError::Connect(err)) => {
if !self.debounce_connect_error_log {
self.debounce_connect_error_log = true;
warn!("connect error to {:?}: {}", self.endpoint, err);
}
match self.binding {
Binding::Bound(ref mut svc) => {
*svc = self.bind.bind_stack(&self.endpoint, &self.protocol);
},
Binding::BindsPerRequest { ref mut next } => {
next.take();
}
}

// So, this service isn't "ready" yet. Instead of trying to make
// it ready, schedule the task for notification so the caller can
// determine whether readiness is still necessary (i.e. whether
// there are still requests to be sent).
//
// But, to return NotReady, we must notify the task. So,
// this notifies the task immediately, and figures that
// whoever owns this service will call `poll_ready` if they
// are still interested.
task::current().notify();
Ok(Async::NotReady)
}
// don't debounce on NotReady...
Ok(Async::NotReady) => Ok(Async::NotReady),
other => {
self.debounce_connect_error_log = false;
other
},
}
}

fn call(&mut self, request: Self::Request) -> Self::Future {
match *self {
match self.binding {
Binding::Bound(ref mut svc) => svc.call(request),
Binding::BindsPerRequest { ref endpoint, ref protocol, ref bind, ref mut next } => {
Binding::BindsPerRequest { ref mut next } => {
// If a service has already been bound in `poll_ready`, consume it.
// Otherwise, bind a new service on-the-spot.
let mut svc = next.take().unwrap_or_else(|| bind.bind_stack(endpoint, protocol));
let bind = &self.bind;
let endpoint = &self.endpoint;
let protocol = &self.protocol;
let mut svc = next.take()
.unwrap_or_else(|| {
bind.bind_stack(endpoint, protocol)
});
svc.call(request)
}
}
Expand Down
5 changes: 2 additions & 3 deletions proxy/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,13 @@ where
///
/// # TODO
///
/// Buffering is currently unbounded and does not apply timeouts. This must be
/// changed.
/// Buffering does not apply timeouts.
fn bind_service(&self, key: &Self::Key) -> Result<Self::Service, Self::RouteError> {
let &(ref addr, ref proto) = key;
debug!("building inbound {:?} client to {}", proto, addr);

let endpoint = (*addr).into();
let binding = self.bind.new_binding(&endpoint, proto);
let binding = self.bind.bind_service(&endpoint, proto);
Buffer::new(binding, self.bind.executor())
.map(|buffer| {
InFlightLimit::new(buffer, MAX_IN_FLIGHT)
Expand Down
5 changes: 0 additions & 5 deletions proxy/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,6 @@ where
///
/// Resolves the authority in service discovery and initializes a service that buffers
/// and load balances requests across.
///
/// # TODO
///
/// Buffering is currently unbounded and does not apply timeouts. This must be
/// changed.
fn bind_service(
&self,
key: &Self::Key,
Expand Down
3 changes: 2 additions & 1 deletion proxy/tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extern crate futures;
extern crate h2;
pub extern crate http;
extern crate hyper;
pub extern crate net2;
extern crate prost;
extern crate tokio_connect;
extern crate tokio_core;
Expand All @@ -30,7 +31,7 @@ pub use std::time::Duration;
pub use self::bytes::Bytes;
pub use self::conduit_proxy::*;
pub use self::futures::*;
use self::futures::sync::oneshot;
pub use self::futures::sync::oneshot;
pub use self::http::{HeaderMap, Request, Response, StatusCode};
use self::tokio_connect::Connect;
use self::tokio_core::net::{TcpListener, TcpStream};
Expand Down
36 changes: 30 additions & 6 deletions proxy/tests/support/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,21 @@ impl Server {
self
}

pub fn delay_listen<F>(self, f: F) -> Listening
where
F: Future<Item=(), Error=()> + Send + 'static,
{
self.run_inner(Some(Box::new(f.then(|_| Ok(())))))
}

pub fn run(self) -> Listening {
self.run_inner(None)
}

fn run_inner(self, delay: Option<Box<Future<Item=(), Error=()> + Send>>) -> Listening {
let (tx, rx) = shutdown_signal();
let (addr_tx, addr_rx) = oneshot::channel();
let (listening_tx, listening_rx) = oneshot::channel();
let mut listening_tx = Some(listening_tx);
let conn_count = Arc::new(AtomicUsize::from(0));
let srv_conn_count = Arc::clone(&conn_count);
let version = self.version;
Expand All @@ -115,7 +127,19 @@ impl Server {
version,
thread_name(),
);

let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let listener = net2::TcpBuilder::new_v4().expect("Tcp::new_v4");
listener.bind(addr).expect("Tcp::bind");
let addr = listener.local_addr().expect("Tcp::local_addr");

::std::thread::Builder::new().name(tname).spawn(move || {
if let Some(delay) = delay {
let _ = listening_tx.take().unwrap().send(());
delay.wait().expect("support server delay wait");
}
let listener = listener.listen(1024).expect("Tcp::listen");

let mut core = Core::new().unwrap();
let reactor = core.handle();

Expand Down Expand Up @@ -157,11 +181,11 @@ impl Server {
},
};

let addr = ([127, 0, 0, 1], 0).into();
let bind = TcpListener::bind(&addr, &reactor).expect("bind");
let bind = TcpListener::from_listener(listener, &addr, &reactor).expect("from_listener");

let local_addr = bind.local_addr().expect("local_addr");
let _ = addr_tx.send(local_addr);
if let Some(listening_tx) = listening_tx {
let _ = listening_tx.send(());
}

let serve = bind.incoming()
.fold((srv, reactor), move |(srv, reactor), (sock, _)| {
Expand All @@ -182,7 +206,7 @@ impl Server {
core.run(rx).unwrap();
}).unwrap();

let addr = addr_rx.wait().expect("addr");
listening_rx.wait().expect("listening_rx");

// printlns will show if the test fails...
println!(
Expand Down
Loading

0 comments on commit fb904f0

Please sign in to comment.