Skip to content

Commit

Permalink
refactor fork/exec logic for the command runners (#20944)
Browse files Browse the repository at this point in the history
Move the exclusive spawn logic out of
`process_execution::local::CommandRunner` into a helper module in
advance of using that logic in the forthcoming "workspace" command
runner to be introduced by
#20772. (This PR was extracted
from #20772.)
  • Loading branch information
tdyas committed May 21, 2024
1 parent aa06bea commit c8cdca0
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 63 deletions.
3 changes: 3 additions & 0 deletions src/rust/engine/process_execution/src/cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
use std::convert::TryInto;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;

use cache::PersistentCache;
use sharded_lmdb::DEFAULT_LEASE_TIME;
use store::{ImmutableInputs, Store};
use tempfile::TempDir;
use testutil::data::TestData;
use testutil::relative_paths;
use tokio::sync::RwLock;
use workunit_store::{RunningWorkunit, WorkunitStore};

use crate::{
Expand All @@ -35,6 +37,7 @@ fn create_local_runner() -> (Box<dyn CommandRunnerTrait>, Store, TempDir) {
NamedCaches::new_local(named_cache_dir),
ImmutableInputs::new(store.clone(), base_dir.path()).unwrap(),
KeepSandboxes::Never,
Arc::new(RwLock::new(())),
));
(runner, store, base_dir)
}
Expand Down
75 changes: 75 additions & 0 deletions src/rust/engine/process_execution/src/fork_exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2024 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::io;
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::RwLock;

use crate::ManagedChild;

/// Spawn a subprocess safely given that binaries may be written by this process.
pub async fn spawn_process(
spawn_lock: Arc<RwLock<()>>,
exclusive: bool,
mut fork_exec: impl FnMut() -> io::Result<ManagedChild>,
) -> Result<ManagedChild, String> {
// See the documentation of the `CapturedWorkdir::run_in_workdir` method, but `exclusive_spawn`
// indicates the binary we're spawning was written out by the current thread, and, as such,
// there may be open file handles against it. This will occur whenever a concurrent call of this
// method proceeds through its fork point
// (https://pubs.opengroup.org/onlinepubs/009695399/functions/fork.html) while the current
// thread is in the middle of writing the binary and thus captures a clone of the open file
// handle, but that concurrent call has not yet gotten to its exec point
// (https://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html) where the operating
// system will close the cloned file handle (via O_CLOEXEC being set on all files opened by
// Rust). To prevent a race like this holding this thread's binary open leading to an ETXTBSY
// (https://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html) error, we
// maintain RwLock that allows non-`exclusive_spawn` binaries to spawn concurrently but ensures
// all such concurrent spawns have completed (and thus closed any cloned file handles) before
// proceeding to spawn the `exclusive_spawn` binary this thread has written.
//
// See: https://github.com/golang/go/issues/22315 for an excellent description of this generic
// unix problem.

if exclusive {
let _write_locked = spawn_lock.write().await;

// Despite the mitigations taken against racing our own forks, forks can happen in our
// process but outside of our control (in libraries). As such, we back-stop by sleeping and
// trying again for a while if we do hit one of these fork races we do not control.
const MAX_ETXTBSY_WAIT: Duration = Duration::from_millis(100);
let mut retries: u32 = 0;
let mut sleep_millis = 1;

let start_time = std::time::Instant::now();
loop {
match fork_exec() {
Err(e) => {
if e.raw_os_error() == Some(libc::ETXTBSY)
&& start_time.elapsed() < MAX_ETXTBSY_WAIT
{
tokio::time::sleep(std::time::Duration::from_millis(sleep_millis)).await;
retries += 1;
sleep_millis *= 2;
continue;
} else if retries > 0 {
break Err(format!(
"Error launching process after {} {} for ETXTBSY. Final error was: {:?}",
retries,
if retries == 1 { "retry" } else { "retries" },
e
));
} else {
break Err(format!("Error launching process: {e:?}"));
}
}
Ok(child) => break Ok(child),
}
}
} else {
let _read_locked = spawn_lock.read().await;
fork_exec().map_err(|e| format!("Error launching process: {e:?}"))
}
}
2 changes: 2 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub mod named_caches;
#[cfg(test)]
pub mod named_caches_tests;

pub(crate) mod fork_exec;

extern crate uname;

pub use crate::children::ManagedChild;
Expand Down
72 changes: 9 additions & 63 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ use task_executor::Executor;
use tempfile::TempDir;
use tokio::process::Command;
use tokio::sync::RwLock;
use tokio::time::{timeout, Duration};
use tokio::time::timeout;
use tokio_util::codec::{BytesCodec, FramedRead};
use workunit_store::{in_workunit, Level, Metric, RunningWorkunit};

use crate::fork_exec::spawn_process;
use crate::{
Context, FallibleProcessResultWithPlatform, ManagedChild, NamedCaches, Process, ProcessError,
ProcessResultMetadata, ProcessResultSource,
Expand All @@ -57,7 +58,7 @@ pub struct CommandRunner {
named_caches: NamedCaches,
immutable_inputs: ImmutableInputs,
keep_sandboxes: KeepSandboxes,
spawn_lock: RwLock<()>,
spawn_lock: Arc<RwLock<()>>,
}

impl CommandRunner {
Expand All @@ -68,6 +69,7 @@ impl CommandRunner {
named_caches: NamedCaches,
immutable_inputs: ImmutableInputs,
keep_sandboxes: KeepSandboxes,
spawn_lock: Arc<RwLock<()>>,
) -> CommandRunner {
CommandRunner {
store,
Expand All @@ -76,7 +78,7 @@ impl CommandRunner {
named_caches,
immutable_inputs,
keep_sandboxes,
spawn_lock: RwLock::new(()),
spawn_lock,
}
}

Expand Down Expand Up @@ -307,66 +309,10 @@ impl CapturedWorkdir for CommandRunner {
.stdout(Stdio::piped())
.stderr(Stdio::piped());

// See the documentation of the `CapturedWorkdir::run_in_workdir` method, but `exclusive_spawn`
// indicates the binary we're spawning was written out by the current thread, and, as such,
// there may be open file handles against it. This will occur whenever a concurrent call of this
// method proceeds through its fork point
// (https://pubs.opengroup.org/onlinepubs/009695399/functions/fork.html) while the current
// thread is in the middle of writing the binary and thus captures a clone of the open file
// handle, but that concurrent call has not yet gotten to its exec point
// (https://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html) where the operating
// system will close the cloned file handle (via O_CLOEXEC being set on all files opened by
// Rust). To prevent a race like this holding this thread's binary open leading to an ETXTBSY
// (https://pubs.opengroup.org/onlinepubs/9699919799/functions/V2_chap02.html) error, we
// maintain RwLock that allows non-`exclusive_spawn` binaries to spawn concurrently but ensures
// all such concurrent spawns have completed (and thus closed any cloned file handles) before
// proceeding to spawn the `exclusive_spawn` binary this thread has written.
//
// See: https://github.com/golang/go/issues/22315 for an excellent description of this generic
// unix problem.
let mut fork_exec = move || ManagedChild::spawn(&mut command, None);
let mut child = {
if exclusive_spawn {
let _write_locked = self.spawn_lock.write().await;

// Despite the mitigations taken against racing our own forks, forks can happen in our
// process but outside of our control (in libraries). As such, we back-stop by sleeping and
// trying again for a while if we do hit one of these fork races we do not control.
const MAX_ETXTBSY_WAIT: Duration = Duration::from_millis(100);
let mut retries: u32 = 0;
let mut sleep_millis = 1;

let start_time = std::time::Instant::now();
loop {
match fork_exec() {
Err(e) => {
if e.raw_os_error() == Some(libc::ETXTBSY)
&& start_time.elapsed() < MAX_ETXTBSY_WAIT
{
tokio::time::sleep(std::time::Duration::from_millis(sleep_millis))
.await;
retries += 1;
sleep_millis *= 2;
continue;
} else if retries > 0 {
break Err(format!(
"Error launching process after {} {} for ETXTBSY. Final error was: {:?}",
retries,
if retries == 1 { "retry" } else { "retries" },
e
));
} else {
break Err(format!("Error launching process: {e:?}"));
}
}
Ok(child) => break Ok(child),
}
}
} else {
let _read_locked = self.spawn_lock.read().await;
fork_exec().map_err(|e| format!("Error launching process: {e:?}"))
}
}?;
let mut child = spawn_process(self.spawn_lock.clone(), exclusive_spawn, move || {
ManagedChild::spawn(&mut command, None)
})
.await?;

debug!("spawned local process as {:?} for {:?}", child.id(), req);
let stdout_stream = FramedRead::new(child.stdout.take().unwrap(), BytesCodec::new())
Expand Down
3 changes: 3 additions & 0 deletions src/rust/engine/process_execution/src/local_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::path::PathBuf;
use std::str;
use std::sync::Arc;
use std::time::Duration;

use maplit::hashset;
Expand All @@ -14,6 +15,7 @@ use store::{ImmutableInputs, Store};
use testutil::data::{TestData, TestDirectory};
use testutil::path::{find_bash, which};
use testutil::{owned_string_vec, relative_paths};
use tokio::sync::RwLock;
use workunit_store::{RunningWorkunit, WorkunitStore};

use crate::{
Expand Down Expand Up @@ -794,6 +796,7 @@ async fn run_command_locally_in_dir(
named_caches,
immutable_inputs,
cleanup,
Arc::new(RwLock::new(())),
);
let original = runner.run(Context::default(), workunit, req).await?;
let stdout_bytes = store
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/process_executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use protos::gen::buildbarn::cas::UncachedActionResult;
use protos::require_digest;
use remote::remote_cache::RemoteCacheRunnerOptions;
use store::{ImmutableInputs, RemoteProvider, RemoteStoreOptions, Store};
use tokio::sync::RwLock;
use workunit_store::{in_workunit, Level, WorkunitStore};

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -393,6 +394,7 @@ async fn main() {
),
ImmutableInputs::new(store.clone(), &workdir).unwrap(),
KeepSandboxes::Never,
Arc::new(RwLock::new(())),
)) as Box<dyn process_execution::CommandRunner>,
};

Expand Down
6 changes: 6 additions & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use remote::{self, remote_cache};
use rule_graph::RuleGraph;
use store::{self, ImmutableInputs, RemoteProvider, RemoteStoreOptions, Store};
use task_executor::Executor;
use tokio::sync::RwLock;
use watch::{Invalidatable, InvalidateCaller, InvalidationWatcher};
use workunit_store::{Metric, RunningWorkunit};

Expand Down Expand Up @@ -210,13 +211,18 @@ impl Core {
exec_strategy_opts: &ExecutionStrategyOptions,
remoting_opts: &RemotingOptions,
) -> Result<Arc<dyn CommandRunner>, String> {
// Lock shared between local command runner (and in future work) other "local" command runners
// for spawning processes.
let spawn_lock = Arc::new(RwLock::new(()));

let local_command_runner = local::CommandRunner::new(
local_runner_store.clone(),
executor.clone(),
local_execution_root_dir.to_path_buf(),
named_caches.clone(),
immutable_inputs.clone(),
exec_strategy_opts.local_keep_sandboxes,
spawn_lock.clone(),
);

let runner: Box<dyn CommandRunner> = if exec_strategy_opts.local_enable_nailgun {
Expand Down

0 comments on commit c8cdca0

Please sign in to comment.