Skip to content

Commit

Permalink
Wire up environment variable logic to BoundedCommandRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
gshuflin committed Jul 8, 2020
1 parent f89e834 commit 1550a95
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/python/pants/engine/internals/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def new_scheduler(
use_gitignore,
root_subject_types,
remoting_options,
execution_options.process_execution_slot_environment_variable,
execution_options.process_execution_local_parallelism,
execution_options.process_execution_remote_parallelism,
execution_options.process_execution_cleanup_local_dirs,
Expand Down
10 changes: 10 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class ExecutionOptions:
remote_store_chunk_upload_timeout_seconds: Any
remote_store_rpc_retries: Any
remote_store_connection_limit: Any
process_execution_slot_environment_variable: str
process_execution_local_parallelism: Any
process_execution_remote_parallelism: Any
process_execution_cleanup_local_dirs: Any
Expand Down Expand Up @@ -99,6 +100,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_store_chunk_upload_timeout_seconds=bootstrap_options.remote_store_chunk_upload_timeout_seconds,
remote_store_rpc_retries=bootstrap_options.remote_store_rpc_retries,
remote_store_connection_limit=bootstrap_options.remote_store_connection_limit,
process_execution_slot_environment_variable=bootstrap_options.process_execution_slot_environment_variable,
process_execution_local_parallelism=bootstrap_options.process_execution_local_parallelism,
process_execution_remote_parallelism=bootstrap_options.process_execution_remote_parallelism,
process_execution_cleanup_local_dirs=bootstrap_options.process_execution_cleanup_local_dirs,
Expand Down Expand Up @@ -126,6 +128,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_store_chunk_upload_timeout_seconds=60,
remote_store_rpc_retries=2,
remote_store_connection_limit=5,
process_execution_slot_environment_variable="",
process_execution_local_parallelism=multiprocessing.cpu_count() * 2,
process_execution_remote_parallelism=128,
process_execution_cleanup_local_dirs=True,
Expand Down Expand Up @@ -792,6 +795,13 @@ def register_bootstrap_options(cls, register):
advanced=True,
help="Overall timeout in seconds for each remote execution request from time of submission",
)
register(
"--process-execution-slot-environment-variable",
type=str,
default=DEFAULT_EXECUTION_OPTIONS.process_execution_slot_environment_variable,
advanced=True,
help="If a non-empty string, the process execution slot id (an integer) will be exposed to a running process under this environment variable.",
)
register(
"--process-execution-local-parallelism",
type=int,
Expand Down
27 changes: 25 additions & 2 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ impl MultiPlatformProcess {
pub fn workunit_name(&self) -> String {
"multi_platform_process".to_string()
}

fn add_env_variable(self, name: String, value: String) -> MultiPlatformProcess {
let mut inner = self.0;
for (_, process) in inner.iter_mut() {
process.env.insert(name.clone(), value.clone());
}
MultiPlatformProcess(inner)
}
}

impl From<Process> for MultiPlatformProcess {
Expand Down Expand Up @@ -493,12 +501,18 @@ pub fn digest(req: MultiPlatformProcess, metadata: &ProcessMetadata) -> Digest {
#[derive(Clone)]
pub struct BoundedCommandRunner {
inner: Arc<(Box<dyn CommandRunner>, AsyncSemaphore)>,
execution_slot_variable: Option<String>,
}

impl BoundedCommandRunner {
pub fn new(inner: Box<dyn CommandRunner>, bound: usize) -> BoundedCommandRunner {
pub fn new(
inner: Box<dyn CommandRunner>,
bound: usize,
execution_slot_variable: Option<String>,
) -> BoundedCommandRunner {
BoundedCommandRunner {
inner: Arc::new((inner, AsyncSemaphore::new(bound))),
execution_slot_variable,
}
}
}
Expand All @@ -518,6 +532,9 @@ impl CommandRunner for BoundedCommandRunner {
let desc = req
.user_facing_name()
.unwrap_or_else(|| "<Unnamed process>".to_string());

let execution_slot_variable = self.execution_slot_variable.clone();

let outer_metadata = WorkunitMetadata {
desc: Some(desc.clone()),
level: Level::Debug,
Expand All @@ -532,7 +549,7 @@ impl CommandRunner for BoundedCommandRunner {
let context = context.clone();
let name = format!("{}-running", req.workunit_name());

semaphore.with_acquired(move |_id| {
semaphore.with_acquired(move |concurrency_id| {
let metadata = WorkunitMetadata {
desc: Some(desc),
message: None,
Expand All @@ -556,6 +573,12 @@ impl CommandRunner for BoundedCommandRunner {
},
};

let req = if let Some(slot_variable) = execution_slot_variable {
req.add_env_variable(slot_variable, format!("{}", concurrency_id))
} else {
req
};

with_workunit(
context.workunit_store.clone(),
name,
Expand Down
3 changes: 3 additions & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl Core {
local_execution_root_dir: PathBuf,
named_caches_dir: PathBuf,
remoting_opts: RemotingOptions,
process_execution_slot_environment_variable: Option<String>,
process_execution_local_parallelism: usize,
process_execution_remote_parallelism: usize,
process_execution_cleanup_local_dirs: bool,
Expand Down Expand Up @@ -183,6 +184,7 @@ impl Core {
Box::new(BoundedCommandRunner::new(
maybe_nailgunnable_local_command_runner,
process_execution_local_parallelism,
process_execution_slot_environment_variable.clone(),
));

if remoting_opts.execution_enable {
Expand Down Expand Up @@ -229,6 +231,7 @@ impl Core {
Box::new(BoundedCommandRunner::new(
command_runner,
process_execution_remote_parallelism,
process_execution_slot_environment_variable,
))
};
command_runner = match process_execution_speculation_strategy.as_ref() {
Expand Down
11 changes: 11 additions & 0 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ py_module_initializer!(native_engine, |py, m| {
use_gitignore: bool,
root_type_ids: Vec<PyType>,
remoting_options: PyRemotingOptions,
process_execution_slot_environment_variable: String,
process_execution_local_parallelism: u64,
process_execution_remote_parallelism: u64,
process_execution_cleanup_local_dirs: bool,
Expand Down Expand Up @@ -743,6 +744,7 @@ fn scheduler_create(
use_gitignore: bool,
root_type_ids: Vec<PyType>,
remoting_options: PyRemotingOptions,
process_execution_slot_environment_variable: String,
process_execution_local_parallelism: u64,
process_execution_remote_parallelism: u64,
process_execution_cleanup_local_dirs: bool,
Expand All @@ -755,6 +757,14 @@ fn scheduler_create(
Ok(msg) => debug!("{}", msg),
Err(e) => warn!("{}", e),
}

let process_execution_slot_environment_variable =
if process_execution_slot_environment_variable.is_empty() {
None
} else {
Some(process_execution_slot_environment_variable)
};

let core: Result<Core, String> = with_executor(py, executor_ptr, |executor| {
let types = types_ptr
.types(py)
Expand All @@ -778,6 +788,7 @@ fn scheduler_create(
PathBuf::from(local_execution_root_dir_buf),
PathBuf::from(named_caches_dir_buf),
remoting_options.options(py).clone(),
process_execution_slot_environment_variable,
process_execution_local_parallelism as usize,
process_execution_remote_parallelism as usize,
process_execution_cleanup_local_dirs,
Expand Down

0 comments on commit 1550a95

Please sign in to comment.