Skip to content

Commit

Permalink
Avoid buffering large amounts of rustc output.
Browse files Browse the repository at this point in the history
  • Loading branch information
ehuss committed Mar 8, 2020
1 parent 458138b commit e2b28f7
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 25 deletions.
50 changes: 35 additions & 15 deletions src/cargo/core/compiler/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,28 @@ pub struct JobQueue<'a, 'cfg> {
///
/// It is created from JobQueue when we have fully assembled the crate graph
/// (i.e., all package dependencies are known).
///
/// # Message queue
///
/// Each thread running a process uses the message queue to send messages back
/// to the main thread. The main thread coordinates everything, and handles
/// printing output.
///
/// It is important to be careful which messages use `push` vs `push_bounded`.
/// `push` is for priority messages (like tokens, or "finished") where the
/// sender shouldn't block. We want to handle those so real work can proceed
/// ASAP.
///
/// `push_bounded` is only for messages being printed to stdout/stderr. Being
/// bounded prevents a flood of messages causing a large amount of memory
/// being used.
///
/// `push` also avoids blocking which helps avoid deadlocks. For example, when
/// the diagnostic server thread is dropped, it waits for the thread to exit.
/// But if the thread is blocked on a full queue, and there is a critical
/// error, the drop will deadlock. This should be fixed at some point in the
/// future. The jobserver thread has a similar problem, though it will time
/// out after 1 second.
struct DrainState<'a, 'cfg> {
// This is the length of the DependencyQueue when starting out
total_units: usize,
Expand Down Expand Up @@ -212,11 +234,11 @@ impl<'a> JobState<'a> {
}

pub fn stdout(&self, stdout: String) {
self.messages.push(Message::Stdout(stdout));
self.messages.push_bounded(Message::Stdout(stdout));
}

pub fn stderr(&self, stderr: String) {
self.messages.push(Message::Stderr(stderr));
self.messages.push_bounded(Message::Stderr(stderr));
}

/// A method used to signal to the coordinator thread that the rmeta file
Expand Down Expand Up @@ -341,7 +363,10 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
let state = DrainState {
total_units: self.queue.len(),
queue: self.queue,
messages: Arc::new(Queue::new()),
// 100 here is somewhat arbitrary. It is a few screenfulls of
// output, and hopefully at most a few megabytes of memory for
// typical messages.
messages: Arc::new(Queue::new(100)),
active: HashMap::new(),
compiled: HashSet::new(),
documented: HashSet::new(),
Expand Down Expand Up @@ -370,6 +395,9 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
// Create a helper thread to manage the diagnostics for rustfix if
// necessary.
let messages = state.messages.clone();
// It is important that this uses `push` instead of `push_bounded` for
// now. If someone wants to fix this to be bounded, the `drop`
// implementation needs to be changed to avoid possible deadlocks.
let _diagnostic_server = cx
.bcx
.build_config
Expand Down Expand Up @@ -578,10 +606,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
// to run above to calculate CPU usage over time. To do this we
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
let mut events = Vec::new();
while let Some(event) = self.messages.try_pop() {
events.push(event);
}
let mut events = self.messages.try_pop_all();
info!(
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
self.tokens.len(),
Expand Down Expand Up @@ -815,15 +840,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
};

match fresh {
Freshness::Fresh => {
self.timings.add_fresh();
doit();
}
Freshness::Dirty => {
self.timings.add_dirty();
scope.spawn(move |_| doit());
}
Freshness::Fresh => self.timings.add_fresh(),
Freshness::Dirty => self.timings.add_dirty(),
}
scope.spawn(move |_| doit());

Ok(())
}
Expand Down
52 changes: 42 additions & 10 deletions src/cargo/util/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,51 @@ use std::time::{Duration, Instant};
/// A simple, threadsafe, queue of items of type `T`
///
/// This is a sort of channel where any thread can push to a queue and any
/// thread can pop from a queue. Currently queues have infinite capacity where
/// `push` will never block but `pop` will block.
/// thread can pop from a queue.
///
/// This supports both bounded and unbounded operations. `push` will never block,
/// and allows the queue to grow without bounds. `push_bounded` will block if the
/// queue is over capacity, and will resume once there is enough capacity.
pub struct Queue<T> {
state: Mutex<State<T>>,
condvar: Condvar,
popper_cv: Condvar,
bounded_cv: Condvar,
bound: usize,
}

struct State<T> {
items: VecDeque<T>,
}

impl<T> Queue<T> {
pub fn new() -> Queue<T> {
pub fn new(bound: usize) -> Queue<T> {
Queue {
state: Mutex::new(State {
items: VecDeque::new(),
}),
condvar: Condvar::new(),
popper_cv: Condvar::new(),
bounded_cv: Condvar::new(),
bound,
}
}

pub fn push(&self, item: T) {
self.state.lock().unwrap().items.push_back(item);
self.condvar.notify_one();
self.popper_cv.notify_one();
}

/// Pushes an item onto the queue, blocking if the queue is full.
pub fn push_bounded(&self, item: T) {
let mut state = self.state.lock().unwrap();
loop {
if state.items.len() >= self.bound {
state = self.bounded_cv.wait(state).unwrap();
} else {
state.items.push_back(item);
self.popper_cv.notify_one();
break;
}
}
}

pub fn pop(&self, timeout: Duration) -> Option<T> {
Expand All @@ -39,16 +60,27 @@ impl<T> Queue<T> {
if elapsed >= timeout {
break;
}
let (lock, result) = self.condvar.wait_timeout(state, timeout - elapsed).unwrap();
let (lock, result) = self
.popper_cv
.wait_timeout(state, timeout - elapsed)
.unwrap();
state = lock;
if result.timed_out() {
break;
}
}
state.items.pop_front()
let value = state.items.pop_front()?;
if state.items.len() < self.bound {
// Assumes threads cannot be canceled.
self.bounded_cv.notify_one();
}
Some(value)
}

pub fn try_pop(&self) -> Option<T> {
self.state.lock().unwrap().items.pop_front()
pub fn try_pop_all(&self) -> Vec<T> {
let mut state = self.state.lock().unwrap();
let result = state.items.drain(..).collect();
self.bounded_cv.notify_all();
result
}
}

0 comments on commit e2b28f7

Please sign in to comment.