Skip to content

Commit

Permalink
Introduce Resettable (#5770)
Browse files Browse the repository at this point in the history
Because we fork without execing a lot, we often need to reset things
that have references to background threads. This provides a handy
wrapper for doing so.

If this looks good, I will also apply it to the
process_execution::remote::CommandRunner.

I can also apply it to the ResettablePool which we already have, for
re-use (it would add an additional Arc clone per CpuPool operation, but
this is probably fine because the CpuPool operations are already by
definition somewhat heavyweight).
  • Loading branch information
illicitonion committed May 2, 2018
1 parent c137fc8 commit 29bdaa6
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 29 deletions.
5 changes: 5 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/rust/engine/Cargo.toml
Expand Up @@ -32,6 +32,7 @@ members = [
"process_execution",
"process_execution/bazel_protos",
"process_executor",
"resettable",
"testutil",
"testutil/mock",
]
Expand All @@ -53,6 +54,7 @@ default-members = [
"process_execution",
"process_execution/bazel_protos",
"process_executor",
"resettable",
"testutil",
"testutil/mock",
]
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/fs/Cargo.toml
Expand Up @@ -22,6 +22,7 @@ lazy_static = "0.2.2"
lmdb = "0.7.2"
log = "0.4"
protobuf = { version = "1.4.1", features = ["with-bytes"] }
resettable = { path = "../resettable" }
sha2 = "0.6.0"
tempdir = "0.3.5"

Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/fs/brfs/src/main.rs
Expand Up @@ -612,7 +612,7 @@ fn main() {
Some(address) => fs::Store::with_remote(
&store_path,
pool,
address,
address.to_owned(),
1,
4 * 1024 * 1024,
std::time::Duration::from_secs(5 * 60),
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/fs/fs_util/src/main.rs
Expand Up @@ -166,7 +166,7 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
Store::with_remote(
store_dir,
pool.clone(),
cas_address,
cas_address.to_owned(),
1,
10 * 1024 * 1024,
Duration::from_secs(30),
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/fs/src/lib.rs
Expand Up @@ -30,6 +30,7 @@ extern crate log;
#[cfg(test)]
extern crate mock;
extern crate protobuf;
extern crate resettable;
extern crate sha2;
extern crate tempdir;
#[cfg(test)]
Expand Down
102 changes: 80 additions & 22 deletions src/rust/engine/fs/src/store.rs
Expand Up @@ -63,7 +63,7 @@ impl Store {
pub fn with_remote<P: AsRef<Path>>(
path: P,
pool: Arc<ResettablePool>,
cas_address: &str,
cas_address: String,
thread_count: usize,
chunk_size_bytes: usize,
timeout: Duration,
Expand All @@ -87,8 +87,11 @@ impl Store {
/// processes run using the same daemon, one takes out some kind of lock which the other cannot
/// ever acquire, so lmdb returns EAGAIN whenever a transaction is created in the second process.
///
pub fn reset_lmdb_connections(&self) {
pub fn reset_prefork(&self) {
self.local.reset_lmdb_connections();
if let Some(ref remote) = self.remote {
remote.reset_threadpool();
}
}

pub fn store_file_bytes(&self, bytes: Bytes, initial_lease: bool) -> BoxFuture<Digest, String> {
Expand Down Expand Up @@ -1450,6 +1453,7 @@ mod remote {
use futures::{self, future, Future, Sink, Stream};
use hashing::{Digest, Fingerprint};
use grpcio;
use resettable::Resettable;
use sha2::Sha256;
use std::cmp::min;
use std::collections::HashSet;
Expand All @@ -1458,37 +1462,58 @@ mod remote {

#[derive(Clone)]
pub struct ByteStore {
byte_stream_client: Arc<bazel_protos::bytestream_grpc::ByteStreamClient>,
cas_client: Arc<bazel_protos::remote_execution_grpc::ContentAddressableStorageClient>,
env: Arc<grpcio::Environment>,
byte_stream_client: Resettable<Arc<bazel_protos::bytestream_grpc::ByteStreamClient>>,
cas_client:
Resettable<Arc<bazel_protos::remote_execution_grpc::ContentAddressableStorageClient>>,
chunk_size_bytes: usize,
upload_timeout: Duration,
env: Resettable<Arc<grpcio::Environment>>,
channel: Resettable<grpcio::Channel>,
}

impl ByteStore {
pub fn new(
cas_address: &str,
cas_address: String,
thread_count: usize,
chunk_size_bytes: usize,
upload_timeout: Duration,
) -> ByteStore {
let env = Arc::new(grpcio::Environment::new(thread_count));
let channel = grpcio::ChannelBuilder::new(env.clone()).connect(cas_address);
let byte_stream_client = Arc::new(bazel_protos::bytestream_grpc::ByteStreamClient::new(
channel.clone(),
));
let cas_client = Arc::new(
bazel_protos::remote_execution_grpc::ContentAddressableStorageClient::new(channel),
);
let env = Resettable::new(Arc::new(move || {
Arc::new(grpcio::Environment::new(thread_count))
}));
let env2 = env.clone();
let channel = Resettable::new(Arc::new(move || {
grpcio::ChannelBuilder::new(env2.get()).connect(&cas_address)
}));
let channel2 = channel.clone();
let channel3 = channel.clone();
let byte_stream_client = Resettable::new(Arc::new(move || {
Arc::new(bazel_protos::bytestream_grpc::ByteStreamClient::new(
channel2.get(),
))
}));
let cas_client = Resettable::new(Arc::new(move || {
Arc::new(
bazel_protos::remote_execution_grpc::ContentAddressableStorageClient::new(channel3.get()),
)
}));
ByteStore {
byte_stream_client,
cas_client,
env,
chunk_size_bytes,
upload_timeout,
env,
channel,
}
}

pub fn reset_threadpool(&self) {
self.channel.reset();
self.env.reset();
self.cas_client.reset();
self.byte_stream_client.reset();
}

pub fn store_bytes(&self, bytes: Bytes) -> BoxFuture<Digest, String> {
let mut hasher = Sha256::default();
hasher.input(&bytes);
Expand All @@ -1503,6 +1528,7 @@ mod remote {
);
match self
.byte_stream_client
.get()
.write_opt(grpcio::CallOption::default().timeout(self.upload_timeout))
{
Err(err) => future::err(format!(
Expand Down Expand Up @@ -1532,7 +1558,7 @@ mod remote {
},
);

future::ok(self.byte_stream_client.clone())
future::ok(self.byte_stream_client.get())
.join(sender.send_all(stream).map_err(move |e| {
format!(
"Error attempting to upload fingerprint {}: {:?}",
Expand Down Expand Up @@ -1570,7 +1596,7 @@ mod remote {
digest: Digest,
f: F,
) -> BoxFuture<Option<T>, String> {
match self.byte_stream_client.read(&{
match self.byte_stream_client.get().read(&{
let mut req = bazel_protos::bytestream::ReadRequest::new();
req.set_resource_name(format!("/blobs/{}/{}", digest.0, digest.1));
req.set_read_offset(0);
Expand All @@ -1581,7 +1607,7 @@ mod remote {
Ok(stream) => {
// We shouldn't have to pass around the client here, it's a workaround for
// https://github.com/pingcap/grpc-rs/issues/123
future::ok(self.byte_stream_client.clone())
future::ok(self.byte_stream_client.get())
.join(
stream.fold(BytesMut::with_capacity(digest.1), move |mut bytes, r| {
bytes.extend_from_slice(&r.data);
Expand Down Expand Up @@ -1619,6 +1645,7 @@ mod remote {
}
self
.cas_client
.get()
.find_missing_blobs(&request)
.map(|response| {
response
Expand Down Expand Up @@ -1787,7 +1814,7 @@ mod remote {
fn write_file_multiple_chunks() {
let cas = StubCAS::empty();

let store = ByteStore::new(&cas.address(), 1, 10 * 1024, Duration::from_secs(5));
let store = ByteStore::new(cas.address(), 1, 10 * 1024, Duration::from_secs(5));

let all_the_henries = big_file_bytes();

Expand Down Expand Up @@ -1855,7 +1882,7 @@ mod remote {
#[test]
fn write_connection_error() {
let store = ByteStore::new(
"doesnotexist.example",
"doesnotexist.example".to_owned(),
1,
10 * 1024 * 1024,
Duration::from_secs(1),
Expand Down Expand Up @@ -1914,7 +1941,7 @@ mod remote {
}

fn new_byte_store(cas: &StubCAS) -> ByteStore {
ByteStore::new(&cas.address(), 1, 10 * 1024 * 1024, Duration::from_secs(1))
ByteStore::new(cas.address(), 1, 10 * 1024 * 1024, Duration::from_secs(1))
}

pub fn load_file_bytes(store: &ByteStore, digest: Digest) -> Result<Option<Bytes>, String> {
Expand Down Expand Up @@ -2019,7 +2046,7 @@ mod tests {
Store::with_remote(
dir,
Arc::new(ResettablePool::new("test-pool-".to_string())),
&cas_address,
cas_address,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
Expand Down Expand Up @@ -2798,6 +2825,37 @@ mod tests {
assert!(!is_executable(&materialize_dir.path().join("food")));
}

#[test]
fn works_after_reset_prefork() {
let dir = TempDir::new("store").unwrap();
let cas = new_cas(1024);

let testdata = TestData::roland();
let testdir = TestDirectory::containing_roland();

let store = new_store(dir.path(), cas.address());

// Fetches from remote, so initialises both the local and remote ByteStores:
assert_eq!(
store.load_file_bytes_with(testdata.digest(), |b| b).wait(),
Ok(Some(testdata.bytes()))
);

store.reset_prefork();

// Already exists in local store:
assert_eq!(
store.load_file_bytes_with(testdata.digest(), |b| b).wait(),
Ok(Some(testdata.bytes()))
);

// Requires an RPC:
assert_eq!(
store.load_directory(testdir.digest()).wait(),
Ok(Some(testdir.directory()))
);
}

fn list_dir(path: &Path) -> Vec<String> {
let mut v: Vec<_> = std::fs::read_dir(path)
.expect("Listing dir")
Expand Down
6 changes: 3 additions & 3 deletions src/rust/engine/process_execution/src/remote.rs
Expand Up @@ -795,7 +795,7 @@ mod tests {
let store = fs::Store::with_remote(
store_dir,
Arc::new(fs::ResettablePool::new("test-pool-".to_owned())),
&cas.address(),
cas.address(),
1,
10 * 1024 * 1024,
Duration::from_secs(1),
Expand Down Expand Up @@ -848,7 +848,7 @@ mod tests {
let store = fs::Store::with_remote(
store_dir,
Arc::new(fs::ResettablePool::new("test-pool-".to_owned())),
&cas.address(),
cas.address(),
1,
10 * 1024 * 1024,
Duration::from_secs(1),
Expand Down Expand Up @@ -1192,7 +1192,7 @@ mod tests {
let store = fs::Store::with_remote(
store_dir,
Arc::new(fs::ResettablePool::new("test-pool-".to_owned())),
&cas.address(),
cas.address(),
1,
10 * 1024 * 1024,
Duration::from_secs(1),
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/process_executor/src/main.rs
Expand Up @@ -104,7 +104,7 @@ fn main() {
(Some(_server), Some(cas_server)) => fs::Store::with_remote(
local_store_path,
pool.clone(),
cas_server,
cas_server.to_owned(),
1,
10 * 1024 * 1024,
Duration::from_secs(30),
Expand Down
6 changes: 6 additions & 0 deletions src/rust/engine/resettable/Cargo.toml
@@ -0,0 +1,6 @@
[package]
name = "resettable"
version = "0.0.1"
authors = [ "Pants Build <pantsbuild@gmail.com>" ]

[dependencies]
56 changes: 56 additions & 0 deletions src/rust/engine/resettable/src/lib.rs
@@ -0,0 +1,56 @@
use std::sync::{Arc, RwLock};

///
/// Resettable is a lazily computed value which can be reset, so that it can be lazily computed
/// again next time it is needed.
///
/// This is useful because we fork without execing a lot in the engine, and before doing so, we need
/// to reset any references which hide background threads, so that forked processes don't inherit
/// pointers to threads from the parent process which will not exist in the forked process.
///
#[derive(Clone)]
pub struct Resettable<T> {
val: Arc<RwLock<Option<T>>>,
make: Arc<Fn() -> T>,
}

unsafe impl<T> Send for Resettable<T> {}
unsafe impl<T> Sync for Resettable<T> {}

impl<T> Resettable<T>
where
T: Clone + Send + Sync,
{
// Sadly there is no way to accept an Fn() -> T because it's not Sized, so we need to accept an
// Arc of one. This is not at all ergonomic, but at some point "impl trait" will come along and
// allow us to remove this monstrosity.
pub fn new(make: Arc<Fn() -> T>) -> Resettable<T> {
Resettable {
val: Arc::new(RwLock::new(None)),
make: make,
}
}

pub fn get(&self) -> T {
{
if let Some(ref val) = *self.val.read().unwrap() {
return val.clone();
}
}
{
let mut maybe_val = self.val.write().unwrap();
{
if let Some(ref val) = *maybe_val {
return val.clone();
}
}
let val = (self.make)();
*maybe_val = Some(val.clone());
val
}
}

pub fn reset(&self) {
*self.val.write().unwrap() = None
}
}
2 changes: 1 addition & 1 deletion src/rust/engine/src/context.rs
Expand Up @@ -72,7 +72,7 @@ impl Core {

pub fn pre_fork(&self) {
self.pool.reset();
self.store.reset_lmdb_connections();
self.store.reset_prefork();
}
}

Expand Down

0 comments on commit 29bdaa6

Please sign in to comment.