Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[remoting] Move local process execution tempdirs into the workdir, add option to not delete them #6023

Merged
merged 14 commits into from
Jun 28, 2018
4 changes: 3 additions & 1 deletion src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@
uint64_t,
uint64_t,
uint64_t,
uint64_t);
uint64_t,
_Bool);
void scheduler_pre_fork(Scheduler*);
Value scheduler_metrics(Scheduler*, Session*);
RawNodes* scheduler_execute(Scheduler*, Session*, ExecutionRequest*);
Expand Down Expand Up @@ -793,6 +794,7 @@ def tc(constraint):
execution_options.remote_store_chunk_bytes,
execution_options.remote_store_chunk_upload_timeout_seconds,
execution_options.process_execution_parallelism,
execution_options.process_execution_cleanup_local_dirs
)
return self.gc(scheduler, self.lib.scheduler_destroy)

Expand Down
6 changes: 6 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ExecutionOptions(datatype([
'remote_store_chunk_bytes',
'remote_store_chunk_upload_timeout_seconds',
'process_execution_parallelism',
'process_execution_cleanup_local_dirs',
])):
"""A collection of all options related to (remote) execution of processes.

Expand All @@ -83,6 +84,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_store_chunk_bytes=bootstrap_options.remote_store_chunk_bytes,
remote_store_chunk_upload_timeout_seconds=bootstrap_options.remote_store_chunk_upload_timeout_seconds,
process_execution_parallelism=bootstrap_options.process_execution_parallelism,
process_execution_cleanup_local_dirs=bootstrap_options.process_execution_cleanup_local_dirs,
)


Expand All @@ -93,6 +95,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_store_chunk_bytes=1024*1024,
remote_store_chunk_upload_timeout_seconds=60,
process_execution_parallelism=multiprocessing.cpu_count()*2,
process_execution_cleanup_local_dirs=True,
)


Expand Down Expand Up @@ -315,6 +318,9 @@ def register_bootstrap_options(cls, register):
register('--process-execution-parallelism', type=int, default=multiprocessing.cpu_count(),
advanced=True,
help='Number of concurrent processes that may be executed either locally and remotely.')
register('--process-execution-cleanup-local-dirs', type=bool, default=True,
help='Whether or not to cleanup directories used for local process execution '
'(primarily useful for e.g. debugging).')

@classmethod
def register_options(cls, register):
Expand Down
98 changes: 86 additions & 12 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
extern crate log;
extern crate tempfile;

use boxfuture::{BoxFuture, Boxable};
Expand All @@ -17,11 +18,23 @@ use bytes::Bytes;
pub struct CommandRunner {
store: fs::Store,
fs_pool: Arc<fs::ResettablePool>,
work_dir: PathBuf,
cleanup_local_dirs: bool,
}

impl CommandRunner {
pub fn new(store: fs::Store, fs_pool: Arc<fs::ResettablePool>) -> CommandRunner {
CommandRunner { store, fs_pool }
pub fn new(
store: fs::Store,
fs_pool: Arc<fs::ResettablePool>,
work_dir: PathBuf,
cleanup_local_dirs: bool,
) -> CommandRunner {
CommandRunner {
store,
fs_pool,
work_dir,
cleanup_local_dirs,
}
}

fn construct_output_snapshot(
Expand Down Expand Up @@ -78,7 +91,7 @@ impl super::CommandRunner for CommandRunner {
let workdir = try_future!(
tempfile::Builder::new()
.prefix("process-execution")
.tempdir()
.tempdir_in(&self.work_dir)
.map_err(|err| {
format!(
"Error making tempdir for local process execution: {:?}",
Expand All @@ -92,7 +105,9 @@ impl super::CommandRunner for CommandRunner {
let env = req.env;
let output_file_paths = req.output_files;
let output_dir_paths = req.output_directories;
let cleanup_local_dirs = self.cleanup_local_dirs;
let argv = req.argv;
let req_description = req.description;
self
.store
.materialize_directory(workdir.path().to_owned(), req.input_files)
Expand All @@ -109,7 +124,7 @@ impl super::CommandRunner for CommandRunner {
.map_err(|e| format!("Error executing process: {:?}", e))
.map(|output| (output, workdir))
})
.and_then(|(output, workdir)| {
.and_then(move |(output, workdir)| {
let output_snapshot = if output_file_paths.is_empty() && output_dir_paths.is_empty() {
future::ok(fs::Snapshot::empty()).to_boxed()
} else {
Expand Down Expand Up @@ -137,8 +152,20 @@ impl super::CommandRunner for CommandRunner {
)
})
// Force workdir not to get dropped until after we've ingested the outputs
.map(|result| (result, workdir))
.map(|(result, _workdir)| result)
.map(move |result| (result, workdir) )
.map(move |(result, workdir)| {
if !cleanup_local_dirs {
// This consumes the `TempDir` without deleting directory on the filesystem, meaning
// that the temporary directory will no longer be automatically deleted when dropped.
let preserved_path = workdir.into_path();
info!(
"preserved local process execution dir `{:?}` for {:?}",
preserved_path,
req_description
);
}
result
})
.to_boxed()
};

Expand Down Expand Up @@ -328,7 +355,7 @@ mod tests {

#[test]
fn output_files_one() {
let result = run_command_locally_in_dir(ExecuteProcessRequest {
let result = run_command_locally(ExecuteProcessRequest {
argv: vec![
find_bash(),
"-c".to_owned(),
Expand All @@ -355,7 +382,7 @@ mod tests {

#[test]
fn output_dirs() {
let result = run_command_locally_in_dir(ExecuteProcessRequest {
let result = run_command_locally(ExecuteProcessRequest {
argv: vec![
find_bash(),
"-c".to_owned(),
Expand Down Expand Up @@ -387,7 +414,7 @@ mod tests {

#[test]
fn output_files_many() {
let result = run_command_locally_in_dir(ExecuteProcessRequest {
let result = run_command_locally(ExecuteProcessRequest {
argv: vec![
find_bash(),
"-c".to_owned(),
Expand Down Expand Up @@ -420,7 +447,7 @@ mod tests {

#[test]
fn output_files_execution_failure() {
let result = run_command_locally_in_dir(ExecuteProcessRequest {
let result = run_command_locally(ExecuteProcessRequest {
argv: vec![
find_bash(),
"-c".to_owned(),
Expand Down Expand Up @@ -451,7 +478,7 @@ mod tests {

#[test]
fn output_files_partial_output() {
let result = run_command_locally_in_dir(ExecuteProcessRequest {
let result = run_command_locally(ExecuteProcessRequest {
argv: vec![
find_bash(),
"-c".to_owned(),
Expand All @@ -478,21 +505,68 @@ mod tests {
)
}

#[test]
fn test_directory_preservation() {
let preserved_work_tmpdir = TempDir::new().unwrap();
let preserved_work_root = preserved_work_tmpdir.path().to_owned();

let result = run_command_locally_in_dir(
ExecuteProcessRequest {
argv: vec![
find_bash(),
"-c".to_owned(),
format!("echo -n {} > {}", TestData::roland().string(), "roland"),
],
env: BTreeMap::new(),
input_files: fs::EMPTY_DIGEST,
output_files: vec![PathBuf::from("roland")].into_iter().collect(),
output_directories: BTreeSet::new(),
timeout: Duration::from_millis(1000),
description: "bash".to_string(),
},
preserved_work_root.clone(),
false,
);
result.unwrap();

assert_eq!(preserved_work_root.exists(), true);

// Collect all of the top level sub-dirs under our test workdir.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a testutil::file::list_dir which may be useful here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

let subdirs = testutil::file::list_dir(&preserved_work_root);
assert_eq!(subdirs.len(), 1);

// Then look for a file like e.g. `/tmp/abc1234/process-execution7zt4pH/roland`
let rolands_path = preserved_work_root.join(&subdirs[0]).join("roland");
assert_eq!(rolands_path.exists(), true);
}

fn run_command_locally(
req: ExecuteProcessRequest,
) -> Result<FallibleExecuteProcessResult, String> {
run_command_locally_in_dir(req)
let work_dir = TempDir::new().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is interesting now... work_dir, being a TempDir, will itself be rm -rf'd when this function returns, which vaguely subverts the cleanup flag.

For how this is used today, this is fine, but may be worth a comment for the future. Or maybe not :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, completely intentional for this variant of the helper based on how it's being used.

run_command_locally_in_dir_with_cleanup(req, work_dir.path().to_owned())
}

fn run_command_locally_in_dir_with_cleanup(
req: ExecuteProcessRequest,
dir: PathBuf,
) -> Result<FallibleExecuteProcessResult, String> {
run_command_locally_in_dir(req, dir, true)
}

fn run_command_locally_in_dir(
req: ExecuteProcessRequest,
dir: PathBuf,
cleanup: bool,
) -> Result<FallibleExecuteProcessResult, String> {
let store_dir = TempDir::new().unwrap();
let pool = Arc::new(fs::ResettablePool::new("test-pool-".to_owned()));
let store = fs::Store::local_only(store_dir.path(), pool.clone()).unwrap();
let runner = super::CommandRunner {
store: store,
fs_pool: pool,
work_dir: dir,
cleanup_local_dirs: cleanup,
};
runner.run(req).wait()
}
Expand Down
15 changes: 14 additions & 1 deletion src/rust/engine/process_executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::future::Future;
use hashing::{Digest, Fingerprint};
use std::collections::{BTreeMap, BTreeSet};
use std::iter::Iterator;
use std::path::PathBuf;
use std::process::exit;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -26,6 +27,12 @@ fn main() {
env_logger::init();

let args = App::new("process_executor")
.arg(
Arg::with_name("work-dir")
.long("work-dir")
.takes_value(true)
.help("Path to workdir"),
)
.arg(
Arg::with_name("local-store-path")
.long("local-store-path")
Expand Down Expand Up @@ -95,6 +102,10 @@ fn main() {
.collect(),
None => BTreeMap::new(),
};
let work_dir = args
.value_of("work-dir")
.map(PathBuf::from)
.unwrap_or_else(std::env::temp_dir);
let local_store_path = args.value_of("local-store-path").unwrap();
let pool = Arc::new(fs::ResettablePool::new("process-executor-".to_owned()));
let server_arg = args.value_of("server");
Expand Down Expand Up @@ -138,7 +149,9 @@ fn main() {
1,
store,
)),
None => Box::new(process_execution::local::CommandRunner::new(store, pool)),
None => Box::new(process_execution::local::CommandRunner::new(
store, pool, work_dir, true,
)),
};

let result = runner.run(request).wait().expect("Error executing");
Expand Down
8 changes: 4 additions & 4 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,15 @@ impl Core {
types: Types,
build_root: &Path,
ignore_patterns: Vec<String>,
work_dir: &Path,
work_dir: PathBuf,
remote_store_server: Option<String>,
remote_execution_server: Option<String>,
remote_store_thread_count: usize,
remote_store_chunk_bytes: usize,
remote_store_chunk_upload_timeout: Duration,
process_execution_parallelism: usize,
process_execution_cleanup_local_dirs: bool,
) -> Core {
let mut snapshots_dir = PathBuf::from(work_dir);
snapshots_dir.push("snapshots");

let fs_pool = Arc::new(ResettablePool::new("io-".to_string()));
let runtime = Resettable::new(|| {
Arc::new(Runtime::new().unwrap_or_else(|e| panic!("Could not initialize Runtime: {:?}", e)))
Expand Down Expand Up @@ -97,6 +95,8 @@ impl Core {
None => Box::new(process_execution::local::CommandRunner::new(
store.clone(),
fs_pool.clone(),
work_dir,
process_execution_cleanup_local_dirs,
)),
};

Expand Down
4 changes: 3 additions & 1 deletion src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ pub extern "C" fn scheduler_create(
remote_store_chunk_bytes: u64,
remote_store_chunk_upload_timeout_seconds: u64,
process_execution_parallelism: u64,
process_execution_cleanup_local_dirs: bool,
) -> *const Scheduler {
let root_type_ids = root_type_ids.to_vec();
let ignore_patterns = ignore_patterns_buf
Expand Down Expand Up @@ -269,7 +270,7 @@ pub extern "C" fn scheduler_create(
types,
build_root_buf.to_os_string().as_ref(),
ignore_patterns,
work_dir_buf.to_os_string().as_ref(),
PathBuf::from(work_dir_buf.to_os_string()),
if remote_store_server_string.is_empty() {
None
} else {
Expand All @@ -284,6 +285,7 @@ pub extern "C" fn scheduler_create(
remote_store_chunk_bytes as usize,
Duration::from_secs(remote_store_chunk_upload_timeout_seconds),
process_execution_parallelism as usize,
process_execution_cleanup_local_dirs as bool,
))))
}

Expand Down