Skip to content

Commit

Permalink
Support secure grpc connections (#6584)
Browse files Browse the repository at this point in the history
  • Loading branch information
illicitonion committed Oct 3, 2018
1 parent 3e629b8 commit bfa45c8
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 16 deletions.
2 changes: 2 additions & 0 deletions src/python/pants/engine/native.py
Expand Up @@ -205,6 +205,7 @@
Buffer,
Buffer,
Buffer,
Buffer,
uint64_t,
uint64_t,
uint64_t,
Expand Down Expand Up @@ -855,6 +856,7 @@ def tc(constraint):
self.context.utf8_buf(execution_options.remote_store_server or ""),
self.context.utf8_buf(execution_options.remote_execution_server or ""),
self.context.utf8_buf(execution_options.remote_instance_name or ""),
self.context.utf8_buf(execution_options.remote_ca_certs_path or ""),
execution_options.remote_store_thread_count,
execution_options.remote_store_chunk_bytes,
execution_options.remote_store_chunk_upload_timeout_seconds,
Expand Down
7 changes: 7 additions & 0 deletions src/python/pants/option/global_options.py
Expand Up @@ -38,6 +38,7 @@ class ExecutionOptions(datatype([
'process_execution_parallelism',
'process_execution_cleanup_local_dirs',
'remote_instance_name',
'remote_ca_certs_path',
])):
"""A collection of all options related to (remote) execution of processes.
Expand All @@ -56,6 +57,7 @@ def from_bootstrap_options(cls, bootstrap_options):
process_execution_parallelism=bootstrap_options.process_execution_parallelism,
process_execution_cleanup_local_dirs=bootstrap_options.process_execution_cleanup_local_dirs,
remote_instance_name=bootstrap_options.remote_instance_name,
remote_ca_certs_path=bootstrap_options.remote_ca_certs_path,
)


Expand All @@ -68,6 +70,7 @@ def from_bootstrap_options(cls, bootstrap_options):
process_execution_parallelism=multiprocessing.cpu_count()*2,
process_execution_cleanup_local_dirs=True,
remote_instance_name=None,
remote_ca_certs_path=None,
)


Expand Down Expand Up @@ -284,6 +287,10 @@ def register_bootstrap_options(cls, register):
register('--remote-instance-name', advanced=True,
help='Name of the remote execution instance to use. Used for routing within '
'--remote-execution-server and --remote-store-server.')
register('--remote-ca-certs-path', advanced=True,
help='Path to a PEM file containing CA certificates used for verifying secure '
'connections to --remote-execution-server and --remote-store-server. '
'If not specified, TLS will not be used.')

# This should eventually deprecate the RunTracker worker count, which is used for legacy cache
# lookups via CacheSetup in TaskBase.
Expand Down
13 changes: 13 additions & 0 deletions src/rust/engine/fs/brfs/src/main.rs
Expand Up @@ -630,6 +630,12 @@ fn main() {
.takes_value(true)
.long("remote-instance-name")
.required(false),
).arg(
clap::Arg::with_name("root-ca-cert-file")
.help("Path to file containing root certificate authority certificates. If not set, TLS will not be used when connecting to the remote.")
.takes_value(true)
.long("root-ca-cert-file")
.required(false)
).arg(
clap::Arg::with_name("mount-path")
.required(true)
Expand All @@ -651,13 +657,20 @@ fn main() {
}
}

let root_ca_certs = if let Some(path) = args.value_of("root-ca-cert-file") {
Some(std::fs::read(path).expect("Error reading root CA certs file"))
} else {
None
};

let pool = Arc::new(fs::ResettablePool::new("brfs-".to_owned()));
let store = match args.value_of("server-address") {
Some(address) => fs::Store::with_remote(
&store_path,
pool,
address.to_owned(),
args.value_of("remote-instance-name").map(str::to_owned),
root_ca_certs,
1,
4 * 1024 * 1024,
std::time::Duration::from_secs(5 * 60),
Expand Down
18 changes: 18 additions & 0 deletions src/rust/engine/fs/fs_util/src/main.rs
Expand Up @@ -194,6 +194,13 @@ to this directory.",
.long("server-address")
.required(false)
)
.arg(
Arg::with_name("root-ca-cert-file")
.help("Path to file containing root certificate authority certificates. If not set, TLS will not be used when connecting to the remote.")
.takes_value(true)
.long("root-ca-cert-file")
.required(false)
)
.arg(Arg::with_name("remote-instance-name")
.takes_value(true)
.long("remote-instance-name")
Expand Down Expand Up @@ -227,6 +234,16 @@ fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
Some(cas_address) => {
let chunk_size =
value_t!(top_match.value_of("chunk-bytes"), usize).expect("Bad chunk-bytes flag");

let root_ca_certs = if let Some(path) = top_match.value_of("root-ca-cert-file") {
Some(
std::fs::read(path)
.map_err(|err| format!("Error reading root CA certs file {}: {}", path, err))?,
)
} else {
None
};

(
Store::with_remote(
&store_dir,
Expand All @@ -235,6 +252,7 @@ fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
top_match
.value_of("remote-instance_name")
.map(str::to_owned),
root_ca_certs,
1,
chunk_size,
// This deadline is really only in place because otherwise DNS failures
Expand Down
31 changes: 28 additions & 3 deletions src/rust/engine/fs/src/store.rs
Expand Up @@ -75,11 +75,13 @@ impl Store {
/// Make a store which uses local storage, and if it is missing a value which it tries to load,
/// will attempt to back-fill its local storage from a remote CAS.
///
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn with_remote<P: AsRef<Path>>(
path: P,
pool: Arc<ResettablePool>,
cas_address: String,
instance_name: Option<String>,
root_ca_certs: Option<Vec<u8>>,
thread_count: usize,
chunk_size_bytes: usize,
timeout: Duration,
Expand All @@ -89,6 +91,7 @@ impl Store {
remote: Some(remote::ByteStore::new(
cas_address,
instance_name,
root_ca_certs,
thread_count,
chunk_size_bytes,
timeout,
Expand Down Expand Up @@ -1611,14 +1614,24 @@ mod remote {
pub fn new(
cas_address: String,
instance_name: Option<String>,
root_ca_certs: Option<Vec<u8>>,
thread_count: usize,
chunk_size_bytes: usize,
upload_timeout: Duration,
) -> ByteStore {
let env = Resettable::new(move || Arc::new(grpcio::Environment::new(thread_count)));
let env2 = env.clone();
let channel =
Resettable::new(move || grpcio::ChannelBuilder::new(env2.get()).connect(&cas_address));
let channel = Resettable::new(move || {
let builder = grpcio::ChannelBuilder::new(env2.get());
if let Some(ref root_ca_certs) = root_ca_certs {
let creds = grpcio::ChannelCredentialsBuilder::new()
.root_cert(root_ca_certs.clone())
.build();
builder.secure_connect(&cas_address, creds)
} else {
builder.connect(&cas_address)
}
});
let channel2 = channel.clone();
let channel3 = channel.clone();
let byte_stream_client = Resettable::new(move || {
Expand Down Expand Up @@ -1956,7 +1969,14 @@ mod remote {
fn write_file_multiple_chunks() {
let cas = StubCAS::empty();

let store = ByteStore::new(cas.address(), None, 1, 10 * 1024, Duration::from_secs(5));
let store = ByteStore::new(
cas.address(),
None,
None,
1,
10 * 1024,
Duration::from_secs(5),
);

let all_the_henries = big_file_bytes();

Expand Down Expand Up @@ -2026,6 +2046,7 @@ mod remote {
let store = ByteStore::new(
"doesnotexist.example".to_owned(),
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
Expand Down Expand Up @@ -2087,6 +2108,7 @@ mod remote {
ByteStore::new(
cas.address(),
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
Expand Down Expand Up @@ -2206,6 +2228,7 @@ mod tests {
Arc::new(ResettablePool::new("test-pool-".to_string())),
cas_address,
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
Expand Down Expand Up @@ -2883,6 +2906,7 @@ mod tests {
Arc::new(ResettablePool::new("test-pool-".to_string())),
cas.address(),
Some("dark-tower".to_owned()),
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
Expand All @@ -2907,6 +2931,7 @@ mod tests {
Arc::new(ResettablePool::new("test-pool-".to_string())),
cas.address(),
Some("dark-tower".to_owned()),
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
Expand Down
26 changes: 20 additions & 6 deletions src/rust/engine/process_execution/src/remote.rs
Expand Up @@ -225,13 +225,23 @@ impl CommandRunner {
pub fn new(
address: String,
instance_name: Option<String>,
root_ca_certs: Option<Vec<u8>>,
thread_count: usize,
store: Store,
) -> CommandRunner {
let env = Resettable::new(move || Arc::new(grpcio::Environment::new(thread_count)));
let env2 = env.clone();
let channel =
Resettable::new(move || grpcio::ChannelBuilder::new(env2.get()).connect(&address));
let channel = Resettable::new(move || {
let builder = grpcio::ChannelBuilder::new(env2.get());
if let Some(ref root_ca_certs) = root_ca_certs {
let creds = grpcio::ChannelCredentialsBuilder::new()
.root_cert(root_ca_certs.clone())
.build();
builder.secure_connect(&address, creds)
} else {
builder.connect(&address)
}
});
let channel2 = channel.clone();
let channel3 = channel.clone();
let execution_client = Resettable::new(move || {
Expand Down Expand Up @@ -1079,12 +1089,13 @@ mod tests {
Arc::new(fs::ResettablePool::new("test-pool-".to_owned())),
cas.address(),
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
).expect("Failed to make store");

let cmd_runner = CommandRunner::new(mock_server.address(), None, 1, store);
let cmd_runner = CommandRunner::new(mock_server.address(), None, None, 1, store);
let result = cmd_runner.run(echo_roland_request()).wait();
assert_eq!(
result,
Expand Down Expand Up @@ -1427,6 +1438,7 @@ mod tests {
Arc::new(fs::ResettablePool::new("test-pool-".to_owned())),
cas.address(),
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
Expand All @@ -1436,7 +1448,7 @@ mod tests {
.wait()
.expect("Saving file bytes to store");

let result = CommandRunner::new(mock_server.address(), None, 1, store)
let result = CommandRunner::new(mock_server.address(), None, None, 1, store)
.run(cat_roland_request())
.wait();
assert_eq!(
Expand Down Expand Up @@ -1485,12 +1497,13 @@ mod tests {
Arc::new(fs::ResettablePool::new("test-pool-".to_owned())),
cas.address(),
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
).expect("Failed to make store");

let error = CommandRunner::new(mock_server.address(), None, 1, store)
let error = CommandRunner::new(mock_server.address(), None, None, 1, store)
.run(cat_roland_request())
.wait()
.expect_err("Want error");
Expand Down Expand Up @@ -2030,12 +2043,13 @@ mod tests {
Arc::new(fs::ResettablePool::new("test-pool-".to_owned())),
cas.address(),
None,
None,
1,
10 * 1024 * 1024,
Duration::from_secs(1),
).expect("Failed to make store");

CommandRunner::new(address, None, 1, store)
CommandRunner::new(address, None, None, 1, store)
}

fn extract_execute_response(
Expand Down
44 changes: 37 additions & 7 deletions src/rust/engine/process_executor/src/main.rs
Expand Up @@ -86,12 +86,26 @@ fn main() {
If unspecified, local execution will be performed.",
),
)
.arg(
.arg(
Arg::with_name("execution-root-ca-cert-file")
.help("Path to file containing root certificate authority certificates for the execution server. If not set, TLS will not be used when connecting to the execution server.")
.takes_value(true)
.long("execution-root-ca-cert-file")
.required(false)
)
.arg(
Arg::with_name("cas-server")
.long("cas-server")
.takes_value(true)
.help("The host:port of the gRPC CAS server to connect to."),
)
.arg(
Arg::with_name("cas-root-ca-cert-file")
.help("Path to file containing root certificate authority certificates for the CAS server. If not set, TLS will not be used when connecting to the CAS server.")
.takes_value(true)
.long("cas-root-ca-cert-file")
.required(false)
)
.arg(Arg::with_name("remote-instance-name")
.takes_value(true)
.long("remote-instance-name")
Expand Down Expand Up @@ -159,11 +173,18 @@ fn main() {
let chunk_size =
value_t!(args.value_of("upload-chunk-bytes"), usize).expect("Bad upload-chunk-bytes flag");

let root_ca_certs = if let Some(path) = args.value_of("cas-root-ca-cert-file") {
Some(std::fs::read(path).expect("Error reading root CA certs file"))
} else {
None
};

fs::Store::with_remote(
local_store_path,
pool.clone(),
cas_server.to_owned(),
remote_instance_arg.clone(),
root_ca_certs,
1,
chunk_size,
Duration::from_secs(30),
Expand Down Expand Up @@ -196,12 +217,21 @@ fn main() {
};

let runner: Box<process_execution::CommandRunner> = match server_arg {
Some(address) => Box::new(process_execution::remote::CommandRunner::new(
address.to_owned(),
remote_instance_arg,
1,
store,
)),
Some(address) => {
let root_ca_certs = if let Some(path) = args.value_of("execution-root-ca-cert-file") {
Some(std::fs::read(path).expect("Error reading root CA certs file"))
} else {
None
};

Box::new(process_execution::remote::CommandRunner::new(
address.to_owned(),
remote_instance_arg,
root_ca_certs,
1,
store,
))
}
None => Box::new(process_execution::local::CommandRunner::new(
store, pool, work_dir, true,
)),
Expand Down

0 comments on commit bfa45c8

Please sign in to comment.