diff --git a/crates/flycheck/src/command.rs b/crates/flycheck/src/command.rs index 091146a0010a..8ba7018316a1 100644 --- a/crates/flycheck/src/command.rs +++ b/crates/flycheck/src/command.rs @@ -4,12 +4,13 @@ use std::{ ffi::OsString, fmt, io, + marker::PhantomData, path::PathBuf, process::{ChildStderr, ChildStdout, Command, Stdio}, }; use command_group::{CommandGroup, GroupChild}; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use crossbeam_channel::Sender; use stdx::process::streaming_output; /// Cargo output is structured as a one JSON per line. This trait abstracts parsing one line of @@ -99,10 +100,10 @@ pub(crate) struct CommandHandle { /// a read syscall dropping and therefore terminating the process is our best option. child: JodGroupChild, thread: stdx::thread::JoinHandle>, - pub(crate) receiver: Receiver, program: OsString, arguments: Vec, current_dir: Option, + _phantom: PhantomData, } impl fmt::Debug for CommandHandle { @@ -116,7 +117,7 @@ impl fmt::Debug for CommandHandle { } impl CommandHandle { - pub(crate) fn spawn(mut command: Command) -> std::io::Result { + pub(crate) fn spawn(mut command: Command, sender: Sender) -> std::io::Result { command.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null()); let mut child = command.group_spawn().map(JodGroupChild)?; @@ -127,13 +128,12 @@ impl CommandHandle { let stdout = child.0.inner().stdout.take().unwrap(); let stderr = child.0.inner().stderr.take().unwrap(); - let (sender, receiver) = unbounded(); let actor = CargoActor::::new(sender, stdout, stderr); let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker) .name("CommandHandle".to_owned()) .spawn(move || actor.run()) .expect("failed to spawn thread"); - Ok(CommandHandle { program, arguments, current_dir, child, thread, receiver }) + Ok(CommandHandle { program, arguments, current_dir, child, thread, _phantom: PhantomData }) } pub(crate) fn cancel(mut self) { diff --git a/crates/flycheck/src/lib.rs b/crates/flycheck/src/lib.rs index f935ec32d51a..5dfaaf774209 100644 --- a/crates/flycheck/src/lib.rs +++ b/crates/flycheck/src/lib.rs @@ -215,6 +215,8 @@ struct FlycheckActor { /// have to wrap sub-processes output handling in a thread and pass messages /// back over a channel. command_handle: Option>, + /// The receiver side of the channel mentioned above. + command_receiver: Option>, } enum Event { @@ -240,6 +242,7 @@ impl FlycheckActor { sysroot_root, root: workspace_root, command_handle: None, + command_receiver: None, } } @@ -248,14 +251,13 @@ impl FlycheckActor { } fn next_event(&self, inbox: &Receiver) -> Option { - let check_chan = self.command_handle.as_ref().map(|cargo| &cargo.receiver); if let Ok(msg) = inbox.try_recv() { // give restarts a preference so check outputs don't block a restart or stop return Some(Event::RequestStateChange(msg)); } select! { recv(inbox) -> msg => msg.ok().map(Event::RequestStateChange), - recv(check_chan.unwrap_or(&never())) -> msg => Some(Event::CheckEvent(msg.ok())), + recv(self.command_receiver.as_ref().unwrap_or(&never())) -> msg => Some(Event::CheckEvent(msg.ok())), } } @@ -284,10 +286,12 @@ impl FlycheckActor { let formatted_command = format!("{:?}", command); tracing::debug!(?command, "will restart flycheck"); - match CommandHandle::spawn(command) { + let (sender, receiver) = unbounded(); + match CommandHandle::spawn(command, sender) { Ok(command_handle) => { tracing::debug!(command = formatted_command, "did restart flycheck"); self.command_handle = Some(command_handle); + self.command_receiver = Some(receiver); self.report_progress(Progress::DidStart); } Err(error) => { @@ -303,6 +307,7 @@ impl FlycheckActor { // Watcher finished let command_handle = self.command_handle.take().unwrap(); + self.command_receiver.take(); let formatted_handle = format!("{:?}", command_handle); let res = command_handle.join(); diff --git a/crates/flycheck/src/test_runner.rs b/crates/flycheck/src/test_runner.rs index 94c05ebc7431..c136dd13664a 100644 --- a/crates/flycheck/src/test_runner.rs +++ b/crates/flycheck/src/test_runner.rs @@ -3,7 +3,8 @@ use std::process::Command; -use crossbeam_channel::Receiver; +use crossbeam_channel::Sender; +use paths::AbsPath; use serde::Deserialize; use toolchain::Tool; @@ -54,20 +55,27 @@ impl ParseFromLine for CargoTestMessage { #[derive(Debug)] pub struct CargoTestHandle { - handle: CommandHandle, + _handle: CommandHandle, } // Example of a cargo test command: // cargo test --workspace --no-fail-fast -- module::func -Z unstable-options --format=json impl CargoTestHandle { - pub fn new(path: Option<&str>, options: CargoOptions) -> std::io::Result { + pub fn new( + path: Option<&str>, + options: CargoOptions, + root: &AbsPath, + sender: Sender, + ) -> std::io::Result { let mut cmd = Command::new(Tool::Cargo.path()); cmd.env("RUSTC_BOOTSTRAP", "1"); cmd.arg("test"); cmd.arg("--workspace"); // --no-fail-fast is needed to ensure that all requested tests will run cmd.arg("--no-fail-fast"); + cmd.arg("--manifest-path"); + cmd.arg(root.join("Cargo.toml")); options.apply_on_command(&mut cmd); cmd.arg("--"); if let Some(path) = path { @@ -75,10 +83,6 @@ impl CargoTestHandle { } cmd.args(["-Z", "unstable-options"]); cmd.arg("--format=json"); - Ok(Self { handle: CommandHandle::spawn(cmd)? }) - } - - pub fn receiver(&self) -> &Receiver { - &self.handle.receiver + Ok(Self { _handle: CommandHandle::spawn(cmd, sender)? }) } } diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs index 8516ffa0dfae..a0101af309b2 100644 --- a/crates/rust-analyzer/src/global_state.rs +++ b/crates/rust-analyzer/src/global_state.rs @@ -86,7 +86,10 @@ pub(crate) struct GlobalState { pub(crate) last_flycheck_error: Option, // Test explorer - pub(crate) test_run_session: Option, + pub(crate) test_run_session: Option>, + pub(crate) test_run_sender: Sender, + pub(crate) test_run_receiver: Receiver, + pub(crate) test_run_remaining_jobs: usize, // VFS pub(crate) loader: Handle, Receiver>, @@ -191,6 +194,7 @@ impl GlobalState { analysis_host.update_lru_capacities(capacities); } let (flycheck_sender, flycheck_receiver) = unbounded(); + let (test_run_sender, test_run_receiver) = unbounded(); let mut this = GlobalState { sender, req_queue: ReqQueue::default(), @@ -219,6 +223,9 @@ impl GlobalState { last_flycheck_error: None, test_run_session: None, + test_run_sender, + test_run_receiver, + test_run_remaining_jobs: 0, vfs: Arc::new(RwLock::new((vfs::Vfs::default(), IntMap::default()))), vfs_config_version: 0, diff --git a/crates/rust-analyzer/src/handlers/request.rs b/crates/rust-analyzer/src/handlers/request.rs index 77c4ad32cc95..e5c070bf77d9 100644 --- a/crates/rust-analyzer/src/handlers/request.rs +++ b/crates/rust-analyzer/src/handlers/request.rs @@ -219,14 +219,28 @@ pub(crate) fn handle_run_test( .unwrap_or_default(), None => "".to_owned(), }; - let handle = if lca.is_empty() { - flycheck::CargoTestHandle::new(None, state.config.cargo_test_options()) + let test_path = if lca.is_empty() { + None } else if let Some((_, path)) = lca.split_once("::") { - flycheck::CargoTestHandle::new(Some(path), state.config.cargo_test_options()) + Some(path) } else { - flycheck::CargoTestHandle::new(None, state.config.cargo_test_options()) + None }; - state.test_run_session = Some(handle?); + let mut handles = vec![]; + for ws in &*state.workspaces { + if let ProjectWorkspace::Cargo { cargo, .. } = ws { + let handle = flycheck::CargoTestHandle::new( + test_path, + state.config.cargo_test_options(), + cargo.workspace_root(), + state.test_run_sender.clone(), + )?; + handles.push(handle); + } + } + // Each process send finished signal twice, once for stdout and once for stderr + state.test_run_remaining_jobs = 2 * handles.len(); + state.test_run_session = Some(handles); Ok(()) } diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index 9459bd7c5d6c..ac270bc3c58c 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs @@ -7,7 +7,7 @@ use std::{ }; use always_assert::always; -use crossbeam_channel::{never, select, Receiver}; +use crossbeam_channel::{select, Receiver}; use ide_db::base_db::{SourceDatabase, SourceDatabaseExt, VfsPath}; use lsp_server::{Connection, Notification, Request}; use lsp_types::{notification::Notification as _, TextDocumentIdentifier}; @@ -220,7 +220,7 @@ impl GlobalState { recv(self.flycheck_receiver) -> task => Some(Event::Flycheck(task.unwrap())), - recv(self.test_run_session.as_ref().map(|s| s.receiver()).unwrap_or(&never())) -> task => + recv(self.test_run_receiver) -> task => Some(Event::TestResult(task.unwrap())), } @@ -337,9 +337,7 @@ impl GlobalState { .entered(); self.handle_cargo_test_msg(message); // Coalesce many test result event into a single loop turn - while let Some(message) = - self.test_run_session.as_ref().and_then(|r| r.receiver().try_recv().ok()) - { + while let Ok(message) = self.test_run_receiver.try_recv() { self.handle_cargo_test_msg(message); } } @@ -792,8 +790,11 @@ impl GlobalState { } flycheck::CargoTestMessage::Suite => (), flycheck::CargoTestMessage::Finished => { - self.send_notification::(()); - self.test_run_session = None; + self.test_run_remaining_jobs = self.test_run_remaining_jobs.saturating_sub(1); + if self.test_run_remaining_jobs == 0 { + self.send_notification::(()); + self.test_run_session = None; + } } flycheck::CargoTestMessage::Custom { text } => { self.send_notification::(text);