Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balance: Log and fail stuck discovery streams. #2484

Merged
merged 2 commits into from Oct 17, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 47 additions & 18 deletions linkerd/proxy/balance/src/discover/buffer.rs
Expand Up @@ -2,7 +2,7 @@ use futures_util::future::poll_fn;
use linkerd_error::Error;
use tokio::sync::mpsc;
use tower::discover;
use tracing::{debug, instrument::Instrument, trace};
use tracing::{debug, debug_span, instrument::Instrument, trace, warn};

pub type Result<K, S> = std::result::Result<discover::Change<K, S>, Error>;
pub type Buffer<K, S> = tokio_stream::wrappers::ReceiverStream<Result<K, S>>;
Expand All @@ -16,48 +16,77 @@ where
{
let (tx, rx) = mpsc::channel(capacity);

let send = |tx: &mpsc::Sender<_>, change| {
olix0r marked this conversation as resolved.
Show resolved Hide resolved
if let Err(e) = tx.try_send(change) {
match e {
// The balancer has been dropped (and will never be used again).
mpsc::error::TrySendError::Closed(_) => {
debug!("Discovery receiver dropped");
}

// The balancer is stalled and we can't continue to buffer
// updates for it.
mpsc::error::TrySendError::Full(_) => {
warn!("The balancer is not processing discovery updates; aborting discovery stream");
}
}
return false;
}
true
olix0r marked this conversation as resolved.
Show resolved Hide resolved
};

debug!(%capacity, "Spawning discovery buffer");
tokio::spawn(
async move {
tokio::pin!(inner);

loop {
let res = tokio::select! {
_ = tx.closed() => break,
biased;

_ = tx.closed() => {
debug!("Discovery receiver dropped");
return;
}

res = poll_fn(|cx| inner.as_mut().poll_discover(cx)) => res,
};

let change = match res {
match res {
Some(Ok(change)) => {
trace!("Changed");
change
if !send(&tx, Ok(change)) {
// XXX(ver) We don't actually have a way to "blow
// up" the balancer in this situation. My
// understanding is that this will cause the
// balancer to get cut off from further updates,
// should it ever become available again. That needs
// to be fixed.
//
// One option would be to drop the discovery stream
// and rebuild it if the balancer ever becomes
// unblocked.
//
// Ultimately we need to track down how we're
// getting into this blocked/idle state
hawkw marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}
Some(Err(e)) => {
let error = e.into();
debug!(%error);
let _ = tx.send(Err(error)).await;
send(&tx, Err(error));
return;
}
None => {
debug!("Discovery stream closed");
return;
}
};
olix0r marked this conversation as resolved.
Show resolved Hide resolved

tokio::select! {
_ = tx.closed() => break,
res = tx.send(Ok(change)) => {
if res.is_err() {
break;
}
trace!("Change sent");
}
}
}

debug!("Discovery receiver dropped");
}
.in_current_span(),
.in_current_span()
.instrument(debug_span!("discover")),
);

Buffer::new(rx)
Expand Down