Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion hyperactor_mesh/src/alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,23 @@ pub trait Alloc {
/// Stop this alloc and wait for all procs to stop. Call will
/// block until all ProcState events have been drained.
async fn stop_and_wait(&mut self) -> Result<(), AllocatorError> {
tracing::error!(
name = "AllocStatus",
alloc_name = %self.world_id(),
status = "StopAndWait",
);
self.stop().await?;
while let Some(event) = self.next().await {
tracing::debug!("drained event: {:?}", event);
tracing::debug!(
alloc_name = %self.world_id(),
"drained event: {event:?}"
);
}
tracing::error!(
name = "AllocStatus",
alloc_name = %self.world_id(),
status = "Stopped",
);
Ok(())
}

Expand Down
24 changes: 22 additions & 2 deletions hyperactor_mesh/src/alloc/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ impl Allocator for LocalAllocator {
type Alloc = LocalAlloc;

async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError> {
Ok(LocalAlloc::new(spec))
let alloc = LocalAlloc::new(spec);
tracing::info!(
name = "LocalAllocStatus",
alloc_name = %alloc.world_id(),
status = "Allocated",
);
Ok(alloc)
}
}

Expand Down Expand Up @@ -259,12 +265,23 @@ impl Alloc for LocalAlloc {
}

async fn stop(&mut self) -> Result<(), AllocatorError> {
tracing::info!(
name = "LocalAllocStatus",
alloc_name = %self.world_id(),
status = "Stopping",
);
for rank in 0..self.size() {
self.todo_tx
.send(Action::Stop(rank, ProcStopReason::Stopped))
.unwrap();
}
self.todo_tx.send(Action::Stopped).unwrap();
tracing::info!(
name = "LocalAllocStatus",
alloc_name = %self.world_id(),
status = "Stop::Sent",
"Stop was sent to local procs; check their log to determine if it exited."
);
Ok(())
}

Expand All @@ -275,7 +292,10 @@ impl Alloc for LocalAlloc {

impl Drop for LocalAlloc {
fn drop(&mut self) {
tracing::debug!(
tracing::info!(
name = "LocalAllocStatus",
alloc_name = %self.world_id(),
status = "Dropped",
"dropping LocalAlloc of name: {}, world id: {}",
self.name,
self.world_id
Expand Down
25 changes: 23 additions & 2 deletions hyperactor_mesh/src/alloc/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,16 @@ impl Allocator for ProcessAllocator {
}

let name = ShortUuid::generate();
let world_id = WorldId(name.to_string());
tracing::info!(
name = "ProcessAllocStatus",
alloc_name = %world_id,
addr = %bootstrap_addr,
status = "Allocated",
);
Ok(ProcessAlloc {
name: name.clone(),
world_id: WorldId(name.to_string()),
world_id,
spec: spec.clone(),
bootstrap_addr,
rx,
Expand Down Expand Up @@ -666,6 +673,11 @@ impl Alloc for ProcessAlloc {
}

async fn stop(&mut self) -> Result<(), AllocatorError> {
tracing::info!(
name = "ProcessAllocStatus",
alloc_name = %self.world_id(),
status = "Stopping",
);
// We rely on the teardown here, and that the process should
// exit on its own. We should have a hard timeout here as well,
// so that we never rely on the system functioning correctly
Expand All @@ -676,13 +688,22 @@ impl Alloc for ProcessAlloc {
}

self.running = false;
tracing::info!(
name = "ProcessAllocStatus",
alloc_name = %self.world_id(),
status = "Stop::Sent",
"StopAndExit was sent to allocators; check their logs for the stop progress."
);
Ok(())
}
}

impl Drop for ProcessAlloc {
fn drop(&mut self) {
tracing::debug!(
tracing::info!(
name = "ProcessAllocStatus",
alloc_name = %self.world_id(),
status = "Dropped",
"dropping ProcessAlloc of name: {}, world id: {}",
self.name,
self.world_id
Expand Down
33 changes: 32 additions & 1 deletion hyperactor_mesh/src/v1/host_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ impl HostMeshRef {
pub(crate) async fn stop_proc_mesh(
&self,
cx: &impl hyperactor::context::Actor,
proc_mesh_name: &Name,
procs: impl IntoIterator<Item = ProcId>,
region: Region,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -940,8 +941,18 @@ impl HostMeshRef {
host.mesh_agent()
.get_rank_status(cx, proc_name, port.bind())
.await?;

tracing::info!(
name = "ProcMeshStatus",
mesh_name = %proc_mesh_name,
%proc_id,
status = "Stop::Sent",
);
}
tracing::info!(
mesh_name = %self.name,
name = "HostMeshStatus",
status = "ProcMesh::Stop::Sent",
"Sending Stop to host mesh {} for {:?} procs",
self.name,
proc_names
Expand All @@ -960,11 +971,23 @@ impl HostMeshRef {
Ok(statuses) => {
let failed = statuses.values().any(|s| s.is_failure());
if failed {
tracing::error!(
name = "ProcMeshStatus",
mesh_name = %proc_mesh_name,
status = "FailedToStop",
"failed to terminate proc mesh: {:?}",
statuses,
);
return Err(anyhow::anyhow!(
"failed to terminate proc mesh: {:?}",
statuses,
));
}
tracing::info!(
name = "ProcMeshStatus",
mesh_name = %proc_mesh_name,
status = "Stopped",
);
}
Err(complete) => {
// Fill remaining ranks with a timeout status via the
Expand All @@ -975,8 +998,16 @@ impl HostMeshRef {
Status::is_not_exist,
num_ranks,
);
tracing::error!(
name = "ProcMeshStatus",
mesh_name = %proc_mesh_name,
status = "StoppingTimeout",
"failed to terminate proc mesh before timeout: {:?}",
legacy,
);
return Err(anyhow::anyhow!(
"failed to terminate proc mesh: {:?}",
"failed to terminate proc mesh {} before timeout: {:?}",
proc_mesh_name,
legacy
));
}
Expand Down
69 changes: 46 additions & 23 deletions hyperactor_mesh/src/v1/proc_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,41 +435,52 @@ impl ProcMesh {

let stop = Arc::new(Notify::new());
let extent = alloc.extent().clone();
let alloc_name = alloc.world_id().to_string();

{
let stop = Arc::clone(&stop);
let name = name.clone();

tokio::spawn(async move {
loop {
tokio::select! {
_ = stop.notified() => {
// If we are explicitly stopped, the alloc is torn down.
if let Err(e) = alloc.stop_and_wait().await {
tracing::error!("alloc {}: failed to stop: {}", name, e);
}
break;
}
// We are mostly just using this to drive allocation events.
proc_state = alloc.next() => {
match proc_state {
// The alloc was stopped.
None => break,
Some(proc_state) => {
tracing::info!("unmonitored allocation event for {}: {}", name, proc_state);

tokio::spawn(
async move {
loop {
tokio::select! {
_ = stop.notified() => {
// If we are explicitly stopped, the alloc is torn down.
if let Err(error) = alloc.stop_and_wait().await {
tracing::error!(
name = "ProcMeshStatus",
alloc_name = %alloc.world_id(),
status = "FailedToStopAlloc",
%error,
);
}
break;
}
// We are mostly just using this to drive allocation events.
proc_state = alloc.next() => {
match proc_state {
// The alloc was stopped.
None => break,
Some(proc_state) => {
tracing::debug!(
alloc_name = %alloc.world_id(),
"unmonitored allocation event: {}", proc_state);
}
}

}
}
}
}
}.instrument(tracing::info_span!("alloc_monitor")));
.instrument(tracing::info_span!("alloc_monitor")),
);
}

let mesh = Self::create(
cx,
name,
ProcMeshAllocation::Allocated {
alloc_name,
stop,
extent,
ranks: Arc::new(ranks),
Expand Down Expand Up @@ -497,15 +508,24 @@ impl ProcMesh {
pub async fn stop(&mut self, cx: &impl context::Actor) -> anyhow::Result<()> {
let region = self.region.clone();
match &mut self.allocation {
ProcMeshAllocation::Allocated { stop, .. } => {
ProcMeshAllocation::Allocated {
stop, alloc_name, ..
} => {
stop.notify_one();
tracing::info!(
name = "ProcMeshStatus",
mesh_name = %self.name,
alloc_name,
status = "StoppingAlloc",
"sending stop to alloc {alloc_name}; check its log for stop status",
);
Ok(())
}
ProcMeshAllocation::Owned { hosts, .. } => {
let names = self.current_ref.proc_ids().collect::<Vec<ProcId>>();
let procs = self.current_ref.proc_ids().collect::<Vec<ProcId>>();
// We use the proc mesh region rather than the host mesh region
// because the host agent stores one entry per proc, not per host.
hosts.stop_proc_mesh(cx, names, region).await
hosts.stop_proc_mesh(cx, &self.name, procs, region).await
}
}
}
Expand Down Expand Up @@ -539,6 +559,9 @@ impl Drop for ProcMesh {
enum ProcMeshAllocation {
/// A mesh that has been allocated from an `Alloc`.
Allocated {
// The name of the alloc from which this mesh was allocated.
alloc_name: String,

// A cancellation token used to stop the task keeping the alloc alive.
stop: Arc<Notify>,

Expand Down