diff --git a/src/cargo/core/compiler/context/mod.rs b/src/cargo/core/compiler/context/mod.rs index 8df9d799ccd..652acaba869 100644 --- a/src/cargo/core/compiler/context/mod.rs +++ b/src/cargo/core/compiler/context/mod.rs @@ -69,11 +69,6 @@ pub struct Context<'a, 'cfg> { /// metadata files in addition to the rlib itself. This is only filled in /// when `pipelining` above is enabled. rmeta_required: HashSet>, - - /// When we're in jobserver-per-rustc process mode, this keeps those - /// jobserver clients for each Unit (which eventually becomes a rustc - /// process). - pub rustc_clients: HashMap, Client>, } impl<'a, 'cfg> Context<'a, 'cfg> { @@ -117,7 +112,6 @@ impl<'a, 'cfg> Context<'a, 'cfg> { unit_dependencies, files: None, rmeta_required: HashSet::new(), - rustc_clients: HashMap::new(), pipelining, }) } @@ -497,21 +491,4 @@ impl<'a, 'cfg> Context<'a, 'cfg> { pub fn rmeta_required(&self, unit: &Unit<'a>) -> bool { self.rmeta_required.contains(unit) || self.bcx.config.cli_unstable().timings.is_some() } - - pub fn new_jobserver(&mut self) -> CargoResult { - let tokens = self.bcx.build_config.jobs as usize; - let client = Client::new(tokens).chain_err(|| "failed to create jobserver")?; - - // Drain the client fully - for i in 0..tokens { - client.acquire_raw().chain_err(|| { - format!( - "failed to fully drain {}/{} token from jobserver at startup", - i, tokens, - ) - })?; - } - - Ok(client) - } } diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 55832cb906a..bdde1297237 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -1,56 +1,5 @@ -//! This module implements the job queue which determines the ordering in which -//! rustc is spawned off. It also manages the allocation of jobserver tokens to -//! rustc beyond the implicit token each rustc owns (i.e., the ones used for -//! parallel LLVM work and parallel rustc threads). -//! -//! Cargo and rustc have a somewhat non-trivial jobserver relationship with each -//! other, which is due to scaling issues with sharing a single jobserver -//! amongst what is potentially hundreds of threads of work on many-cored -//! systems on (at least) linux, and likely other platforms as well. -//! -//! The details of this algorithm are (also) written out in -//! src/librustc_jobserver/lib.rs. What follows is a description focusing on the -//! Cargo side of things. -//! -//! Cargo wants to complete the build as quickly as possible, fully saturating -//! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn -//! more than N threads of work: the total amount of tokens we have floating -//! around must always be limited to N. -//! -//! It is not really possible to optimally choose which crate should build first -//! or last; nor is it possible to decide whether to give an additional token to -//! rustc first or rather spawn a new crate of work. For now, the algorithm we -//! implement prioritizes spawning as many crates (i.e., rustc processes) as -//! possible, and then filling each rustc with tokens on demand. -//! -//! The primary loop is in `drain_the_queue` below. -//! -//! We integrate with the jobserver, originating from GNU make, to make sure -//! that build scripts which use make to build C code can cooperate with us on -//! the number of used tokens and avoid overfilling the system we're on. -//! -//! The jobserver is unfortunately a very simple protocol, so we enhance it a -//! little when we know that there is a rustc on the other end. Via the stderr -//! pipe we have to rustc, we get messages such as "NeedsToken" and -//! "ReleaseToken" from rustc. -//! -//! "NeedsToken" indicates that a rustc is interested in acquiring a token, but -//! never that it would be impossible to make progress without one (i.e., it -//! would be incorrect for rustc to not terminate due to a unfulfilled -//! NeedsToken request); we do not usually fulfill all NeedsToken requests for a -//! given rustc. -//! -//! "ReleaseToken" indicates that a rustc is done with one of its tokens and is -//! ready for us to re-acquire ownership -- we will either release that token -//! back into the general pool or reuse it ourselves. Note that rustc will -//! inform us that it is releasing a token even if it itself is also requesting -//! tokens; is is up to us whether to return the token to that same rustc. -//! -//! The current scheduling algorithm is relatively primitive and could likely be -//! improved. - use std::cell::Cell; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::io; use std::marker; use std::mem; @@ -60,7 +9,7 @@ use std::time::Duration; use anyhow::format_err; use crossbeam_utils::thread::Scope; -use jobserver::{Acquired, Client, HelperThread}; +use jobserver::{Acquired, HelperThread}; use log::{debug, info, trace}; use super::context::OutputFile; @@ -78,70 +27,22 @@ use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder use crate::util::{Config, DependencyQueue}; use crate::util::{Progress, ProgressStyle}; -/// This structure is backed by the `DependencyQueue` type and manages the -/// queueing of compilation steps for each package. Packages enqueue units of -/// work and then later on the entire graph is converted to DrainState and -/// executed. -pub struct JobQueue<'a, 'cfg> { - queue: DependencyQueue, Artifact, Job>, - counts: HashMap, - timings: Timings<'a, 'cfg>, -} - +/// A management structure of the entire dependency graph to compile. +/// /// This structure is backed by the `DependencyQueue` type and manages the /// actual compilation step of each package. Packages enqueue units of work and /// then later on the entire graph is processed and compiled. -/// -/// It is created from JobQueue when we have fully assembled the crate graph -/// (i.e., all package dependencies are known). -struct DrainState<'a, 'cfg> { - // This is the length of the DependencyQueue when starting out - total_units: usize, - +pub struct JobQueue<'a, 'cfg> { queue: DependencyQueue, Artifact, Job>, tx: Sender, rx: Receiver, - active: HashMap>, + active: HashMap>, compiled: HashSet, documented: HashSet, counts: HashMap, progress: Progress<'cfg>, next_id: u32, timings: Timings<'a, 'cfg>, - - /// Tokens that are currently owned by this Cargo, and may be "associated" - /// with a rustc process. They may also be unused, though if so will be - /// dropped on the next loop iteration. - /// - /// Note that the length of this may be zero, but we will still spawn work, - /// as we share the implicit token given to this Cargo process with a - /// single rustc process. - tokens: Vec, - - /// rustc per-thread tokens, when in jobserver-per-rustc mode. - rustc_tokens: HashMap>, - - /// This represents the list of rustc jobs (processes) and associated - /// clients that are interested in receiving a token. - to_send_clients: BTreeMap>, - - /// The list of jobs that we have not yet started executing, but have - /// retrieved from the `queue`. We eagerly pull jobs off the main queue to - /// allow us to request jobserver tokens pretty early. - pending_queue: Vec<(Unit<'a>, Job)>, - print: DiagnosticPrinter<'cfg>, - - // How many jobs we've finished - finished: usize, -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct JobId(pub u32); - -impl std::fmt::Display for JobId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } } pub struct JobState<'a> { @@ -150,7 +51,7 @@ pub struct JobState<'a> { /// The job id that this state is associated with, used when sending /// messages back to the main thread. - id: JobId, + id: u32, /// Whether or not we're expected to have a call to `rmeta_produced`. Once /// that method is called this is dynamically set to `false` to prevent @@ -183,19 +84,13 @@ enum Artifact { } enum Message { - Run(JobId, String), + Run(u32, String), BuildPlanMsg(String, ProcessBuilder, Arc>), Stdout(String), Stderr(String), FixDiagnostic(diagnostic_server::Message), Token(io::Result), - Finish(JobId, Artifact, CargoResult<()>), - - // This client should get release_raw called on it with one of our tokens - NeedsToken(JobId), - - // A token previously passed to a NeedsToken client is being released. - ReleaseToken(JobId), + Finish(u32, Artifact, CargoResult<()>), } impl<'a> JobState<'a> { @@ -233,30 +128,24 @@ impl<'a> JobState<'a> { .tx .send(Message::Finish(self.id, Artifact::Metadata, Ok(()))); } - - /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block) - /// on the passed client. - /// - /// This should arrange for the associated client to eventually get a token via - /// `client.release_raw()`. - pub fn will_acquire(&self) { - let _ = self.tx.send(Message::NeedsToken(self.id)); - } - - /// The rustc underlying this Job is informing us that it is done with a jobserver token. - /// - /// Note that it does *not* write that token back anywhere. - pub fn release_token(&self) { - let _ = self.tx.send(Message::ReleaseToken(self.id)); - } } impl<'a, 'cfg> JobQueue<'a, 'cfg> { pub fn new(bcx: &BuildContext<'a, 'cfg>, root_units: &[Unit<'a>]) -> JobQueue<'a, 'cfg> { + let (tx, rx) = channel(); + let progress = Progress::with_style("Building", ProgressStyle::Ratio, bcx.config); + let timings = Timings::new(bcx, root_units); JobQueue { queue: DependencyQueue::new(), + tx, + rx, + active: HashMap::new(), + compiled: HashSet::new(), + documented: HashSet::new(), counts: HashMap::new(), - timings: Timings::new(bcx, root_units), + progress, + next_id: 0, + timings, } } @@ -337,35 +226,12 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { /// This function will spawn off `config.jobs()` workers to build all of the /// necessary dependencies, in order. Freshness is propagated as far as /// possible along each dependency chain. - pub fn execute(mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan) -> CargoResult<()> { + pub fn execute(&mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan) -> CargoResult<()> { let _p = profile::start("executing the job graph"); self.queue.queue_finished(); - let (tx, rx) = channel(); - let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config); - let state = DrainState { - total_units: self.queue.len(), - queue: self.queue, - tx, - rx, - active: HashMap::new(), - compiled: HashSet::new(), - documented: HashSet::new(), - counts: self.counts, - progress, - next_id: 0, - timings: self.timings, - - tokens: Vec::new(), - rustc_tokens: HashMap::new(), - to_send_clients: BTreeMap::new(), - pending_queue: Vec::new(), - print: DiagnosticPrinter::new(cx.bcx.config), - finished: 0, - }; - // Create a helper thread for acquiring jobserver tokens - let tx = state.tx.clone(); + let tx = self.tx.clone(); let helper = cx .jobserver .clone() @@ -376,7 +242,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // Create a helper thread to manage the diagnostics for rustfix if // necessary. - let tx = state.tx.clone(); + let tx = self.tx.clone(); let _diagnostic_server = cx .bcx .build_config @@ -385,241 +251,26 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { .take() .map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg))))); - crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper)) + // Use `crossbeam` to create a scope in which we can execute scoped + // threads. Note that this isn't currently required by Cargo but it was + // historically required. This is left in for now in case we need the + // `'a` ability for child threads in the near future, but if this + // comment has been sitting here for a long time feel free to refactor + // away crossbeam. + crossbeam_utils::thread::scope(|scope| self.drain_the_queue(cx, plan, scope, &helper)) .expect("child threads shouldn't panic") } -} - -impl<'a, 'cfg> DrainState<'a, 'cfg> { - fn spawn_work_if_possible( - &mut self, - cx: &mut Context<'a, '_>, - jobserver_helper: &HelperThread, - scope: &Scope<'_>, - has_errored: bool, - ) -> CargoResult<()> { - // Dequeue as much work as we can, learning about everything - // possible that can run. Note that this is also the point where we - // start requesting job tokens. Each job after the first needs to - // request a token. - while let Some((unit, job)) = self.queue.dequeue() { - self.pending_queue.push((unit, job)); - if self.active.len() + self.pending_queue.len() > 1 { - jobserver_helper.request_token(); - } - } - - // Do not actually spawn the new work if we've errored out - if has_errored { - return Ok(()); - } - - // Now that we've learned of all possible work that we can execute - // try to spawn it so long as we've got a jobserver token which says - // we're able to perform some parallel work. - while self.has_extra_tokens() && !self.pending_queue.is_empty() { - let (unit, job) = self.pending_queue.remove(0); - self.run(&unit, job, cx, scope)?; - } - - Ok(()) - } - - fn has_extra_tokens(&self) -> bool { - self.active.len() < self.tokens.len() + 1 - } - - // The oldest job (i.e., least job ID) is the one we grant tokens to first. - fn pop_waiting_client(&mut self) -> (JobId, Client) { - // FIXME: replace this with BTreeMap::first_entry when that stabilizes. - let key = *self - .to_send_clients - .keys() - .next() - .expect("at least one waiter"); - let clients = self.to_send_clients.get_mut(&key).unwrap(); - let client = clients.pop().unwrap(); - if clients.is_empty() { - self.to_send_clients.remove(&key); - } - (key, client) - } - - // If we managed to acquire some extra tokens, send them off to a waiting rustc. - fn grant_rustc_token_requests(&mut self) -> CargoResult<()> { - while !self.to_send_clients.is_empty() && self.has_extra_tokens() { - let (id, client) = self.pop_waiting_client(); - // This unwrap is guaranteed to succeed. `active` must be at least - // length 1, as otherwise there can't be a client waiting to be sent - // on, so tokens.len() must also be at least one. - let token = self.tokens.pop().unwrap(); - self.rustc_tokens - .entry(id) - .or_insert_with(Vec::new) - .push(token); - client - .release_raw() - .chain_err(|| "failed to release jobserver token")?; - } - - Ok(()) - } - - fn handle_event( - &mut self, - cx: &mut Context<'a, '_>, - jobserver_helper: &HelperThread, - plan: &mut BuildPlan, - event: Message, - ) -> CargoResult> { - match event { - Message::Run(id, cmd) => { - cx.bcx - .config - .shell() - .verbose(|c| c.status("Running", &cmd))?; - self.timings.unit_start(id, self.active[&id]); - } - Message::BuildPlanMsg(module_name, cmd, filenames) => { - plan.update(&module_name, &cmd, &filenames)?; - } - Message::Stdout(out) => { - cx.bcx.config.shell().stdout_println(out); - } - Message::Stderr(err) => { - let mut shell = cx.bcx.config.shell(); - shell.print_ansi(err.as_bytes())?; - shell.err().write_all(b"\n")?; - } - Message::FixDiagnostic(msg) => { - self.print.print(&msg)?; - } - Message::Finish(id, artifact, result) => { - let unit = match artifact { - // If `id` has completely finished we remove it - // from the `active` map ... - Artifact::All => { - info!("end: {:?}", id); - self.finished += 1; - if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) { - // This puts back the tokens that this rustc - // acquired into our primary token list. - // - // This represents a rustc bug: it did not - // release all of its thread tokens but finished - // completely. But we want to make Cargo resilient - // to such rustc bugs, as they're generally not - // fatal in nature (i.e., Cargo can make progress - // still, and the build might not even fail). - self.tokens.extend(rustc_tokens); - } - self.to_send_clients.remove(&id); - self.active.remove(&id).unwrap() - } - // ... otherwise if it hasn't finished we leave it - // in there as we'll get another `Finish` later on. - Artifact::Metadata => { - info!("end (meta): {:?}", id); - self.active[&id] - } - }; - info!("end ({:?}): {:?}", unit, result); - match result { - Ok(()) => self.finish(id, &unit, artifact, cx)?, - Err(e) => { - let msg = "The following warnings were emitted during compilation:"; - self.emit_warnings(Some(msg), &unit, cx)?; - - if !self.active.is_empty() { - handle_error(&e, &mut *cx.bcx.config.shell()); - cx.bcx.config.shell().warn( - "build failed, waiting for other \ - jobs to finish...", - )?; - return Ok(Some(anyhow::format_err!("build failed"))); - } else { - return Ok(Some(e)); - } - } - } - } - Message::Token(acquired_token) => { - let token = acquired_token.chain_err(|| "failed to acquire jobserver token")?; - self.tokens.push(token); - } - Message::NeedsToken(id) => { - log::info!("queue token request"); - jobserver_helper.request_token(); - let client = cx.rustc_clients[&self.active[&id]].clone(); - self.to_send_clients - .entry(id) - .or_insert_with(Vec::new) - .push(client); - } - Message::ReleaseToken(id) => { - // Note that this pops off potentially a completely - // different token, but all tokens of the same job are - // conceptually the same so that's fine. - // - // self.tokens is a "pool" -- the order doesn't matter -- and - // this transfers ownership of the token into that pool. If we - // end up using it on the next go around, then this token will - // be truncated, same as tokens obtained through Message::Token. - let rustc_tokens = self - .rustc_tokens - .get_mut(&id) - .expect("no tokens associated"); - self.tokens - .push(rustc_tokens.pop().expect("rustc releases token it has")); - } - } - - Ok(None) - } - - // This will also tick the progress bar as appropriate - fn wait_for_events(&mut self) -> Vec { - // Drain all events at once to avoid displaying the progress bar - // unnecessarily. If there's no events we actually block waiting for - // an event, but we keep a "heartbeat" going to allow `record_cpu` - // to run above to calculate CPU usage over time. To do this we - // listen for a message with a timeout, and on timeout we run the - // previous parts of the loop again. - let events: Vec<_> = self.rx.try_iter().collect(); - info!( - "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})", - self.tokens.len(), - self.rustc_tokens - .iter() - .map(|(k, j)| (k, j.len())) - .collect::>(), - self.to_send_clients - .iter() - .map(|(k, j)| (k, j.len())) - .collect::>(), - events.len(), - ); - if events.is_empty() { - loop { - self.tick_progress(); - self.tokens.truncate(self.active.len() - 1); - match self.rx.recv_timeout(Duration::from_millis(500)) { - Ok(message) => break vec![message], - Err(_) => continue, - } - } - } else { - events - } - } fn drain_the_queue( - mut self, + &mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan, scope: &Scope<'a>, jobserver_helper: &HelperThread, ) -> CargoResult<()> { + let mut tokens = Vec::new(); + let mut queue = Vec::new(); + let mut print = DiagnosticPrinter::new(cx.bcx.config); trace!("queue: {:#?}", self.queue); // Iteratively execute the entire dependency graph. Each turn of the @@ -633,8 +284,27 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // successful and otherwise wait for pending work to finish if it failed // and then immediately return. let mut error = None; + let total = self.queue.len(); + let mut finished = 0; loop { - self.spawn_work_if_possible(cx, jobserver_helper, scope, error.is_some())?; + // Dequeue as much work as we can, learning about everything + // possible that can run. Note that this is also the point where we + // start requesting job tokens. Each job after the first needs to + // request a token. + while let Some((unit, job)) = self.queue.dequeue() { + queue.push((unit, job)); + if self.active.len() + queue.len() > 1 { + jobserver_helper.request_token(); + } + } + + // Now that we've learned of all possible work that we can execute + // try to spawn it so long as we've got a jobserver token which says + // we're able to perform some parallel work. + while error.is_none() && self.active.len() < tokens.len() + 1 && !queue.is_empty() { + let (unit, job) = queue.remove(0); + self.run(&unit, job, cx, scope)?; + } // If after all that we're not actually running anything then we're // done! @@ -642,16 +312,101 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { break; } - self.grant_rustc_token_requests()?; - // And finally, before we block waiting for the next event, drop any // excess tokens we may have accidentally acquired. Due to how our // jobserver interface is architected we may acquire a token that we // don't actually use, and if this happens just relinquish it back // to the jobserver itself. - for event in self.wait_for_events() { - if let Some(err) = self.handle_event(cx, jobserver_helper, plan, event)? { - error = Some(err); + tokens.truncate(self.active.len() - 1); + + // Record some timing information if `-Ztimings` is enabled, and + // this'll end up being a noop if we're not recording this + // information. + self.timings + .mark_concurrency(self.active.len(), queue.len(), self.queue.len()); + self.timings.record_cpu(); + + // Drain all events at once to avoid displaying the progress bar + // unnecessarily. If there's no events we actually block waiting for + // an event, but we keep a "heartbeat" going to allow `record_cpu` + // to run above to calculate CPU usage over time. To do this we + // listen for a message with a timeout, and on timeout we run the + // previous parts of the loop again. + let events: Vec<_> = self.rx.try_iter().collect(); + let events = if events.is_empty() { + self.show_progress(finished, total); + match self.rx.recv_timeout(Duration::from_millis(500)) { + Ok(message) => vec![message], + Err(_) => continue, + } + } else { + events + }; + + for event in events { + match event { + Message::Run(id, cmd) => { + cx.bcx + .config + .shell() + .verbose(|c| c.status("Running", &cmd))?; + self.timings.unit_start(id, self.active[&id]); + } + Message::BuildPlanMsg(module_name, cmd, filenames) => { + plan.update(&module_name, &cmd, &filenames)?; + } + Message::Stdout(out) => { + cx.bcx.config.shell().stdout_println(out); + } + Message::Stderr(err) => { + let mut shell = cx.bcx.config.shell(); + shell.print_ansi(err.as_bytes())?; + shell.err().write_all(b"\n")?; + } + Message::FixDiagnostic(msg) => { + print.print(&msg)?; + } + Message::Finish(id, artifact, result) => { + let unit = match artifact { + // If `id` has completely finished we remove it + // from the `active` map ... + Artifact::All => { + info!("end: {:?}", id); + finished += 1; + self.active.remove(&id).unwrap() + } + // ... otherwise if it hasn't finished we leave it + // in there as we'll get another `Finish` later on. + Artifact::Metadata => { + info!("end (meta): {:?}", id); + self.active[&id] + } + }; + info!("end ({:?}): {:?}", unit, result); + match result { + Ok(()) => self.finish(id, &unit, artifact, cx)?, + Err(e) => { + let msg = "The following warnings were emitted during compilation:"; + self.emit_warnings(Some(msg), &unit, cx)?; + + if !self.active.is_empty() { + error = Some(anyhow::format_err!("build failed")); + handle_error(&e, &mut *cx.bcx.config.shell()); + cx.bcx.config.shell().warn( + "build failed, waiting for other \ + jobs to finish...", + )?; + } else { + error = Some(e); + } + } + } + } + Message::Token(acquired_token) => { + tokens.push( + acquired_token.chain_err(|| "failed to acquire jobserver token")?, + ); + } } } } @@ -679,7 +434,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { if let Some(e) = error { Err(e) - } else if self.queue.is_empty() && self.pending_queue.is_empty() { + } else if self.queue.is_empty() && queue.is_empty() { let message = format!( "{} [{}] target(s) in {}", profile_name, opt_type, time_elapsed @@ -695,31 +450,16 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { } } - // This also records CPU usage and marks concurrency; we roughly want to do - // this as often as we spin on the events receiver (at least every 500ms or - // so). - fn tick_progress(&mut self) { - // Record some timing information if `-Ztimings` is enabled, and - // this'll end up being a noop if we're not recording this - // information. - self.timings.mark_concurrency( - self.active.len(), - self.pending_queue.len(), - self.queue.len(), - self.rustc_tokens.len(), - ); - self.timings.record_cpu(); - + fn show_progress(&mut self, count: usize, total: usize) { let active_names = self .active .values() .map(|u| self.name_for_progress(u)) .collect::>(); - drop(self.progress.tick_now( - self.finished, - self.total_units, - &format!(": {}", active_names.join(", ")), - )); + drop( + self.progress + .tick_now(count, total, &format!(": {}", active_names.join(", "))), + ); } fn name_for_progress(&self, unit: &Unit<'_>) -> String { @@ -741,16 +481,17 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { } } - /// Executes a job, pushing the spawned thread's handled onto `threads`. + /// Executes a job in the `scope` given, pushing the spawned thread's + /// handled onto `threads`. fn run( &mut self, unit: &Unit<'a>, job: Job, cx: &Context<'a, '_>, - scope: &Scope<'_>, + scope: &Scope<'a>, ) -> CargoResult<()> { - let id = JobId(self.next_id); - self.next_id = self.next_id.checked_add(1).unwrap(); + let id = self.next_id; + self.next_id = id.checked_add(1).unwrap(); info!("start {}: {:?}", id, unit); @@ -760,12 +501,6 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { let my_tx = self.tx.clone(); let fresh = job.freshness(); let rmeta_required = cx.rmeta_required(unit); - - if !cx.bcx.build_config.build_plan { - // Print out some nice progress information. - self.note_working_on(cx.bcx.config, unit, fresh)?; - } - let doit = move || { let state = JobState { id, @@ -805,7 +540,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // to make sure nothing hangs by accident. struct FinishOnDrop<'a> { tx: &'a Sender, - id: JobId, + id: u32, result: CargoResult<()>, } @@ -817,10 +552,15 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { } }; + if !cx.bcx.build_config.build_plan { + // Print out some nice progress information. + self.note_working_on(cx.bcx.config, unit, fresh)?; + } + match fresh { Freshness::Fresh => { self.timings.add_fresh(); - doit(); + doit() } Freshness::Dirty => { self.timings.add_dirty(); @@ -861,7 +601,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { fn finish( &mut self, - id: JobId, + id: u32, unit: &Unit<'a>, artifact: Artifact, cx: &mut Context<'a, '_>, diff --git a/src/cargo/core/compiler/mod.rs b/src/cargo/core/compiler/mod.rs index 29805bb3e0e..67c5496e9ba 100644 --- a/src/cargo/core/compiler/mod.rs +++ b/src/cargo/core/compiler/mod.rs @@ -538,14 +538,7 @@ fn prepare_rustc<'a, 'cfg>( let is_primary = cx.is_primary_package(unit); let mut base = cx.compilation.rustc_process(unit.pkg, is_primary)?; - if cx.bcx.config.cli_unstable().jobserver_per_rustc { - let client = cx.new_jobserver()?; - base.inherit_jobserver(&client); - base.arg("-Zjobserver-token-requests"); - assert!(cx.rustc_clients.insert(*unit, client).is_none()); - } else { - base.inherit_jobserver(&cx.jobserver); - } + base.inherit_jobserver(&cx.jobserver); build_base_args(cx, &mut base, unit, crate_types)?; build_deps_args(&mut base, cx, unit)?; Ok(base) @@ -1217,31 +1210,6 @@ fn on_stderr_line_inner( } } - #[derive(serde::Deserialize)] - struct JobserverNotification { - jobserver_event: Event, - } - - #[derive(Debug, serde::Deserialize)] - enum Event { - WillAcquire, - Release, - } - - if let Ok(JobserverNotification { jobserver_event }) = - serde_json::from_str::(compiler_message.get()) - { - log::info!( - "found jobserver directive from rustc: `{:?}`", - jobserver_event - ); - match jobserver_event { - Event::WillAcquire => state.will_acquire(), - Event::Release => state.release_token(), - } - return Ok(false); - } - // And failing all that above we should have a legitimate JSON diagnostic // from the compiler, so wrap it in an external Cargo JSON message // indicating which package it came from and then emit it. diff --git a/src/cargo/core/compiler/timings.rs b/src/cargo/core/compiler/timings.rs index ff24e77b10a..a205b658642 100644 --- a/src/cargo/core/compiler/timings.rs +++ b/src/cargo/core/compiler/timings.rs @@ -3,7 +3,6 @@ //! This module implements some simple tracking information for timing of how //! long it takes for different units to compile. use super::{CompileMode, Unit}; -use crate::core::compiler::job_queue::JobId; use crate::core::compiler::BuildContext; use crate::core::PackageId; use crate::util::cpu::State; @@ -42,7 +41,7 @@ pub struct Timings<'a, 'cfg> { unit_times: Vec>, /// Units that are in the process of being built. /// When they finished, they are moved to `unit_times`. - active: HashMap>, + active: HashMap>, /// Concurrency-tracking information. This is periodically updated while /// compilation progresses. concurrency: Vec, @@ -85,10 +84,6 @@ struct Concurrency { /// Number of units that are not yet ready, because they are waiting for /// dependencies to finish. inactive: usize, - /// Number of rustc "extra" threads -- i.e., how many tokens have been - /// provided across all current rustc instances that are not the main thread - /// tokens. - rustc_parallelism: usize, } impl<'a, 'cfg> Timings<'a, 'cfg> { @@ -145,7 +140,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// Mark that a unit has started running. - pub fn unit_start(&mut self, id: JobId, unit: Unit<'a>) { + pub fn unit_start(&mut self, id: u32, unit: Unit<'a>) { if !self.enabled { return; } @@ -179,7 +174,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// Mark that the `.rmeta` file as generated. - pub fn unit_rmeta_finished(&mut self, id: JobId, unlocked: Vec<&Unit<'a>>) { + pub fn unit_rmeta_finished(&mut self, id: u32, unlocked: Vec<&Unit<'a>>) { if !self.enabled { return; } @@ -197,7 +192,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// Mark that a unit has finished running. - pub fn unit_finished(&mut self, id: JobId, unlocked: Vec<&Unit<'a>>) { + pub fn unit_finished(&mut self, id: u32, unlocked: Vec<&Unit<'a>>) { if !self.enabled { return; } @@ -237,13 +232,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { } /// This is called periodically to mark the concurrency of internal structures. - pub fn mark_concurrency( - &mut self, - active: usize, - waiting: usize, - inactive: usize, - rustc_parallelism: usize, - ) { + pub fn mark_concurrency(&mut self, active: usize, waiting: usize, inactive: usize) { if !self.enabled { return; } @@ -252,7 +241,6 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { active, waiting, inactive, - rustc_parallelism, }; self.concurrency.push(c); } @@ -297,7 +285,7 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { if !self.enabled { return Ok(()); } - self.mark_concurrency(0, 0, 0, 0); + self.mark_concurrency(0, 0, 0); self.unit_times .sort_unstable_by(|a, b| a.start.partial_cmp(&b.start).unwrap()); if self.report_html { @@ -373,12 +361,6 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { }; let total_time = format!("{:.1}s{}", duration, time_human); let max_concurrency = self.concurrency.iter().map(|c| c.active).max().unwrap(); - let max_rustc_concurrency = self - .concurrency - .iter() - .map(|c| c.rustc_parallelism) - .max() - .unwrap(); let rustc_info = render_rustc_info(bcx); write!( f, @@ -411,9 +393,6 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { rustc:{} - - Max (global) rustc threads concurrency:{} - "#, @@ -428,7 +407,6 @@ impl<'a, 'cfg> Timings<'a, 'cfg> { self.start_str, total_time, rustc_info, - max_rustc_concurrency, )?; Ok(()) } diff --git a/src/cargo/core/features.rs b/src/cargo/core/features.rs index c2ea6622fa8..c34fe07880d 100644 --- a/src/cargo/core/features.rs +++ b/src/cargo/core/features.rs @@ -341,7 +341,6 @@ pub struct CliUnstable { pub timings: Option>, pub doctest_xcompile: bool, pub panic_abort_tests: bool, - pub jobserver_per_rustc: bool, } impl CliUnstable { @@ -410,7 +409,6 @@ impl CliUnstable { "timings" => self.timings = Some(parse_timings(v)), "doctest-xcompile" => self.doctest_xcompile = parse_empty(k, v)?, "panic-abort-tests" => self.panic_abort_tests = parse_empty(k, v)?, - "jobserver-per-rustc" => self.jobserver_per_rustc = parse_empty(k, v)?, _ => bail!("unknown `-Z` flag specified: {}", k), } diff --git a/src/cargo/util/progress.rs b/src/cargo/util/progress.rs index d62600379cb..05715e4fe32 100644 --- a/src/cargo/util/progress.rs +++ b/src/cargo/util/progress.rs @@ -224,7 +224,6 @@ impl<'cfg> State<'cfg> { impl Format { fn progress(&self, cur: usize, max: usize) -> Option { - assert!(cur <= max); // Render the percentage at the far right and then figure how long the // progress bar is let pct = (cur as f64) / (max as f64);