New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix dropped output race during UI teardown #14093
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -27,12 +27,13 @@ | |||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
use std::future::Future; | ||||||||||||||||||||||||||||||||||||||
use std::pin::Pin; | ||||||||||||||||||||||||||||||||||||||
use std::sync::mpsc; | ||||||||||||||||||||||||||||||||||||||
use std::sync::Arc; | ||||||||||||||||||||||||||||||||||||||
use std::time::Duration; | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
use futures::future::{self, FutureExt, TryFutureExt}; | ||||||||||||||||||||||||||||||||||||||
use indexmap::IndexMap; | ||||||||||||||||||||||||||||||||||||||
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle}; | ||||||||||||||||||||||||||||||||||||||
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle, WeakProgressBar}; | ||||||||||||||||||||||||||||||||||||||
use parking_lot::Mutex; | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
use task_executor::Executor; | ||||||||||||||||||||||||||||||||||||||
use workunit_store::{format_workunit_duration, SpanId, WorkunitStore}; | ||||||||||||||||||||||||||||||||||||||
|
@@ -42,7 +43,6 @@ pub struct ConsoleUI { | |||||||||||||||||||||||||||||||||||||
local_parallelism: usize, | ||||||||||||||||||||||||||||||||||||||
// While the UI is running, there will be an Instance present. | ||||||||||||||||||||||||||||||||||||||
instance: Option<Instance>, | ||||||||||||||||||||||||||||||||||||||
teardown_mpsc: (mpsc::Sender<()>, mpsc::Receiver<()>), | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
impl ConsoleUI { | ||||||||||||||||||||||||||||||||||||||
|
@@ -51,7 +51,6 @@ impl ConsoleUI { | |||||||||||||||||||||||||||||||||||||
workunit_store, | ||||||||||||||||||||||||||||||||||||||
local_parallelism, | ||||||||||||||||||||||||||||||||||||||
instance: None, | ||||||||||||||||||||||||||||||||||||||
teardown_mpsc: mpsc::channel(), | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
|
@@ -78,18 +77,33 @@ impl ConsoleUI { | |||||||||||||||||||||||||||||||||||||
/// | ||||||||||||||||||||||||||||||||||||||
/// Setup progress bars, and return them along with a running task that will drive them. | ||||||||||||||||||||||||||||||||||||||
/// | ||||||||||||||||||||||||||||||||||||||
/// NB: This method must be very careful to avoid logging. Between the point where we have taken | ||||||||||||||||||||||||||||||||||||||
/// exclusive access to the Console and when the UI has actually been initialized, attempts to | ||||||||||||||||||||||||||||||||||||||
/// log from this method would deadlock (by causing the method to wait for _itself_ to finish). | ||||||||||||||||||||||||||||||||||||||
/// | ||||||||||||||||||||||||||||||||||||||
fn setup_bars( | ||||||||||||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||||||||||||
executor: Executor, | ||||||||||||||||||||||||||||||||||||||
) -> Result<(Vec<ProgressBar>, MultiProgressTask), String> { | ||||||||||||||||||||||||||||||||||||||
// Stderr is propagated across a channel to remove lock interleavings between stdio and the UI. | ||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After giving this some thought, I believe that the lock interleavings referred to here are no longer possible as the code exists today. The |
||||||||||||||||||||||||||||||||||||||
let (stderr_sender, stderr_receiver) = mpsc::sync_channel(0); | ||||||||||||||||||||||||||||||||||||||
let (term_read, _, term_stderr_write) = | ||||||||||||||||||||||||||||||||||||||
// We take exclusive access to stdio by registering a callback that is used to render stderr | ||||||||||||||||||||||||||||||||||||||
// while we're holding access. See the method doc. | ||||||||||||||||||||||||||||||||||||||
let stderr_dest_bar: Arc<Mutex<Option<WeakProgressBar>>> = Arc::new(Mutex::new(None)); | ||||||||||||||||||||||||||||||||||||||
let mut stderr_dest_bar_guard = stderr_dest_bar.lock(); | ||||||||||||||||||||||||||||||||||||||
stuhood marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||
let (term_read, _, term_stderr_write) = { | ||||||||||||||||||||||||||||||||||||||
let stderr_dest_bar = stderr_dest_bar.clone(); | ||||||||||||||||||||||||||||||||||||||
stdio::get_destination().exclusive_start(Box::new(move |msg: &str| { | ||||||||||||||||||||||||||||||||||||||
// If we fail to send, it's because the UI has shut down: we fail the callback to | ||||||||||||||||||||||||||||||||||||||
// have the logging module directly log to stderr at that point. | ||||||||||||||||||||||||||||||||||||||
stderr_sender.send(msg.to_owned()).map_err(|_| ()) | ||||||||||||||||||||||||||||||||||||||
}))?; | ||||||||||||||||||||||||||||||||||||||
// Acquire a handle to the destination bar in the UI. If we fail to upgrade, it's because | ||||||||||||||||||||||||||||||||||||||
// the UI has shut down: we fail the callback to have the logging module directly log to | ||||||||||||||||||||||||||||||||||||||
// stderr at that point. | ||||||||||||||||||||||||||||||||||||||
let dest_bar = { | ||||||||||||||||||||||||||||||||||||||
let stderr_dest_bar = stderr_dest_bar.lock(); | ||||||||||||||||||||||||||||||||||||||
// We can safely unwrap here because the Mutex is held until the bar is initialized. | ||||||||||||||||||||||||||||||||||||||
stderr_dest_bar.as_ref().unwrap().upgrade().ok_or(())? | ||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change appears to work, but I also never see a failure to upgrade the progress bar from weak; i.e.: I neve see a log::warn / retry. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My read of the stdio crate is that it should fall back to looging and that should include a warn level log lione I do not see: pants/src/rust/engine/stdio/src/lib.rs Lines 340 to 357 in 2b909b2
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm. Actually, you're right. When we fail to write to the handler and tear down So... things should probably work the way I was claiming above (an error in the stderr callback should clear exclusive access and re-enable the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only 1 hit and that was back on 12/22 - i.e.: not from this fix:
|
||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||
dest_bar.println(msg); | ||||||||||||||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||||||||||||
}))? | ||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
let stderr_use_color = term_stderr_write.use_color; | ||||||||||||||||||||||||||||||||||||||
let term = console::Term::read_write_pair_with_style( | ||||||||||||||||||||||||||||||||||||||
|
@@ -108,21 +122,12 @@ impl ConsoleUI { | |||||||||||||||||||||||||||||||||||||
multi_progress.add(ProgressBar::new(50).with_style(style)) | ||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||
.collect::<Vec<_>>(); | ||||||||||||||||||||||||||||||||||||||
let first_bar = bars[0].downgrade(); | ||||||||||||||||||||||||||||||||||||||
*stderr_dest_bar_guard = Some(bars[0].downgrade()); | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
// Spawn a task to drive the multi progress. | ||||||||||||||||||||||||||||||||||||||
let multi_progress_task = executor | ||||||||||||||||||||||||||||||||||||||
.spawn_blocking(move || multi_progress.join()) | ||||||||||||||||||||||||||||||||||||||
.boxed(); | ||||||||||||||||||||||||||||||||||||||
// And another to propagate stderr, which will exit automatically when the channel closes. | ||||||||||||||||||||||||||||||||||||||
let _stderr_task = executor.spawn_blocking(move || { | ||||||||||||||||||||||||||||||||||||||
while let Ok(stderr) = stderr_receiver.recv() { | ||||||||||||||||||||||||||||||||||||||
match first_bar.upgrade() { | ||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the bug was here. We've definitely got stderr to flush, but the WeakProgressBar may have evaporated already in which case we drop the stderr. |
||||||||||||||||||||||||||||||||||||||
Some(first_bar) => first_bar.println(stderr), | ||||||||||||||||||||||||||||||||||||||
None => break, | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
Ok((bars, multi_progress_task)) | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
|
@@ -207,16 +212,11 @@ impl ConsoleUI { | |||||||||||||||||||||||||||||||||||||
/// | ||||||||||||||||||||||||||||||||||||||
pub fn teardown(&mut self) -> impl Future<Output = Result<(), String>> { | ||||||||||||||||||||||||||||||||||||||
if let Some(instance) = self.instance.take() { | ||||||||||||||||||||||||||||||||||||||
let sender = self.teardown_mpsc.0.clone(); | ||||||||||||||||||||||||||||||||||||||
// When the MultiProgress completes, the Term(Destination) is dropped, which will restore | ||||||||||||||||||||||||||||||||||||||
// direct access to the Console. | ||||||||||||||||||||||||||||||||||||||
instance | ||||||||||||||||||||||||||||||||||||||
.multi_progress_task | ||||||||||||||||||||||||||||||||||||||
.map_err(|e| format!("Failed to render UI: {}", e)) | ||||||||||||||||||||||||||||||||||||||
.and_then(move |()| { | ||||||||||||||||||||||||||||||||||||||
sender.send(()).unwrap(); | ||||||||||||||||||||||||||||||||||||||
future::ok(()) | ||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||
.boxed() | ||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||
future::ok(()).boxed() | ||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW: This was already true before this change: because we were using a
sync_channel
, any attempt to log in this method before the second task was spawned (which couldn't be spawned until we were actually initialized) would have caused this method to deadlock.