Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor Core::new including command runner setup #10993

Merged
merged 3 commits into from
Oct 20, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
356 changes: 223 additions & 133 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,204 @@ pub struct ExecutionStrategyOptions {
}

impl Core {
fn make_store(
executor: &Executor,
local_store_dir: &Path,
enable_remote: bool,
remoting_opts: &RemotingOptions,
remote_store_servers: &[String],
root_ca_certs: &Option<Vec<u8>>,
oauth_bearer_token: &Option<String>,
) -> Result<Store, String> {
if enable_remote {
Store::with_remote(
executor.clone(),
local_store_dir,
remote_store_servers.to_vec(),
remoting_opts.instance_name.clone(),
root_ca_certs.clone(),
oauth_bearer_token.clone(),
remoting_opts.store_thread_count,
remoting_opts.store_chunk_bytes,
remoting_opts.store_chunk_upload_timeout,
// TODO: Take a parameter
store::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10))
.unwrap(),
remoting_opts.store_rpc_retries,
remoting_opts.store_connection_limit,
)
} else {
Store::local_only(executor.clone(), local_store_dir.to_path_buf())
}
}

fn make_local_execution_runner(
store: &Store,
executor: &Executor,
local_execution_root_dir: &Path,
named_caches_dir: &Path,
process_execution_metadata: &ProcessMetadata,
exec_strategy_opts: &ExecutionStrategyOptions,
) -> Box<dyn CommandRunner> {
let local_command_runner = process_execution::local::CommandRunner::new(
store.clone(),
executor.clone(),
local_execution_root_dir.to_path_buf(),
NamedCaches::new(named_caches_dir.to_path_buf()),
exec_strategy_opts.cleanup_local_dirs,
);

let maybe_nailgunnable_local_command_runner: Box<dyn process_execution::CommandRunner> =
if exec_strategy_opts.local_enable_nailgun {
Box::new(process_execution::nailgun::CommandRunner::new(
local_command_runner,
process_execution_metadata.clone(),
local_execution_root_dir.to_path_buf(),
executor.clone(),
))
} else {
Box::new(local_command_runner)
};

Box::new(BoundedCommandRunner::new(
maybe_nailgunnable_local_command_runner,
exec_strategy_opts.local_parallelism,
))
}

fn make_remote_execution_runner(
store: &Store,
process_execution_metadata: &ProcessMetadata,
remoting_opts: &RemotingOptions,
root_ca_certs: &Option<Vec<u8>>,
oauth_bearer_token: &Option<String>,
) -> Result<Box<dyn CommandRunner>, String> {
Ok(Box::new(process_execution::remote::CommandRunner::new(
// No problem unwrapping here because the global options validation
// requires the remoting_opts.execution_server be present when
// remoting_opts.execution_enable is set.
&remoting_opts.execution_server.clone().unwrap(),
remoting_opts.store_servers.clone(),
process_execution_metadata.clone(),
root_ca_certs.clone(),
oauth_bearer_token.clone(),
remoting_opts.execution_headers.clone(),
store.clone(),
// TODO if we ever want to configure the remote platform to be something else we
// need to take an option all the way down here and into the remote::CommandRunner struct.
Platform::Linux,
remoting_opts.execution_overall_deadline,
Duration::from_millis(100),
)?))
}

fn make_command_runner(
store: &Store,
executor: &Executor,
local_execution_root_dir: &Path,
named_caches_dir: &Path,
local_store_dir: &Path,
process_execution_metadata: &ProcessMetadata,
root_ca_certs: &Option<Vec<u8>>,
oauth_bearer_token: &Option<String>,
exec_strategy_opts: &ExecutionStrategyOptions,
remoting_opts: &RemotingOptions,
) -> Result<Box<dyn CommandRunner>, String> {
let local_command_runner = Core::make_local_execution_runner(
store,
executor,
local_execution_root_dir,
named_caches_dir,
process_execution_metadata,
&exec_strategy_opts,
);

let command_runner: Box<dyn CommandRunner> = if remoting_opts.execution_enable {
let remote_command_runner: Box<dyn process_execution::CommandRunner> = {
Box::new(BoundedCommandRunner::new(
Core::make_remote_execution_runner(
store,
process_execution_metadata,
&remoting_opts,
root_ca_certs,
oauth_bearer_token,
)?,
exec_strategy_opts.remote_parallelism,
))
};

match exec_strategy_opts.speculation_strategy.as_ref() {
"local_first" => Box::new(SpeculatingCommandRunner::new(
local_command_runner,
remote_command_runner,
exec_strategy_opts.speculation_delay,
)),
"remote_first" => Box::new(SpeculatingCommandRunner::new(
remote_command_runner,
local_command_runner,
exec_strategy_opts.speculation_delay,
)),
"none" => remote_command_runner,
_ => unreachable!(),
}
} else {
local_command_runner
};

let maybe_cached_command_runner = if exec_strategy_opts.use_local_cache {
let process_execution_store = ShardedLmdb::new(
local_store_dir.join("processes"),
5 * GIGABYTES,
executor.clone(),
DEFAULT_LEASE_TIME,
)
.map_err(|err| format!("Could not initialize store for process cache: {:?}", err))?;
Box::new(process_execution::cache::CommandRunner::new(
command_runner.into(),
process_execution_store,
store.clone(),
process_execution_metadata.clone(),
))
} else {
command_runner
};

Ok(maybe_cached_command_runner)
}

fn load_certificates(
ca_certs_path: Option<PathBuf>,
) -> Result<Vec<reqwest::Certificate>, String> {
let certs = match ca_certs_path {
Some(ref path) => {
let mut content = String::new();
std::fs::File::open(path)
.and_then(|mut f| f.read_to_string(&mut content))
.map_err(|err| {
format!(
"Error reading root CA certs file {}: {}",
path.display(),
err
)
})?;
let pem_re = Regex::new(PEM_RE_STR).unwrap();
let certs_res: Result<Vec<reqwest::Certificate>, _> = pem_re
.find_iter(&content)
.map(|mat| reqwest::Certificate::from_pem(mat.as_str().as_bytes()))
.collect();
certs_res.map_err(|err| {
format!(
"Error parsing PEM from root CA certs file {}: {}",
path.display(),
err
)
})?
}
None => Vec::new(),
};
Ok(certs)
}

pub fn new(
executor: Executor,
tasks: Tasks,
Expand Down Expand Up @@ -146,152 +344,44 @@ impl Core {
None
};

let local_store_dir2 = local_store_dir.clone();
let store = safe_create_dir_all_ioerror(&local_store_dir)
.map_err(|e| format!("Error making directory {:?}: {:?}", local_store_dir, e))
.and_then(|_| {
if !remoting_opts.execution_enable || remote_store_servers.is_empty() {
Store::local_only(executor.clone(), local_store_dir)
} else {
Store::with_remote(
executor.clone(),
local_store_dir,
remote_store_servers,
remoting_opts.instance_name.clone(),
root_ca_certs.clone(),
oauth_bearer_token.clone(),
remoting_opts.store_thread_count,
remoting_opts.store_chunk_bytes,
remoting_opts.store_chunk_upload_timeout,
// TODO: Take a parameter
store::BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10))
.unwrap(),
remoting_opts.store_rpc_retries,
remoting_opts.store_connection_limit,
)
}
Core::make_store(
&executor,
&local_store_dir,
remoting_opts.execution_enable && !remote_store_servers.is_empty(),
&remoting_opts,
&remote_store_servers,
&root_ca_certs,
&oauth_bearer_token,
)
})
.map_err(|e| format!("Could not initialize Store: {:?}", e))?;

let process_execution_metadata = ProcessMetadata {
instance_name: remoting_opts.instance_name,
cache_key_gen_version: remoting_opts.execution_process_cache_namespace,
platform_properties: remoting_opts.execution_extra_platform_properties,
instance_name: remoting_opts.instance_name.clone(),
cache_key_gen_version: remoting_opts.execution_process_cache_namespace.clone(),
platform_properties: remoting_opts.execution_extra_platform_properties.clone(),
};

let local_command_runner = process_execution::local::CommandRunner::new(
store.clone(),
executor.clone(),
local_execution_root_dir.clone(),
NamedCaches::new(named_caches_dir),
exec_strategy_opts.cleanup_local_dirs,
);

let maybe_nailgunnable_local_command_runner: Box<dyn process_execution::CommandRunner> =
if exec_strategy_opts.local_enable_nailgun {
Box::new(process_execution::nailgun::CommandRunner::new(
local_command_runner,
process_execution_metadata.clone(),
local_execution_root_dir,
executor.clone(),
))
} else {
Box::new(local_command_runner)
};

let mut command_runner: Box<dyn process_execution::CommandRunner> =
Box::new(BoundedCommandRunner::new(
maybe_nailgunnable_local_command_runner,
exec_strategy_opts.local_parallelism,
));

if remoting_opts.execution_enable {
let remote_command_runner: Box<dyn process_execution::CommandRunner> = {
let command_runner: Box<dyn CommandRunner> = {
Box::new(process_execution::remote::CommandRunner::new(
// No problem unwrapping here because the global options validation
// requires the remoting_opts.execution_server be present when
// remoting_opts.execution_enable is set.
&remoting_opts.execution_server.unwrap(),
remoting_opts.store_servers.clone(),
process_execution_metadata.clone(),
root_ca_certs,
oauth_bearer_token,
remoting_opts.execution_headers,
store.clone(),
// TODO if we ever want to configure the remote platform to be something else we
// need to take an option all the way down here and into the remote::CommandRunner struct.
Platform::Linux,
remoting_opts.execution_overall_deadline,
Duration::from_millis(100),
)?)
};
let command_runner = Core::make_command_runner(
&store,
&executor,
&local_execution_root_dir,
&named_caches_dir,
&local_store_dir,
&process_execution_metadata,
&root_ca_certs,
&oauth_bearer_token,
&exec_strategy_opts,
&remoting_opts,
)?;

Box::new(BoundedCommandRunner::new(
command_runner,
exec_strategy_opts.remote_parallelism,
))
};
command_runner = match exec_strategy_opts.speculation_strategy.as_ref() {
"local_first" => Box::new(SpeculatingCommandRunner::new(
command_runner,
remote_command_runner,
exec_strategy_opts.speculation_delay,
)),
"remote_first" => Box::new(SpeculatingCommandRunner::new(
remote_command_runner,
command_runner,
exec_strategy_opts.speculation_delay,
)),
"none" => remote_command_runner,
_ => unreachable!(),
};
}

if exec_strategy_opts.use_local_cache {
let process_execution_store = ShardedLmdb::new(
local_store_dir2.join("processes"),
5 * GIGABYTES,
executor.clone(),
DEFAULT_LEASE_TIME,
)
.map_err(|err| format!("Could not initialize store for process cache: {:?}", err))?;
command_runner = Box::new(process_execution::cache::CommandRunner::new(
command_runner.into(),
process_execution_store,
store.clone(),
process_execution_metadata,
));
}
let graph = Arc::new(InvalidatableGraph(Graph::new()));

// These certs are for downloads, not to be confused with the ones used for remoting.
let ca_certs = if let Some(ref path) = ca_certs_path {
let mut content = String::new();
std::fs::File::open(path)
.and_then(|mut f| f.read_to_string(&mut content))
.map_err(|err| {
format!(
"Error reading root CA certs file {}: {}",
path.display(),
err
)
})?;
let pem_re = Regex::new(PEM_RE_STR).unwrap();
let certs_res: Result<Vec<reqwest::Certificate>, _> = pem_re
.find_iter(&content)
.map(|mat| reqwest::Certificate::from_pem(mat.as_str().as_bytes()))
.collect();
certs_res.map_err(|err| {
format!(
"Error parsing PEM from root CA certs file {}: {}",
path.display(),
err
)
})?
} else {
Vec::new()
};
let ca_certs = Core::load_certificates(ca_certs_path)?;

let http_client_builder = ca_certs
.iter()
Expand Down