diff --git a/src/python/pants/engine/native.py b/src/python/pants/engine/native.py index 4c227b0e37d..e472a6c230d 100644 --- a/src/python/pants/engine/native.py +++ b/src/python/pants/engine/native.py @@ -205,6 +205,7 @@ Buffer, Buffer, Buffer, + Buffer, uint64_t, uint64_t, uint64_t, @@ -855,6 +856,7 @@ def tc(constraint): self.context.utf8_buf(execution_options.remote_store_server or ""), self.context.utf8_buf(execution_options.remote_execution_server or ""), self.context.utf8_buf(execution_options.remote_instance_name or ""), + self.context.utf8_buf(execution_options.remote_ca_certs_path or ""), execution_options.remote_store_thread_count, execution_options.remote_store_chunk_bytes, execution_options.remote_store_chunk_upload_timeout_seconds, diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 46a15de1ee7..b06ab31f5d1 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -38,6 +38,7 @@ class ExecutionOptions(datatype([ 'process_execution_parallelism', 'process_execution_cleanup_local_dirs', 'remote_instance_name', + 'remote_ca_certs_path', ])): """A collection of all options related to (remote) execution of processes. @@ -56,6 +57,7 @@ def from_bootstrap_options(cls, bootstrap_options): process_execution_parallelism=bootstrap_options.process_execution_parallelism, process_execution_cleanup_local_dirs=bootstrap_options.process_execution_cleanup_local_dirs, remote_instance_name=bootstrap_options.remote_instance_name, + remote_ca_certs_path=bootstrap_options.remote_ca_certs_path, ) @@ -68,6 +70,7 @@ def from_bootstrap_options(cls, bootstrap_options): process_execution_parallelism=multiprocessing.cpu_count()*2, process_execution_cleanup_local_dirs=True, remote_instance_name=None, + remote_ca_certs_path=None, ) @@ -284,6 +287,10 @@ def register_bootstrap_options(cls, register): register('--remote-instance-name', advanced=True, help='Name of the remote execution instance to use. Used for routing within ' '--remote-execution-server and --remote-store-server.') + register('--remote-ca-certs-path', advanced=True, + help='Path to a PEM file containing CA certificates used for verifying secure ' + 'connections to --remote-execution-server and --remote-store-server. ' + 'If not specified, TLS will not be used.') # This should eventually deprecate the RunTracker worker count, which is used for legacy cache # lookups via CacheSetup in TaskBase. diff --git a/src/rust/engine/fs/brfs/src/main.rs b/src/rust/engine/fs/brfs/src/main.rs index 398d16f5735..fc0dda3197b 100644 --- a/src/rust/engine/fs/brfs/src/main.rs +++ b/src/rust/engine/fs/brfs/src/main.rs @@ -630,6 +630,12 @@ fn main() { .takes_value(true) .long("remote-instance-name") .required(false), + ).arg( + clap::Arg::with_name("root-ca-cert-file") + .help("Path to file containing root certificate authority certificates. If not set, TLS will not be used when connecting to the remote.") + .takes_value(true) + .long("root-ca-cert-file") + .required(false) ).arg( clap::Arg::with_name("mount-path") .required(true) @@ -651,6 +657,12 @@ fn main() { } } + let root_ca_certs = if let Some(path) = args.value_of("root-ca-cert-file") { + Some(std::fs::read(path).expect("Error reading root CA certs file")) + } else { + None + }; + let pool = Arc::new(fs::ResettablePool::new("brfs-".to_owned())); let store = match args.value_of("server-address") { Some(address) => fs::Store::with_remote( @@ -658,6 +670,7 @@ fn main() { pool, address.to_owned(), args.value_of("remote-instance-name").map(str::to_owned), + root_ca_certs, 1, 4 * 1024 * 1024, std::time::Duration::from_secs(5 * 60), diff --git a/src/rust/engine/fs/fs_util/src/main.rs b/src/rust/engine/fs/fs_util/src/main.rs index f4b37f926be..8770e2f8a19 100644 --- a/src/rust/engine/fs/fs_util/src/main.rs +++ b/src/rust/engine/fs/fs_util/src/main.rs @@ -194,6 +194,13 @@ to this directory.", .long("server-address") .required(false) ) + .arg( + Arg::with_name("root-ca-cert-file") + .help("Path to file containing root certificate authority certificates. If not set, TLS will not be used when connecting to the remote.") + .takes_value(true) + .long("root-ca-cert-file") + .required(false) + ) .arg(Arg::with_name("remote-instance-name") .takes_value(true) .long("remote-instance-name") @@ -227,6 +234,16 @@ fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { Some(cas_address) => { let chunk_size = value_t!(top_match.value_of("chunk-bytes"), usize).expect("Bad chunk-bytes flag"); + + let root_ca_certs = if let Some(path) = top_match.value_of("root-ca-cert-file") { + Some( + std::fs::read(path) + .map_err(|err| format!("Error reading root CA certs file {}: {}", path, err))?, + ) + } else { + None + }; + ( Store::with_remote( &store_dir, @@ -235,6 +252,7 @@ fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { top_match .value_of("remote-instance_name") .map(str::to_owned), + root_ca_certs, 1, chunk_size, // This deadline is really only in place because otherwise DNS failures diff --git a/src/rust/engine/fs/src/store.rs b/src/rust/engine/fs/src/store.rs index c743e1a601a..33a523fb0e5 100644 --- a/src/rust/engine/fs/src/store.rs +++ b/src/rust/engine/fs/src/store.rs @@ -75,11 +75,13 @@ impl Store { /// Make a store which uses local storage, and if it is missing a value which it tries to load, /// will attempt to back-fill its local storage from a remote CAS. /// + #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn with_remote>( path: P, pool: Arc, cas_address: String, instance_name: Option, + root_ca_certs: Option>, thread_count: usize, chunk_size_bytes: usize, timeout: Duration, @@ -89,6 +91,7 @@ impl Store { remote: Some(remote::ByteStore::new( cas_address, instance_name, + root_ca_certs, thread_count, chunk_size_bytes, timeout, @@ -1611,14 +1614,24 @@ mod remote { pub fn new( cas_address: String, instance_name: Option, + root_ca_certs: Option>, thread_count: usize, chunk_size_bytes: usize, upload_timeout: Duration, ) -> ByteStore { let env = Resettable::new(move || Arc::new(grpcio::Environment::new(thread_count))); let env2 = env.clone(); - let channel = - Resettable::new(move || grpcio::ChannelBuilder::new(env2.get()).connect(&cas_address)); + let channel = Resettable::new(move || { + let builder = grpcio::ChannelBuilder::new(env2.get()); + if let Some(ref root_ca_certs) = root_ca_certs { + let creds = grpcio::ChannelCredentialsBuilder::new() + .root_cert(root_ca_certs.clone()) + .build(); + builder.secure_connect(&cas_address, creds) + } else { + builder.connect(&cas_address) + } + }); let channel2 = channel.clone(); let channel3 = channel.clone(); let byte_stream_client = Resettable::new(move || { @@ -1956,7 +1969,14 @@ mod remote { fn write_file_multiple_chunks() { let cas = StubCAS::empty(); - let store = ByteStore::new(cas.address(), None, 1, 10 * 1024, Duration::from_secs(5)); + let store = ByteStore::new( + cas.address(), + None, + None, + 1, + 10 * 1024, + Duration::from_secs(5), + ); let all_the_henries = big_file_bytes(); @@ -2026,6 +2046,7 @@ mod remote { let store = ByteStore::new( "doesnotexist.example".to_owned(), None, + None, 1, 10 * 1024 * 1024, Duration::from_secs(1), @@ -2087,6 +2108,7 @@ mod remote { ByteStore::new( cas.address(), None, + None, 1, 10 * 1024 * 1024, Duration::from_secs(1), @@ -2206,6 +2228,7 @@ mod tests { Arc::new(ResettablePool::new("test-pool-".to_string())), cas_address, None, + None, 1, 10 * 1024 * 1024, Duration::from_secs(1), @@ -2883,6 +2906,7 @@ mod tests { Arc::new(ResettablePool::new("test-pool-".to_string())), cas.address(), Some("dark-tower".to_owned()), + None, 1, 10 * 1024 * 1024, Duration::from_secs(1), @@ -2907,6 +2931,7 @@ mod tests { Arc::new(ResettablePool::new("test-pool-".to_string())), cas.address(), Some("dark-tower".to_owned()), + None, 1, 10 * 1024 * 1024, Duration::from_secs(1), diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index a70fd02a694..86c6fc2f59f 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -225,13 +225,23 @@ impl CommandRunner { pub fn new( address: String, instance_name: Option, + root_ca_certs: Option>, thread_count: usize, store: Store, ) -> CommandRunner { let env = Resettable::new(move || Arc::new(grpcio::Environment::new(thread_count))); let env2 = env.clone(); - let channel = - Resettable::new(move || grpcio::ChannelBuilder::new(env2.get()).connect(&address)); + let channel = Resettable::new(move || { + let builder = grpcio::ChannelBuilder::new(env2.get()); + if let Some(ref root_ca_certs) = root_ca_certs { + let creds = grpcio::ChannelCredentialsBuilder::new() + .root_cert(root_ca_certs.clone()) + .build(); + builder.secure_connect(&address, creds) + } else { + builder.connect(&address) + } + }); let channel2 = channel.clone(); let channel3 = channel.clone(); let execution_client = Resettable::new(move || { @@ -1079,12 +1089,13 @@ mod tests { Arc::new(fs::ResettablePool::new("test-pool-".to_owned())), cas.address(), None, + None, 1, 10 * 1024 * 1024, Duration::from_secs(1), ).expect("Failed to make store"); - let cmd_runner = CommandRunner::new(mock_server.address(), None, 1, store); + let cmd_runner = CommandRunner::new(mock_server.address(), None, None, 1, store); let result = cmd_runner.run(echo_roland_request()).wait(); assert_eq!( result, @@ -1427,6 +1438,7 @@ mod tests { Arc::new(fs::ResettablePool::new("test-pool-".to_owned())), cas.address(), None, + None, 1, 10 * 1024 * 1024, Duration::from_secs(1), @@ -1436,7 +1448,7 @@ mod tests { .wait() .expect("Saving file bytes to store"); - let result = CommandRunner::new(mock_server.address(), None, 1, store) + let result = CommandRunner::new(mock_server.address(), None, None, 1, store) .run(cat_roland_request()) .wait(); assert_eq!( @@ -1485,12 +1497,13 @@ mod tests { Arc::new(fs::ResettablePool::new("test-pool-".to_owned())), cas.address(), None, + None, 1, 10 * 1024 * 1024, Duration::from_secs(1), ).expect("Failed to make store"); - let error = CommandRunner::new(mock_server.address(), None, 1, store) + let error = CommandRunner::new(mock_server.address(), None, None, 1, store) .run(cat_roland_request()) .wait() .expect_err("Want error"); @@ -2030,12 +2043,13 @@ mod tests { Arc::new(fs::ResettablePool::new("test-pool-".to_owned())), cas.address(), None, + None, 1, 10 * 1024 * 1024, Duration::from_secs(1), ).expect("Failed to make store"); - CommandRunner::new(address, None, 1, store) + CommandRunner::new(address, None, None, 1, store) } fn extract_execute_response( diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index fa7ec33610a..a62e96a0f80 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -86,12 +86,26 @@ fn main() { If unspecified, local execution will be performed.", ), ) - .arg( + .arg( + Arg::with_name("execution-root-ca-cert-file") + .help("Path to file containing root certificate authority certificates for the execution server. If not set, TLS will not be used when connecting to the execution server.") + .takes_value(true) + .long("execution-root-ca-cert-file") + .required(false) + ) + .arg( Arg::with_name("cas-server") .long("cas-server") .takes_value(true) .help("The host:port of the gRPC CAS server to connect to."), ) + .arg( + Arg::with_name("cas-root-ca-cert-file") + .help("Path to file containing root certificate authority certificates for the CAS server. If not set, TLS will not be used when connecting to the CAS server.") + .takes_value(true) + .long("cas-root-ca-cert-file") + .required(false) + ) .arg(Arg::with_name("remote-instance-name") .takes_value(true) .long("remote-instance-name") @@ -159,11 +173,18 @@ fn main() { let chunk_size = value_t!(args.value_of("upload-chunk-bytes"), usize).expect("Bad upload-chunk-bytes flag"); + let root_ca_certs = if let Some(path) = args.value_of("cas-root-ca-cert-file") { + Some(std::fs::read(path).expect("Error reading root CA certs file")) + } else { + None + }; + fs::Store::with_remote( local_store_path, pool.clone(), cas_server.to_owned(), remote_instance_arg.clone(), + root_ca_certs, 1, chunk_size, Duration::from_secs(30), @@ -196,12 +217,21 @@ fn main() { }; let runner: Box = match server_arg { - Some(address) => Box::new(process_execution::remote::CommandRunner::new( - address.to_owned(), - remote_instance_arg, - 1, - store, - )), + Some(address) => { + let root_ca_certs = if let Some(path) = args.value_of("execution-root-ca-cert-file") { + Some(std::fs::read(path).expect("Error reading root CA certs file")) + } else { + None + }; + + Box::new(process_execution::remote::CommandRunner::new( + address.to_owned(), + remote_instance_arg, + root_ca_certs, + 1, + store, + )) + } None => Box::new(process_execution::local::CommandRunner::new( store, pool, work_dir, true, )), diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 7ec5d65105f..26cc56485a6 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -1,6 +1,8 @@ // Copyright 2017 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). +use std::fs::File; +use std::io::Read; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::thread; @@ -54,6 +56,7 @@ impl Core { remote_store_server: Option, remote_execution_server: Option, remote_instance_name: Option, + remote_root_ca_certs_path: Option, remote_store_thread_count: usize, remote_store_chunk_bytes: usize, remote_store_chunk_upload_timeout: Duration, @@ -65,6 +68,17 @@ impl Core { Arc::new(Runtime::new().unwrap_or_else(|e| panic!("Could not initialize Runtime: {:?}", e))) }); + // We re-use these certs for both the execution and store service; they're generally tied together. + let root_ca_certs = if let Some(path) = remote_root_ca_certs_path { + Some( + File::open(&path) + .and_then(|f| f.bytes().collect::, _>>()) + .unwrap_or_else(|err| panic!("Error reading root CA certs file {:?}: {}", path, err)), + ) + } else { + None + }; + let store_path = Store::default_path(); let store = safe_create_dir_all_ioerror(&store_path) @@ -75,6 +89,7 @@ impl Core { fs_pool.clone(), address, remote_instance_name.clone(), + root_ca_certs.clone(), remote_store_thread_count, remote_store_chunk_bytes, remote_store_chunk_upload_timeout, @@ -86,6 +101,7 @@ impl Core { Some(address) => Box::new(process_execution::remote::CommandRunner::new( address, remote_instance_name, + root_ca_certs, // Allow for some overhead for bookkeeping threads (if any). process_execution_parallelism + 2, store.clone(), diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 35d6847d7bb..c933122b9b0 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -254,6 +254,7 @@ pub extern "C" fn scheduler_create( remote_store_server: Buffer, remote_execution_server: Buffer, remote_instance_name: Buffer, + remote_root_ca_certs_path_buffer: Buffer, remote_store_thread_count: u64, remote_store_chunk_bytes: u64, remote_store_chunk_upload_timeout_seconds: u64, @@ -300,6 +301,16 @@ pub extern "C" fn scheduler_create( let remote_instance_name_string = remote_instance_name .to_string() .expect("remote_instance_name was not valid UTF8"); + + let remote_root_ca_certs_path = { + let path = remote_root_ca_certs_path_buffer.to_os_string(); + if path.is_empty() { + None + } else { + Some(PathBuf::from(path)) + } + }; + Box::into_raw(Box::new(Scheduler::new(Core::new( root_type_ids.clone(), tasks, @@ -322,6 +333,7 @@ pub extern "C" fn scheduler_create( } else { Some(remote_instance_name_string) }, + remote_root_ca_certs_path, remote_store_thread_count as usize, remote_store_chunk_bytes as usize, Duration::from_secs(remote_store_chunk_upload_timeout_seconds),