Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid buffering large amounts of rustc output. #7838

Merged
merged 4 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ atty = "0.2"
bytesize = "1.0"
cargo-platform = { path = "crates/cargo-platform", version = "0.1.1" }
crates-io = { path = "crates/crates-io", version = "0.31" }
crossbeam-channel = "0.4"
crossbeam-utils = "0.7"
crypto-hash = "0.3.1"
curl = { version = "0.4.23", features = ["http2"] }
Expand Down
108 changes: 63 additions & 45 deletions src/cargo/core/compiler/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::format_err;
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_utils::thread::Scope;
use jobserver::{Acquired, Client, HelperThread};
use log::{debug, info, trace};
Expand All @@ -73,6 +72,7 @@ use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
use crate::core::{PackageId, TargetKind};
use crate::util;
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
use crate::util::Queue;
use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder};
use crate::util::{Config, DependencyQueue};
use crate::util::{Progress, ProgressStyle};
Expand All @@ -93,13 +93,34 @@ pub struct JobQueue<'a, 'cfg> {
///
/// It is created from JobQueue when we have fully assembled the crate graph
/// (i.e., all package dependencies are known).
///
/// # Message queue
///
/// Each thread running a process uses the message queue to send messages back
/// to the main thread. The main thread coordinates everything, and handles
/// printing output.
///
/// It is important to be careful which messages use `push` vs `push_bounded`.
/// `push` is for priority messages (like tokens, or "finished") where the
/// sender shouldn't block. We want to handle those so real work can proceed
/// ASAP.
///
/// `push_bounded` is only for messages being printed to stdout/stderr. Being
/// bounded prevents a flood of messages causing a large amount of memory
/// being used.
///
/// `push` also avoids blocking which helps avoid deadlocks. For example, when
/// the diagnostic server thread is dropped, it waits for the thread to exit.
/// But if the thread is blocked on a full queue, and there is a critical
/// error, the drop will deadlock. This should be fixed at some point in the
/// future. The jobserver thread has a similar problem, though it will time
/// out after 1 second.
struct DrainState<'a, 'cfg> {
// This is the length of the DependencyQueue when starting out
total_units: usize,

queue: DependencyQueue<Unit<'a>, Artifact, Job>,
tx: Sender<Message>,
rx: Receiver<Message>,
messages: Arc<Queue<Message>>,
active: HashMap<JobId, Unit<'a>>,
compiled: HashSet<PackageId>,
documented: HashSet<PackageId>,
Expand Down Expand Up @@ -145,7 +166,7 @@ impl std::fmt::Display for JobId {

pub struct JobState<'a> {
/// Channel back to the main thread to coordinate messages and such.
tx: Sender<Message>,
messages: Arc<Queue<Message>>,

/// The job id that this state is associated with, used when sending
/// messages back to the main thread.
Expand Down Expand Up @@ -199,7 +220,7 @@ enum Message {

impl<'a> JobState<'a> {
pub fn running(&self, cmd: &ProcessBuilder) {
let _ = self.tx.send(Message::Run(self.id, cmd.to_string()));
self.messages.push(Message::Run(self.id, cmd.to_string()));
}

pub fn build_plan(
Expand All @@ -208,17 +229,16 @@ impl<'a> JobState<'a> {
cmd: ProcessBuilder,
filenames: Arc<Vec<OutputFile>>,
) {
let _ = self
.tx
.send(Message::BuildPlanMsg(module_name, cmd, filenames));
self.messages
.push(Message::BuildPlanMsg(module_name, cmd, filenames));
}

pub fn stdout(&self, stdout: String) {
drop(self.tx.send(Message::Stdout(stdout)));
self.messages.push_bounded(Message::Stdout(stdout));
}

pub fn stderr(&self, stderr: String) {
drop(self.tx.send(Message::Stderr(stderr)));
self.messages.push_bounded(Message::Stderr(stderr));
}

/// A method used to signal to the coordinator thread that the rmeta file
Expand All @@ -228,9 +248,8 @@ impl<'a> JobState<'a> {
/// produced once!
pub fn rmeta_produced(&self) {
self.rmeta_required.set(false);
let _ = self
.tx
.send(Message::Finish(self.id, Artifact::Metadata, Ok(())));
self.messages
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
}

/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
Expand All @@ -239,14 +258,14 @@ impl<'a> JobState<'a> {
/// This should arrange for the associated client to eventually get a token via
/// `client.release_raw()`.
pub fn will_acquire(&self) {
let _ = self.tx.send(Message::NeedsToken(self.id));
self.messages.push(Message::NeedsToken(self.id));
}

/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
let _ = self.tx.send(Message::ReleaseToken(self.id));
self.messages.push(Message::ReleaseToken(self.id));
}
}

Expand Down Expand Up @@ -340,21 +359,21 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
let _p = profile::start("executing the job graph");
self.queue.queue_finished();

let (tx, rx) = unbounded();
let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
let state = DrainState {
total_units: self.queue.len(),
queue: self.queue,
tx,
rx,
// 100 here is somewhat arbitrary. It is a few screenfulls of
// output, and hopefully at most a few megabytes of memory for
// typical messages.
messages: Arc::new(Queue::new(100)),
active: HashMap::new(),
compiled: HashSet::new(),
documented: HashSet::new(),
counts: self.counts,
progress,
next_id: 0,
timings: self.timings,

tokens: Vec::new(),
rustc_tokens: HashMap::new(),
to_send_clients: BTreeMap::new(),
Expand All @@ -364,25 +383,28 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
};

// Create a helper thread for acquiring jobserver tokens
let tx = state.tx.clone();
let messages = state.messages.clone();
let helper = cx
.jobserver
.clone()
.into_helper_thread(move |token| {
drop(tx.send(Message::Token(token)));
drop(messages.push(Message::Token(token)));
})
.chain_err(|| "failed to create helper thread for jobserver management")?;

// Create a helper thread to manage the diagnostics for rustfix if
// necessary.
let tx = state.tx.clone();
let messages = state.messages.clone();
// It is important that this uses `push` instead of `push_bounded` for
// now. If someone wants to fix this to be bounded, the `drop`
// implementation needs to be changed to avoid possible deadlocks.
let _diagnostic_server = cx
.bcx
.build_config
.rustfix_diagnostic_server
.borrow_mut()
.take()
.map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg)))));
.map(move |srv| srv.start(move |msg| drop(messages.push(Message::FixDiagnostic(msg)))));

crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper))
.expect("child threads shouldn't panic")
Expand Down Expand Up @@ -584,7 +606,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
// to run above to calculate CPU usage over time. To do this we
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
let events: Vec<_> = self.rx.try_iter().collect();
let mut events = self.messages.try_pop_all();
info!(
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
self.tokens.len(),
Expand All @@ -602,14 +624,16 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
loop {
self.tick_progress();
self.tokens.truncate(self.active.len() - 1);
match self.rx.recv_timeout(Duration::from_millis(500)) {
Ok(message) => break vec![message],
Err(_) => continue,
match self.messages.pop(Duration::from_millis(500)) {
Some(message) => {
events.push(message);
break;
}
None => continue,
}
}
} else {
events
}
return events;
}

fn drain_the_queue(
Expand Down Expand Up @@ -756,7 +780,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
assert!(self.active.insert(id, *unit).is_none());
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;

let my_tx = self.tx.clone();
let messages = self.messages.clone();
let fresh = job.freshness();
let rmeta_required = cx.rmeta_required(unit);

Expand All @@ -768,13 +792,13 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
let doit = move || {
let state = JobState {
id,
tx: my_tx.clone(),
messages: messages.clone(),
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
};

let mut sender = FinishOnDrop {
tx: &my_tx,
messages: &messages,
id,
result: Err(format_err!("worker panicked")),
};
Expand All @@ -793,39 +817,33 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
// we need to make sure that the metadata is flagged as produced so
// send a synthetic message here.
if state.rmeta_required.get() && sender.result.is_ok() {
my_tx
.send(Message::Finish(id, Artifact::Metadata, Ok(())))
.unwrap();
messages.push(Message::Finish(id, Artifact::Metadata, Ok(())));
}

// Use a helper struct with a `Drop` implementation to guarantee
// that a `Finish` message is sent even if our job panics. We
// shouldn't panic unless there's a bug in Cargo, so we just need
// to make sure nothing hangs by accident.
struct FinishOnDrop<'a> {
tx: &'a Sender<Message>,
messages: &'a Queue<Message>,
id: JobId,
result: CargoResult<()>,
}

impl Drop for FinishOnDrop<'_> {
fn drop(&mut self) {
let msg = mem::replace(&mut self.result, Ok(()));
drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg)));
self.messages
.push(Message::Finish(self.id, Artifact::All, msg));
}
}
};

match fresh {
Freshness::Fresh => {
self.timings.add_fresh();
doit();
}
Freshness::Dirty => {
self.timings.add_dirty();
scope.spawn(move |_| doit());
}
Freshness::Fresh => self.timings.add_fresh(),
Freshness::Dirty => self.timings.add_dirty(),
}
scope.spawn(move |_| doit());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change may no longer be necessary, but did you want to include it anyway here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary, otherwise the cached message playback would deadlock if there were more than 100 messages. The playback shouldn't happen on the main thread, otherwise there is nothing to drain messages while they are added to the queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right yeah, forgot about that!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for message caching to check for deadlock.


Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions src/cargo/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use self::paths::{bytes2path, dylib_path, join_paths, path2bytes};
pub use self::paths::{dylib_path_envvar, normalize_path};
pub use self::process_builder::{process, ProcessBuilder};
pub use self::progress::{Progress, ProgressStyle};
pub use self::queue::Queue;
pub use self::read2::read2;
pub use self::restricted_names::validate_package_name;
pub use self::rustc::Rustc;
Expand Down Expand Up @@ -51,6 +52,7 @@ pub mod paths;
pub mod process_builder;
pub mod profile;
mod progress;
mod queue;
mod read2;
pub mod restricted_names;
pub mod rustc;
Expand Down
86 changes: 86 additions & 0 deletions src/cargo/util/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::collections::VecDeque;
use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};

/// A simple, threadsafe, queue of items of type `T`
///
/// This is a sort of channel where any thread can push to a queue and any
/// thread can pop from a queue.
///
/// This supports both bounded and unbounded operations. `push` will never block,
/// and allows the queue to grow without bounds. `push_bounded` will block if the
/// queue is over capacity, and will resume once there is enough capacity.
pub struct Queue<T> {
state: Mutex<State<T>>,
popper_cv: Condvar,
bounded_cv: Condvar,
bound: usize,
}

struct State<T> {
items: VecDeque<T>,
}

impl<T> Queue<T> {
pub fn new(bound: usize) -> Queue<T> {
Queue {
state: Mutex::new(State {
items: VecDeque::new(),
}),
popper_cv: Condvar::new(),
bounded_cv: Condvar::new(),
bound,
}
}

pub fn push(&self, item: T) {
self.state.lock().unwrap().items.push_back(item);
self.popper_cv.notify_one();
}

/// Pushes an item onto the queue, blocking if the queue is full.
pub fn push_bounded(&self, item: T) {
let mut state = self.state.lock().unwrap();
loop {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be able to make use of the nifty wait_until method:

let state = self.bounded_cv.wait_until(state, |s| s.items.len() < self.bound).unwrap();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't know that existed!

if state.items.len() >= self.bound {
state = self.bounded_cv.wait(state).unwrap();
} else {
state.items.push_back(item);
self.popper_cv.notify_one();
break;
}
}
}

pub fn pop(&self, timeout: Duration) -> Option<T> {
let mut state = self.state.lock().unwrap();
let now = Instant::now();
while state.items.is_empty() {
let elapsed = now.elapsed();
if elapsed >= timeout {
break;
}
let (lock, result) = self
.popper_cv
.wait_timeout(state, timeout - elapsed)
.unwrap();
state = lock;
if result.timed_out() {
break;
}
}
let value = state.items.pop_front()?;
if state.items.len() < self.bound {
// Assumes threads cannot be canceled.
self.bounded_cv.notify_one();
}
Some(value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might actually also get cleaned up a good amount with wait_timeout_until

let (mut state, result) = self.popper_cv.wait_timeout_until(
    self.state.lock().unwrap(),
    timeout,
    |s| s.items.len() > 0,
).unwrap();
if result.timed_out() {
    None 
} else {
    // conditionally notify `bounded_cv`
    state.items.pop_front()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, after thinking about it some more, this subtly changes the semantics. If there are multiple poppers, and both are awoken, then one will get a value and the other won't. We don't use multiple poppers, but for the push_bounded case, it could result in pushing too many elements on the queue. To guard against that, we would need to keep the loops, which ends up not simplifying at all.

In general, it probably doesn't matter, but I would prefer to keep the current semantics with the loop that "retries" after the thread is awakened.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I'm not sure I follow, because if the closure returns true then that lock is persisted and returned, so we can't have two poppers simultaneously exit the wait timeout loop I believe? I think this is the same for the push case as well, where when we get a lock back after wait_until we're guaranteed that the condition evaluates true for the lock state we were returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Somehow it didn't click that it was atomically locked.

Pushed a commit with the change. Since it is unstable until 1.42, it will need to wait until Thursday.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh oops sorry about that, I totally didn't realize it was still unsable... In any case at least Thursday isn't that far off!

}

pub fn try_pop_all(&self) -> Vec<T> {
let mut state = self.state.lock().unwrap();
let result = state.items.drain(..).collect();
self.bounded_cv.notify_all();
result
}
}