Skip to content

Commit

Permalink
Immediately print job output for fresh jobs
Browse files Browse the repository at this point in the history
This prevents a deadlock where the message queue is filled with output
messages but not emptied as the job producing the messages runs on the
same thread as the message processing.
  • Loading branch information
bjorn3 committed Nov 10, 2020
1 parent 49b63b7 commit 0583081
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 37 deletions.
13 changes: 7 additions & 6 deletions src/cargo/core/compiler/custom_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ fn emit_build_output(
output: &BuildOutput,
out_dir: &Path,
package_id: PackageId,
) {
) -> CargoResult<()> {
let library_paths = output
.library_paths
.iter()
Expand All @@ -144,7 +144,8 @@ fn emit_build_output(
out_dir,
}
.to_json_string();
state.stdout(msg);
state.stdout(msg)?;
Ok(())
}

fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
Expand Down Expand Up @@ -353,13 +354,13 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
warnings_in_case_of_panic.push(warning.to_owned());
}
if extra_verbose {
state.stdout(format!("{}{}", prefix, stdout));
state.stdout(format!("{}{}", prefix, stdout))?;
}
Ok(())
},
&mut |stderr| {
if extra_verbose {
state.stderr(format!("{}{}", prefix, stderr));
state.stderr(format!("{}{}", prefix, stderr))?;
}
Ok(())
},
Expand Down Expand Up @@ -396,7 +397,7 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
BuildOutput::parse(&output.stdout, &pkg_name, &script_out_dir, &script_out_dir)?;

if json_messages {
emit_build_output(state, &parsed_output, script_out_dir.as_path(), id);
emit_build_output(state, &parsed_output, script_out_dir.as_path(), id)?;
}
build_script_outputs
.lock()
Expand All @@ -421,7 +422,7 @@ fn build_work(cx: &mut Context<'_, '_>, unit: &Unit) -> CargoResult<Job> {
};

if json_messages {
emit_build_output(state, &output, script_out_dir.as_path(), id);
emit_build_output(state, &output, script_out_dir.as_path(), id)?;
}

build_script_outputs
Expand Down
63 changes: 38 additions & 25 deletions src/cargo/core/compiler/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,11 @@ pub struct JobState<'a> {
/// Channel back to the main thread to coordinate messages and such.
messages: Arc<Queue<Message>>,

/// Normally messages are handled in a bounded way. When the job is fresh
/// however we need to immediately return to prevent a deadlock as the messages
/// are processed on the same thread as they are sent from.
messages_bounded: bool,
/// Normally output is sent to the job queue with backpressure. When the job is fresh
/// however we need to immediately display the output to prevent a deadlock as the
/// output messages are processed on the same thread as they are sent from. `output`
/// defines where to output in this case.
output: Option<&'a Config>,

/// The job id that this state is associated with, used when sending
/// messages back to the main thread.
Expand Down Expand Up @@ -236,20 +237,24 @@ impl<'a> JobState<'a> {
.push(Message::BuildPlanMsg(module_name, cmd, filenames));
}

pub fn stdout(&self, stdout: String) {
if self.messages_bounded {
self.messages.push_bounded(Message::Stdout(stdout));
pub fn stdout(&self, stdout: String) -> CargoResult<()> {
if let Some(config) = self.output {
writeln!(config.shell().out(), "{}", stdout)?;
} else {
self.messages.push(Message::Stdout(stdout));
self.messages.push_bounded(Message::Stdout(stdout));
}
Ok(())
}

pub fn stderr(&self, stderr: String) {
if self.messages_bounded {
self.messages.push_bounded(Message::Stderr(stderr));
pub fn stderr(&self, stderr: String) -> CargoResult<()> {
if let Some(config) = self.output {
let mut shell = config.shell();
shell.print_ansi(stderr.as_bytes())?;
shell.err().write_all(b"\n")?;
} else {
self.messages.push(Message::Stderr(stderr));
self.messages.push_bounded(Message::Stderr(stderr));
}
Ok(())
}

/// A method used to signal to the coordinator thread that the rmeta file
Expand Down Expand Up @@ -839,17 +844,9 @@ impl<'cfg> DrainState<'cfg> {
self.note_working_on(cx.bcx.config, unit, fresh)?;
}

let doit = move || {
let state = JobState {
id,
messages: messages.clone(),
messages_bounded: job.freshness() == Freshness::Dirty,
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
};

let doit = move |state: JobState<'_>| {
let mut sender = FinishOnDrop {
messages: &messages,
messages: &state.messages,
id,
result: None,
};
Expand All @@ -868,7 +865,9 @@ impl<'cfg> DrainState<'cfg> {
// we need to make sure that the metadata is flagged as produced so
// send a synthetic message here.
if state.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
messages.push(Message::Finish(id, Artifact::Metadata, Ok(())));
state
.messages
.push(Message::Finish(state.id, Artifact::Metadata, Ok(())));
}

// Use a helper struct with a `Drop` implementation to guarantee
Expand Down Expand Up @@ -898,11 +897,25 @@ impl<'cfg> DrainState<'cfg> {
self.timings.add_fresh();
// Running a fresh job on the same thread is often much faster than spawning a new
// thread to run the job.
doit();
doit(JobState {
id,
messages: messages.clone(),
output: Some(cx.bcx.config),
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
});
}
Freshness::Dirty => {
self.timings.add_dirty();
scope.spawn(move |_| doit());
scope.spawn(move |_| {
doit(JobState {
id,
messages: messages.clone(),
output: None,
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
})
});
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/cargo/core/compiler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ fn link_targets(cx: &mut Context<'_, '_>, unit: &Unit, fresh: bool) -> CargoResu
fresh,
}
.to_json_string();
state.stdout(msg);
state.stdout(msg)?;
}
Ok(())
}))
Expand Down Expand Up @@ -1139,7 +1139,7 @@ fn on_stdout_line(
_package_id: PackageId,
_target: &Target,
) -> CargoResult<()> {
state.stdout(line.to_string());
state.stdout(line.to_string())?;
Ok(())
}

Expand Down Expand Up @@ -1177,7 +1177,7 @@ fn on_stderr_line_inner(
// something like that), so skip over everything that doesn't look like a
// JSON message.
if !line.starts_with('{') {
state.stderr(line.to_string());
state.stderr(line.to_string())?;
return Ok(true);
}

Expand All @@ -1189,7 +1189,7 @@ fn on_stderr_line_inner(
// to stderr.
Err(e) => {
debug!("failed to parse json: {:?}", e);
state.stderr(line.to_string());
state.stderr(line.to_string())?;
return Ok(true);
}
};
Expand Down Expand Up @@ -1225,7 +1225,7 @@ fn on_stderr_line_inner(
.map(|v| String::from_utf8(v).expect("utf8"))
.expect("strip should never fail")
};
state.stderr(rendered);
state.stderr(rendered)?;
return Ok(true);
}
}
Expand Down Expand Up @@ -1316,7 +1316,7 @@ fn on_stderr_line_inner(
// Switch json lines from rustc/rustdoc that appear on stderr to stdout
// instead. We want the stdout of Cargo to always be machine parseable as
// stderr has our colorized human-readable messages.
state.stdout(msg);
state.stdout(msg)?;
Ok(true)
}

Expand Down

0 comments on commit 0583081

Please sign in to comment.