Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions hyperactor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ declare_attrs! {

/// Maximum buffer size for split port messages
pub attr SPLIT_MAX_BUFFER_SIZE: usize = 5;

/// Flag indicating if this is a managed subprocess
pub attr IS_MANAGED_SUBPROCESS: bool = false;
}

/// Load configuration from environment variables
Expand Down Expand Up @@ -84,8 +81,6 @@ pub fn from_env() -> Attrs {
}
}

config[IS_MANAGED_SUBPROCESS] = env::var("HYPERACTOR_MANAGED_SUBPROCESS").is_ok();

config
}

Expand Down Expand Up @@ -121,9 +116,6 @@ pub fn merge(config: &mut Attrs, other: &Attrs) {
if other.contains_key(SPLIT_MAX_BUFFER_SIZE) {
config[SPLIT_MAX_BUFFER_SIZE] = other[SPLIT_MAX_BUFFER_SIZE];
}
if other.contains_key(IS_MANAGED_SUBPROCESS) {
config[IS_MANAGED_SUBPROCESS] = other[IS_MANAGED_SUBPROCESS];
}
}

/// Global configuration functions
Expand Down Expand Up @@ -294,7 +286,6 @@ mod tests {
);
assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
assert!(!config[IS_MANAGED_SUBPROCESS]);
}

#[test]
Expand Down Expand Up @@ -375,15 +366,13 @@ mod tests {
);
assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
assert!(!config[IS_MANAGED_SUBPROCESS]);

// Verify the keys have defaults
assert!(CODEC_MAX_FRAME_LENGTH.has_default());
assert!(MESSAGE_DELIVERY_TIMEOUT.has_default());
assert!(MESSAGE_ACK_TIME_INTERVAL.has_default());
assert!(MESSAGE_ACK_EVERY_N_MESSAGES.has_default());
assert!(SPLIT_MAX_BUFFER_SIZE.has_default());
assert!(IS_MANAGED_SUBPROCESS.has_default());

// Verify we can get defaults directly from keys
assert_eq!(
Expand All @@ -400,7 +389,6 @@ mod tests {
);
assert_eq!(MESSAGE_ACK_EVERY_N_MESSAGES.default(), Some(&1000));
assert_eq!(SPLIT_MAX_BUFFER_SIZE.default(), Some(&5));
assert_eq!(IS_MANAGED_SUBPROCESS.default(), Some(&false));
}

#[test]
Expand Down
34 changes: 0 additions & 34 deletions hyperactor/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ pub(crate) static RUNTIME: LazyLock<tokio::runtime::Runtime> =
/// Initialize the Hyperactor runtime. Specifically:
/// - Set up panic handling, so that we get consistent panic stack traces in Actors.
/// - Initialize logging defaults.
/// - On Linux, set up signal handlers to ensure that managed child processes are reliably
/// terminated when their parents die. This is indicated by the environment variable
/// `HYPERACTOR_MANAGED_SUBPROCESS`.
pub fn initialize() {
static INITIALIZED: OnceLock<()> = OnceLock::new();
INITIALIZED.get_or_init(|| {
Expand All @@ -38,15 +35,8 @@ pub fn initialize() {
#[cfg(target_os = "linux")]
mod linux {
use std::backtrace::Backtrace;
use std::process;

use libc::PR_SET_PDEATHSIG;
use nix::sys::signal::SIGUSR1;
use nix::sys::signal::SigHandler;
use nix::unistd::getpid;
use nix::unistd::getppid;
use tokio::signal::unix::SignalKind;
use tokio::signal::unix::signal;

pub(crate) fn initialize() {
// Safety: Because I want to
Expand All @@ -68,29 +58,5 @@ mod linux {
)
.expect("unable to register signal handler");
}

if !crate::config::global::get(crate::config::IS_MANAGED_SUBPROCESS) {
return;
}
super::RUNTIME.spawn(async {
match signal(SignalKind::user_defined1()) {
Ok(mut sigusr1) => {
// SAFETY: required for signal handling
unsafe {
libc::prctl(PR_SET_PDEATHSIG, SIGUSR1);
}
sigusr1.recv().await;
tracing::error!(
"hyperactor[{}]: parent process {} died; exiting",
getpid(),
getppid()
);
process::exit(1);
}
Err(err) => {
eprintln!("failed to set up SIGUSR1 signal handler: {:?}", err);
}
}
});
}
}
1 change: 0 additions & 1 deletion monarch_tensor_worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2381,7 +2381,6 @@ mod tests {
.arg(format!("--bootstrap-addr={system_addr}"))
.arg(format!("--world-id={world_id}"))
.arg(format!("--proc-id={proc_id}"))
.env("HYPERACTOR_MANAGED_SUBPROCESS", "1")
.stdout(Stdio::piped())
.stdin(Stdio::piped())
.kill_on_drop(true)
Expand Down
1 change: 0 additions & 1 deletion python/monarch/mesh_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ def _initialize_env(worker_point: Point, proc_id: str) -> None:
num_worker_procs = len(worker_point.shape)
process_env = {
**worker_env,
"HYPERACTOR_MANAGED_SUBPROCESS": "1",
"CUDA_VISIBLE_DEVICES": str(local_rank),
"NCCL_HOSTID": f"{proc_id}_host_{worker_rank // gpus_per_host}",
# This is needed to avoid a hard failure in ncclx when we do not
Expand Down
2 changes: 0 additions & 2 deletions python/monarch/proc_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ async def proc_mesh_nonblocking(
env = env or {}
cmd, args, base_env = _get_bootstrap_args()
env.update(base_env)
env["HYPERACTOR_MANAGED_SUBPROCESS"] = "1"
allocator = monarch.ProcessAllocator(cmd, args, env)
alloc = await allocator.allocate(spec)
return await ProcMesh.from_alloc(alloc)
Expand All @@ -284,7 +283,6 @@ def proc_mesh_blocking(
env = env or {}
cmd, args, base_env = _get_bootstrap_args()
env.update(base_env)
env["HYPERACTOR_MANAGED_SUBPROCESS"] = "1"
allocator = monarch.ProcessAllocator(cmd, args, env)
alloc = allocator.allocate(spec).get()
return ProcMesh.from_alloc(alloc).get()
Expand Down
5 changes: 1 addition & 4 deletions python/monarch/rust_local_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ class ControllerParams(NamedTuple):
fail_on_worker_timeout: bool


_PROC_ENV = {
"HYPERACTOR_MANAGED_SUBPROCESS": str(1),
}
_PROC_ENV: dict[str, str] = {}


def get_controller_main() -> tuple[Path, dict[str, str]]:
Expand Down Expand Up @@ -988,7 +986,6 @@ def __init__(
raise ValueError(f"Unknown socket type: {socket_type}")

env = os.environ.copy()
env["HYPERACTOR_MANAGED_SUBPROCESS"] = "1"
self.env: dict[str, str] = env

# Launch a single system globally
Expand Down
1 change: 0 additions & 1 deletion python/monarch/sim_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ def __init__(
fake_call(lambda: 0)

env = os.environ.copy()
env["HYPERACTOR_MANAGED_SUBPROCESS"] = "1"
self.env: dict[str, str] = env

self._mesh_world_state: Dict[MeshWorld, Optional[DeviceMesh]] = mesh_world_state
Expand Down