Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid buffering large amounts of rustc output. #7838

Merged
merged 4 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ atty = "0.2"
bytesize = "1.0"
cargo-platform = { path = "crates/cargo-platform", version = "0.1.1" }
crates-io = { path = "crates/crates-io", version = "0.31" }
crossbeam-channel = "0.4"
crossbeam-utils = "0.7"
crypto-hash = "0.3.1"
curl = { version = "0.4.23", features = ["http2"] }
Expand Down
109 changes: 64 additions & 45 deletions src/cargo/core/compiler/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::format_err;
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_utils::thread::Scope;
use jobserver::{Acquired, Client, HelperThread};
use log::{debug, info, trace};
Expand All @@ -73,6 +72,7 @@ use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
use crate::core::{PackageId, TargetKind};
use crate::util;
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
use crate::util::Queue;
use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder};
use crate::util::{Config, DependencyQueue};
use crate::util::{Progress, ProgressStyle};
Expand All @@ -93,13 +93,34 @@ pub struct JobQueue<'a, 'cfg> {
///
/// It is created from JobQueue when we have fully assembled the crate graph
/// (i.e., all package dependencies are known).
///
/// # Message queue
///
/// Each thread running a process uses the message queue to send messages back
/// to the main thread. The main thread coordinates everything, and handles
/// printing output.
///
/// It is important to be careful which messages use `push` vs `push_bounded`.
/// `push` is for priority messages (like tokens, or "finished") where the
/// sender shouldn't block. We want to handle those so real work can proceed
/// ASAP.
///
/// `push_bounded` is only for messages being printed to stdout/stderr. Being
/// bounded prevents a flood of messages causing a large amount of memory
/// being used.
///
/// `push` also avoids blocking which helps avoid deadlocks. For example, when
/// the diagnostic server thread is dropped, it waits for the thread to exit.
/// But if the thread is blocked on a full queue, and there is a critical
/// error, the drop will deadlock. This should be fixed at some point in the
/// future. The jobserver thread has a similar problem, though it will time
/// out after 1 second.
struct DrainState<'a, 'cfg> {
// This is the length of the DependencyQueue when starting out
total_units: usize,

queue: DependencyQueue<Unit<'a>, Artifact, Job>,
tx: Sender<Message>,
rx: Receiver<Message>,
messages: Arc<Queue<Message>>,
active: HashMap<JobId, Unit<'a>>,
compiled: HashSet<PackageId>,
documented: HashSet<PackageId>,
Expand Down Expand Up @@ -145,7 +166,7 @@ impl std::fmt::Display for JobId {

pub struct JobState<'a> {
/// Channel back to the main thread to coordinate messages and such.
tx: Sender<Message>,
messages: Arc<Queue<Message>>,

/// The job id that this state is associated with, used when sending
/// messages back to the main thread.
Expand Down Expand Up @@ -199,7 +220,7 @@ enum Message {

impl<'a> JobState<'a> {
pub fn running(&self, cmd: &ProcessBuilder) {
let _ = self.tx.send(Message::Run(self.id, cmd.to_string()));
self.messages.push(Message::Run(self.id, cmd.to_string()));
}

pub fn build_plan(
Expand All @@ -208,17 +229,16 @@ impl<'a> JobState<'a> {
cmd: ProcessBuilder,
filenames: Arc<Vec<OutputFile>>,
) {
let _ = self
.tx
.send(Message::BuildPlanMsg(module_name, cmd, filenames));
self.messages
.push(Message::BuildPlanMsg(module_name, cmd, filenames));
}

pub fn stdout(&self, stdout: String) {
drop(self.tx.send(Message::Stdout(stdout)));
self.messages.push_bounded(Message::Stdout(stdout));
}

pub fn stderr(&self, stderr: String) {
drop(self.tx.send(Message::Stderr(stderr)));
self.messages.push_bounded(Message::Stderr(stderr));
}

/// A method used to signal to the coordinator thread that the rmeta file
Expand All @@ -228,9 +248,8 @@ impl<'a> JobState<'a> {
/// produced once!
pub fn rmeta_produced(&self) {
self.rmeta_required.set(false);
let _ = self
.tx
.send(Message::Finish(self.id, Artifact::Metadata, Ok(())));
self.messages
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
}

/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
Expand All @@ -239,14 +258,14 @@ impl<'a> JobState<'a> {
/// This should arrange for the associated client to eventually get a token via
/// `client.release_raw()`.
pub fn will_acquire(&self) {
let _ = self.tx.send(Message::NeedsToken(self.id));
self.messages.push(Message::NeedsToken(self.id));
}

/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
let _ = self.tx.send(Message::ReleaseToken(self.id));
self.messages.push(Message::ReleaseToken(self.id));
}
}

Expand Down Expand Up @@ -340,21 +359,22 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
let _p = profile::start("executing the job graph");
self.queue.queue_finished();

let (tx, rx) = unbounded();
let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
let state = DrainState {
total_units: self.queue.len(),
queue: self.queue,
tx,
rx,
// 100 here is somewhat arbitrary. It is a few screenfulls of
// output, and hopefully at most a few megabytes of memory for
// typical messages. If you change this, please update the test
// caching_large_output, too.
messages: Arc::new(Queue::new(100)),
active: HashMap::new(),
compiled: HashSet::new(),
documented: HashSet::new(),
counts: self.counts,
progress,
next_id: 0,
timings: self.timings,

tokens: Vec::new(),
rustc_tokens: HashMap::new(),
to_send_clients: BTreeMap::new(),
Expand All @@ -364,25 +384,28 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
};

// Create a helper thread for acquiring jobserver tokens
let tx = state.tx.clone();
let messages = state.messages.clone();
let helper = cx
.jobserver
.clone()
.into_helper_thread(move |token| {
drop(tx.send(Message::Token(token)));
drop(messages.push(Message::Token(token)));
})
.chain_err(|| "failed to create helper thread for jobserver management")?;

// Create a helper thread to manage the diagnostics for rustfix if
// necessary.
let tx = state.tx.clone();
let messages = state.messages.clone();
// It is important that this uses `push` instead of `push_bounded` for
// now. If someone wants to fix this to be bounded, the `drop`
// implementation needs to be changed to avoid possible deadlocks.
let _diagnostic_server = cx
.bcx
.build_config
.rustfix_diagnostic_server
.borrow_mut()
.take()
.map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg)))));
.map(move |srv| srv.start(move |msg| drop(messages.push(Message::FixDiagnostic(msg)))));

crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper))
.expect("child threads shouldn't panic")
Expand Down Expand Up @@ -584,7 +607,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
// to run above to calculate CPU usage over time. To do this we
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
let events: Vec<_> = self.rx.try_iter().collect();
let mut events = self.messages.try_pop_all();
info!(
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
self.tokens.len(),
Expand All @@ -602,14 +625,16 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
loop {
self.tick_progress();
self.tokens.truncate(self.active.len() - 1);
match self.rx.recv_timeout(Duration::from_millis(500)) {
Ok(message) => break vec![message],
Err(_) => continue,
match self.messages.pop(Duration::from_millis(500)) {
Some(message) => {
events.push(message);
break;
}
None => continue,
}
}
} else {
events
}
return events;
}

fn drain_the_queue(
Expand Down Expand Up @@ -756,7 +781,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
assert!(self.active.insert(id, *unit).is_none());
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;

let my_tx = self.tx.clone();
let messages = self.messages.clone();
let fresh = job.freshness();
let rmeta_required = cx.rmeta_required(unit);

Expand All @@ -768,13 +793,13 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
let doit = move || {
let state = JobState {
id,
tx: my_tx.clone(),
messages: messages.clone(),
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
};

let mut sender = FinishOnDrop {
tx: &my_tx,
messages: &messages,
id,
result: Err(format_err!("worker panicked")),
};
Expand All @@ -793,39 +818,33 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
// we need to make sure that the metadata is flagged as produced so
// send a synthetic message here.
if state.rmeta_required.get() && sender.result.is_ok() {
my_tx
.send(Message::Finish(id, Artifact::Metadata, Ok(())))
.unwrap();
messages.push(Message::Finish(id, Artifact::Metadata, Ok(())));
}

// Use a helper struct with a `Drop` implementation to guarantee
// that a `Finish` message is sent even if our job panics. We
// shouldn't panic unless there's a bug in Cargo, so we just need
// to make sure nothing hangs by accident.
struct FinishOnDrop<'a> {
tx: &'a Sender<Message>,
messages: &'a Queue<Message>,
id: JobId,
result: CargoResult<()>,
}

impl Drop for FinishOnDrop<'_> {
fn drop(&mut self) {
let msg = mem::replace(&mut self.result, Ok(()));
drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg)));
self.messages
.push(Message::Finish(self.id, Artifact::All, msg));
}
}
};

match fresh {
Freshness::Fresh => {
self.timings.add_fresh();
doit();
}
Freshness::Dirty => {
self.timings.add_dirty();
scope.spawn(move |_| doit());
}
Freshness::Fresh => self.timings.add_fresh(),
Freshness::Dirty => self.timings.add_dirty(),
}
scope.spawn(move |_| doit());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change may no longer be necessary, but did you want to include it anyway here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary, otherwise the cached message playback would deadlock if there were more than 100 messages. The playback shouldn't happen on the main thread, otherwise there is nothing to drain messages while they are added to the queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right yeah, forgot about that!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for message caching to check for deadlock.


Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions src/cargo/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use self::paths::{bytes2path, dylib_path, join_paths, path2bytes};
pub use self::paths::{dylib_path_envvar, normalize_path};
pub use self::process_builder::{process, ProcessBuilder};
pub use self::progress::{Progress, ProgressStyle};
pub use self::queue::Queue;
pub use self::read2::read2;
pub use self::restricted_names::validate_package_name;
pub use self::rustc::Rustc;
Expand Down Expand Up @@ -51,6 +52,7 @@ pub mod paths;
pub mod process_builder;
pub mod profile;
mod progress;
mod queue;
mod read2;
pub mod restricted_names;
pub mod rustc;
Expand Down
75 changes: 75 additions & 0 deletions src/cargo/util/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::collections::VecDeque;
use std::sync::{Condvar, Mutex};
use std::time::Duration;

/// A simple, threadsafe, queue of items of type `T`
///
/// This is a sort of channel where any thread can push to a queue and any
/// thread can pop from a queue.
///
/// This supports both bounded and unbounded operations. `push` will never block,
/// and allows the queue to grow without bounds. `push_bounded` will block if the
/// queue is over capacity, and will resume once there is enough capacity.
pub struct Queue<T> {
state: Mutex<State<T>>,
popper_cv: Condvar,
bounded_cv: Condvar,
bound: usize,
}

struct State<T> {
items: VecDeque<T>,
}

impl<T> Queue<T> {
pub fn new(bound: usize) -> Queue<T> {
Queue {
state: Mutex::new(State {
items: VecDeque::new(),
}),
popper_cv: Condvar::new(),
bounded_cv: Condvar::new(),
bound,
}
}

pub fn push(&self, item: T) {
self.state.lock().unwrap().items.push_back(item);
self.popper_cv.notify_one();
}

/// Pushes an item onto the queue, blocking if the queue is full.
pub fn push_bounded(&self, item: T) {
let locked_state = self.state.lock().unwrap();
let mut state = self
.bounded_cv
.wait_while(locked_state, |s| s.items.len() >= self.bound)
.unwrap();
state.items.push_back(item);
self.popper_cv.notify_one();
}

pub fn pop(&self, timeout: Duration) -> Option<T> {
let (mut state, result) = self
.popper_cv
.wait_timeout_while(self.state.lock().unwrap(), timeout, |s| s.items.is_empty())
.unwrap();
if result.timed_out() {
None
} else {
let value = state.items.pop_front()?;
if state.items.len() < self.bound {
// Assumes threads cannot be canceled.
self.bounded_cv.notify_one();
}
Some(value)
}
}

pub fn try_pop_all(&self) -> Vec<T> {
let mut state = self.state.lock().unwrap();
let result = state.items.drain(..).collect();
self.bounded_cv.notify_all();
result
}
}
Loading