Skip to content

Commit

Permalink
Fix dropped output race during UI teardown (cherrypick of #14093) (#1…
Browse files Browse the repository at this point in the history
…4690)

As described in #13276 and #11626, a race condition can occur between releasing exclusive access to the Console and the dynamic UI shutting down. In #13276, the issue was triaged to a task accepting output without confirming that the UI was still valid, and then dropping it if the UI was no longer valid.

To fix this, replace the extra task and channel (which could not be used "transactionally" to both accept stderr and validate that the UI was still valid), with an anonymous `Mutex` directly around the stderr destination.

Fixes #13276 and #11626.

[ci skip-build-wheels]
  • Loading branch information
stuhood committed Mar 3, 2022
1 parent d58ab1a commit c9c2b46
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 26 deletions.
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/ui/Cargo.toml
Expand Up @@ -13,6 +13,7 @@ futures = "0.3"
indexmap = "1.4"
indicatif = "0.16.2"
logging = { path = "../logging" }
parking_lot = "0.11"
stdio = { path = "../stdio" }
task_executor = { path = "../task_executor" }
uuid = { version = "0.7", features = ["v4"] }
Expand Down
55 changes: 29 additions & 26 deletions src/rust/engine/ui/src/console_ui.rs
Expand Up @@ -27,12 +27,13 @@

use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc;
use std::sync::Arc;
use std::time::Duration;

use futures::future::{self, FutureExt, TryFutureExt};
use indexmap::IndexMap;
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle, WeakProgressBar};
use parking_lot::Mutex;

use task_executor::Executor;
use workunit_store::{format_workunit_duration, SpanId, WorkunitStore};
Expand All @@ -42,7 +43,6 @@ pub struct ConsoleUI {
local_parallelism: usize,
// While the UI is running, there will be an Instance present.
instance: Option<Instance>,
teardown_mpsc: (mpsc::Sender<()>, mpsc::Receiver<()>),
}

impl ConsoleUI {
Expand All @@ -51,7 +51,6 @@ impl ConsoleUI {
workunit_store,
local_parallelism,
instance: None,
teardown_mpsc: mpsc::channel(),
}
}

Expand All @@ -78,18 +77,36 @@ impl ConsoleUI {
///
/// Setup progress bars, and return them along with a running task that will drive them.
///
/// NB: This method must be very careful to avoid logging. Between the point where we have taken
/// exclusive access to the Console and when the UI has actually been initialized, attempts to
/// log from this method would deadlock (by causing the method to wait for _itself_ to finish).
///
fn setup_bars(
&self,
executor: Executor,
) -> Result<(Vec<ProgressBar>, MultiProgressTask), String> {
// Stderr is propagated across a channel to remove lock interleavings between stdio and the UI.
let (stderr_sender, stderr_receiver) = mpsc::sync_channel(0);
let (term_read, _, term_stderr_write) =
// We take exclusive access to stdio by registering a callback that is used to render stderr
// while we're holding access. See the method doc.
let stderr_dest_bar: Arc<Mutex<Option<WeakProgressBar>>> = Arc::new(Mutex::new(None));
// We acquire the lock before taking exclusive access, and don't release it until after
// initialization. That way, the exclusive callback can always assume that the destination
// is initialized (i.e., can `unwrap` it).
let mut stderr_dest_bar_guard = stderr_dest_bar.lock();
let (term_read, _, term_stderr_write) = {
let stderr_dest_bar = stderr_dest_bar.clone();
stdio::get_destination().exclusive_start(Box::new(move |msg: &str| {
// If we fail to send, it's because the UI has shut down: we fail the callback to
// have the logging module directly log to stderr at that point.
stderr_sender.send(msg.to_owned()).map_err(|_| ())
}))?;
// Acquire a handle to the destination bar in the UI. If we fail to upgrade, it's because
// the UI has shut down: we fail the callback to have the logging module directly log to
// stderr at that point.
let dest_bar = {
let stderr_dest_bar = stderr_dest_bar.lock();
// We can safely unwrap here because the Mutex is held until the bar is initialized.
stderr_dest_bar.as_ref().unwrap().upgrade().ok_or(())?
};
dest_bar.println(msg);
Ok(())
}))?
};

let stderr_use_color = term_stderr_write.use_color;
let term = console::Term::read_write_pair_with_style(
Expand All @@ -108,21 +125,12 @@ impl ConsoleUI {
multi_progress.add(ProgressBar::new(50).with_style(style))
})
.collect::<Vec<_>>();
let first_bar = bars[0].downgrade();
*stderr_dest_bar_guard = Some(bars[0].downgrade());

// Spawn a task to drive the multi progress.
let multi_progress_task = executor
.spawn_blocking(move || multi_progress.join())
.boxed();
// And another to propagate stderr, which will exit automatically when the channel closes.
let _stderr_task = executor.spawn_blocking(move || {
while let Ok(stderr) = stderr_receiver.recv() {
match first_bar.upgrade() {
Some(first_bar) => first_bar.println(stderr),
None => break,
}
}
});

Ok((bars, multi_progress_task))
}
Expand Down Expand Up @@ -207,16 +215,11 @@ impl ConsoleUI {
///
pub fn teardown(&mut self) -> impl Future<Output = Result<(), String>> {
if let Some(instance) = self.instance.take() {
let sender = self.teardown_mpsc.0.clone();
// When the MultiProgress completes, the Term(Destination) is dropped, which will restore
// direct access to the Console.
instance
.multi_progress_task
.map_err(|e| format!("Failed to render UI: {}", e))
.and_then(move |()| {
sender.send(()).unwrap();
future::ok(())
})
.boxed()
} else {
future::ok(()).boxed()
Expand Down

0 comments on commit c9c2b46

Please sign in to comment.