From adaa4ef190f7de41715f1c08bbc764ac7eb4df23 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 27 Sep 2017 09:14:51 -0700 Subject: [PATCH] Add jobserver support to sccache This commit alters the main sccache server to operate and orchestrate its own GNU make style jobserver. This is primarily intended for interoperation with rustc itself. The Rust compiler currently has a multithreaded mode where it will execute code generation and optimization on the LLVM side of things in parallel. This parallelism, however, can overload a machine quickly if not properly accounted for (e.g. if 10 rustcs all spawn 10 threads...). The usage of a GNU make style jobserver is intended to arbitrate and rate limit all these rustc instances to ensure that one build's maximal parallelism never exceeds a particular amount. Currently for Rust Cargo is the primary driver for setting up a jobserver. Cargo will create this and manage this per compilation, ensuring that any one `cargo build` invocation never exceeds a maximal parallelism. When sccache enters the picture, however, the story gets slightly more odd. The jobserver implementation on Unix relies on inheritance of file descriptors in spawned processes. With sccache, however, there's no inheritance as the actual rustc invocation is spawned by the server, not the client. In this case the env vars used to configure the jobsever are usually incorrect. To handle this problem this commit bakes a jobserver directly into sccache itself. The jobserver then overrides whatever jobserver the client has configured in its own env vars to ensure correct operation. The settings of each jobserver may be misconfigured (there's no way to configure sccache's jobserver right now), but hopefully that's not too much of a problem for the forseeable future. The implementation here was to provide a thin wrapper around the `jobserver` crate with a futures-based interface. This interface was then hooked into the mock command infrastructure to automatically acquire a jobserver token when spawning a process and automatically drop the token when the process exits. Additionally, all spawned processes will now automatically receive a configured jobserver. cc rust-lang/rust#42867, the original motivation for this commit --- Cargo.lock | 12 +++ Cargo.toml | 2 + README.md | 10 +++ src/commands.rs | 4 +- src/compiler/compiler.rs | 44 ++++------- src/jobserver.rs | 71 ++++++++++++++++++ src/main.rs | 2 + src/mock_command.rs | 158 +++++++++++++++++++++++++-------------- src/server.rs | 15 +++- src/test/tests.rs | 12 +-- src/test/utils.rs | 10 ++- src/util.rs | 6 +- 12 files changed, 243 insertions(+), 103 deletions(-) create mode 100644 src/jobserver.rs diff --git a/Cargo.lock b/Cargo.lock index dcb06d044..ddeac2e1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -455,6 +455,15 @@ name = "itoa" version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "jobserver" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "jsonwebtoken" version = "4.0.0" @@ -959,6 +968,7 @@ dependencies = [ "hyper 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", "hyper-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)", + "jobserver 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "jsonwebtoken 4.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)", @@ -967,6 +977,7 @@ dependencies = [ "lru-disk-cache 0.1.0", "mio-named-pipes 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "native-tls 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "number_prefix 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "openssl 0.9.23 (registry+https://github.com/rust-lang/crates.io-index)", "redis 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1594,6 +1605,7 @@ dependencies = [ "checksum iovec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b6e8b9c2247fcf6c6a1151f1156932be5606c9fd6f55a2d7f9fc1cb29386b2f7" "checksum itertools 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a3f195c9c60b8d747d73357694c3ebcaeb229ea574bd92999260954f851d59c2" "checksum itoa 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8324a32baf01e2ae060e9de58ed0bc2320c9a2833491ee36cd3b4c414de4db8c" +"checksum jobserver 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "443ae8bc0af6c106e6e8b77e04684faecc1a5ce94e058f4c2b0a037b0ea1b133" "checksum jsonwebtoken 4.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "50ff506fbf25106dea8f97be2f9b43d67b7f02a4b5ced6b1ff3543dd08cecee0" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" diff --git a/Cargo.toml b/Cargo.toml index 308975c68..3d163f48c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,12 +28,14 @@ futures = "0.1.11" futures-cpupool = "0.1" hyper = { version = "0.11", optional = true } hyper-tls = { version = "0.1", optional = true } +jobserver = "0.1" jsonwebtoken = { version = "4.0", optional = true } libc = "0.2.10" local-encoding = "0.2.0" log = "0.3.6" lru-disk-cache = { path = "lru-disk-cache", version = "0.1.0" } native-tls = "0.1" +num_cpus = "1.0" number_prefix = "0.2.5" openssl = { version = "0.9", optional = true } redis = { version = "0.8.0", optional = true } diff --git a/README.md b/README.md index ae0c0d7c0..e0939d99a 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Table of Contents (ToC) * [Usage](#usage) * [Storage Options](#storage-options) * [Debugging](#debugging) +* [Interaction with GNU `make` jobserver](#interaction-with-gnu-make-jobserver) * [Known Caveats](#known-caveats) --- @@ -94,6 +95,15 @@ You can set the `SCCACHE_ERROR_LOG` environment variable to a path to cause the --- +Interaction with GNU `make` jobserver +------------------------------------- + +Sccache provides support for a [GNU make jobserver](https://www.gnu.org/software/make/manual/html_node/Job-Slots.html). When the server is started from a process that provides a jobserver, sccache will use that jobserver and provide it to any processes it spawns. (If you are running sccache from a GNU make recipe, you will need to prefix the command with `+` to get this behavior.) If the sccache server is started without a jobserver present it will create its own with the number of slots equal to the number of available CPU cores. + +This is most useful when using sccache for Rust compilation, as rustc supports using a jobserver for parallel codegen, so this ensures that rustc will not overwhelm the system with codegen tasks. Cargo implements its own jobserver ([see the information on `NUM_JOBS` in the cargo documentation](https://doc.rust-lang.org/stable/cargo/reference/environment-variables.html#environment-variables-cargo-sets-for-build-scripts)) for rustc to use, so using sccache for Rust compilation in cargo via `RUSTC_WRAPPER` should do the right thing automatically. + +--- + Known caveats ------------- diff --git a/src/commands.rs b/src/commands.rs index d5d5e08ed..23ba817df 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -18,6 +18,7 @@ use client::{ ServerConnection, }; use cmdline::{Command, StatsFormat}; +use jobserver::Client; use log::LogLevel::Trace; use mock_command::{ CommandCreatorSync, @@ -601,9 +602,10 @@ pub fn run_command(cmd: Command) -> Result { } Command::Compile { exe, cmdline, cwd, env_vars } => { trace!("Command::Compile {{ {:?}, {:?}, {:?} }}", exe, cmdline, cwd); + let jobserver = unsafe { Client::new() }; let conn = connect_or_start_server(get_port())?; let mut core = Core::new()?; - let res = do_compile(ProcessCommandCreator::new(&core.handle()), + let res = do_compile(ProcessCommandCreator::new(&core.handle(), &jobserver), &mut core, conn, exe.as_ref(), diff --git a/src/compiler/compiler.rs b/src/compiler/compiler.rs index 275d6cd76..c9d79aa7a 100644 --- a/src/compiler/compiler.rs +++ b/src/compiler/compiler.rs @@ -470,14 +470,13 @@ fn detect_compiler(creator: &T, executable: &Path, pool: &CpuPool) }; let is_rustc = if filename.to_string_lossy().to_lowercase() == "rustc" { // Sanity check that it's really rustc. + let executable = executable.to_path_buf(); let child = creator.clone().new_command_sync(&executable) .stdout(Stdio::piped()) .stderr(Stdio::null()) .args(&["--version"]) - .spawn().chain_err(|| { - format!("failed to execute {:?}", executable) - }); - let output = child.into_future().and_then(move |child| { + .spawn(); + let output = child.and_then(move |child| { child.wait_with_output() .chain_err(|| "failed to read child output") }); @@ -530,10 +529,7 @@ gcc let output = write.and_then(move |(tempdir, src)| { cmd.arg("-E").arg(src); trace!("compiler {:?}", cmd); - let child = cmd.spawn().chain_err(|| { - format!("failed to execute {:?}", cmd) - }); - child.into_future().and_then(|child| { + cmd.spawn().and_then(|child| { child.wait_with_output().chain_err(|| "failed to read child output") }).map(|e| { drop(tempdir); @@ -724,11 +720,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); let cwd = f.tempdir.path(); let arguments = ovec!["-c", "foo.c", "-o", "foo.o"]; @@ -805,11 +799,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); let cwd = f.tempdir.path(); let arguments = ovec!["-c", "foo.c", "-o", "foo.o"]; @@ -887,11 +879,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); let cwd = f.tempdir.path(); let arguments = ovec!["-c", "foo.c", "-o", "foo.o"]; @@ -954,11 +944,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); } let cwd = f.tempdir.path(); diff --git a/src/jobserver.rs b/src/jobserver.rs new file mode 100644 index 000000000..b3ce0ac55 --- /dev/null +++ b/src/jobserver.rs @@ -0,0 +1,71 @@ +extern crate jobserver; + +use std::io; +use std::process::Command; +use std::sync::Arc; + +use futures::prelude::*; +use futures::sync::mpsc; +use futures::sync::oneshot; +use num_cpus; + +use errors::*; + +pub use self::jobserver::Acquired; + +#[derive(Clone)] +pub struct Client { + helper: Arc, + inner: jobserver::Client, + tx: mpsc::UnboundedSender>> +} + +impl Client { + // unsafe because `from_env` is unsafe (can use the wrong fds) + pub unsafe fn new() -> Client { + match jobserver::Client::from_env() { + Some(c) => Client::_new(c), + None => Client::new_num(num_cpus::get()), + } + } + + pub fn new_num(num: usize) -> Client { + let inner = jobserver::Client::new(num) + .expect("failed to create jobserver"); + Client::_new(inner) + } + + fn _new(inner: jobserver::Client) -> Client { + let (tx, rx) = mpsc::unbounded::>(); + let mut rx = rx.wait(); + let helper = inner.clone().into_helper_thread(move |token| { + if let Some(Ok(sender)) = rx.next() { + drop(sender.send(token)); + } + }).expect("failed to spawn helper thread"); + + Client { + inner: inner, + helper: Arc::new(helper), + tx: tx, + } + } + + /// Configures this jobserver to be inherited by the specified command + pub fn configure(&self, cmd: &mut Command) { + self.inner.configure(cmd) + } + + /// Returns a future that represents an acquired jobserver token. + /// + /// This should be invoked before any "work" is spawend (for whatever the + /// defnition of "work" is) to ensure that the system is properly + /// rate-limiting itself. + pub fn acquire(&self) -> SFuture { + let (tx, rx) = oneshot::channel(); + self.helper.request_token(); + self.tx.unbounded_send(tx).unwrap(); + Box::new(rx.chain_err(|| "jobserver helper panicked") + .and_then(|t| t.chain_err(|| "failed to acquire jobserver token"))) + } +} diff --git a/src/main.rs b/src/main.rs index e26d65e5a..1d780c77a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,6 +52,7 @@ extern crate libc; #[cfg(windows)] extern crate mio_named_pipes; extern crate native_tls; +extern crate num_cpus; extern crate number_prefix; #[cfg(feature = "openssl")] extern crate openssl; @@ -93,6 +94,7 @@ mod client; mod cmdline; mod commands; mod compiler; +mod jobserver; mod mock_command; mod protocol; mod server; diff --git a/src/mock_command.rs b/src/mock_command.rs index 9e898d235..b282424fa 100644 --- a/src/mock_command.rs +++ b/src/mock_command.rs @@ -47,7 +47,9 @@ #[cfg(unix)] use libc; +use errors::*; use futures::future::{self, Future}; +use jobserver::{Acquired, Client}; use std::boxed::Box; use std::ffi::{OsStr, OsString}; use std::fmt; @@ -61,7 +63,7 @@ use std::process::{ }; use std::sync::{Arc,Mutex}; use tokio_process::{ - Child, + self, ChildStderr, ChildStdin, ChildStdout, @@ -120,7 +122,7 @@ pub trait RunCommand: fmt::Debug { /// Set the process' stderr from `cfg`. fn stderr(&mut self, cfg: Stdio) -> &mut Self; /// Execute the process and return a process object. - fn spawn(&mut self) -> io::Result; + fn spawn(&mut self) -> SFuture; } /// A trait that provides a means to create objects implementing `RunCommand`. @@ -133,7 +135,7 @@ pub trait CommandCreator { type Cmd: RunCommand; /// Create a new instance of this type. - fn new(handle: &Handle) -> Self; + fn new(handle: &Handle, client: &Client) -> Self; /// Create a new object that implements `RunCommand` that can be used /// to create a new process. fn new_command>(&mut self, program: S) -> Self::Cmd; @@ -143,42 +145,63 @@ pub trait CommandCreator { pub trait CommandCreatorSync: Clone + 'static { type Cmd: RunCommand; - fn new(handle: &Handle) -> Self; + fn new(handle: &Handle, client: &Client) -> Self; fn new_command_sync>(&mut self, program: S) -> Self::Cmd; } +pub struct Child { + inner: tokio_process::Child, + token: Acquired, +} + /// Trivial implementation of `CommandChild` for `std::process::Child`. impl CommandChild for Child { type I = ChildStdin; type O = ChildStdout; type E = ChildStderr; - fn take_stdin(&mut self) -> Option { self.stdin().take() } - fn take_stdout(&mut self) -> Option { self.stdout().take() } - fn take_stderr(&mut self) -> Option { self.stderr().take() } + fn take_stdin(&mut self) -> Option { self.inner.stdin().take() } + fn take_stdout(&mut self) -> Option { self.inner.stdout().take() } + fn take_stderr(&mut self) -> Option { self.inner.stderr().take() } fn wait(self) -> Box> { - Box::new(self) + let Child { inner, token } = self; + Box::new(inner.map(|ret| { + drop(token); + ret + })) } fn wait_with_output(self) -> Box> { - Box::new(self.wait_with_output()) + let Child { inner, token } = self; + Box::new(inner.wait_with_output().map(|ret| { + drop(token); + ret + })) } } pub struct AsyncCommand { - inner: Command, + inner: Option, handle: Handle, + jobserver: Client, } impl AsyncCommand { - pub fn new>(program: S, handle: Handle) -> AsyncCommand { + pub fn new>(program: S, + handle: Handle, + jobserver: Client) -> AsyncCommand { AsyncCommand { - inner: Command::new(program), + inner: Some(Command::new(program)), handle: handle, + jobserver: jobserver, } } + + fn inner(&mut self) -> &mut Command { + self.inner.as_mut().expect("can't reuse commands") + } } /// Trivial implementation of `RunCommand` for `std::process::Command`. @@ -186,18 +209,18 @@ impl RunCommand for AsyncCommand { type C = Child; fn arg>(&mut self, arg: S) -> &mut AsyncCommand { - self.inner.arg(arg); + self.inner().arg(arg); self } fn args>(&mut self, args: &[S]) -> &mut AsyncCommand { - self.inner.args(args); + self.inner().args(args); self } fn env(&mut self, key: K, val: V) -> &mut AsyncCommand where K: AsRef, V: AsRef, { - self.inner.env(key, val); + self.inner().env(key, val); self } fn envs(&mut self, vars: I) -> &mut Self @@ -206,16 +229,16 @@ impl RunCommand for AsyncCommand { //TODO: when Command::envs stabilizes, use that: // https://github.com/rust-lang/rust/issues/38526 for (k, v) in vars { - self.inner.env(k, v); + self.inner().env(k, v); } self } fn env_clear(&mut self) -> &mut AsyncCommand { - self.inner.env_clear(); + self.inner().env_clear(); self } fn current_dir>(&mut self, dir: P) -> &mut AsyncCommand { - self.inner.current_dir(dir); + self.inner().current_dir(dir); self } @@ -223,7 +246,7 @@ impl RunCommand for AsyncCommand { fn no_console(&mut self) -> &mut AsyncCommand { use std::os::windows::process::CommandExt; const CREATE_NO_WINDOW: u32 = 0x08000000; - self.inner.creation_flags(CREATE_NO_WINDOW); + self.inner().creation_flags(CREATE_NO_WINDOW); self } @@ -233,19 +256,30 @@ impl RunCommand for AsyncCommand { } fn stdin(&mut self, cfg: Stdio) -> &mut AsyncCommand { - self.inner.stdin(cfg); + self.inner().stdin(cfg); self } fn stdout(&mut self, cfg: Stdio) -> &mut AsyncCommand { - self.inner.stdout(cfg); + self.inner().stdout(cfg); self } fn stderr(&mut self, cfg: Stdio) -> &mut AsyncCommand { - self.inner.stderr(cfg); + self.inner().stderr(cfg); self } - fn spawn(&mut self) -> io::Result { - self.inner.spawn_async(&self.handle) + fn spawn(&mut self) -> SFuture { + let mut inner = self.inner.take().unwrap(); + inner.env_remove("MAKEFLAGS"); + inner.env_remove("MFLAGS"); + inner.env_remove("CARGO_MAKEFLAGS"); + self.jobserver.configure(&mut inner); + let handle = self.handle.clone(); + Box::new(self.jobserver.acquire().and_then(move |token| { + let child = inner.spawn_async(&handle).chain_err(|| { + format!("failed to spawn {:?}", inner) + })?; + Ok(Child { inner: child, token: token }) + })) } } @@ -259,20 +293,22 @@ impl fmt::Debug for AsyncCommand { #[derive(Clone)] pub struct ProcessCommandCreator { handle: Handle, + jobserver: Client, } /// Trivial implementation of `CommandCreator` for `ProcessCommandCreator`. impl CommandCreator for ProcessCommandCreator { type Cmd = AsyncCommand; - fn new(handle: &Handle) -> ProcessCommandCreator { + fn new(handle: &Handle, client: &Client) -> ProcessCommandCreator { ProcessCommandCreator { handle: handle.clone(), + jobserver: client.clone(), } } fn new_command>(&mut self, program: S) -> AsyncCommand { - AsyncCommand::new(program, self.handle.clone()) + AsyncCommand::new(program, self.handle.clone(), self.jobserver.clone()) } } @@ -280,8 +316,8 @@ impl CommandCreator for ProcessCommandCreator { impl CommandCreatorSync for ProcessCommandCreator { type Cmd = AsyncCommand; - fn new(handle: &Handle) -> ProcessCommandCreator { - CommandCreator::new(handle) + fn new(handle: &Handle, client: &Client) -> ProcessCommandCreator { + CommandCreator::new(handle, client) } fn new_command_sync>(&mut self, program: S) -> AsyncCommand { @@ -375,8 +411,8 @@ impl CommandChild for MockChild { } pub enum ChildOrCall { - Child(io::Result), - Call(Box io::Result + Send>), + Child(Result), + Call(Box Result + Send>), } impl fmt::Debug for ChildOrCall { @@ -437,10 +473,10 @@ impl RunCommand for MockCommand { fn stderr(&mut self, _cfg: Stdio) -> &mut MockCommand { self } - fn spawn(&mut self) -> io::Result { + fn spawn(&mut self) -> SFuture { match self.child.take().unwrap() { - ChildOrCall::Child(c) => c, - ChildOrCall::Call(f) => f(&self.args), + ChildOrCall::Child(c) => Box::new(future::result(c)), + ChildOrCall::Call(f) => Box::new(future::result(f(&self.args))), } } } @@ -455,7 +491,7 @@ pub struct MockCommandCreator { impl MockCommandCreator { /// The next `MockCommand` created will return `child` from `RunCommand::spawn`. #[allow(dead_code)] - pub fn next_command_spawns(&mut self, child: io::Result) { + pub fn next_command_spawns(&mut self, child: Result) { self.children.push(ChildOrCall::Child(child)); } @@ -463,7 +499,7 @@ impl MockCommandCreator { /// arguments passed to the command. #[allow(dead_code)] pub fn next_command_calls(&mut self, call: C) - where C: Fn(&[OsString]) -> io::Result + Send + 'static, + where C: Fn(&[OsString]) -> Result + Send + 'static, { self.children.push(ChildOrCall::Call(Box::new(call))); } @@ -472,7 +508,7 @@ impl MockCommandCreator { impl CommandCreator for MockCommandCreator { type Cmd = MockCommand; - fn new(_handle: &Handle) -> MockCommandCreator { + fn new(_handle: &Handle, _client: &Client) -> MockCommandCreator { MockCommandCreator { children: Vec::new(), } @@ -492,8 +528,8 @@ impl CommandCreator for MockCommandCreator { impl CommandCreatorSync for Arc> { type Cmd = T::Cmd; - fn new(handle: &Handle) -> Arc> { - Arc::new(Mutex::new(T::new(handle))) + fn new(handle: &Handle, client: &Client) -> Arc> { + Arc::new(Mutex::new(T::new(handle, client))) } fn new_command_sync>(&mut self, program: S) -> T::Cmd { @@ -507,6 +543,7 @@ mod test { use std::error::Error; use std::ffi::OsStr; use std::io; + use jobserver::Client; use futures::Future; use std::process::{ ExitStatus, @@ -517,24 +554,24 @@ mod test { use test::utils::*; use tokio_core::reactor::Core; - fn spawn_command>(creator : &mut T, program: S) -> io::Result<<::Cmd as RunCommand>::C> { - creator.new_command(program).spawn() + fn spawn_command>(creator : &mut T, program: S) -> Result<<::Cmd as RunCommand>::C> { + creator.new_command(program).spawn().wait() } - fn spawn_wait_command>(creator : &mut T, program: S) -> io::Result { - spawn_command(creator, program).and_then(|c| c.wait().wait()) + fn spawn_wait_command>(creator : &mut T, program: S) -> Result { + Ok(spawn_command(creator, program)?.wait().wait()?) } - fn spawn_output_command>(creator : &mut T, program: S) -> io::Result { - spawn_command(creator, program).and_then(|c| c.wait_with_output().wait()) + fn spawn_output_command>(creator : &mut T, program: S) -> Result { + Ok(spawn_command(creator, program)?.wait_with_output().wait()?) } fn spawn_on_thread(mut t : T, really : bool) -> ExitStatus { thread::spawn(move || { if really { - t.new_command_sync("foo").spawn().and_then(|c| { - c.wait().wait() - }).unwrap() + t.new_command_sync("foo") + .spawn().wait().unwrap() + .wait().wait().unwrap() } else { exit_status(1) } @@ -544,7 +581,8 @@ mod test { #[test] fn test_mock_command_wait() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_spawns(Ok(MockChild::new(exit_status(0), "hello", "error"))); assert_eq!(0, spawn_wait_command(&mut creator, "foo").unwrap().code().unwrap()); } @@ -555,14 +593,16 @@ mod test { // If next_command_spawns hasn't been called enough times, // new_command should panic. let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); - creator.new_command("foo").spawn().unwrap(); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); + creator.new_command("foo").spawn().wait().unwrap(); } #[test] fn test_mock_command_output() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_spawns(Ok(MockChild::new(exit_status(0), "hello", "error"))); let output = spawn_output_command(&mut creator, "foo").unwrap(); assert_eq!(0, output.status.code().unwrap()); @@ -573,7 +613,8 @@ mod test { #[test] fn test_mock_command_calls() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_calls(|_| { Ok(MockChild::new(exit_status(0), "hello", "error")) }); @@ -586,27 +627,28 @@ mod test { #[test] fn test_mock_spawn_error() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); - creator.next_command_spawns(Err(io::Error::new(io::ErrorKind::Other, "error"))); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); + creator.next_command_spawns(Err("error".into())); let e = spawn_command(&mut creator, "foo").err().unwrap(); - assert_eq!(io::ErrorKind::Other, e.kind()); assert_eq!("error", e.description()); } #[test] fn test_mock_wait_error() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_spawns(Ok(MockChild::with_error(io::Error::new(io::ErrorKind::Other, "error")))); let e = spawn_wait_command(&mut creator, "foo").err().unwrap(); - assert_eq!(io::ErrorKind::Other, e.kind()); assert_eq!("error", e.description()); } #[test] fn test_mock_command_sync() { let core = Core::new().unwrap(); - let creator = Arc::new(Mutex::new(MockCommandCreator::new(&core.handle()))); + let client = Client::new_num(1); + let creator = Arc::new(Mutex::new(MockCommandCreator::new(&core.handle(), &client))); next_command(&creator, Ok(MockChild::new(exit_status(0), "hello", "error"))); assert_eq!(exit_status(0), spawn_on_thread(creator.clone(), true)); } diff --git a/src/server.rs b/src/server.rs index 9b86eac23..816665c5b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -31,6 +31,7 @@ use futures::sync::mpsc; use futures::task::{self, Task}; use futures::{Stream, Sink, Async, AsyncSink, Poll, StartSend, Future}; use futures_cpupool::CpuPool; +use jobserver::Client; use mock_command::{ CommandCreatorSync, ProcessCommandCreator, @@ -121,10 +122,11 @@ fn get_signal(_status: ExitStatus) -> i32 { /// requests a shutdown. pub fn start_server(port: u16) -> Result<()> { trace!("start_server"); + let client = unsafe { Client::new() }; let core = Core::new()?; let pool = CpuPool::new(20); let storage = storage_from_environment(&pool, &core.handle()); - let res = SccacheServer::::new(port, pool, core, storage); + let res = SccacheServer::::new(port, pool, core, client, storage); let notify = env::var_os("SCCACHE_STARTUP_NOTIFY"); match res { Ok(srv) => { @@ -152,6 +154,7 @@ impl SccacheServer { pub fn new(port: u16, pool: CpuPool, core: Core, + client: Client, storage: Arc) -> Result> { let handle = core.handle(); let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port); @@ -161,7 +164,12 @@ impl SccacheServer { // connections. let (tx, rx) = mpsc::channel(1); let (wait, info) = WaitUntilZero::new(); - let service = SccacheService::new(storage, core.handle(), pool, tx, info); + let service = SccacheService::new(storage, + core.handle(), + &client, + pool, + tx, + info); Ok(SccacheServer { core: core, @@ -389,6 +397,7 @@ impl SccacheService { pub fn new(storage: Arc, handle: Handle, + client: &Client, pool: CpuPool, tx: mpsc::Sender, info: ActiveInfo) -> SccacheService { @@ -397,7 +406,7 @@ impl SccacheService storage: storage, compilers: Rc::new(RefCell::new(HashMap::new())), pool: pool, - creator: C::new(&handle), + creator: C::new(&handle, client), handle: handle, tx: tx, info: info, diff --git a/src/test/tests.rs b/src/test/tests.rs index 099644dad..0418b4dec 100644 --- a/src/test/tests.rs +++ b/src/test/tests.rs @@ -24,6 +24,7 @@ use ::commands::{ use env_logger; use futures::sync::oneshot::{self, Sender}; use futures_cpupool::CpuPool; +use jobserver::Client; use ::mock_command::*; use ::server::{ ServerMessage, @@ -80,7 +81,8 @@ fn run_server_thread(cache_dir: &Path, options: T) let (shutdown_tx, shutdown_rx) = oneshot::channel(); let handle = thread::spawn(move || { let core = Core::new().unwrap(); - let srv = SccacheServer::new(0, pool, core, storage).unwrap(); + let client = unsafe { Client::new() }; + let srv = SccacheServer::new(0, pool, core, client, storage).unwrap(); let mut srv: SccacheServer>> = srv; assert!(srv.port() > 0); if let Some(options) = options { @@ -202,11 +204,9 @@ fn test_server_compile() { let obj = f.tempdir.path().join("file.o"); c.next_command_calls(move |_| { // Pretend to compile something. - match File::create(&obj) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), STDOUT, STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&obj)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), STDOUT, STDERR)) }); } // Ask the server to compile something. diff --git a/src/test/utils.rs b/src/test/utils.rs index 33d3ef172..122289daf 100644 --- a/src/test/utils.rs +++ b/src/test/utils.rs @@ -26,6 +26,9 @@ use std::sync::{Arc,Mutex}; use tempdir::TempDir; use tokio_core::reactor::Core; +use jobserver::Client; +use errors::*; + /// Return a `Vec` with each listed entry converted to an owned `String`. macro_rules! stringvec { ( $( $x:expr ),* ) => { @@ -69,15 +72,16 @@ macro_rules! assert_map_contains { pub fn new_creator() -> Arc> { let core = Core::new().unwrap(); - Arc::new(Mutex::new(MockCommandCreator::new(&core.handle()))) + let client = unsafe { Client::new() }; + Arc::new(Mutex::new(MockCommandCreator::new(&core.handle(), &client))) } pub fn next_command(creator : &Arc>, - child: io::Result) { + child: Result) { creator.lock().unwrap().next_command_spawns(child); } -pub fn next_command_calls io::Result + Send + 'static>(creator: &Arc>, call: C) { +pub fn next_command_calls Result + Send + 'static>(creator: &Arc>, call: C) { creator.lock().unwrap().next_command_calls(call); } diff --git a/src/util.rs b/src/util.rs index 8f2a6cd4d..b028bd17a 100644 --- a/src/util.rs +++ b/src/util.rs @@ -13,7 +13,6 @@ // limitations under the License. use futures::Future; -use futures::future; use futures_cpupool::CpuPool; use mock_command::{CommandChild, RunCommand}; use ring::digest::{SHA512, Context}; @@ -141,10 +140,9 @@ pub fn run_input_output(mut command: C, input: Option>) .stdin(if input.is_some() { Stdio::piped() } else { Stdio::inherit() }) .stdout(Stdio::piped()) .stderr(Stdio::piped()) - .spawn() - .chain_err(|| "failed to spawn child"); + .spawn(); - Box::new(future::result(child) + Box::new(child .and_then(|child| { wait_with_input_output(child, input).and_then(|output| { if output.status.success() {