From 883c54f7b98d28dbe76183ff9812b5ee934e6359 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Tue, 9 Oct 2018 07:05:27 -0700 Subject: [PATCH] Consolidate Resettable instances (#6604) ### Problem Given that we always reset the entire `Store` and `CommandRunner` at once, the usage of `Resettable` in those modules is overly fine grained. ### Solution Move the `Store` and `CommandRunner` into a single external `Resettable` instance, and then remove all internal usage of `Resettable`. ### Result Simpler code, possibly slightly better performance. --- src/rust/engine/Cargo.lock | 2 - src/rust/engine/fs/Cargo.toml | 1 - src/rust/engine/fs/brfs/src/main.rs | 2 +- src/rust/engine/fs/fs_util/src/main.rs | 2 +- src/rust/engine/fs/src/lib.rs | 1 - src/rust/engine/fs/src/store.rs | 169 +++++------------- src/rust/engine/process_execution/Cargo.toml | 1 - src/rust/engine/process_execution/src/lib.rs | 21 +-- .../engine/process_execution/src/local.rs | 8 - .../engine/process_execution/src/remote.rs | 78 +++----- src/rust/engine/process_executor/src/main.rs | 4 +- src/rust/engine/src/context.rs | 104 +++++------ src/rust/engine/src/lib.rs | 8 +- src/rust/engine/src/nodes.rs | 10 +- src/rust/engine/src/scheduler.rs | 2 +- 15 files changed, 142 insertions(+), 271 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index faec7da37be..180f2434c98 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -363,7 +363,6 @@ dependencies = [ "mock 0.0.1", "parking_lot 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.4 (registry+https://github.com/rust-lang/crates.io-index)", - "resettable 0.0.1", "serde 1.0.75 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.75 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -869,7 +868,6 @@ dependencies = [ "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "mock 0.0.1", "protobuf 2.0.4 (registry+https://github.com/rust-lang/crates.io-index)", - "resettable 0.0.1", "sha2 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "testutil 0.0.1", diff --git a/src/rust/engine/fs/Cargo.toml b/src/rust/engine/fs/Cargo.toml index 6a394ad4961..04b1ff000cf 100644 --- a/src/rust/engine/fs/Cargo.toml +++ b/src/rust/engine/fs/Cargo.toml @@ -24,7 +24,6 @@ lmdb = "0.8" log = "0.4" parking_lot = "0.6" protobuf = { version = "2.0.4", features = ["with-bytes"] } -resettable = { path = "../resettable" } sha2 = "0.7" serde = "1.0" serde_derive = "1.0" diff --git a/src/rust/engine/fs/brfs/src/main.rs b/src/rust/engine/fs/brfs/src/main.rs index a698be8a39c..80318bce7ff 100644 --- a/src/rust/engine/fs/brfs/src/main.rs +++ b/src/rust/engine/fs/brfs/src/main.rs @@ -679,7 +679,7 @@ fn main() { Some(address) => fs::Store::with_remote( &store_path, pool, - address.to_owned(), + address, args.value_of("remote-instance-name").map(str::to_owned), root_ca_certs, oauth_bearer_token, diff --git a/src/rust/engine/fs/fs_util/src/main.rs b/src/rust/engine/fs/fs_util/src/main.rs index aa27b72dc26..faec9f283c0 100644 --- a/src/rust/engine/fs/fs_util/src/main.rs +++ b/src/rust/engine/fs/fs_util/src/main.rs @@ -264,7 +264,7 @@ fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { Store::with_remote( &store_dir, pool.clone(), - cas_address.to_owned(), + cas_address, top_match .value_of("remote-instance_name") .map(str::to_owned), diff --git a/src/rust/engine/fs/src/lib.rs b/src/rust/engine/fs/src/lib.rs index d543cafd0a9..3482d553e58 100644 --- a/src/rust/engine/fs/src/lib.rs +++ b/src/rust/engine/fs/src/lib.rs @@ -65,7 +65,6 @@ extern crate log; extern crate mock; extern crate parking_lot; extern crate protobuf; -extern crate resettable; extern crate serde; extern crate sha2; #[cfg(test)] diff --git a/src/rust/engine/fs/src/store.rs b/src/rust/engine/fs/src/store.rs index a7e26342d21..f1ed6b69cab 100644 --- a/src/rust/engine/fs/src/store.rs +++ b/src/rust/engine/fs/src/store.rs @@ -79,7 +79,7 @@ impl Store { pub fn with_remote>( path: P, pool: Arc, - cas_address: String, + cas_address: &str, instance_name: Option, root_ca_certs: Option>, oauth_bearer_token: Option, @@ -108,27 +108,6 @@ impl Store { } } - /// - /// LMDB Environments aren't safe to be re-used after forking, so we need to drop them before - /// forking and re-create them afterwards. - /// - /// I haven't delved into the exact details as to what's fork-unsafe about LMDB, but if two pants - /// processes run using the same daemon, one takes out some kind of lock which the other cannot - /// ever acquire, so lmdb returns EAGAIN whenever a transaction is created in the second process. - /// - pub fn with_reset(&self, f: F) - where - F: FnOnce() -> (), - { - self.local.with_reset(|| { - if let Some(ref remote) = self.remote { - remote.with_reset(f) - } else { - f() - } - }) - } - /// /// Store a file locally. /// @@ -632,7 +611,6 @@ mod local { self, Cursor, Database, DatabaseFlags, Environment, EnvironmentFlags, RwTransaction, Transaction, WriteFlags, }; - use resettable::Resettable; use sha2::Sha256; use std::collections::{BinaryHeap, HashMap}; use std::fmt; @@ -654,8 +632,8 @@ mod local { // Store directories separately from files because: // 1. They may have different lifetimes. // 2. It's nice to know whether we should be able to parse something as a proto. - file_dbs: Resettable, String>>, - directory_dbs: Resettable, String>>, + file_dbs: Result, String>, + directory_dbs: Result, String>, } impl ByteStore { @@ -666,26 +644,16 @@ mod local { Ok(ByteStore { inner: Arc::new(InnerStore { pool: pool, - file_dbs: Resettable::new(move || ShardedLmdb::new(&files_root).map(Arc::new)), - directory_dbs: Resettable::new(move || ShardedLmdb::new(&directories_root).map(Arc::new)), + file_dbs: ShardedLmdb::new(&files_root).map(Arc::new), + directory_dbs: ShardedLmdb::new(&directories_root).map(Arc::new), }), }) } - pub fn with_reset(&self, f: F) - where - F: FnOnce() -> (), - { - self - .inner - .file_dbs - .with_reset(|| self.inner.directory_dbs.with_reset(f)) - } - // Note: This performs IO on the calling thread. Hopefully the IO is small enough not to matter. pub fn entry_type(&self, fingerprint: &Fingerprint) -> Result, String> { { - let (env, directory_database, _) = self.inner.directory_dbs.get()?.get(fingerprint); + let (env, directory_database, _) = self.inner.directory_dbs.clone()?.get(fingerprint); let txn = env .begin_ro_txn() .map_err(|err| format!("Failed to begin read transaction: {:?}", err))?; @@ -700,7 +668,7 @@ mod local { } }; } - let (env, file_database, _) = self.inner.file_dbs.get()?.get(fingerprint); + let (env, file_database, _) = self.inner.file_dbs.clone()?.get(fingerprint); let txn = env .begin_ro_txn() .map_err(|err| format!("Failed to begin read transaction: {}", err))?; @@ -723,7 +691,7 @@ mod local { ) -> Result<(), String> { let until = Self::default_lease_until_secs_since_epoch(); for digest in digests { - let (env, _, lease_database) = self.inner.file_dbs.get()?.get(&digest.0); + let (env, _, lease_database) = self.inner.file_dbs.clone()?.get(&digest.0); env .begin_rw_txn() .and_then(|mut txn| self.lease(lease_database, &digest.0, until, &mut txn)) @@ -787,7 +755,7 @@ mod local { EntryType::File => self.inner.file_dbs.clone(), EntryType::Directory => self.inner.directory_dbs.clone(), }; - let (env, database, lease_database) = lmdbs.get()?.get(&aged_fingerprint.fingerprint); + let (env, database, lease_database) = lmdbs.clone()?.get(&aged_fingerprint.fingerprint); { env .begin_rw_txn() @@ -819,7 +787,7 @@ mod local { EntryType::Directory => self.inner.directory_dbs.clone(), }; - for &(ref env, ref database, ref lease_database) in &database.get()?.all_lmdbs() { + for &(ref env, ref database, ref lease_database) in &database?.all_lmdbs() { let txn = env .begin_ro_txn() .map_err(|err| format!("Error beginning transaction to garbage collect: {}", err))?; @@ -884,7 +852,7 @@ mod local { }; let digest = Digest(fingerprint, bytes.len()); - let (env, content_database, lease_database) = dbs.get()?.get(&fingerprint); + let (env, content_database, lease_database) = dbs.clone()?.get(&fingerprint); let put_res = env.begin_rw_txn().and_then(|mut txn| { txn.put( content_database, @@ -933,7 +901,7 @@ mod local { .inner .pool .spawn_fn(move || { - let (env, db, _) = dbs.get()?.get(&digest.0); + let (env, db, _) = dbs.clone()?.get(&digest.0); let ro_txn = env .begin_ro_txn() .map_err(|err| format!("Failed to begin read transaction: {}", err)); @@ -1543,7 +1511,6 @@ mod remote { use futures::{self, future, Future, Sink, Stream}; use grpcio; use hashing::{Digest, Fingerprint}; - use resettable::Resettable; use sha2::Sha256; use std::cmp::min; use std::collections::HashSet; @@ -1553,20 +1520,19 @@ mod remote { #[derive(Clone)] pub struct ByteStore { - byte_stream_client: Resettable>, - cas_client: - Resettable>, + byte_stream_client: Arc, + cas_client: Arc, instance_name: Option, chunk_size_bytes: usize, upload_timeout: Duration, - env: Resettable>, - channel: Resettable, + env: Arc, + channel: grpcio::Channel, authorization_header: Option, } impl ByteStore { pub fn new( - cas_address: String, + cas_address: &str, instance_name: Option, root_ca_certs: Option>, oauth_bearer_token: Option, @@ -1574,31 +1540,24 @@ mod remote { chunk_size_bytes: usize, upload_timeout: Duration, ) -> ByteStore { - let env = Resettable::new(move || Arc::new(grpcio::Environment::new(thread_count))); - let env2 = env.clone(); - let channel = Resettable::new(move || { - let builder = grpcio::ChannelBuilder::new(env2.get()); - if let Some(ref root_ca_certs) = root_ca_certs { + let env = Arc::new(grpcio::Environment::new(thread_count)); + let channel = { + let builder = grpcio::ChannelBuilder::new(env.clone()); + if let Some(root_ca_certs) = root_ca_certs { let creds = grpcio::ChannelCredentialsBuilder::new() - .root_cert(root_ca_certs.clone()) + .root_cert(root_ca_certs) .build(); - builder.secure_connect(&cas_address, creds) + builder.secure_connect(cas_address, creds) } else { - builder.connect(&cas_address) + builder.connect(cas_address) } - }); - let channel2 = channel.clone(); - let channel3 = channel.clone(); - let byte_stream_client = Resettable::new(move || { - Arc::new(bazel_protos::bytestream_grpc::ByteStreamClient::new( - channel2.get(), - )) - }); - let cas_client = Resettable::new(move || { - Arc::new( - bazel_protos::remote_execution_grpc::ContentAddressableStorageClient::new(channel3.get()), - ) - }); + }; + let byte_stream_client = Arc::new(bazel_protos::bytestream_grpc::ByteStreamClient::new( + channel.clone(), + )); + let cas_client = Arc::new( + bazel_protos::remote_execution_grpc::ContentAddressableStorageClient::new(channel.clone()), + ); ByteStore { byte_stream_client, @@ -1612,17 +1571,6 @@ mod remote { } } - pub fn with_reset(&self, f: F) - where - F: FnOnce() -> (), - { - self.cas_client.with_reset(|| { - self - .byte_stream_client - .with_reset(|| self.channel.with_reset(|| self.env.with_reset(f))) - }) - } - fn call_option(&self) -> grpcio::CallOption { let mut call_option = grpcio::CallOption::default(); if let Some(ref authorization_header) = self.authorization_header { @@ -1649,7 +1597,6 @@ mod remote { ); match self .byte_stream_client - .get() .write_opt(self.call_option().timeout(self.upload_timeout)) { Err(err) => future::err(format!( @@ -1679,7 +1626,7 @@ mod remote { }, ); - future::ok(self.byte_stream_client.get()) + future::ok(self.byte_stream_client.clone()) .join(sender.send_all(stream).map_err(move |e| { format!( "Error attempting to upload fingerprint {}: {:?}", @@ -1714,7 +1661,7 @@ mod remote { digest: Digest, f: F, ) -> BoxFuture, String> { - match self.byte_stream_client.get().read_opt( + match self.byte_stream_client.read_opt( &{ let mut req = bazel_protos::bytestream::ReadRequest::new(); req.set_resource_name(format!( @@ -1733,7 +1680,7 @@ mod remote { Ok(stream) => { // We shouldn't have to pass around the client here, it's a workaround for // https://github.com/pingcap/grpc-rs/issues/123 - future::ok(self.byte_stream_client.get()) + future::ok(self.byte_stream_client.clone()) .join( stream.fold(BytesMut::with_capacity(digest.1), move |mut bytes, r| { bytes.extend_from_slice(&r.data); @@ -1776,7 +1723,6 @@ mod remote { } self .cas_client - .get() .find_missing_blobs_opt(&request, self.call_option()) .map_err(|err| { format!( @@ -1942,7 +1888,7 @@ mod remote { let cas = StubCAS::empty(); let store = ByteStore::new( - cas.address(), + &cas.address(), None, None, None, @@ -2017,7 +1963,7 @@ mod remote { #[test] fn write_connection_error() { let store = ByteStore::new( - "doesnotexist.example".to_owned(), + "doesnotexist.example", None, None, None, @@ -2080,7 +2026,7 @@ mod remote { fn new_byte_store(cas: &StubCAS) -> ByteStore { ByteStore::new( - cas.address(), + &cas.address(), None, None, None, @@ -2201,7 +2147,7 @@ mod tests { Store::with_remote( dir, Arc::new(ResettablePool::new("test-pool-".to_string())), - cas_address, + &cas_address, None, None, None, @@ -2904,7 +2850,7 @@ mod tests { let store_with_remote = Store::with_remote( dir.path(), Arc::new(ResettablePool::new("test-pool-".to_string())), - cas.address(), + &cas.address(), Some("dark-tower".to_owned()), None, None, @@ -2930,7 +2876,7 @@ mod tests { let store_with_remote = Store::with_remote( dir.path(), Arc::new(ResettablePool::new("test-pool-".to_string())), - cas.address(), + &cas.address(), Some("dark-tower".to_owned()), None, None, @@ -2973,7 +2919,7 @@ mod tests { let store_with_remote = Store::with_remote( dir.path(), Arc::new(ResettablePool::new("test-pool-".to_string())), - cas.address(), + &cas.address(), None, None, Some("Armory.Key".to_owned()), @@ -2999,7 +2945,7 @@ mod tests { let store_with_remote = Store::with_remote( dir.path(), Arc::new(ResettablePool::new("test-pool-".to_string())), - cas.address(), + &cas.address(), None, None, Some("Armory.Key".to_owned()), @@ -3171,37 +3117,6 @@ mod tests { assert!(!is_executable(&materialize_dir.path().join("food"))); } - #[test] - fn works_after_reset() { - let dir = TempDir::new().unwrap(); - let cas = new_cas(1024); - - let testdata = TestData::roland(); - let testdir = TestDirectory::containing_roland(); - - let store = new_store(dir.path(), cas.address()); - - // Fetches from remote, so initialises both the local and remote ByteStores: - assert_eq!( - store.load_file_bytes_with(testdata.digest(), |b| b).wait(), - Ok(Some(testdata.bytes())) - ); - - store.with_reset(|| {}); - - // Already exists in local store: - assert_eq!( - store.load_file_bytes_with(testdata.digest(), |b| b).wait(), - Ok(Some(testdata.bytes())) - ); - - // Requires an RPC: - assert_eq!( - store.load_directory(testdir.digest()).wait(), - Ok(Some(testdir.directory())) - ); - } - #[test] fn contents_for_directory_empty() { let store_dir = TempDir::new().unwrap(); diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index 3fd0db21e7a..07863fcb831 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -16,7 +16,6 @@ grpcio = { git = "https://github.com/pantsbuild/grpc-rs.git", rev = "4dfafe9355d hashing = { path = "../hashing" } log = "0.4" protobuf = { version = "2.0.4", features = ["with-bytes"] } -resettable = { path = "../resettable" } sha2 = "0.7" tempfile = "3" futures-timer = "0.1" diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index cae70656153..1dfe1e2f3a8 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -43,7 +43,6 @@ extern crate log; #[cfg(test)] extern crate mock; extern crate protobuf; -extern crate resettable; extern crate sha2; #[cfg(test)] extern crate tempfile; @@ -122,28 +121,20 @@ pub struct FallibleExecuteProcessResult { pub trait CommandRunner: Send + Sync { fn run(&self, req: ExecuteProcessRequest) -> BoxFuture; - - /// - /// NB: Unlike other `with_shutdown` methods in the codebase, this method takes a Fn reference, - /// because in order to be "object safe", `CommandRunner` must not have functions with generic - /// parameters. - /// - fn with_shutdown(&self, f: &mut FnMut() -> ()); } /// /// A CommandRunner wrapper that limits the number of concurrent requests. /// +#[derive(Clone)] pub struct BoundedCommandRunner { - inner: Arc>, - sema: AsyncSemaphore, + inner: Arc<(Box, AsyncSemaphore)>, } impl BoundedCommandRunner { pub fn new(inner: Box, bound: usize) -> BoundedCommandRunner { BoundedCommandRunner { - inner: Arc::new(inner), - sema: AsyncSemaphore::new(bound), + inner: Arc::new((inner, AsyncSemaphore::new(bound))), } } } @@ -151,10 +142,6 @@ impl BoundedCommandRunner { impl CommandRunner for BoundedCommandRunner { fn run(&self, req: ExecuteProcessRequest) -> BoxFuture { let inner = self.inner.clone(); - self.sema.with_acquired(move || inner.run(req)) - } - - fn with_shutdown(&self, f: &mut FnMut() -> ()) { - self.inner.with_shutdown(f); + self.inner.1.with_acquired(move || inner.0.run(req)) } } diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 74fda41c3d4..3aa9927afae 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -298,14 +298,6 @@ impl super::CommandRunner for CommandRunner { }) .to_boxed() } - - fn with_shutdown(&self, f: &mut FnMut() -> ()) { - // TODO: Although we have a Resettable, we do not shut it down, because our caller - // will (and attempting to shut things down twice guarantees a deadlock because Resettable is - // not reentrant). This is fragile, and it would be nice to have type safety to prevent that - // case. - self.store.with_reset(f) - } } #[cfg(test)] diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 3947df76445..13665632b61 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -14,7 +14,6 @@ use futures_timer::Delay; use grpcio; use hashing::{Digest, Fingerprint}; use protobuf::{self, Message, ProtobufEnum}; -use resettable::Resettable; use sha2::Sha256; use super::{ExecuteProcessRequest, FallibleExecuteProcessResult}; @@ -30,10 +29,10 @@ enum OperationOrStatus { pub struct CommandRunner { instance_name: Option, authorization_header: Option, - channel: Resettable, - env: Resettable>, - execution_client: Resettable>, - operations_client: Resettable>, + channel: grpcio::Channel, + env: Arc, + execution_client: Arc, + operations_client: Arc, store: Store, } @@ -62,7 +61,6 @@ impl CommandRunner { let stream = try_future!( self .execution_client - .get() .execute_opt(&execute_request, self.call_option()) .map_err(rpcerror_to_string) ); @@ -202,7 +200,6 @@ impl super::CommandRunner for CommandRunner { .and_then(move |_| { future::done( operations_client - .get() .get_operation_opt(&operation_request, command_runner3.call_option()) .or_else(move |err| { rpcerror_recover_cancelled(operation_request.take_name(), err) @@ -226,16 +223,6 @@ impl super::CommandRunner for CommandRunner { Err(err) => future::err(err).to_boxed(), } } - - fn with_shutdown(&self, f: &mut FnMut() -> ()) { - self.channel.with_reset(|| { - self.env.with_reset(|| { - self - .execution_client - .with_reset(|| self.operations_client.with_reset(f)) - }) - }) - } } impl CommandRunner { @@ -243,38 +230,31 @@ impl CommandRunner { const BACKOFF_MAX_WAIT_MILLIS: u64 = 5000; pub fn new( - address: String, + address: &str, instance_name: Option, root_ca_certs: Option>, oauth_bearer_token: Option, thread_count: usize, store: Store, ) -> CommandRunner { - let env = Resettable::new(move || Arc::new(grpcio::Environment::new(thread_count))); - let env2 = env.clone(); - let channel = Resettable::new(move || { - let builder = grpcio::ChannelBuilder::new(env2.get()); - if let Some(ref root_ca_certs) = root_ca_certs { + let env = Arc::new(grpcio::Environment::new(thread_count)); + let channel = { + let builder = grpcio::ChannelBuilder::new(env.clone()); + if let Some(root_ca_certs) = root_ca_certs { let creds = grpcio::ChannelCredentialsBuilder::new() - .root_cert(root_ca_certs.clone()) + .root_cert(root_ca_certs) .build(); - builder.secure_connect(&address, creds) + builder.secure_connect(address, creds) } else { - builder.connect(&address) + builder.connect(address) } - }); - let channel2 = channel.clone(); - let channel3 = channel.clone(); - let execution_client = Resettable::new(move || { - Arc::new(bazel_protos::remote_execution_grpc::ExecutionClient::new( - channel2.get(), - )) - }); - let operations_client = Resettable::new(move || { - Arc::new(bazel_protos::operations_grpc::OperationsClient::new( - channel3.get(), - )) - }); + }; + let execution_client = Arc::new(bazel_protos::remote_execution_grpc::ExecutionClient::new( + channel.clone(), + )); + let operations_client = Arc::new(bazel_protos::operations_grpc::OperationsClient::new( + channel.clone(), + )); CommandRunner { instance_name, @@ -1159,7 +1139,7 @@ mod tests { let store = fs::Store::with_remote( &store_dir_path, Arc::new(fs::ResettablePool::new("test-pool-".to_owned())), - cas.address(), + &cas.address(), None, None, None, @@ -1168,7 +1148,7 @@ mod tests { Duration::from_secs(1), ).expect("Failed to make store"); - let cmd_runner = CommandRunner::new(mock_server.address(), None, None, None, 1, store); + let cmd_runner = CommandRunner::new(&mock_server.address(), None, None, None, 1, store); let result = cmd_runner.run(echo_roland_request()).wait(); assert_eq!( result, @@ -1509,7 +1489,7 @@ mod tests { let store = fs::Store::with_remote( store_dir, Arc::new(fs::ResettablePool::new("test-pool-".to_owned())), - cas.address(), + &cas.address(), None, None, None, @@ -1522,7 +1502,7 @@ mod tests { .wait() .expect("Saving file bytes to store"); - let result = CommandRunner::new(mock_server.address(), None, None, None, 1, store) + let result = CommandRunner::new(&mock_server.address(), None, None, None, 1, store) .run(cat_roland_request()) .wait(); assert_eq!( @@ -1588,7 +1568,7 @@ mod tests { let store = fs::Store::with_remote( store_dir, Arc::new(fs::ResettablePool::new("test-pool-".to_owned())), - cas.address(), + &cas.address(), None, None, None, @@ -1601,7 +1581,7 @@ mod tests { .wait() .expect("Saving file bytes to store"); - let result = CommandRunner::new(mock_server.address(), None, None, None, 1, store) + let result = CommandRunner::new(&mock_server.address(), None, None, None, 1, store) .run(cat_roland_request()) .wait(); assert_eq!( @@ -1648,7 +1628,7 @@ mod tests { let store = fs::Store::with_remote( store_dir, Arc::new(fs::ResettablePool::new("test-pool-".to_owned())), - cas.address(), + &cas.address(), None, None, None, @@ -1657,7 +1637,7 @@ mod tests { Duration::from_secs(1), ).expect("Failed to make store"); - let error = CommandRunner::new(mock_server.address(), None, None, None, 1, store) + let error = CommandRunner::new(&mock_server.address(), None, None, None, 1, store) .run(cat_roland_request()) .wait() .expect_err("Want error"); @@ -2214,7 +2194,7 @@ mod tests { let store = fs::Store::with_remote( store_dir, Arc::new(fs::ResettablePool::new("test-pool-".to_owned())), - cas.address(), + &cas.address(), None, None, None, @@ -2223,7 +2203,7 @@ mod tests { Duration::from_secs(1), ).expect("Failed to make store"); - CommandRunner::new(address, None, None, None, 1, store) + CommandRunner::new(&address, None, None, None, 1, store) } fn extract_execute_response( diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 41fb59d6331..a058d3edb8d 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -202,7 +202,7 @@ fn main() { fs::Store::with_remote( local_store_path, pool.clone(), - cas_server.to_owned(), + cas_server, remote_instance_arg.clone(), root_ca_certs, oauth_bearer_token, @@ -253,7 +253,7 @@ fn main() { }; Box::new(process_execution::remote::CommandRunner::new( - address.to_owned(), + address, remote_instance_arg, root_ca_certs, oauth_bearer_token, diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index a773c7191e7..46e989f5716 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -38,9 +38,8 @@ pub struct Core { pub types: Types, pub fs_pool: Arc, pub runtime: Resettable>, - pub store: Store, + store_and_command_runner: Resettable<(Store, BoundedCommandRunner)>, pub vfs: PosixFS, - pub command_runner: BoundedCommandRunner, } impl Core { @@ -67,7 +66,6 @@ impl Core { let runtime = Resettable::new(|| { Arc::new(Runtime::new().unwrap_or_else(|e| panic!("Could not initialize Runtime: {:?}", e))) }); - // We re-use these certs for both the execution and store service; they're generally tied together. let root_ca_certs = if let Some(path) = remote_root_ca_certs_path { Some( @@ -88,45 +86,50 @@ impl Core { None }; - let store_path = Store::default_path(); - - let store = safe_create_dir_all_ioerror(&store_path) - .map_err(|e| format!("Error making directory {:?}: {:?}", store_path, e)) - .and_then(|()| match remote_store_server { - Some(address) => Store::with_remote( - store_path, - fs_pool.clone(), + let fs_pool2 = fs_pool.clone(); + let store_and_command_runner = Resettable::new(move || { + let store_path = Store::default_path(); + + let store = safe_create_dir_all_ioerror(&store_path) + .map_err(|e| format!("Error making directory {:?}: {:?}", store_path, e)) + .and_then(|()| match &remote_store_server { + Some(ref address) => Store::with_remote( + store_path, + fs_pool2.clone(), + address, + remote_instance_name.clone(), + root_ca_certs.clone(), + oauth_bearer_token.clone(), + remote_store_thread_count, + remote_store_chunk_bytes, + remote_store_chunk_upload_timeout, + ), + None => Store::local_only(store_path, fs_pool2.clone()), + }).unwrap_or_else(|e| panic!("Could not initialize Store: {:?}", e)); + + let underlying_command_runner: Box = match &remote_execution_server { + Some(ref address) => Box::new(process_execution::remote::CommandRunner::new( address, remote_instance_name.clone(), root_ca_certs.clone(), oauth_bearer_token.clone(), - remote_store_thread_count, - remote_store_chunk_bytes, - remote_store_chunk_upload_timeout, - ), - None => Store::local_only(store_path, fs_pool.clone()), - }).unwrap_or_else(|e| panic!("Could not initialize Store: {:?}", e)); - - let underlying_command_runner: Box = match remote_execution_server { - Some(address) => Box::new(process_execution::remote::CommandRunner::new( - address, - remote_instance_name, - root_ca_certs, - oauth_bearer_token, - // Allow for some overhead for bookkeeping threads (if any). - process_execution_parallelism + 2, - store.clone(), - )), - None => Box::new(process_execution::local::CommandRunner::new( - store.clone(), - fs_pool.clone(), - work_dir, - process_execution_cleanup_local_dirs, - )), - }; - - let command_runner = - BoundedCommandRunner::new(underlying_command_runner, process_execution_parallelism); + // Allow for some overhead for bookkeeping threads (if any). + process_execution_parallelism + 2, + store.clone(), + )), + None => Box::new(process_execution::local::CommandRunner::new( + store.clone(), + fs_pool2.clone(), + work_dir.clone(), + process_execution_cleanup_local_dirs, + )), + }; + + let command_runner = + BoundedCommandRunner::new(underlying_command_runner, process_execution_parallelism); + + (store, command_runner) + }); let rule_graph = RuleGraph::new(&tasks, root_subject_types); @@ -137,13 +140,12 @@ impl Core { types: types, fs_pool: fs_pool.clone(), runtime: runtime, - store: store, + store_and_command_runner: store_and_command_runner, // TODO: Errors in initialization should definitely be exposed as python // exceptions, rather than as panics. vfs: PosixFS::new(build_root, fs_pool, &ignore_patterns).unwrap_or_else(|e| { panic!("Could not initialize VFS: {:?}", e); }), - command_runner: command_runner, } } @@ -166,17 +168,9 @@ impl Core { } let t = self.runtime.with_reset(|| { self.graph.with_exclusive(|| { - self.fs_pool.with_shutdown(|| { - // TODO: In order for `CommandRunner` to be "object safe" (which it must be in order to - // be `Box`ed for use in the `Core` struct without generic parameters), it cannot have - // a generic return type. Rather than giving it a return type like `void*`, we set a - // mutable field here. - let mut res: Option = None; - self.command_runner.with_shutdown(&mut || { - res = Some(f()); - }); - res.expect("with_shutdown method did not call its argument function.") - }) + self + .fs_pool + .with_shutdown(|| self.store_and_command_runner.with_reset(f)) }) }); self @@ -185,6 +179,14 @@ impl Core { .expect("Multiple callers should not be in the fork context at once."); t } + + pub fn store(&self) -> Store { + self.store_and_command_runner.get().0 + } + + pub fn command_runner(&self) -> BoundedCommandRunner { + self.store_and_command_runner.get().1 + } } #[derive(Clone)] diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 1f244942706..6b13ad35265 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -643,7 +643,7 @@ pub extern "C" fn set_panic_handler() { #[no_mangle] pub extern "C" fn garbage_collect_store(scheduler_ptr: *mut Scheduler) { with_scheduler(scheduler_ptr, |scheduler| { - match scheduler.core.store.garbage_collect() { + match scheduler.core.store().garbage_collect() { Ok(_) => {} Err(err) => error!("{}", err), } @@ -654,7 +654,7 @@ pub extern "C" fn garbage_collect_store(scheduler_ptr: *mut Scheduler) { pub extern "C" fn lease_files_in_graph(scheduler_ptr: *mut Scheduler) { with_scheduler(scheduler_ptr, |scheduler| { let digests = scheduler.core.graph.all_digests(); - match scheduler.core.store.lease_all(digests.iter()) { + match scheduler.core.store().lease_all(digests.iter()) { Ok(_) => {} Err(err) => error!("{}", &err), } @@ -720,7 +720,7 @@ pub extern "C" fn merge_directories( }; with_scheduler(scheduler_ptr, |scheduler| { - fs::Snapshot::merge_directories(scheduler.core.store.clone(), digests) + fs::Snapshot::merge_directories(scheduler.core.store(), digests) .wait() .map(|dir| nodes::Snapshot::store_directory(&scheduler.core, &dir)) .into() @@ -754,7 +754,7 @@ pub extern "C" fn materialize_directories( futures::future::join_all( dir_and_digests .into_iter() - .map(|(dir, digest)| scheduler.core.store.materialize_directory(dir, digest)) + .map(|(dir, digest)| scheduler.core.store().materialize_directory(dir, digest)) .collect::>(), ) }).map(|_| ()) diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index f1c17cf58dc..1b00ac2cdb7 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -235,10 +235,10 @@ impl Select { .and_then(|directory_digest_val| { lift_digest(&directory_digest_val).map_err(|str| throw(&str)) }).and_then(move |digest| { - let store = context.core.store.clone(); + let store = context.core.store(); context .core - .store + .store() .load_directory(digest) .map_err(|str| throw(&str)) .and_then(move |maybe_directory| { @@ -407,7 +407,7 @@ impl WrappedNode for ExecuteProcess { context .core - .command_runner + .command_runner() .run(request) .map(ProcessResult) .map_err(|e| throw(&format!("Failed to execute process: {}", e))) @@ -470,7 +470,7 @@ impl WrappedNode for DigestFile { .and_then(move |c| { context .core - .store + .store() .store_file_bytes(c.content, true) .map_err(|e| throw(&e)) }).to_boxed() @@ -527,7 +527,7 @@ impl Snapshot { .expand(path_globs) .map_err(|e| format!("PathGlobs expansion failed: {:?}", e)) .and_then(move |path_stats| { - fs::Snapshot::from_path_stats(context.core.store.clone(), &context, path_stats) + fs::Snapshot::from_path_stats(context.core.store(), &context, path_stats) .map_err(move |e| format!("Snapshot failed: {}", e)) }).map_err(|e| throw(&e)) .to_boxed() diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index 6c0f4d25fd4..966f64a68df 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -292,7 +292,7 @@ impl Scheduler { self.core.fs_pool.clone(), &[] ))); - let store = self.core.store.clone(); + let store = self.core.store(); posix_fs .expand(path_globs)