From 0c5f6f012ec90e037d01b50ea3c45ad06b4b7e48 Mon Sep 17 00:00:00 2001 From: bjorn3 Date: Sat, 7 Nov 2020 14:19:14 +0100 Subject: [PATCH 1/5] Don't spawn a new thread for fresh jobs The overhead in doing so is often much higher than the actual time it takes to execute the job --- src/cargo/core/compiler/job_queue.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index b8b22ae5a2f..54db3a49bee 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -880,10 +880,17 @@ impl<'cfg> DrainState<'cfg> { }; match fresh { - Freshness::Fresh => self.timings.add_fresh(), - Freshness::Dirty => self.timings.add_dirty(), + Freshness::Fresh => { + self.timings.add_fresh(); + // Running a fresh job on the same thread is often much faster than spawning a new + // thread to run the job. + doit(); + } + Freshness::Dirty => { + self.timings.add_dirty(); + scope.spawn(move |_| doit()); + } } - scope.spawn(move |_| doit()); Ok(()) } From 3836dfb32de7ed1e394b6734792a68ab94e518dd Mon Sep 17 00:00:00 2001 From: bjorn3 Date: Sat, 7 Nov 2020 16:08:02 +0100 Subject: [PATCH 2/5] Split Job::new into Job::new_fresh and Job::new_dirty --- src/cargo/core/compiler/custom_build.rs | 2 +- src/cargo/core/compiler/fingerprint.rs | 9 +++------ src/cargo/core/compiler/job.rs | 15 +++++++++++++-- src/cargo/core/compiler/mod.rs | 4 ++-- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/cargo/core/compiler/custom_build.rs b/src/cargo/core/compiler/custom_build.rs index 7e02008c8a1..17c86920714 100644 --- a/src/cargo/core/compiler/custom_build.rs +++ b/src/cargo/core/compiler/custom_build.rs @@ -432,7 +432,7 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult { }); let mut job = if cx.bcx.build_config.build_plan { - Job::new(Work::noop(), Freshness::Dirty) + Job::new_dirty(Work::noop()) } else { fingerprint::prepare_target(cx, unit, false)? }; diff --git a/src/cargo/core/compiler/fingerprint.rs b/src/cargo/core/compiler/fingerprint.rs index bc3e43b624c..e81855213cd 100644 --- a/src/cargo/core/compiler/fingerprint.rs +++ b/src/cargo/core/compiler/fingerprint.rs @@ -337,10 +337,7 @@ use crate::util::paths; use crate::util::{internal, profile, ProcessBuilder}; use super::custom_build::BuildDeps; -use super::job::{ - Freshness::{Dirty, Fresh}, - Job, Work, -}; +use super::job::{Job, Work}; use super::{BuildContext, Context, FileFlavor, Unit}; /// Determines if a `unit` is up-to-date, and if not prepares necessary work to @@ -396,7 +393,7 @@ pub fn prepare_target(cx: &mut Context<'_, '_>, unit: &Unit, force: bool) -> Car } if compare.is_ok() && !force { - return Ok(Job::new(Work::noop(), Fresh)); + return Ok(Job::new_fresh()); } // Clear out the old fingerprint file if it exists. This protects when @@ -469,7 +466,7 @@ pub fn prepare_target(cx: &mut Context<'_, '_>, unit: &Unit, force: bool) -> Car Work::new(move |_| write_fingerprint(&loc, &fingerprint)) }; - Ok(Job::new(write_fingerprint, Dirty)) + Ok(Job::new_dirty(write_fingerprint)) } /// Dependency edge information for fingerprints. This is generated for each diff --git a/src/cargo/core/compiler/job.rs b/src/cargo/core/compiler/job.rs index 7d589ee7986..acf551c7d6d 100644 --- a/src/cargo/core/compiler/job.rs +++ b/src/cargo/core/compiler/job.rs @@ -40,9 +40,20 @@ impl Work { } impl Job { + /// Creates a new job that does nothing. + pub fn new_fresh() -> Job { + Job { + work: Work::noop(), + fresh: Freshness::Fresh, + } + } + /// Creates a new job representing a unit of work. - pub fn new(work: Work, fresh: Freshness) -> Job { - Job { work, fresh } + pub fn new_dirty(work: Work) -> Job { + Job { + work, + fresh: Freshness::Dirty, + } } /// Consumes this job by running it, returning the result of the diff --git a/src/cargo/core/compiler/mod.rs b/src/cargo/core/compiler/mod.rs index 53849e300e1..fef4c5099c3 100644 --- a/src/cargo/core/compiler/mod.rs +++ b/src/cargo/core/compiler/mod.rs @@ -130,9 +130,9 @@ fn compile<'cfg>( custom_build::prepare(cx, unit)? } else if unit.mode.is_doc_test() { // We run these targets later, so this is just a no-op for now. - Job::new(Work::noop(), Freshness::Fresh) + Job::new_fresh() } else if build_plan { - Job::new(rustc(cx, unit, &exec.clone())?, Freshness::Dirty) + Job::new_dirty(rustc(cx, unit, &exec.clone())?) } else { let force = exec.force_rebuild(unit) || force_rebuild; let mut job = fingerprint::prepare_target(cx, unit, force)?; From 49b63b7caaedb1d27c92598861b872577612c11b Mon Sep 17 00:00:00 2001 From: bjorn3 Date: Sat, 7 Nov 2020 16:52:17 +0100 Subject: [PATCH 3/5] [HACK] Fix deadlock by avoiding backpressure for fresh jobs --- src/cargo/core/compiler/job_queue.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 54db3a49bee..40bafb7f76b 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -166,6 +166,11 @@ pub struct JobState<'a> { /// Channel back to the main thread to coordinate messages and such. messages: Arc>, + /// Normally messages are handled in a bounded way. When the job is fresh + /// however we need to immediately return to prevent a deadlock as the messages + /// are processed on the same thread as they are sent from. + messages_bounded: bool, + /// The job id that this state is associated with, used when sending /// messages back to the main thread. id: JobId, @@ -232,11 +237,19 @@ impl<'a> JobState<'a> { } pub fn stdout(&self, stdout: String) { - self.messages.push_bounded(Message::Stdout(stdout)); + if self.messages_bounded { + self.messages.push_bounded(Message::Stdout(stdout)); + } else { + self.messages.push(Message::Stdout(stdout)); + } } pub fn stderr(&self, stderr: String) { - self.messages.push_bounded(Message::Stderr(stderr)); + if self.messages_bounded { + self.messages.push_bounded(Message::Stderr(stderr)); + } else { + self.messages.push(Message::Stderr(stderr)); + } } /// A method used to signal to the coordinator thread that the rmeta file @@ -830,6 +843,7 @@ impl<'cfg> DrainState<'cfg> { let state = JobState { id, messages: messages.clone(), + messages_bounded: job.freshness() == Freshness::Dirty, rmeta_required: Cell::new(rmeta_required), _marker: marker::PhantomData, }; From 0583081d2a1c5a1d1e321ba834bb1ca317e1a5f1 Mon Sep 17 00:00:00 2001 From: bjorn3 Date: Sat, 7 Nov 2020 18:12:11 +0100 Subject: [PATCH 4/5] Immediately print job output for fresh jobs This prevents a deadlock where the message queue is filled with output messages but not emptied as the job producing the messages runs on the same thread as the message processing. --- src/cargo/core/compiler/custom_build.rs | 13 ++--- src/cargo/core/compiler/job_queue.rs | 63 +++++++++++++++---------- src/cargo/core/compiler/mod.rs | 12 ++--- 3 files changed, 51 insertions(+), 37 deletions(-) diff --git a/src/cargo/core/compiler/custom_build.rs b/src/cargo/core/compiler/custom_build.rs index 17c86920714..7930a896047 100644 --- a/src/cargo/core/compiler/custom_build.rs +++ b/src/cargo/core/compiler/custom_build.rs @@ -128,7 +128,7 @@ fn emit_build_output( output: &BuildOutput, out_dir: &Path, package_id: PackageId, -) { +) -> CargoResult<()> { let library_paths = output .library_paths .iter() @@ -144,7 +144,8 @@ fn emit_build_output( out_dir, } .to_json_string(); - state.stdout(msg); + state.stdout(msg)?; + Ok(()) } fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult { @@ -353,13 +354,13 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult { warnings_in_case_of_panic.push(warning.to_owned()); } if extra_verbose { - state.stdout(format!("{}{}", prefix, stdout)); + state.stdout(format!("{}{}", prefix, stdout))?; } Ok(()) }, &mut |stderr| { if extra_verbose { - state.stderr(format!("{}{}", prefix, stderr)); + state.stderr(format!("{}{}", prefix, stderr))?; } Ok(()) }, @@ -396,7 +397,7 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult { BuildOutput::parse(&output.stdout, &pkg_name, &script_out_dir, &script_out_dir)?; if json_messages { - emit_build_output(state, &parsed_output, script_out_dir.as_path(), id); + emit_build_output(state, &parsed_output, script_out_dir.as_path(), id)?; } build_script_outputs .lock() @@ -421,7 +422,7 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult { }; if json_messages { - emit_build_output(state, &output, script_out_dir.as_path(), id); + emit_build_output(state, &output, script_out_dir.as_path(), id)?; } build_script_outputs diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 40bafb7f76b..af68722bb50 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -166,10 +166,11 @@ pub struct JobState<'a> { /// Channel back to the main thread to coordinate messages and such. messages: Arc>, - /// Normally messages are handled in a bounded way. When the job is fresh - /// however we need to immediately return to prevent a deadlock as the messages - /// are processed on the same thread as they are sent from. - messages_bounded: bool, + /// Normally output is sent to the job queue with backpressure. When the job is fresh + /// however we need to immediately display the output to prevent a deadlock as the + /// output messages are processed on the same thread as they are sent from. `output` + /// defines where to output in this case. + output: Option<&'a Config>, /// The job id that this state is associated with, used when sending /// messages back to the main thread. @@ -236,20 +237,24 @@ impl<'a> JobState<'a> { .push(Message::BuildPlanMsg(module_name, cmd, filenames)); } - pub fn stdout(&self, stdout: String) { - if self.messages_bounded { - self.messages.push_bounded(Message::Stdout(stdout)); + pub fn stdout(&self, stdout: String) -> CargoResult<()> { + if let Some(config) = self.output { + writeln!(config.shell().out(), "{}", stdout)?; } else { - self.messages.push(Message::Stdout(stdout)); + self.messages.push_bounded(Message::Stdout(stdout)); } + Ok(()) } - pub fn stderr(&self, stderr: String) { - if self.messages_bounded { - self.messages.push_bounded(Message::Stderr(stderr)); + pub fn stderr(&self, stderr: String) -> CargoResult<()> { + if let Some(config) = self.output { + let mut shell = config.shell(); + shell.print_ansi(stderr.as_bytes())?; + shell.err().write_all(b"\n")?; } else { - self.messages.push(Message::Stderr(stderr)); + self.messages.push_bounded(Message::Stderr(stderr)); } + Ok(()) } /// A method used to signal to the coordinator thread that the rmeta file @@ -839,17 +844,9 @@ impl<'cfg> DrainState<'cfg> { self.note_working_on(cx.bcx.config, unit, fresh)?; } - let doit = move || { - let state = JobState { - id, - messages: messages.clone(), - messages_bounded: job.freshness() == Freshness::Dirty, - rmeta_required: Cell::new(rmeta_required), - _marker: marker::PhantomData, - }; - + let doit = move |state: JobState<'_>| { let mut sender = FinishOnDrop { - messages: &messages, + messages: &state.messages, id, result: None, }; @@ -868,7 +865,9 @@ impl<'cfg> DrainState<'cfg> { // we need to make sure that the metadata is flagged as produced so // send a synthetic message here. if state.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() { - messages.push(Message::Finish(id, Artifact::Metadata, Ok(()))); + state + .messages + .push(Message::Finish(state.id, Artifact::Metadata, Ok(()))); } // Use a helper struct with a `Drop` implementation to guarantee @@ -898,11 +897,25 @@ impl<'cfg> DrainState<'cfg> { self.timings.add_fresh(); // Running a fresh job on the same thread is often much faster than spawning a new // thread to run the job. - doit(); + doit(JobState { + id, + messages: messages.clone(), + output: Some(cx.bcx.config), + rmeta_required: Cell::new(rmeta_required), + _marker: marker::PhantomData, + }); } Freshness::Dirty => { self.timings.add_dirty(); - scope.spawn(move |_| doit()); + scope.spawn(move |_| { + doit(JobState { + id, + messages: messages.clone(), + output: None, + rmeta_required: Cell::new(rmeta_required), + _marker: marker::PhantomData, + }) + }); } } diff --git a/src/cargo/core/compiler/mod.rs b/src/cargo/core/compiler/mod.rs index fef4c5099c3..addf5ec7c3f 100644 --- a/src/cargo/core/compiler/mod.rs +++ b/src/cargo/core/compiler/mod.rs @@ -448,7 +448,7 @@ fn link_targets(cx: &mut Context<'_, '_>, unit: &Unit, fresh: bool) -> CargoResu fresh, } .to_json_string(); - state.stdout(msg); + state.stdout(msg)?; } Ok(()) })) @@ -1139,7 +1139,7 @@ fn on_stdout_line( _package_id: PackageId, _target: &Target, ) -> CargoResult<()> { - state.stdout(line.to_string()); + state.stdout(line.to_string())?; Ok(()) } @@ -1177,7 +1177,7 @@ fn on_stderr_line_inner( // something like that), so skip over everything that doesn't look like a // JSON message. if !line.starts_with('{') { - state.stderr(line.to_string()); + state.stderr(line.to_string())?; return Ok(true); } @@ -1189,7 +1189,7 @@ fn on_stderr_line_inner( // to stderr. Err(e) => { debug!("failed to parse json: {:?}", e); - state.stderr(line.to_string()); + state.stderr(line.to_string())?; return Ok(true); } }; @@ -1225,7 +1225,7 @@ fn on_stderr_line_inner( .map(|v| String::from_utf8(v).expect("utf8")) .expect("strip should never fail") }; - state.stderr(rendered); + state.stderr(rendered)?; return Ok(true); } } @@ -1316,7 +1316,7 @@ fn on_stderr_line_inner( // Switch json lines from rustc/rustdoc that appear on stderr to stdout // instead. We want the stdout of Cargo to always be machine parseable as // stderr has our colorized human-readable messages. - state.stdout(msg); + state.stdout(msg)?; Ok(true) } From 50c0221ca38cd8469d8875a68482e93bb5753e51 Mon Sep 17 00:00:00 2001 From: bjorn3 Date: Tue, 10 Nov 2020 10:31:39 +0100 Subject: [PATCH 5/5] Add some comments --- src/cargo/core/compiler/job_queue.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index af68722bb50..e883ad3e601 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -162,14 +162,28 @@ impl std::fmt::Display for JobId { } } +/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything +/// necessary to communicate between the main thread and the execution of the job. +/// +/// The job may execute on either a dedicated thread or the main thread. If the job executes on the +/// main thread, the `output` field must be set to prevent a deadlock. pub struct JobState<'a> { /// Channel back to the main thread to coordinate messages and such. + /// + /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on + /// the message queue to prevent a deadlock. messages: Arc>, /// Normally output is sent to the job queue with backpressure. When the job is fresh /// however we need to immediately display the output to prevent a deadlock as the /// output messages are processed on the same thread as they are sent from. `output` /// defines where to output in this case. + /// + /// Currently the `Shell` inside `Config` is wrapped in a `RefCell` and thus can't be passed + /// between threads. This means that it isn't possible for multiple output messages to be + /// interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case + /// interleaving is still prevented as the lock would be held for the whole printing of an + /// output message. output: Option<&'a Config>, /// The job id that this state is associated with, used when sending