From e2b28f7a478e642def39157d09a98e289c0f01a3 Mon Sep 17 00:00:00 2001 From: Eric Huss Date: Sat, 7 Mar 2020 19:58:37 -0800 Subject: [PATCH] Avoid buffering large amounts of rustc output. --- src/cargo/core/compiler/job_queue.rs | 50 ++++++++++++++++++-------- src/cargo/util/queue.rs | 52 ++++++++++++++++++++++------ 2 files changed, 77 insertions(+), 25 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 0b8855eea52..57e6b835b6b 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -93,6 +93,28 @@ pub struct JobQueue<'a, 'cfg> { /// /// It is created from JobQueue when we have fully assembled the crate graph /// (i.e., all package dependencies are known). +/// +/// # Message queue +/// +/// Each thread running a process uses the message queue to send messages back +/// to the main thread. The main thread coordinates everything, and handles +/// printing output. +/// +/// It is important to be careful which messages use `push` vs `push_bounded`. +/// `push` is for priority messages (like tokens, or "finished") where the +/// sender shouldn't block. We want to handle those so real work can proceed +/// ASAP. +/// +/// `push_bounded` is only for messages being printed to stdout/stderr. Being +/// bounded prevents a flood of messages causing a large amount of memory +/// being used. +/// +/// `push` also avoids blocking which helps avoid deadlocks. For example, when +/// the diagnostic server thread is dropped, it waits for the thread to exit. +/// But if the thread is blocked on a full queue, and there is a critical +/// error, the drop will deadlock. This should be fixed at some point in the +/// future. The jobserver thread has a similar problem, though it will time +/// out after 1 second. struct DrainState<'a, 'cfg> { // This is the length of the DependencyQueue when starting out total_units: usize, @@ -212,11 +234,11 @@ impl<'a> JobState<'a> { } pub fn stdout(&self, stdout: String) { - self.messages.push(Message::Stdout(stdout)); + self.messages.push_bounded(Message::Stdout(stdout)); } pub fn stderr(&self, stderr: String) { - self.messages.push(Message::Stderr(stderr)); + self.messages.push_bounded(Message::Stderr(stderr)); } /// A method used to signal to the coordinator thread that the rmeta file @@ -341,7 +363,10 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { let state = DrainState { total_units: self.queue.len(), queue: self.queue, - messages: Arc::new(Queue::new()), + // 100 here is somewhat arbitrary. It is a few screenfulls of + // output, and hopefully at most a few megabytes of memory for + // typical messages. + messages: Arc::new(Queue::new(100)), active: HashMap::new(), compiled: HashSet::new(), documented: HashSet::new(), @@ -370,6 +395,9 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // Create a helper thread to manage the diagnostics for rustfix if // necessary. let messages = state.messages.clone(); + // It is important that this uses `push` instead of `push_bounded` for + // now. If someone wants to fix this to be bounded, the `drop` + // implementation needs to be changed to avoid possible deadlocks. let _diagnostic_server = cx .bcx .build_config @@ -578,10 +606,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // to run above to calculate CPU usage over time. To do this we // listen for a message with a timeout, and on timeout we run the // previous parts of the loop again. - let mut events = Vec::new(); - while let Some(event) = self.messages.try_pop() { - events.push(event); - } + let mut events = self.messages.try_pop_all(); info!( "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})", self.tokens.len(), @@ -815,15 +840,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { }; match fresh { - Freshness::Fresh => { - self.timings.add_fresh(); - doit(); - } - Freshness::Dirty => { - self.timings.add_dirty(); - scope.spawn(move |_| doit()); - } + Freshness::Fresh => self.timings.add_fresh(), + Freshness::Dirty => self.timings.add_dirty(), } + scope.spawn(move |_| doit()); Ok(()) } diff --git a/src/cargo/util/queue.rs b/src/cargo/util/queue.rs index d9aefcc3b1c..9adf1b88afe 100644 --- a/src/cargo/util/queue.rs +++ b/src/cargo/util/queue.rs @@ -5,11 +5,16 @@ use std::time::{Duration, Instant}; /// A simple, threadsafe, queue of items of type `T` /// /// This is a sort of channel where any thread can push to a queue and any -/// thread can pop from a queue. Currently queues have infinite capacity where -/// `push` will never block but `pop` will block. +/// thread can pop from a queue. +/// +/// This supports both bounded and unbounded operations. `push` will never block, +/// and allows the queue to grow without bounds. `push_bounded` will block if the +/// queue is over capacity, and will resume once there is enough capacity. pub struct Queue { state: Mutex>, - condvar: Condvar, + popper_cv: Condvar, + bounded_cv: Condvar, + bound: usize, } struct State { @@ -17,18 +22,34 @@ struct State { } impl Queue { - pub fn new() -> Queue { + pub fn new(bound: usize) -> Queue { Queue { state: Mutex::new(State { items: VecDeque::new(), }), - condvar: Condvar::new(), + popper_cv: Condvar::new(), + bounded_cv: Condvar::new(), + bound, } } pub fn push(&self, item: T) { self.state.lock().unwrap().items.push_back(item); - self.condvar.notify_one(); + self.popper_cv.notify_one(); + } + + /// Pushes an item onto the queue, blocking if the queue is full. + pub fn push_bounded(&self, item: T) { + let mut state = self.state.lock().unwrap(); + loop { + if state.items.len() >= self.bound { + state = self.bounded_cv.wait(state).unwrap(); + } else { + state.items.push_back(item); + self.popper_cv.notify_one(); + break; + } + } } pub fn pop(&self, timeout: Duration) -> Option { @@ -39,16 +60,27 @@ impl Queue { if elapsed >= timeout { break; } - let (lock, result) = self.condvar.wait_timeout(state, timeout - elapsed).unwrap(); + let (lock, result) = self + .popper_cv + .wait_timeout(state, timeout - elapsed) + .unwrap(); state = lock; if result.timed_out() { break; } } - state.items.pop_front() + let value = state.items.pop_front()?; + if state.items.len() < self.bound { + // Assumes threads cannot be canceled. + self.bounded_cv.notify_one(); + } + Some(value) } - pub fn try_pop(&self) -> Option { - self.state.lock().unwrap().items.pop_front() + pub fn try_pop_all(&self) -> Vec { + let mut state = self.state.lock().unwrap(); + let result = state.items.drain(..).collect(); + self.bounded_cv.notify_all(); + result } }