Skip to content

Commit

Permalink
Enable TLS for remoting in Python instead of Rust
Browse files Browse the repository at this point in the history
[ci skip-rust]
[ci skip-build-wheels]
  • Loading branch information
Eric-Arellano committed Feb 17, 2021
1 parent 9d02f99 commit 15227fa
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 66 deletions.
4 changes: 2 additions & 2 deletions src/python/pants/engine/internals/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ def new_scheduler(

remoting_options = PyRemotingOptions(
execution_enable=execution_options.remote_execution,
store_servers=execution_options.remote_store_server,
execution_server=execution_options.remote_execution_server,
store_addresses=execution_options.remote_store_addresses,
execution_address=execution_options.remote_execution_address,
execution_process_cache_namespace=execution_options.process_execution_cache_namespace,
instance_name=execution_options.remote_instance_name,
root_ca_certs_path=execution_options.remote_ca_certs_path,
Expand Down
26 changes: 19 additions & 7 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class ExecutionOptions:
process_execution_use_local_cache: bool
process_execution_local_enable_nailgun: bool

remote_store_server: List[str]
remote_store_addresses: list[str]
remote_store_headers: Dict[str, str]
remote_store_thread_count: int
remote_store_chunk_bytes: Any
Expand All @@ -124,7 +124,7 @@ class ExecutionOptions:

remote_cache_eager_fetch: bool

remote_execution_server: Optional[str]
remote_execution_address: str | None
remote_execution_extra_platform_properties: List[str]
remote_execution_headers: Dict[str, str]
remote_execution_overall_deadline_secs: int
Expand Down Expand Up @@ -184,6 +184,16 @@ def from_options(cls, options: Options) -> ExecutionOptions:
remote_execution_headers = auth_plugin_result.execution_headers
remote_store_headers = auth_plugin_result.store_headers

# Prefix the remote servers with a scheme.
remote_address_scheme = "https://" if bootstrap_options.remote_ca_certs_path else "http://"
remote_execution_address = (
f"{remote_address_scheme}{bootstrap_options.remote_execution_server}"
if bootstrap_options.remote_execution_server else None
)
remote_store_addresses = [
f"{remote_address_scheme}{addr}" for addr in bootstrap_options.remote_store_server
]

return cls(
# Remote execution strategy.
remote_execution=remote_execution,
Expand All @@ -200,7 +210,7 @@ def from_options(cls, options: Options) -> ExecutionOptions:
process_execution_cache_namespace=bootstrap_options.process_execution_cache_namespace,
process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun,
# Remote store setup.
remote_store_server=bootstrap_options.remote_store_server,
remote_store_addresses=remote_store_addresses,
remote_store_headers=remote_store_headers,
remote_store_thread_count=bootstrap_options.remote_store_thread_count,
remote_store_chunk_bytes=bootstrap_options.remote_store_chunk_bytes,
Expand All @@ -213,7 +223,7 @@ def from_options(cls, options: Options) -> ExecutionOptions:
# Remote cache setup.
remote_cache_eager_fetch=bootstrap_options.remote_cache_eager_fetch,
# Remote execution setup.
remote_execution_server=bootstrap_options.remote_execution_server,
remote_execution_address=remote_execution_address,
remote_execution_extra_platform_properties=bootstrap_options.remote_execution_extra_platform_properties,
remote_execution_headers=remote_execution_headers,
remote_execution_overall_deadline_secs=bootstrap_options.remote_execution_overall_deadline_secs,
Expand Down Expand Up @@ -241,7 +251,7 @@ def from_options(cls, options: Options) -> ExecutionOptions:
process_execution_use_local_cache=True,
process_execution_local_enable_nailgun=False,
# Remote store setup.
remote_store_server=[],
remote_store_addresses=[],
remote_store_headers={},
remote_store_thread_count=1,
remote_store_chunk_bytes=1024 * 1024,
Expand All @@ -254,7 +264,7 @@ def from_options(cls, options: Options) -> ExecutionOptions:
# Remote cache setup.
remote_cache_eager_fetch=True,
# Remote execution setup.
remote_execution_server=None,
remote_execution_address=None,
remote_execution_extra_platform_properties=[],
remote_execution_headers={},
remote_execution_overall_deadline_secs=60 * 60, # one hour
Expand Down Expand Up @@ -867,7 +877,7 @@ def register_bootstrap_options(cls, register):
"--remote-store-server",
advanced=True,
type=list,
default=DEFAULT_EXECUTION_OPTIONS.remote_store_server,
default=DEFAULT_EXECUTION_OPTIONS.remote_store_addresses,
help="host:port of grpc server to use as remote execution file store.",
)
register(
Expand Down Expand Up @@ -953,6 +963,8 @@ def register_bootstrap_options(cls, register):
register(
"--remote-execution-server",
advanced=True,
type=str,
default=DEFAULT_EXECUTION_OPTIONS.remote_execution_address,
help="host:port of grpc server to use as remote execution scheduler.",
)
register(
Expand Down
12 changes: 1 addition & 11 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,7 @@ impl ByteStore {
None => None,
};

let scheme = if tls_client_config.is_some() {
"https"
} else {
"http"
};
let cas_addresses_with_scheme: Vec<_> = cas_addresses
.iter()
.map(|addr| format!("{}://{}", scheme, addr))
.collect();

let (endpoints, errors): (Vec<Endpoint>, Vec<String>) = cas_addresses_with_scheme
let (endpoints, errors): (Vec<Endpoint>, Vec<String>) = cas_addresses
.iter()
.map(|addr| grpc_util::create_endpoint(addr, tls_client_config.as_ref()))
.partition_map(|result| match result {
Expand Down
29 changes: 8 additions & 21 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl CommandRunner {
/// Construct a new CommandRunner
pub fn new(
address: &str,
store_servers: Vec<String>,
store_addresses: Vec<String>,
metadata: ProcessMetadata,
root_ca_certs: Option<Vec<u8>>,
headers: BTreeMap<String, String>,
Expand All @@ -133,39 +133,26 @@ impl CommandRunner {
_ => None,
};

let scheme = if tls_client_config.is_some() {
"https"
} else {
"http"
};
let address_with_scheme = format!("{}://{}", scheme, address);

let interceptor = if headers.is_empty() {
None
} else {
Some(Interceptor::new(headers_to_interceptor_fn(&headers)?))
};

let endpoint = grpc_util::create_endpoint(&address_with_scheme, tls_client_config.as_ref())?;
let endpoint = grpc_util::create_endpoint(&address, tls_client_config.as_ref())?;
let channel = tonic::transport::Channel::balance_list(vec![endpoint].into_iter());
let execution_client = Arc::new(match interceptor.as_ref() {
Some(interceptor) => ExecutionClient::with_interceptor(channel.clone(), interceptor.clone()),
None => ExecutionClient::new(channel.clone()),
});

let store_servers_with_scheme: Vec<_> = store_servers
let (store_endpoints, store_endpoints_errors): (Vec<Endpoint>, Vec<String>) = store_addresses
.iter()
.map(|addr| format!("{}://{}", scheme, addr))
.collect();

let (store_endpoints, store_endpoints_errors): (Vec<Endpoint>, Vec<String>) =
store_servers_with_scheme
.iter()
.map(|addr| grpc_util::create_endpoint(addr.as_str(), tls_client_config.as_ref()))
.partition_map(|result| match result {
Ok(endpoint) => Either::Left(endpoint),
Err(err) => Either::Right(err),
});
.map(|addr| grpc_util::create_endpoint(addr.as_str(), tls_client_config.as_ref()))
.partition_map(|result| match result {
Ok(endpoint) => Either::Left(endpoint),
Err(err) => Either::Right(err),
});

if !store_endpoints_errors.is_empty() {
return Err(format!(
Expand Down
9 changes: 1 addition & 8 deletions src/rust/engine/process_execution/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,7 @@ impl CommandRunner {
_ => None,
};

let scheme = if tls_client_config.is_some() {
"https"
} else {
"http"
};
let address_with_scheme = format!("{}://{}", scheme, action_cache_address);

let endpoint = grpc_util::create_endpoint(&address_with_scheme, tls_client_config.as_ref())?;
let endpoint = grpc_util::create_endpoint(&action_cache_address, tls_client_config.as_ref())?;
let channel = tonic::transport::Channel::balance_list(vec![endpoint].into_iter());
let action_cache_client = Arc::new(if headers.is_empty() {
ActionCacheClient::new(channel)
Expand Down
26 changes: 13 additions & 13 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ pub struct Core {
#[derive(Clone, Debug)]
pub struct RemotingOptions {
pub execution_enable: bool,
pub store_servers: Vec<String>,
pub execution_server: Option<String>,
pub store_addresses: Vec<String>,
pub execution_address: Option<String>,
pub execution_process_cache_namespace: Option<String>,
pub instance_name: Option<String>,
pub root_ca_certs_path: Option<PathBuf>,
Expand Down Expand Up @@ -106,7 +106,7 @@ impl Core {
local_store_dir: &Path,
enable_remote: bool,
remoting_opts: &RemotingOptions,
remote_store_servers: &[String],
remote_store_addresses: &[String],
root_ca_certs: &Option<Vec<u8>>,
) -> Result<Store, String> {
if enable_remote {
Expand All @@ -118,7 +118,7 @@ impl Core {
Store::with_remote(
executor.clone(),
local_store_dir,
remote_store_servers.to_vec(),
remote_store_addresses.to_vec(),
remoting_opts.instance_name.clone(),
root_ca_certs.clone(),
remoting_opts.store_headers.clone(),
Expand Down Expand Up @@ -178,8 +178,8 @@ impl Core {
// No problem unwrapping here because the global options validation
// requires the remoting_opts.execution_server be present when
// remoting_opts.execution_enable is set.
&remoting_opts.execution_server.clone().unwrap(),
remoting_opts.store_servers.clone(),
&remoting_opts.execution_address.clone().unwrap(),
remoting_opts.store_addresses.clone(),
process_execution_metadata.clone(),
root_ca_certs.clone(),
remoting_opts.execution_headers.clone(),
Expand All @@ -194,7 +194,7 @@ impl Core {

fn make_command_runner(
full_store: &Store,
remote_store_servers: &[String],
remote_store_addresses: &[String],
executor: &Executor,
local_execution_root_dir: &Path,
named_caches_dir: &Path,
Expand Down Expand Up @@ -239,7 +239,7 @@ impl Core {
exec_strategy_opts.remote_parallelism,
))
} else if remote_caching_used {
let action_cache_address = remote_store_servers
let action_cache_address = remote_store_addresses
.first()
.ok_or_else(|| "At least one remote store must be specified".to_owned())?;
Box::new(process_execution::remote_cache::CommandRunner::new(
Expand Down Expand Up @@ -330,8 +330,8 @@ impl Core {
exec_strategy_opts: ExecutionStrategyOptions,
) -> Result<Core, String> {
// Randomize CAS address order to avoid thundering herds from common config.
let mut remote_store_servers = remoting_opts.store_servers.clone();
remote_store_servers.shuffle(&mut rand::thread_rng());
let mut remote_store_addresses = remoting_opts.store_addresses.clone();
remote_store_addresses.shuffle(&mut rand::thread_rng());

// We re-use these certs for both the execution and store service; they're generally tied together.
let root_ca_certs = if let Some(ref path) = remoting_opts.root_ca_certs_path {
Expand All @@ -346,7 +346,7 @@ impl Core {
let need_remote_store = remoting_opts.execution_enable
|| exec_strategy_opts.remote_cache_read
|| exec_strategy_opts.remote_cache_write;
if need_remote_store && remote_store_servers.is_empty() {
if need_remote_store && remote_store_addresses.is_empty() {
return Err("Remote store required but none provided".into());
}

Expand All @@ -357,7 +357,7 @@ impl Core {
&local_store_dir,
need_remote_store,
&remoting_opts,
&remote_store_servers,
&remote_store_addresses,
&root_ca_certs,
)
.map_err(|e| format!("Could not initialize Store: {:?}", e))?;
Expand All @@ -382,7 +382,7 @@ impl Core {

let command_runner = Self::make_command_runner(
&full_store,
&remote_store_servers,
&remote_store_addresses,
&executor,
&local_execution_root_dir,
&named_caches_dir,
Expand Down
8 changes: 4 additions & 4 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,8 @@ py_class!(class PyRemotingOptions |py| {
def __new__(
_cls,
execution_enable: bool,
store_servers: Vec<String>,
execution_server: Option<String>,
store_addresses: Vec<String>,
execution_address: Option<String>,
execution_process_cache_namespace: Option<String>,
instance_name: Option<String>,
root_ca_certs_path: Option<String>,
Expand All @@ -565,8 +565,8 @@ py_class!(class PyRemotingOptions |py| {
Self::create_instance(py,
RemotingOptions {
execution_enable,
store_servers,
execution_server,
store_addresses,
execution_address,
execution_process_cache_namespace,
instance_name,
root_ca_certs_path: root_ca_certs_path.map(PathBuf::from),
Expand Down

0 comments on commit 15227fa

Please sign in to comment.