diff --git a/linkerd/proxy/balance/src/discover/buffer.rs b/linkerd/proxy/balance/src/discover/buffer.rs index c17b477a4d..d46787b4a5 100644 --- a/linkerd/proxy/balance/src/discover/buffer.rs +++ b/linkerd/proxy/balance/src/discover/buffer.rs @@ -2,7 +2,7 @@ use futures_util::future::poll_fn; use linkerd_error::Error; use tokio::sync::mpsc; use tower::discover; -use tracing::{debug, instrument::Instrument, trace}; +use tracing::{debug, debug_span, instrument::Instrument, trace, warn}; pub type Result = std::result::Result, Error>; pub type Buffer = tokio_stream::wrappers::ReceiverStream>; @@ -16,6 +16,29 @@ where { let (tx, rx) = mpsc::channel(capacity); + // Attempts to send an update to the balancer, returning `true` if sending + // was successful and `false` otherwise. + let send = |tx: &mpsc::Sender<_>, up| { + match tx.try_send(up) { + Ok(()) => true, + + // The balancer has been dropped (and will never be used again). + Err(mpsc::error::TrySendError::Closed(_)) => { + debug!("Discovery receiver dropped"); + false + } + + // The balancer is stalled and we can't continue to buffer + // updates for it. + Err(mpsc::error::TrySendError::Full(_)) => { + warn!( + "The balancer is not processing discovery updates; aborting discovery stream" + ); + false + } + } + }; + debug!(%capacity, "Spawning discovery buffer"); tokio::spawn( async move { @@ -23,41 +46,51 @@ where loop { let res = tokio::select! { - _ = tx.closed() => break, + biased; + + _ = tx.closed() => { + debug!("Discovery receiver dropped"); + return; + } + res = poll_fn(|cx| inner.as_mut().poll_discover(cx)) => res, }; - let change = match res { + match res { Some(Ok(change)) => { trace!("Changed"); - change + if !send(&tx, Ok(change)) { + // XXX(ver) We don't actually have a way to "blow + // up" the balancer in this situation. My + // understanding is that this will cause the + // balancer to get cut off from further updates, + // should it ever become available again. That needs + // to be fixed. + // + // One option would be to drop the discovery stream + // and rebuild it if the balancer ever becomes + // unblocked. + // + // Ultimately we need to track down how we're + // getting into this blocked/idle state + return; + } } Some(Err(e)) => { let error = e.into(); debug!(%error); - let _ = tx.send(Err(error)).await; + send(&tx, Err(error)); return; } None => { debug!("Discovery stream closed"); return; } - }; - - tokio::select! { - _ = tx.closed() => break, - res = tx.send(Ok(change)) => { - if res.is_err() { - break; - } - trace!("Change sent"); - } } } - - debug!("Discovery receiver dropped"); } - .in_current_span(), + .in_current_span() + .instrument(debug_span!("discover")), ); Buffer::new(rx)