From 7c5639cf749ed2cf3d06965118b99c955cea064d Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 27 Sep 2017 09:14:51 -0700 Subject: [PATCH] Add jobserver support to sccache This commit alters the main sccache server to operate and orchestrate its own GNU make style jobserver. This is primarily intended for interoperation with rustc itself. The Rust compiler currently has a multithreaded mode where it will execute code generation and optimization on the LLVM side of things in parallel. This parallelism, however, can overload a machine quickly if not properly accounted for (e.g. if 10 rustcs all spawn 10 threads...). The usage of a GNU make style jobserver is intended to arbitrate and rate limit all these rustc instances to ensure that one build's maximal parallelism never exceeds a particular amount. Currently for Rust Cargo is the primary driver for setting up a jobserver. Cargo will create this and manage this per compilation, ensuring that any one `cargo build` invocation never exceeds a maximal parallelism. When sccache enters the picture, however, the story gets slightly more odd. The jobserver implementation on Unix relies on inheritance of file descriptors in spawned processes. With sccache, however, there's no inheritance as the actual rustc invocation is spawned by the server, not the client. In this case the env vars used to configure the jobsever are usually incorrect. To handle this problem this commit bakes a jobserver directly into sccache itself. The jobserver then overrides whatever jobserver the client has configured in its own env vars to ensure correct operation. The settings of each jobserver may be misconfigured (there's no way to configure sccache's jobserver right now), but hopefully that's not too much of a problem for the forseeable future. The implementation here was to provide a thin wrapper around the `jobserver` crate with a futures-based interface. This interface was then hooked into the mock command infrastructure to automatically acquire a jobserver token when spawning a process and automatically drop the token when the process exits. Additionally, all spawned processes will now automatically receive a configured jobserver. cc rust-lang/rust#42867, the original motivation for this commit --- Cargo.lock | 12 +++ Cargo.toml | 2 + src/cache/disk.rs | 9 +-- src/commands.rs | 4 +- src/compiler/compiler.rs | 44 ++++------ src/jobserver.rs | 71 ++++++++++++++++ src/main.rs | 2 + src/mock_command.rs | 162 +++++++++++++++++++++++-------------- src/server.rs | 15 +++- src/simples3/credential.rs | 4 +- src/test/tests.rs | 8 +- src/test/utils.rs | 10 ++- src/util.rs | 6 +- 13 files changed, 238 insertions(+), 111 deletions(-) create mode 100644 src/jobserver.rs diff --git a/Cargo.lock b/Cargo.lock index ad60376e6..6761f252b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,6 +19,7 @@ dependencies = [ "hyper 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "hyper-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "jobserver 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "jsonwebtoken 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)", @@ -27,6 +28,7 @@ dependencies = [ "lru-disk-cache 0.1.0", "mio-named-pipes 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "native-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "number_prefix 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "openssl 0.9.13 (registry+https://github.com/rust-lang/crates.io-index)", "redis 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -414,6 +416,15 @@ name = "itoa" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "jobserver" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "jsonwebtoken" version = "2.0.1" @@ -1385,6 +1396,7 @@ dependencies = [ "checksum iovec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b6e8b9c2247fcf6c6a1151f1156932be5606c9fd6f55a2d7f9fc1cb29386b2f7" "checksum itertools 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "772a0928a97246167d59a2a4729df5871f1327ab8b36fd24c4224b229cb47b99" "checksum itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2f404fbc66fd9aac13e998248505e7ecb2ad8e44ab6388684c5fb11c6c251c" +"checksum jobserver 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "443ae8bc0af6c106e6e8b77e04684faecc1a5ce94e058f4c2b0a037b0ea1b133" "checksum jsonwebtoken 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ded1c69eb0de78a0abce9f7987dc832613abb168029868271e8cba843f45a3b3" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" diff --git a/Cargo.toml b/Cargo.toml index 2369b65d9..c91df8b49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,12 +28,14 @@ futures = "0.1.11" futures-cpupool = "0.1" hyper = { version = "0.11", optional = true } hyper-tls = { version = "0.1", optional = true } +jobserver = "0.1" jsonwebtoken = { version = "2.0", optional = true } libc = "0.2.10" local-encoding = "0.2.0" log = "0.3.6" lru-disk-cache = { path = "lru-disk-cache", version = "0.1.0" } native-tls = "0.1" +num_cpus = "1.0" number_prefix = "0.2.5" openssl = { version = "0.9", optional = true } redis = { version = "0.8.0", optional = true } diff --git a/src/cache/disk.rs b/src/cache/disk.rs index b5ee2de3b..a71c68b51 100644 --- a/src/cache/disk.rs +++ b/src/cache/disk.rs @@ -18,7 +18,6 @@ use cache::{ CacheWrite, Storage, }; -use futures::Future; use futures_cpupool::CpuPool; use lru_disk_cache::LruDiskCache; use lru_disk_cache::Error as LruError; @@ -62,7 +61,7 @@ impl Storage for DiskCache { let path = make_key_path(key); let lru = self.lru.clone(); let key = key.to_owned(); - self.pool.spawn_fn(move || { + Box::new(self.pool.spawn_fn(move || { let mut lru = lru.lock().unwrap(); let f = match lru.get(&path) { Ok(f) => f, @@ -78,7 +77,7 @@ impl Storage for DiskCache { }; let hit = CacheRead::from(f)?; Ok(Cache::Hit(hit)) - }).boxed() + })) } fn put(&self, key: &str, entry: CacheWrite) -> SFuture { @@ -87,12 +86,12 @@ impl Storage for DiskCache { trace!("DiskCache::finish_put({})", key); let lru = self.lru.clone(); let key = make_key_path(key); - self.pool.spawn_fn(move || { + Box::new(self.pool.spawn_fn(move || { let start = Instant::now(); let v = entry.finish()?; lru.lock().unwrap().insert_bytes(key, &v)?; Ok(start.elapsed()) - }).boxed() + })) } fn location(&self) -> String { diff --git a/src/commands.rs b/src/commands.rs index d5d5e08ed..23ba817df 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -18,6 +18,7 @@ use client::{ ServerConnection, }; use cmdline::{Command, StatsFormat}; +use jobserver::Client; use log::LogLevel::Trace; use mock_command::{ CommandCreatorSync, @@ -601,9 +602,10 @@ pub fn run_command(cmd: Command) -> Result { } Command::Compile { exe, cmdline, cwd, env_vars } => { trace!("Command::Compile {{ {:?}, {:?}, {:?} }}", exe, cmdline, cwd); + let jobserver = unsafe { Client::new() }; let conn = connect_or_start_server(get_port())?; let mut core = Core::new()?; - let res = do_compile(ProcessCommandCreator::new(&core.handle()), + let res = do_compile(ProcessCommandCreator::new(&core.handle(), &jobserver), &mut core, conn, exe.as_ref(), diff --git a/src/compiler/compiler.rs b/src/compiler/compiler.rs index 72c2c34ff..730049e41 100644 --- a/src/compiler/compiler.rs +++ b/src/compiler/compiler.rs @@ -470,14 +470,13 @@ fn detect_compiler(creator: &T, executable: &Path, pool: &CpuPool) }; let is_rustc = if filename.to_string_lossy().to_lowercase() == "rustc" { // Sanity check that it's really rustc. + let executable = executable.to_path_buf(); let child = creator.clone().new_command_sync(&executable) .stdout(Stdio::piped()) .stderr(Stdio::null()) .args(&["--version"]) - .spawn().chain_err(|| { - format!("failed to execute {:?}", executable) - }); - let output = child.into_future().and_then(move |child| { + .spawn(); + let output = child.and_then(move |child| { child.wait_with_output() .chain_err(|| "failed to read child output") }); @@ -530,10 +529,7 @@ gcc let output = write.and_then(move |(tempdir, src)| { cmd.arg("-E").arg(src); trace!("compiler {:?}", cmd); - let child = cmd.spawn().chain_err(|| { - format!("failed to execute {:?}", cmd) - }); - child.into_future().and_then(|child| { + cmd.spawn().and_then(|child| { child.wait_with_output().chain_err(|| "failed to read child output") }).map(|e| { drop(tempdir); @@ -724,11 +720,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); let cwd = f.tempdir.path(); let arguments = ovec!["-c", "foo.c", "-o", "foo.o"]; @@ -805,11 +799,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); let cwd = f.tempdir.path(); let arguments = ovec!["-c", "foo.c", "-o", "foo.o"]; @@ -887,11 +879,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); let cwd = f.tempdir.path(); let arguments = ovec!["-c", "foo.c", "-o", "foo.o"]; @@ -954,11 +944,9 @@ mod test { let o = obj.clone(); next_command_calls(&creator, move |_| { // Pretend to compile something. - match File::create(&o) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&o)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), COMPILER_STDOUT, COMPILER_STDERR)) }); } let cwd = f.tempdir.path(); diff --git a/src/jobserver.rs b/src/jobserver.rs new file mode 100644 index 000000000..b3ce0ac55 --- /dev/null +++ b/src/jobserver.rs @@ -0,0 +1,71 @@ +extern crate jobserver; + +use std::io; +use std::process::Command; +use std::sync::Arc; + +use futures::prelude::*; +use futures::sync::mpsc; +use futures::sync::oneshot; +use num_cpus; + +use errors::*; + +pub use self::jobserver::Acquired; + +#[derive(Clone)] +pub struct Client { + helper: Arc, + inner: jobserver::Client, + tx: mpsc::UnboundedSender>> +} + +impl Client { + // unsafe because `from_env` is unsafe (can use the wrong fds) + pub unsafe fn new() -> Client { + match jobserver::Client::from_env() { + Some(c) => Client::_new(c), + None => Client::new_num(num_cpus::get()), + } + } + + pub fn new_num(num: usize) -> Client { + let inner = jobserver::Client::new(num) + .expect("failed to create jobserver"); + Client::_new(inner) + } + + fn _new(inner: jobserver::Client) -> Client { + let (tx, rx) = mpsc::unbounded::>(); + let mut rx = rx.wait(); + let helper = inner.clone().into_helper_thread(move |token| { + if let Some(Ok(sender)) = rx.next() { + drop(sender.send(token)); + } + }).expect("failed to spawn helper thread"); + + Client { + inner: inner, + helper: Arc::new(helper), + tx: tx, + } + } + + /// Configures this jobserver to be inherited by the specified command + pub fn configure(&self, cmd: &mut Command) { + self.inner.configure(cmd) + } + + /// Returns a future that represents an acquired jobserver token. + /// + /// This should be invoked before any "work" is spawend (for whatever the + /// defnition of "work" is) to ensure that the system is properly + /// rate-limiting itself. + pub fn acquire(&self) -> SFuture { + let (tx, rx) = oneshot::channel(); + self.helper.request_token(); + self.tx.unbounded_send(tx).unwrap(); + Box::new(rx.chain_err(|| "jobserver helper panicked") + .and_then(|t| t.chain_err(|| "failed to acquire jobserver token"))) + } +} diff --git a/src/main.rs b/src/main.rs index e26d65e5a..1d780c77a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,6 +52,7 @@ extern crate libc; #[cfg(windows)] extern crate mio_named_pipes; extern crate native_tls; +extern crate num_cpus; extern crate number_prefix; #[cfg(feature = "openssl")] extern crate openssl; @@ -93,6 +94,7 @@ mod client; mod cmdline; mod commands; mod compiler; +mod jobserver; mod mock_command; mod protocol; mod server; diff --git a/src/mock_command.rs b/src/mock_command.rs index a131594c5..b282424fa 100644 --- a/src/mock_command.rs +++ b/src/mock_command.rs @@ -47,7 +47,9 @@ #[cfg(unix)] use libc; +use errors::*; use futures::future::{self, Future}; +use jobserver::{Acquired, Client}; use std::boxed::Box; use std::ffi::{OsStr, OsString}; use std::fmt; @@ -61,7 +63,7 @@ use std::process::{ }; use std::sync::{Arc,Mutex}; use tokio_process::{ - Child, + self, ChildStderr, ChildStdin, ChildStdout, @@ -120,7 +122,7 @@ pub trait RunCommand: fmt::Debug { /// Set the process' stderr from `cfg`. fn stderr(&mut self, cfg: Stdio) -> &mut Self; /// Execute the process and return a process object. - fn spawn(&mut self) -> io::Result; + fn spawn(&mut self) -> SFuture; } /// A trait that provides a means to create objects implementing `RunCommand`. @@ -133,7 +135,7 @@ pub trait CommandCreator { type Cmd: RunCommand; /// Create a new instance of this type. - fn new(handle: &Handle) -> Self; + fn new(handle: &Handle, client: &Client) -> Self; /// Create a new object that implements `RunCommand` that can be used /// to create a new process. fn new_command>(&mut self, program: S) -> Self::Cmd; @@ -143,42 +145,63 @@ pub trait CommandCreator { pub trait CommandCreatorSync: Clone + 'static { type Cmd: RunCommand; - fn new(handle: &Handle) -> Self; + fn new(handle: &Handle, client: &Client) -> Self; fn new_command_sync>(&mut self, program: S) -> Self::Cmd; } +pub struct Child { + inner: tokio_process::Child, + token: Acquired, +} + /// Trivial implementation of `CommandChild` for `std::process::Child`. impl CommandChild for Child { type I = ChildStdin; type O = ChildStdout; type E = ChildStderr; - fn take_stdin(&mut self) -> Option { self.stdin().take() } - fn take_stdout(&mut self) -> Option { self.stdout().take() } - fn take_stderr(&mut self) -> Option { self.stderr().take() } + fn take_stdin(&mut self) -> Option { self.inner.stdin().take() } + fn take_stdout(&mut self) -> Option { self.inner.stdout().take() } + fn take_stderr(&mut self) -> Option { self.inner.stderr().take() } fn wait(self) -> Box> { - Box::new(self) + let Child { inner, token } = self; + Box::new(inner.map(|ret| { + drop(token); + ret + })) } fn wait_with_output(self) -> Box> { - Box::new(self.wait_with_output()) + let Child { inner, token } = self; + Box::new(inner.wait_with_output().map(|ret| { + drop(token); + ret + })) } } pub struct AsyncCommand { - inner: Command, + inner: Option, handle: Handle, + jobserver: Client, } impl AsyncCommand { - pub fn new>(program: S, handle: Handle) -> AsyncCommand { + pub fn new>(program: S, + handle: Handle, + jobserver: Client) -> AsyncCommand { AsyncCommand { - inner: Command::new(program), + inner: Some(Command::new(program)), handle: handle, + jobserver: jobserver, } } + + fn inner(&mut self) -> &mut Command { + self.inner.as_mut().expect("can't reuse commands") + } } /// Trivial implementation of `RunCommand` for `std::process::Command`. @@ -186,18 +209,18 @@ impl RunCommand for AsyncCommand { type C = Child; fn arg>(&mut self, arg: S) -> &mut AsyncCommand { - self.inner.arg(arg); + self.inner().arg(arg); self } fn args>(&mut self, args: &[S]) -> &mut AsyncCommand { - self.inner.args(args); + self.inner().args(args); self } fn env(&mut self, key: K, val: V) -> &mut AsyncCommand where K: AsRef, V: AsRef, { - self.inner.env(key, val); + self.inner().env(key, val); self } fn envs(&mut self, vars: I) -> &mut Self @@ -206,16 +229,16 @@ impl RunCommand for AsyncCommand { //TODO: when Command::envs stabilizes, use that: // https://github.com/rust-lang/rust/issues/38526 for (k, v) in vars { - self.inner.env(k, v); + self.inner().env(k, v); } self } fn env_clear(&mut self) -> &mut AsyncCommand { - self.inner.env_clear(); + self.inner().env_clear(); self } fn current_dir>(&mut self, dir: P) -> &mut AsyncCommand { - self.inner.current_dir(dir); + self.inner().current_dir(dir); self } @@ -223,7 +246,7 @@ impl RunCommand for AsyncCommand { fn no_console(&mut self) -> &mut AsyncCommand { use std::os::windows::process::CommandExt; const CREATE_NO_WINDOW: u32 = 0x08000000; - self.inner.creation_flags(CREATE_NO_WINDOW); + self.inner().creation_flags(CREATE_NO_WINDOW); self } @@ -233,19 +256,30 @@ impl RunCommand for AsyncCommand { } fn stdin(&mut self, cfg: Stdio) -> &mut AsyncCommand { - self.inner.stdin(cfg); + self.inner().stdin(cfg); self } fn stdout(&mut self, cfg: Stdio) -> &mut AsyncCommand { - self.inner.stdout(cfg); + self.inner().stdout(cfg); self } fn stderr(&mut self, cfg: Stdio) -> &mut AsyncCommand { - self.inner.stderr(cfg); + self.inner().stderr(cfg); self } - fn spawn(&mut self) -> io::Result { - self.inner.spawn_async(&self.handle) + fn spawn(&mut self) -> SFuture { + let mut inner = self.inner.take().unwrap(); + inner.env_remove("MAKEFLAGS"); + inner.env_remove("MFLAGS"); + inner.env_remove("CARGO_MAKEFLAGS"); + self.jobserver.configure(&mut inner); + let handle = self.handle.clone(); + Box::new(self.jobserver.acquire().and_then(move |token| { + let child = inner.spawn_async(&handle).chain_err(|| { + format!("failed to spawn {:?}", inner) + })?; + Ok(Child { inner: child, token: token }) + })) } } @@ -259,20 +293,22 @@ impl fmt::Debug for AsyncCommand { #[derive(Clone)] pub struct ProcessCommandCreator { handle: Handle, + jobserver: Client, } /// Trivial implementation of `CommandCreator` for `ProcessCommandCreator`. impl CommandCreator for ProcessCommandCreator { type Cmd = AsyncCommand; - fn new(handle: &Handle) -> ProcessCommandCreator { + fn new(handle: &Handle, client: &Client) -> ProcessCommandCreator { ProcessCommandCreator { handle: handle.clone(), + jobserver: client.clone(), } } fn new_command>(&mut self, program: S) -> AsyncCommand { - AsyncCommand::new(program, self.handle.clone()) + AsyncCommand::new(program, self.handle.clone(), self.jobserver.clone()) } } @@ -280,8 +316,8 @@ impl CommandCreator for ProcessCommandCreator { impl CommandCreatorSync for ProcessCommandCreator { type Cmd = AsyncCommand; - fn new(handle: &Handle) -> ProcessCommandCreator { - CommandCreator::new(handle) + fn new(handle: &Handle, client: &Client) -> ProcessCommandCreator { + CommandCreator::new(handle, client) } fn new_command_sync>(&mut self, program: S) -> AsyncCommand { @@ -357,7 +393,7 @@ impl CommandChild for MockChild { fn take_stderr(&mut self) -> Option>> { self.stderr.take() } fn wait(mut self) -> Box> { - future::result(self.wait_result.take().unwrap()).boxed() + Box::new(future::result(self.wait_result.take().unwrap())) } @@ -370,13 +406,13 @@ impl CommandChild for MockChild { stderr: stderr.map(|c| c.into_inner()).unwrap_or(vec!()), }) }); - future::result(result).boxed() + Box::new(future::result(result)) } } pub enum ChildOrCall { - Child(io::Result), - Call(Box io::Result + Send>), + Child(Result), + Call(Box Result + Send>), } impl fmt::Debug for ChildOrCall { @@ -437,10 +473,10 @@ impl RunCommand for MockCommand { fn stderr(&mut self, _cfg: Stdio) -> &mut MockCommand { self } - fn spawn(&mut self) -> io::Result { + fn spawn(&mut self) -> SFuture { match self.child.take().unwrap() { - ChildOrCall::Child(c) => c, - ChildOrCall::Call(f) => f(&self.args), + ChildOrCall::Child(c) => Box::new(future::result(c)), + ChildOrCall::Call(f) => Box::new(future::result(f(&self.args))), } } } @@ -455,7 +491,7 @@ pub struct MockCommandCreator { impl MockCommandCreator { /// The next `MockCommand` created will return `child` from `RunCommand::spawn`. #[allow(dead_code)] - pub fn next_command_spawns(&mut self, child: io::Result) { + pub fn next_command_spawns(&mut self, child: Result) { self.children.push(ChildOrCall::Child(child)); } @@ -463,7 +499,7 @@ impl MockCommandCreator { /// arguments passed to the command. #[allow(dead_code)] pub fn next_command_calls(&mut self, call: C) - where C: Fn(&[OsString]) -> io::Result + Send + 'static, + where C: Fn(&[OsString]) -> Result + Send + 'static, { self.children.push(ChildOrCall::Call(Box::new(call))); } @@ -472,7 +508,7 @@ impl MockCommandCreator { impl CommandCreator for MockCommandCreator { type Cmd = MockCommand; - fn new(_handle: &Handle) -> MockCommandCreator { + fn new(_handle: &Handle, _client: &Client) -> MockCommandCreator { MockCommandCreator { children: Vec::new(), } @@ -492,8 +528,8 @@ impl CommandCreator for MockCommandCreator { impl CommandCreatorSync for Arc> { type Cmd = T::Cmd; - fn new(handle: &Handle) -> Arc> { - Arc::new(Mutex::new(T::new(handle))) + fn new(handle: &Handle, client: &Client) -> Arc> { + Arc::new(Mutex::new(T::new(handle, client))) } fn new_command_sync>(&mut self, program: S) -> T::Cmd { @@ -507,6 +543,7 @@ mod test { use std::error::Error; use std::ffi::OsStr; use std::io; + use jobserver::Client; use futures::Future; use std::process::{ ExitStatus, @@ -517,24 +554,24 @@ mod test { use test::utils::*; use tokio_core::reactor::Core; - fn spawn_command>(creator : &mut T, program: S) -> io::Result<<::Cmd as RunCommand>::C> { - creator.new_command(program).spawn() + fn spawn_command>(creator : &mut T, program: S) -> Result<<::Cmd as RunCommand>::C> { + creator.new_command(program).spawn().wait() } - fn spawn_wait_command>(creator : &mut T, program: S) -> io::Result { - spawn_command(creator, program).and_then(|c| c.wait().wait()) + fn spawn_wait_command>(creator : &mut T, program: S) -> Result { + Ok(spawn_command(creator, program)?.wait().wait()?) } - fn spawn_output_command>(creator : &mut T, program: S) -> io::Result { - spawn_command(creator, program).and_then(|c| c.wait_with_output().wait()) + fn spawn_output_command>(creator : &mut T, program: S) -> Result { + Ok(spawn_command(creator, program)?.wait_with_output().wait()?) } fn spawn_on_thread(mut t : T, really : bool) -> ExitStatus { thread::spawn(move || { if really { - t.new_command_sync("foo").spawn().and_then(|c| { - c.wait().wait() - }).unwrap() + t.new_command_sync("foo") + .spawn().wait().unwrap() + .wait().wait().unwrap() } else { exit_status(1) } @@ -544,7 +581,8 @@ mod test { #[test] fn test_mock_command_wait() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_spawns(Ok(MockChild::new(exit_status(0), "hello", "error"))); assert_eq!(0, spawn_wait_command(&mut creator, "foo").unwrap().code().unwrap()); } @@ -555,14 +593,16 @@ mod test { // If next_command_spawns hasn't been called enough times, // new_command should panic. let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); - creator.new_command("foo").spawn().unwrap(); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); + creator.new_command("foo").spawn().wait().unwrap(); } #[test] fn test_mock_command_output() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_spawns(Ok(MockChild::new(exit_status(0), "hello", "error"))); let output = spawn_output_command(&mut creator, "foo").unwrap(); assert_eq!(0, output.status.code().unwrap()); @@ -573,7 +613,8 @@ mod test { #[test] fn test_mock_command_calls() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_calls(|_| { Ok(MockChild::new(exit_status(0), "hello", "error")) }); @@ -586,27 +627,28 @@ mod test { #[test] fn test_mock_spawn_error() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); - creator.next_command_spawns(Err(io::Error::new(io::ErrorKind::Other, "error"))); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); + creator.next_command_spawns(Err("error".into())); let e = spawn_command(&mut creator, "foo").err().unwrap(); - assert_eq!(io::ErrorKind::Other, e.kind()); assert_eq!("error", e.description()); } #[test] fn test_mock_wait_error() { let core = Core::new().unwrap(); - let mut creator = MockCommandCreator::new(&core.handle()); + let client = Client::new_num(1); + let mut creator = MockCommandCreator::new(&core.handle(), &client); creator.next_command_spawns(Ok(MockChild::with_error(io::Error::new(io::ErrorKind::Other, "error")))); let e = spawn_wait_command(&mut creator, "foo").err().unwrap(); - assert_eq!(io::ErrorKind::Other, e.kind()); assert_eq!("error", e.description()); } #[test] fn test_mock_command_sync() { let core = Core::new().unwrap(); - let creator = Arc::new(Mutex::new(MockCommandCreator::new(&core.handle()))); + let client = Client::new_num(1); + let creator = Arc::new(Mutex::new(MockCommandCreator::new(&core.handle(), &client))); next_command(&creator, Ok(MockChild::new(exit_status(0), "hello", "error"))); assert_eq!(exit_status(0), spawn_on_thread(creator.clone(), true)); } diff --git a/src/server.rs b/src/server.rs index e85ecac1e..f70ff62e7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -31,6 +31,7 @@ use futures::sync::mpsc; use futures::task::{self, Task}; use futures::{Stream, Sink, Async, AsyncSink, Poll, StartSend, Future}; use futures_cpupool::CpuPool; +use jobserver::Client; use mock_command::{ CommandCreatorSync, ProcessCommandCreator, @@ -121,10 +122,11 @@ fn get_signal(_status: ExitStatus) -> i32 { /// requests a shutdown. pub fn start_server(port: u16) -> Result<()> { trace!("start_server"); + let client = unsafe { Client::new() }; let core = Core::new()?; let pool = CpuPool::new(20); let storage = storage_from_environment(&pool, &core.handle()); - let res = SccacheServer::::new(port, pool, core, storage); + let res = SccacheServer::::new(port, pool, core, client, storage); let notify = env::var_os("SCCACHE_STARTUP_NOTIFY"); match res { Ok(srv) => { @@ -152,6 +154,7 @@ impl SccacheServer { pub fn new(port: u16, pool: CpuPool, core: Core, + client: Client, storage: Arc) -> Result> { let handle = core.handle(); let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port); @@ -161,7 +164,12 @@ impl SccacheServer { // connections. let (tx, rx) = mpsc::channel(1); let (wait, info) = WaitUntilZero::new(); - let service = SccacheService::new(storage, core.handle(), pool, tx, info); + let service = SccacheService::new(storage, + core.handle(), + &client, + pool, + tx, + info); Ok(SccacheServer { core: core, @@ -389,6 +397,7 @@ impl SccacheService { pub fn new(storage: Arc, handle: Handle, + client: &Client, pool: CpuPool, tx: mpsc::Sender, info: ActiveInfo) -> SccacheService { @@ -397,7 +406,7 @@ impl SccacheService storage: storage, compilers: Rc::new(RefCell::new(HashMap::new())), pool: pool, - creator: C::new(&handle), + creator: C::new(&handle, client), handle: handle, tx: tx, info: info, diff --git a/src/simples3/credential.rs b/src/simples3/credential.rs index d8e3c5f22..9aebb30a8 100644 --- a/src/simples3/credential.rs +++ b/src/simples3/credential.rs @@ -87,7 +87,7 @@ pub struct EnvironmentProvider; impl ProvideAwsCredentials for EnvironmentProvider { fn credentials(&self) -> SFuture { - future::result(credentials_from_environment()).boxed() + Box::new(future::result(credentials_from_environment())) } } @@ -189,7 +189,7 @@ impl ProvideAwsCredentials for ProfileProvider { let result = result.and_then(|mut profiles| { profiles.remove(self.profile()).ok_or("profile not found".into()) }); - future::result(result).boxed() + Box::new(future::result(result)) } } diff --git a/src/test/tests.rs b/src/test/tests.rs index 8f7009b6a..4c0e9a02f 100644 --- a/src/test/tests.rs +++ b/src/test/tests.rs @@ -202,11 +202,9 @@ fn test_server_compile() { let obj = f.tempdir.path().join("file.o"); c.next_command_calls(move |_| { // Pretend to compile something. - match File::create(&obj) - .and_then(|mut f| f.write_all(b"file contents")) { - Ok(_) => Ok(MockChild::new(exit_status(0), STDOUT, STDERR)), - Err(e) => Err(e), - } + let mut f = File::create(&obj)?; + f.write_all(b"file contents")?; + Ok(MockChild::new(exit_status(0), STDOUT, STDERR)) }); } // Ask the server to compile something. diff --git a/src/test/utils.rs b/src/test/utils.rs index 33d3ef172..122289daf 100644 --- a/src/test/utils.rs +++ b/src/test/utils.rs @@ -26,6 +26,9 @@ use std::sync::{Arc,Mutex}; use tempdir::TempDir; use tokio_core::reactor::Core; +use jobserver::Client; +use errors::*; + /// Return a `Vec` with each listed entry converted to an owned `String`. macro_rules! stringvec { ( $( $x:expr ),* ) => { @@ -69,15 +72,16 @@ macro_rules! assert_map_contains { pub fn new_creator() -> Arc> { let core = Core::new().unwrap(); - Arc::new(Mutex::new(MockCommandCreator::new(&core.handle()))) + let client = unsafe { Client::new() }; + Arc::new(Mutex::new(MockCommandCreator::new(&core.handle(), &client))) } pub fn next_command(creator : &Arc>, - child: io::Result) { + child: Result) { creator.lock().unwrap().next_command_spawns(child); } -pub fn next_command_calls io::Result + Send + 'static>(creator: &Arc>, call: C) { +pub fn next_command_calls Result + Send + 'static>(creator: &Arc>, call: C) { creator.lock().unwrap().next_command_calls(call); } diff --git a/src/util.rs b/src/util.rs index d19e830d1..f1c1039be 100644 --- a/src/util.rs +++ b/src/util.rs @@ -13,7 +13,6 @@ // limitations under the License. use futures::Future; -use futures::future; use futures_cpupool::CpuPool; use mock_command::{CommandChild, RunCommand}; use ring::digest::{SHA512, Context}; @@ -139,10 +138,9 @@ pub fn run_input_output(mut command: C, input: Option>) .stdin(if input.is_some() { Stdio::piped() } else { Stdio::inherit() }) .stdout(Stdio::piped()) .stderr(Stdio::piped()) - .spawn() - .chain_err(|| "failed to spawn child"); + .spawn(); - Box::new(future::result(child) + Box::new(child .and_then(|child| { wait_with_input_output(child, input).and_then(|output| { if output.status.success() {