Skip to content

Commit

Permalink
refactor Core::new including command runner setup (#10993)
Browse files Browse the repository at this point in the history
### Problem

Core::new is a very convoluted function and it is hard to read what is actually going on in it.

### Solution

Refactor the store and command runner setup into separate functions. (Also refactors the load of some certificates.) This refactor will make the subsequent introduction of remote caching in #10960 easier to comprehend.

### Result

Existing tests pass.
  • Loading branch information
Tom Dyas committed Oct 20, 2020
1 parent bf62b0a commit 73199ee
Showing 1 changed file with 223 additions and 133 deletions.
356 changes: 223 additions & 133 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,204 @@ pub struct ExecutionStrategyOptions {
}

impl Core {
fn make_store(
executor: &Executor,
local_store_dir: &Path,
enable_remote: bool,
remoting_opts: &RemotingOptions,
remote_store_servers: &[String],
root_ca_certs: &Option<Vec<u8>>,
oauth_bearer_token: &Option<String>,
) -> Result<Store, String> {
if enable_remote {
Store::with_remote(
executor.clone(),
local_store_dir,
remote_store_servers.to_vec(),
remoting_opts.instance_name.clone(),
root_ca_certs.clone(),
oauth_bearer_token.clone(),
remoting_opts.store_thread_count,
remoting_opts.store_chunk_bytes,
remoting_opts.store_chunk_upload_timeout,
// TODO: Take a parameter
store::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10))
.unwrap(),
remoting_opts.store_rpc_retries,
remoting_opts.store_connection_limit,
)
} else {
Store::local_only(executor.clone(), local_store_dir.to_path_buf())
}
}

fn make_local_execution_runner(
store: &Store,
executor: &Executor,
local_execution_root_dir: &Path,
named_caches_dir: &Path,
process_execution_metadata: &ProcessMetadata,
exec_strategy_opts: &ExecutionStrategyOptions,
) -> Box<dyn CommandRunner> {
let local_command_runner = process_execution::local::CommandRunner::new(
store.clone(),
executor.clone(),
local_execution_root_dir.to_path_buf(),
NamedCaches::new(named_caches_dir.to_path_buf()),
exec_strategy_opts.cleanup_local_dirs,
);

let maybe_nailgunnable_local_command_runner: Box<dyn process_execution::CommandRunner> =
if exec_strategy_opts.local_enable_nailgun {
Box::new(process_execution::nailgun::CommandRunner::new(
local_command_runner,
process_execution_metadata.clone(),
local_execution_root_dir.to_path_buf(),
executor.clone(),
))
} else {
Box::new(local_command_runner)
};

Box::new(BoundedCommandRunner::new(
maybe_nailgunnable_local_command_runner,
exec_strategy_opts.local_parallelism,
))
}

fn make_remote_execution_runner(
store: &Store,
process_execution_metadata: &ProcessMetadata,
remoting_opts: &RemotingOptions,
root_ca_certs: &Option<Vec<u8>>,
oauth_bearer_token: &Option<String>,
) -> Result<Box<dyn CommandRunner>, String> {
Ok(Box::new(process_execution::remote::CommandRunner::new(
// No problem unwrapping here because the global options validation
// requires the remoting_opts.execution_server be present when
// remoting_opts.execution_enable is set.
&remoting_opts.execution_server.clone().unwrap(),
remoting_opts.store_servers.clone(),
process_execution_metadata.clone(),
root_ca_certs.clone(),
oauth_bearer_token.clone(),
remoting_opts.execution_headers.clone(),
store.clone(),
// TODO if we ever want to configure the remote platform to be something else we
// need to take an option all the way down here and into the remote::CommandRunner struct.
Platform::Linux,
remoting_opts.execution_overall_deadline,
Duration::from_millis(100),
)?))
}

fn make_command_runner(
store: &Store,
executor: &Executor,
local_execution_root_dir: &Path,
named_caches_dir: &Path,
local_store_dir: &Path,
process_execution_metadata: &ProcessMetadata,
root_ca_certs: &Option<Vec<u8>>,
oauth_bearer_token: &Option<String>,
exec_strategy_opts: &ExecutionStrategyOptions,
remoting_opts: &RemotingOptions,
) -> Result<Box<dyn CommandRunner>, String> {
let local_command_runner = Core::make_local_execution_runner(
store,
executor,
local_execution_root_dir,
named_caches_dir,
process_execution_metadata,
&exec_strategy_opts,
);

let command_runner: Box<dyn CommandRunner> = if remoting_opts.execution_enable {
let remote_command_runner: Box<dyn process_execution::CommandRunner> = {
Box::new(BoundedCommandRunner::new(
Core::make_remote_execution_runner(
store,
process_execution_metadata,
&remoting_opts,
root_ca_certs,
oauth_bearer_token,
)?,
exec_strategy_opts.remote_parallelism,
))
};

match exec_strategy_opts.speculation_strategy.as_ref() {
"local_first" => Box::new(SpeculatingCommandRunner::new(
local_command_runner,
remote_command_runner,
exec_strategy_opts.speculation_delay,
)),
"remote_first" => Box::new(SpeculatingCommandRunner::new(
remote_command_runner,
local_command_runner,
exec_strategy_opts.speculation_delay,
)),
"none" => remote_command_runner,
_ => unreachable!(),
}
} else {
local_command_runner
};

let maybe_cached_command_runner = if exec_strategy_opts.use_local_cache {
let process_execution_store = ShardedLmdb::new(
local_store_dir.join("processes"),
5 * GIGABYTES,
executor.clone(),
DEFAULT_LEASE_TIME,
)
.map_err(|err| format!("Could not initialize store for process cache: {:?}", err))?;
Box::new(process_execution::cache::CommandRunner::new(
command_runner.into(),
process_execution_store,
store.clone(),
process_execution_metadata.clone(),
))
} else {
command_runner
};

Ok(maybe_cached_command_runner)
}

fn load_certificates(
ca_certs_path: Option<PathBuf>,
) -> Result<Vec<reqwest::Certificate>, String> {
let certs = match ca_certs_path {
Some(ref path) => {
let mut content = String::new();
std::fs::File::open(path)
.and_then(|mut f| f.read_to_string(&mut content))
.map_err(|err| {
format!(
"Error reading root CA certs file {}: {}",
path.display(),
err
)
})?;
let pem_re = Regex::new(PEM_RE_STR).unwrap();
let certs_res: Result<Vec<reqwest::Certificate>, _> = pem_re
.find_iter(&content)
.map(|mat| reqwest::Certificate::from_pem(mat.as_str().as_bytes()))
.collect();
certs_res.map_err(|err| {
format!(
"Error parsing PEM from root CA certs file {}: {}",
path.display(),
err
)
})?
}
None => Vec::new(),
};
Ok(certs)
}

pub fn new(
executor: Executor,
tasks: Tasks,
Expand Down Expand Up @@ -146,152 +344,44 @@ impl Core {
None
};

let local_store_dir2 = local_store_dir.clone();
let store = safe_create_dir_all_ioerror(&local_store_dir)
.map_err(|e| format!("Error making directory {:?}: {:?}", local_store_dir, e))
.and_then(|_| {
if !remoting_opts.execution_enable || remote_store_servers.is_empty() {
Store::local_only(executor.clone(), local_store_dir)
} else {
Store::with_remote(
executor.clone(),
local_store_dir,
remote_store_servers,
remoting_opts.instance_name.clone(),
root_ca_certs.clone(),
oauth_bearer_token.clone(),
remoting_opts.store_thread_count,
remoting_opts.store_chunk_bytes,
remoting_opts.store_chunk_upload_timeout,
// TODO: Take a parameter
store::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10))
.unwrap(),
remoting_opts.store_rpc_retries,
remoting_opts.store_connection_limit,
)
}
Core::make_store(
&executor,
&local_store_dir,
remoting_opts.execution_enable && !remote_store_servers.is_empty(),
&remoting_opts,
&remote_store_servers,
&root_ca_certs,
&oauth_bearer_token,
)
})
.map_err(|e| format!("Could not initialize Store: {:?}", e))?;

let process_execution_metadata = ProcessMetadata {
instance_name: remoting_opts.instance_name,
cache_key_gen_version: remoting_opts.execution_process_cache_namespace,
platform_properties: remoting_opts.execution_extra_platform_properties,
instance_name: remoting_opts.instance_name.clone(),
cache_key_gen_version: remoting_opts.execution_process_cache_namespace.clone(),
platform_properties: remoting_opts.execution_extra_platform_properties.clone(),
};

let local_command_runner = process_execution::local::CommandRunner::new(
store.clone(),
executor.clone(),
local_execution_root_dir.clone(),
NamedCaches::new(named_caches_dir),
exec_strategy_opts.cleanup_local_dirs,
);

let maybe_nailgunnable_local_command_runner: Box<dyn process_execution::CommandRunner> =
if exec_strategy_opts.local_enable_nailgun {
Box::new(process_execution::nailgun::CommandRunner::new(
local_command_runner,
process_execution_metadata.clone(),
local_execution_root_dir,
executor.clone(),
))
} else {
Box::new(local_command_runner)
};

let mut command_runner: Box<dyn process_execution::CommandRunner> =
Box::new(BoundedCommandRunner::new(
maybe_nailgunnable_local_command_runner,
exec_strategy_opts.local_parallelism,
));

if remoting_opts.execution_enable {
let remote_command_runner: Box<dyn process_execution::CommandRunner> = {
let command_runner: Box<dyn CommandRunner> = {
Box::new(process_execution::remote::CommandRunner::new(
// No problem unwrapping here because the global options validation
// requires the remoting_opts.execution_server be present when
// remoting_opts.execution_enable is set.
&remoting_opts.execution_server.unwrap(),
remoting_opts.store_servers.clone(),
process_execution_metadata.clone(),
root_ca_certs,
oauth_bearer_token,
remoting_opts.execution_headers,
store.clone(),
// TODO if we ever want to configure the remote platform to be something else we
// need to take an option all the way down here and into the remote::CommandRunner struct.
Platform::Linux,
remoting_opts.execution_overall_deadline,
Duration::from_millis(100),
)?)
};
let command_runner = Core::make_command_runner(
&store,
&executor,
&local_execution_root_dir,
&named_caches_dir,
&local_store_dir,
&process_execution_metadata,
&root_ca_certs,
&oauth_bearer_token,
&exec_strategy_opts,
&remoting_opts,
)?;

Box::new(BoundedCommandRunner::new(
command_runner,
exec_strategy_opts.remote_parallelism,
))
};
command_runner = match exec_strategy_opts.speculation_strategy.as_ref() {
"local_first" => Box::new(SpeculatingCommandRunner::new(
command_runner,
remote_command_runner,
exec_strategy_opts.speculation_delay,
)),
"remote_first" => Box::new(SpeculatingCommandRunner::new(
remote_command_runner,
command_runner,
exec_strategy_opts.speculation_delay,
)),
"none" => remote_command_runner,
_ => unreachable!(),
};
}

if exec_strategy_opts.use_local_cache {
let process_execution_store = ShardedLmdb::new(
local_store_dir2.join("processes"),
5 * GIGABYTES,
executor.clone(),
DEFAULT_LEASE_TIME,
)
.map_err(|err| format!("Could not initialize store for process cache: {:?}", err))?;
command_runner = Box::new(process_execution::cache::CommandRunner::new(
command_runner.into(),
process_execution_store,
store.clone(),
process_execution_metadata,
));
}
let graph = Arc::new(InvalidatableGraph(Graph::new()));

// These certs are for downloads, not to be confused with the ones used for remoting.
let ca_certs = if let Some(ref path) = ca_certs_path {
let mut content = String::new();
std::fs::File::open(path)
.and_then(|mut f| f.read_to_string(&mut content))
.map_err(|err| {
format!(
"Error reading root CA certs file {}: {}",
path.display(),
err
)
})?;
let pem_re = Regex::new(PEM_RE_STR).unwrap();
let certs_res: Result<Vec<reqwest::Certificate>, _> = pem_re
.find_iter(&content)
.map(|mat| reqwest::Certificate::from_pem(mat.as_str().as_bytes()))
.collect();
certs_res.map_err(|err| {
format!(
"Error parsing PEM from root CA certs file {}: {}",
path.display(),
err
)
})?
} else {
Vec::new()
};
let ca_certs = Core::load_certificates(ca_certs_path)?;

let http_client_builder = ca_certs
.iter()
Expand Down

0 comments on commit 73199ee

Please sign in to comment.