From 399eba56a3927360d715afad212ff359f05b5203 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 27 Oct 2025 12:19:52 -0700 Subject: [PATCH 1/3] Downgrade message log level to TRACE (#1672) Summary: We found this log not helpful when debugging S576170. But it causes log spew and makes it hard to find other useful logs. Downgrade to TRACE instead of deleting it just in case we might need it in the future debugging (at that time we can manually bump its level). Reviewed By: shayne-fletcher Differential Revision: D85533659 --- hyperactor/src/mailbox.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index 2b31b8124..b0fd66e8d 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -1160,7 +1160,7 @@ impl MailboxSender for MailboxClient { envelope: MessageEnvelope, return_handle: PortHandle>, ) { - tracing::event!(target:"messages", tracing::Level::DEBUG, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message"); + tracing::event!(target:"messages", tracing::Level::TRACE, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message"); if let Err(mpsc::error::SendError((envelope, return_handle))) = self.buffer.send((envelope, return_handle)) { From 1c4c510ab364ecba088dc5932ac160e18ff49705 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 27 Oct 2025 12:19:52 -0700 Subject: [PATCH 2/3] log improvement Summary: Two changes: 1. Include the timeout value in the log. 2. Include port id and message type name, or hash if not registered, in the undelivered message error. It it will look like the following in a returned message log: > metatls:twshared13737.02.gtn2.facebook.com:46155,service,agent[0]: actor failure: serving metatls:twshared13737.02.gtn2.facebook.com:46155,service,agent[0]: processing error: a message from metatls:twshared13737.02.gtn2.facebook.com:46155,service,agent[0] to metatls:twshared13736.02.gtn2.facebook.com:40835,mesh_root_client_proc,client[0][430914] was undeliverable and returned: Some(**"address not routable: port not bound in mailbox; port id: 430914; message type: unregistered type hash 16038717096654025819**; broken link: message was undeliverable") Reviewed By: shayne-fletcher Differential Revision: D85537948 --- hyperactor/src/mailbox.rs | 9 ++++++++- hyperactor_mesh/src/v1/host_mesh.rs | 5 ++++- hyperactor_mesh/src/v1/proc_mesh.rs | 3 ++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index b0fd66e8d..f6b8b1cf5 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -1509,7 +1509,14 @@ impl MailboxSender for Mailbox { match self.inner.ports.entry(envelope.dest().index()) { Entry::Vacant(_) => { - let err = DeliveryError::Unroutable("port not bound in mailbox".to_string()); + let err = DeliveryError::Unroutable(format!( + "port not bound in mailbox; port id: {}; message type: {}", + envelope.dest().index(), + envelope.data().typename().map_or_else( + || format!("unregistered type hash {}", envelope.data().typehash()), + |s| s.to_string(), + ) + )); envelope.undeliverable(err, return_handle); } diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index 79829e699..ed1cd0b83 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -854,7 +854,10 @@ impl HostMeshRef { } else { // Timeout error, stop reading from the receiver and send back what we have so far, // padding with failed states. - tracing::warn!("Timeout waiting for response from host mesh agent for proc_states"); + tracing::warn!( + "Timeout waiting for response from host mesh agent for proc_states after {:?}", + timeout + ); let all_ranks = (0..num_ranks).collect::>(); let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::>(); let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::>(); diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 8e901a17f..884f1db27 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -646,7 +646,8 @@ impl ProcMeshRef { } } else { tracing::error!( - "timeout waiting for a message from proc mesh agent in mesh: {}", + "timeout waiting for a message after {:?} from proc mesh agent in mesh {}", + timeout, agent_mesh ); // Timeout error, stop reading from the receiver and send back what we have so far, From 0230e1615e750f9b68ba3a0364208bb74e8192c7 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 27 Oct 2025 12:19:52 -0700 Subject: [PATCH 3/3] Add reason to channel::serve Summary: We need to have a way to associate the channel address to why it was served. This diff adds a reason parameter to `channel::serve` and log it along with the address. We probably do not need this diff in the future if we include "reason" or something similar in the channel's name directly. But before that, this diff is required otherwise we cannot debug channel errors in many cases. Reviewed By: shayne-fletcher Differential Revision: D85529357 --- controller/src/lib.rs | 2 +- hyperactor/benches/main.rs | 8 +-- hyperactor/example/channel.rs | 13 +++-- hyperactor/src/channel.rs | 23 ++++++--- hyperactor/src/host.rs | 24 ++++++--- hyperactor/src/mailbox.rs | 17 +++++-- hyperactor/src/proc.rs | 7 ++- hyperactor_mesh/src/actor_mesh.rs | 2 +- hyperactor_mesh/src/alloc.rs | 8 +-- hyperactor_mesh/src/alloc/local.rs | 5 +- hyperactor_mesh/src/alloc/process.rs | 7 ++- hyperactor_mesh/src/alloc/remoteprocess.rs | 38 +++++++------- hyperactor_mesh/src/bootstrap.rs | 20 +++++--- hyperactor_mesh/src/logging.rs | 55 +++++++++++---------- hyperactor_mesh/src/proc_mesh.rs | 9 ++-- hyperactor_mesh/src/router.rs | 2 +- hyperactor_mesh/src/v1/proc_mesh.rs | 5 +- hyperactor_multiprocess/src/proc_actor.rs | 4 +- hyperactor_multiprocess/src/system.rs | 5 +- hyperactor_multiprocess/src/system_actor.rs | 5 +- 20 files changed, 158 insertions(+), 101 deletions(-) diff --git a/controller/src/lib.rs b/controller/src/lib.rs index 0dac1e42c..f6417643e 100644 --- a/controller/src/lib.rs +++ b/controller/src/lib.rs @@ -1573,7 +1573,7 @@ mod tests { // Set up a local actor. let local_proc_id = world_id.proc_id(rank); let (local_proc_addr, local_proc_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap(); + channel::serve(ChannelAddr::any(ChannelTransport::Local), "mock_proc_actor").unwrap(); let local_proc_mbox = Mailbox::new_detached( local_proc_id.actor_id(format!("test_dummy_proc{}", idx).to_string(), 0), ); diff --git a/hyperactor/benches/main.rs b/hyperactor/benches/main.rs index bae362d65..dc15eec8c 100644 --- a/hyperactor/benches/main.rs +++ b/hyperactor/benches/main.rs @@ -87,7 +87,7 @@ fn bench_message_sizes(c: &mut Criterion) { assert!(!socket_addr.ip().is_loopback()); } - let (listen_addr, mut rx) = serve::(addr).unwrap(); + let (listen_addr, mut rx) = serve::(addr, "bench").unwrap(); let tx = dial::(listen_addr).unwrap(); let msg = Message::new(0, size); let start = Instant::now(); @@ -127,7 +127,7 @@ fn bench_message_rates(c: &mut Criterion) { b.iter_custom(|iters| async move { let total_msgs = iters * rate; let addr = ChannelAddr::any(transport.clone()); - let (listen_addr, mut rx) = serve::(addr).unwrap(); + let (listen_addr, mut rx) = serve::(addr, "bench").unwrap(); tokio::spawn(async move { let mut received_count = 0; @@ -212,9 +212,9 @@ async fn channel_ping_pong( struct Message(Part); let (client_addr, mut client_rx) = - channel::serve::(ChannelAddr::any(transport.clone())).unwrap(); + channel::serve::(ChannelAddr::any(transport.clone()), "ping_pong_client").unwrap(); let (server_addr, mut server_rx) = - channel::serve::(ChannelAddr::any(transport.clone())).unwrap(); + channel::serve::(ChannelAddr::any(transport.clone()), "ping_pong_server").unwrap(); let _server_handle: tokio::task::JoinHandle> = tokio::spawn(async move { diff --git a/hyperactor/example/channel.rs b/hyperactor/example/channel.rs index 97f3d2955..32872d176 100644 --- a/hyperactor/example/channel.rs +++ b/hyperactor/example/channel.rs @@ -64,8 +64,11 @@ async fn client( ) -> anyhow::Result<()> { let server_tx = channel::dial(server_addr)?; - let (client_addr, mut client_rx) = - channel::serve::(ChannelAddr::any(server_tx.addr().transport().clone())).unwrap(); + let (client_addr, mut client_rx) = channel::serve::( + ChannelAddr::any(server_tx.addr().transport().clone()), + "example", + ) + .unwrap(); server_tx.post(Message::Hello(client_addr)); @@ -164,7 +167,8 @@ async fn main() -> Result<(), anyhow::Error> { match args.command { Some(Commands::Server) => { let (server_addr, server_rx) = - channel::serve::(ChannelAddr::any(args.transport.clone())).unwrap(); + channel::serve::(ChannelAddr::any(args.transport.clone()), "example") + .unwrap(); eprintln!("server listening on {}", server_addr); server(server_rx).await?; } @@ -176,7 +180,8 @@ async fn main() -> Result<(), anyhow::Error> { // No command: run a self-contained benchmark. None => { let (server_addr, server_rx) = - channel::serve::(ChannelAddr::any(args.transport.clone())).unwrap(); + channel::serve::(ChannelAddr::any(args.transport.clone()), "example") + .unwrap(); let _server_handle = tokio::spawn(server(server_rx)); let client_handle = tokio::spawn(client(server_addr, args.message_size, args.num_iter)); diff --git a/hyperactor/src/channel.rs b/hyperactor/src/channel.rs index 075ac75cf..65533713b 100644 --- a/hyperactor/src/channel.rs +++ b/hyperactor/src/channel.rs @@ -14,7 +14,6 @@ use core::net::SocketAddr; use std::fmt; use std::net::IpAddr; -use std::net::Ipv4Addr; use std::net::Ipv6Addr; #[cfg(target_os = "linux")] use std::os::linux::net::SocketAddrExt; @@ -35,7 +34,6 @@ use crate::Named; use crate::RemoteMessage; use crate::attrs::AttrValue; use crate::channel::sim::SimAddr; -use crate::config; use crate::simnet::SimNetError; pub(crate) mod local; @@ -842,8 +840,8 @@ pub fn dial(addr: ChannelAddr) -> Result, Channel #[crate::instrument] pub fn serve( addr: ChannelAddr, + reason: &str, ) -> Result<(ChannelAddr, ChannelRx), ChannelError> { - tracing::debug!(name = "serve", "serving channel address {}", addr); match addr { ChannelAddr::Tcp(addr) => { let (addr, rx) = net::tcp::serve::(addr)?; @@ -870,7 +868,15 @@ pub fn serve( a ))), } - .map(|(addr, inner)| (addr, ChannelRx { inner })) + .map(|(addr, inner)| { + tracing::debug!( + name = "serve", + "serving channel address {} for {}", + addr, + reason + ); + (addr, ChannelRx { inner }) + }) } /// Serve on the local address. The server is turned down @@ -900,6 +906,7 @@ mod tests { use super::*; use crate::clock::Clock; use crate::clock::RealClock; + use crate::config; #[test] fn test_channel_addr() { @@ -1044,7 +1051,7 @@ mod tests { #[tokio::test] async fn test_multiple_connections() { for addr in ChannelTransport::all().map(ChannelAddr::any) { - let (listen_addr, mut rx) = crate::channel::serve::(addr).unwrap(); + let (listen_addr, mut rx) = crate::channel::serve::(addr, "test").unwrap(); let mut sends: JoinSet<()> = JoinSet::new(); for message in 0u64..100u64 { @@ -1083,7 +1090,7 @@ mod tests { continue; } - let (listen_addr, rx) = crate::channel::serve::(addr).unwrap(); + let (listen_addr, rx) = crate::channel::serve::(addr, "test").unwrap(); let tx = dial::(listen_addr).unwrap(); tx.try_post(123, oneshot::channel().0).unwrap(); @@ -1132,7 +1139,7 @@ mod tests { #[cfg_attr(not(feature = "fb"), ignore)] async fn test_dial_serve() { for addr in addrs() { - let (listen_addr, mut rx) = crate::channel::serve::(addr).unwrap(); + let (listen_addr, mut rx) = crate::channel::serve::(addr, "test").unwrap(); let tx = crate::channel::dial(listen_addr).unwrap(); tx.try_post(123, oneshot::channel().0).unwrap(); assert_eq!(rx.recv().await.unwrap(), 123); @@ -1152,7 +1159,7 @@ mod tests { ); let _guard2 = config.override_key(crate::config::MESSAGE_ACK_EVERY_N_MESSAGES, 1); for addr in addrs() { - let (listen_addr, mut rx) = crate::channel::serve::(addr).unwrap(); + let (listen_addr, mut rx) = crate::channel::serve::(addr, "test").unwrap(); let tx = crate::channel::dial(listen_addr).unwrap(); tx.send(123).await.unwrap(); assert_eq!(rx.recv().await.unwrap(), 123); diff --git a/hyperactor/src/host.rs b/hyperactor/src/host.rs index a6fe1900f..e8bf9bfa9 100644 --- a/hyperactor/src/host.rs +++ b/hyperactor/src/host.rs @@ -64,7 +64,6 @@ use crate::Proc; use crate::ProcId; use crate::actor::Binds; use crate::actor::Referable; -use crate::attrs::Attrs; use crate::channel; use crate::channel::ChannelAddr; use crate::channel::ChannelError; @@ -135,7 +134,7 @@ impl Host { manager: M, addr: ChannelAddr, ) -> Result<(Self, MailboxServerHandle), HostError> { - let (frontend_addr, frontend_rx) = channel::serve(addr)?; + let (frontend_addr, frontend_rx) = channel::serve(addr, "host frontend")?; // We set up a cascade of routers: first, the outer router supports // sending to the the system proc, while the dial router manages dialed @@ -144,7 +143,8 @@ impl Host { // Establish a backend channel on the preferred transport. We currently simply // serve the same router on both. - let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?; + let (backend_addr, backend_rx) = + channel::serve(ChannelAddr::any(manager.transport()), "host backend")?; // Set up a system proc. This is often used to manage the host itself. let service_proc_id = ProcId::Direct(frontend_addr.clone(), "service".to_string()); @@ -865,7 +865,10 @@ where proc_id.clone(), MailboxClient::dial(forwarder_addr)?.into_boxed(), ); - let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?; + let (proc_addr, rx) = channel::serve( + ChannelAddr::any(transport), + &format!("LocalProcManager spawning: {}", &proc_id), + )?; self.procs .lock() .await @@ -1036,8 +1039,10 @@ where forwarder_addr: ChannelAddr, _config: (), ) -> Result { - let (callback_addr, mut callback_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Unix))?; + let (callback_addr, mut callback_rx) = channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + &format!("ProcessProcManager spawning: {}", &proc_id), + )?; let mut cmd = Command::new(&self.program); cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string()); @@ -1144,11 +1149,14 @@ where let agent_handle = spawn(proc.clone()) .await - .map_err(|e| HostError::AgentSpawnFailure(proc_id, e))?; + .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?; // Finally serve the proc on the same transport as the backend address, // and call back. - let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?; + let (proc_addr, proc_rx) = channel::serve( + ChannelAddr::any(backend_transport), + &format!("proc addr of: {}", &proc_id), + )?; proc.clone().serve(proc_rx); channel::dial(callback_addr)? .send((proc_addr, agent_handle.bind::())) diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index f6b8b1cf5..7503a8498 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -1051,10 +1051,16 @@ pub trait MailboxServer: MailboxSender + Clone + Sized + 'static { } } result = stopped_rx.changed(), if !detached => { - tracing::debug!( - "the mailbox server is stopped" - ); detached = result.is_err(); + if detached { + tracing::debug!( + "the mailbox server is detached for Rx {}", rx.addr() + ); + } else { + tracing::debug!( + "the mailbox server is stopped for Rx {}", rx.addr() + ); + } } } } @@ -2856,7 +2862,7 @@ mod tests { .unwrap(), ); - let (_, rx) = serve::(ChannelAddr::Sim(dst_addr.clone())).unwrap(); + let (_, rx) = serve::(ChannelAddr::Sim(dst_addr.clone()), "test").unwrap(); let tx = dial::(src_to_dst).unwrap(); let mbox = Mailbox::new_detached(id!(test[0].actor0)); let serve_handle = mbox.clone().serve(rx); @@ -2985,7 +2991,8 @@ mod tests { let mut handles = Vec::new(); // hold on to handles, or channels get closed for mbox in mailboxes.iter() { - let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap(); + let (addr, rx) = + channel::serve(ChannelAddr::any(ChannelTransport::Local), "test").unwrap(); let handle = (*mbox).clone().serve(rx); handles.push(handle); diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 67f002e51..5c2c8668b 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -334,7 +334,7 @@ impl Proc { /// Create a new direct-addressed proc. pub async fn direct(addr: ChannelAddr, name: String) -> Result { - let (addr, rx) = channel::serve(addr)?; + let (addr, rx) = channel::serve(addr, &format!("creating Proc::direct: {}", name))?; let proc_id = ProcId::Direct(addr, name); let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed()); proc.clone().serve(rx); @@ -347,7 +347,10 @@ impl Proc { name: String, default: BoxedMailboxSender, ) -> Result { - let (addr, rx) = channel::serve(addr)?; + let (addr, rx) = channel::serve( + addr, + &format!("creating Proc::direct_with_default: {}", name), + )?; let proc_id = ProcId::Direct(addr, name); let proc = Self::new( proc_id, diff --git a/hyperactor_mesh/src/actor_mesh.rs b/hyperactor_mesh/src/actor_mesh.rs index 4ea59a057..140771578 100644 --- a/hyperactor_mesh/src/actor_mesh.rs +++ b/hyperactor_mesh/src/actor_mesh.rs @@ -1730,7 +1730,7 @@ mod tests { let config = hyperactor::config::global::lock(); let _guard = config.override_key(MAX_CAST_DIMENSION_SIZE, 2); - let (_, mut rx) = serve::(addr).unwrap(); + let (_, mut rx) = serve::(addr, "test").unwrap(); let expected_ranks = selection .eval( diff --git a/hyperactor_mesh/src/alloc.rs b/hyperactor_mesh/src/alloc.rs index f66097b87..2eb7d45d2 100644 --- a/hyperactor_mesh/src/alloc.rs +++ b/hyperactor_mesh/src/alloc.rs @@ -517,6 +517,7 @@ impl AllocAssignedAddr { pub(crate) fn serve_with_config( self, + reason: &str, ) -> anyhow::Result<(ChannelAddr, ChannelRx)> { fn set_as_inaddr_any(original: &mut SocketAddr) { let inaddr_any: IpAddr = match &original { @@ -551,7 +552,7 @@ impl AllocAssignedAddr { } }; - let (mut bound, rx) = channel::serve(bind_to)?; + let (mut bound, rx) = channel::serve(bind_to, reason)?; // Restore the original IP address if we used INADDR_ANY. match &mut bound { @@ -836,13 +837,14 @@ pub(crate) mod testing { transport: ChannelTransport, ) -> (DialMailboxRouter, Instance<()>, Proc, ChannelAddr) { let (router_channel_addr, router_rx) = - channel::serve(ChannelAddr::any(transport.clone())).unwrap(); + channel::serve(ChannelAddr::any(transport.clone()), "test").unwrap(); let router = DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed()); router.clone().serve(router_rx); let client_proc_id = ProcId::Ranked(WorldId("test_stuck".to_string()), 0); - let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(transport)).unwrap(); + let (client_proc_addr, client_rx) = + channel::serve(ChannelAddr::any(transport), "test").unwrap(); let client_proc = Proc::new( client_proc_id.clone(), BoxedMailboxSender::new(router.clone()), diff --git a/hyperactor_mesh/src/alloc/local.rs b/hyperactor_mesh/src/alloc/local.rs index 039442cae..87bfd8429 100644 --- a/hyperactor_mesh/src/alloc/local.rs +++ b/hyperactor_mesh/src/alloc/local.rs @@ -147,7 +147,10 @@ impl Alloc for LocalAlloc { match self.todo_rx.recv().await? { Action::Start(rank) => { let (addr, proc_rx) = loop { - match channel::serve(ChannelAddr::any(self.transport())) { + match channel::serve( + ChannelAddr::any(self.transport()), + "LocalAlloc next proc addr", + ) { Ok(addr_and_proc_rx) => break addr_and_proc_rx, Err(err) => { tracing::error!( diff --git a/hyperactor_mesh/src/alloc/process.rs b/hyperactor_mesh/src/alloc/process.rs index 94d68b365..65c918737 100644 --- a/hyperactor_mesh/src/alloc/process.rs +++ b/hyperactor_mesh/src/alloc/process.rs @@ -89,8 +89,11 @@ impl Allocator for ProcessAllocator { #[hyperactor::instrument(fields(name = "process_allocate", monarch_client_trace_id = spec.constraints.match_labels.get(CLIENT_TRACE_ID_LABEL).cloned().unwrap_or_else(|| "".to_string())))] async fn allocate(&mut self, spec: AllocSpec) -> Result { - let (bootstrap_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)) - .map_err(anyhow::Error::from)?; + let (bootstrap_addr, rx) = channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + "ProcessAllocator allocate bootstrap_addr", + ) + .map_err(anyhow::Error::from)?; if spec.transport == ChannelTransport::Local { return Err(AllocatorError::Other(anyhow::anyhow!( diff --git a/hyperactor_mesh/src/alloc/remoteprocess.rs b/hyperactor_mesh/src/alloc/remoteprocess.rs index 7bb52165f..96e8addb7 100644 --- a/hyperactor_mesh/src/alloc/remoteprocess.rs +++ b/hyperactor_mesh/src/alloc/remoteprocess.rs @@ -184,7 +184,8 @@ impl RemoteProcessAllocator { ::Alloc: Sync, { tracing::info!("starting remote allocator on: {}", serve_addr); - let (_, mut rx) = channel::serve(serve_addr.clone()).map_err(anyhow::Error::from)?; + let (_, mut rx) = channel::serve(serve_addr.clone(), "RemoteProcessAllocator serve addr") + .map_err(anyhow::Error::from)?; struct ActiveAllocation { handle: JoinHandle<()>, @@ -319,13 +320,14 @@ impl RemoteProcessAllocator { ) { tracing::info!("handle allocation request, bootstrap_addr: {bootstrap_addr}"); // start proc message forwarder - let (forwarder_addr, forwarder_rx) = match forwarder_addr.serve_with_config() { - Ok(v) => v, - Err(e) => { - tracing::error!("failed to to bootstrap forwarder actor: {}", e); - return; - } - }; + let (forwarder_addr, forwarder_rx) = + match forwarder_addr.serve_with_config("handle_allocation_request: forwarder_addr") { + Ok(v) => v, + Err(e) => { + tracing::error!("failed to to bootstrap forwarder actor: {}", e); + return; + } + }; let router = DialMailboxRouter::new(); let mailbox_handle = router.clone().serve(forwarder_rx); tracing::info!("started forwarder on: {}", forwarder_addr); @@ -627,7 +629,7 @@ impl RemoteProcessAlloc { None => AllocAssignedAddr::new(ChannelAddr::any(spec.transport.clone())), }; - let (bootstrap_addr, rx) = alloc_serve_addr.serve_with_config()?; + let (bootstrap_addr, rx) = alloc_serve_addr.serve_with_config("alloc bootstrap_addr")?; tracing::info!( "starting alloc for {} on: {}", @@ -1324,7 +1326,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 2); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -1480,7 +1482,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 2); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -1561,7 +1563,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 2); @@ -1700,7 +1702,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 2); @@ -1790,7 +1792,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 2); @@ -1892,7 +1894,7 @@ mod test { hyperactor_telemetry::initialize_logging(ClockKind::default()); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 1); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -1973,7 +1975,7 @@ mod test { hyperactor_telemetry::initialize_logging(ClockKind::default()); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 1); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -2453,7 +2455,7 @@ mod test_alloc { let hosts_per_proc_mesh = 5; let pid_addr = ChannelAddr::any(ChannelTransport::Unix); - let (pid_addr, mut pid_rx) = channel::serve::(pid_addr).unwrap(); + let (pid_addr, mut pid_rx) = channel::serve::(pid_addr, "test").unwrap(); let addresses = (0..(num_proc_meshes * hosts_per_proc_mesh)) .map(|_| ChannelAddr::any(ChannelTransport::Unix).to_string()) @@ -2475,7 +2477,7 @@ mod test_alloc { let done_allocating_addr = ChannelAddr::any(ChannelTransport::Unix); let (done_allocating_addr, mut done_allocating_rx) = - channel::serve::<()>(done_allocating_addr).unwrap(); + channel::serve::<()>(done_allocating_addr, "test").unwrap(); let mut remote_process_alloc = Command::new(crate::testresource::get( "monarch/hyperactor_mesh/remote_process_alloc", )) diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index 8484bbd0d..be6a681d2 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -1663,8 +1663,10 @@ impl ProcManager for BootstrapProcManager { backend_addr: ChannelAddr, config: BootstrapProcConfig, ) -> Result { - let (callback_addr, mut callback_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Unix))?; + let (callback_addr, mut callback_rx) = channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + &format!("BootstrapProcManager::spawn callback_addr: {}", &proc_id), + )?; let mode = Bootstrap::Proc { proc_id: proc_id.clone(), @@ -1925,7 +1927,8 @@ async fn bootstrap_v0_proc_mesh() -> anyhow::Error { .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))? .parse()?; let listen_addr = ChannelAddr::any(bootstrap_addr.transport()); - let (serve_addr, mut rx) = channel::serve(listen_addr)?; + let (serve_addr, mut rx) = + channel::serve(listen_addr, "bootstrap_v0_proc_mesh listen_addr")?; let tx = channel::dial(bootstrap_addr.clone())?; let (rtx, mut return_channel) = oneshot::channel(); @@ -1957,7 +1960,10 @@ async fn bootstrap_v0_proc_mesh() -> anyhow::Error { match the_msg? { Allocator2Process::StartProc(proc_id, listen_transport) => { let (proc, mesh_agent) = ProcMeshAgent::bootstrap(proc_id.clone()).await?; - let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?; + let (proc_addr, proc_rx) = channel::serve( + ChannelAddr::any(listen_transport), + &format!("bootstrap_v0_proc_mesh proc_addr: {}", &proc_id,), + )?; let handle = proc.clone().serve(proc_rx); drop(handle); // linter appeasement; it is safe to drop this future tx.send(Process2Allocator( @@ -2120,7 +2126,6 @@ mod tests { use ndslice::Extent; use ndslice::ViewExt; use ndslice::extent; - use tokio::io::AsyncReadExt; use tokio::process::Command; use super::*; @@ -2352,7 +2357,7 @@ mod tests { let router = DialMailboxRouter::new(); let (proc_addr, proc_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + channel::serve(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone())); proc.clone().serve(proc_rx); router.bind(id!(client[0]).into(), proc_addr.clone()); @@ -3192,7 +3197,8 @@ mod tests { // Serve a Unix channel as the "backend_addr" and hook it into // this test proc. - let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + let (backend_addr, rx) = + channel::serve(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); // Route messages arriving on backend_addr into this test // proc's mailbox so the bootstrap child can reach the host diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index 264c28579..8aef2f821 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -47,7 +47,6 @@ use hyperactor::data::Serialized; use hyperactor::declare_attrs; use hyperactor_telemetry::env; use hyperactor_telemetry::log_file_path; -use notify::Event; use notify::Watcher; use serde::Deserialize; use serde::Serialize; @@ -58,14 +57,10 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncSeek; use tokio::io::AsyncSeekExt; use tokio::io::AsyncWriteExt; -use tokio::io::BufReader; use tokio::io::SeekFrom; use tokio::sync::Mutex; use tokio::sync::Notify; use tokio::sync::RwLock; -use tokio::sync::mpsc; -use tokio::sync::mpsc::UnboundedReceiver; -use tokio::sync::mpsc::UnboundedSender; use tokio::sync::watch::Receiver; use tokio::task::JoinHandle; @@ -477,14 +472,16 @@ impl FileAppender { return None; } }; - let (stdout_addr, stdout_rx) = - match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) { - Ok((addr, rx)) => (addr, rx), - Err(e) => { - tracing::warn!("failed to serve stdout channel: {}", e); - return None; - } - }; + let (stdout_addr, stdout_rx) = match channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + "FileAppender stdout_addr", + ) { + Ok((addr, rx)) => (addr, rx), + Err(e) => { + tracing::warn!("failed to serve stdout channel: {}", e); + return None; + } + }; let stdout_stop = stop.clone(); let stdout_task = tokio::spawn(file_monitor_task( stdout_rx, @@ -502,14 +499,16 @@ impl FileAppender { return None; } }; - let (stderr_addr, stderr_rx) = - match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) { - Ok((addr, rx)) => (addr, rx), - Err(e) => { - tracing::warn!("failed to serve stderr channel: {}", e); - return None; - } - }; + let (stderr_addr, stderr_rx) = match channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + "FileAppender stderr_addr", + ) { + Ok((addr, rx)) => (addr, rx), + Err(e) => { + tracing::warn!("failed to serve stderr channel: {}", e); + return None; + } + }; let stderr_stop = stop.clone(); let stderr_task = tokio::spawn(file_monitor_task( stderr_rx, @@ -1116,7 +1115,7 @@ impl Actor for LogForwardActor { log_channel ); - let rx = match channel::serve(log_channel.clone()) { + let rx = match channel::serve(log_channel.clone(), "LogForwardActor") { Ok((_, rx)) => rx, Err(err) => { // This can happen if we are not spanwed on a separate process like local. @@ -1126,7 +1125,11 @@ impl Actor for LogForwardActor { log_channel, err ); - channel::serve(ChannelAddr::any(ChannelTransport::Unix))?.1 + channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + "LogForwardActor Unix fallback", + )? + .1 } }; @@ -1573,7 +1576,7 @@ mod tests { // Setup the basics let router = DialMailboxRouter::new(); let (proc_addr, client_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + channel::serve(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone())); proc.clone().serve(client_rx); router.bind(id!(client[0]).into(), proc_addr.clone()); @@ -1787,7 +1790,7 @@ mod tests { let (mut writer, reader) = tokio::io::duplex(1024); let (log_channel, mut rx) = - channel::serve::(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + channel::serve::(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); // Create a temporary file for testing the writer let temp_file = tempfile::NamedTempFile::new().unwrap(); @@ -1920,7 +1923,7 @@ mod tests { #[tokio::test] async fn test_local_log_sender_inactive_status() { let (log_channel, _) = - channel::serve::(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + channel::serve::(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); let mut sender = LocalLogSender::new(log_channel, 12345).unwrap(); // This test verifies that the sender handles inactive status gracefully diff --git a/hyperactor_mesh/src/proc_mesh.rs b/hyperactor_mesh/src/proc_mesh.rs index a0fb4e79d..9c855316e 100644 --- a/hyperactor_mesh/src/proc_mesh.rs +++ b/hyperactor_mesh/src/proc_mesh.rs @@ -314,8 +314,11 @@ impl ProcMesh { // everything else, so now the whole mesh should be able to communicate. let client_proc_id = ProcId::Ranked(WorldId(format!("{}_client", alloc.world_id().name())), 0); - let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(alloc.transport())) - .map_err(|err| AllocatorError::Other(err.into()))?; + let (client_proc_addr, client_rx) = channel::serve( + ChannelAddr::any(alloc.transport()), + &format!("client_proc_addr: {}", &client_proc_id), + ) + .map_err(|err| AllocatorError::Other(err.into()))?; tracing::info!( name = "ProcMesh::Allocate::ChannelServe", alloc_id = alloc_id, @@ -380,7 +383,7 @@ impl ProcMesh { // Ensure that the router is served so that agents may reach us. let (router_channel_addr, router_rx) = alloc .client_router_addr() - .serve_with_config() + .serve_with_config("client_router_addr") .map_err(AllocatorError::Other)?; router.serve(router_rx); tracing::info!("router channel started listening on addr: {router_channel_addr}"); diff --git a/hyperactor_mesh/src/router.rs b/hyperactor_mesh/src/router.rs index 947f4818c..3fab6b931 100644 --- a/hyperactor_mesh/src/router.rs +++ b/hyperactor_mesh/src/router.rs @@ -63,7 +63,7 @@ impl Router { return Ok(addr.clone()); } - let (addr, rx) = channel::serve(ChannelAddr::any(transport.clone()))?; + let (addr, rx) = channel::serve(ChannelAddr::any(transport.clone()), "Router::serve")?; self.router.clone().serve(rx); servers.insert(transport.clone(), addr.clone()); Ok(addr) diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 884f1db27..e2c2b8764 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -296,7 +296,10 @@ impl ProcMesh { let proc = cx.instance().proc(); // First make sure we can serve the proc: - let (proc_channel_addr, rx) = channel::serve(ChannelAddr::any(alloc.transport()))?; + let (proc_channel_addr, rx) = channel::serve( + ChannelAddr::any(alloc.transport()), + &format!("proc_channel_addr for {}", proc.proc_id()), + )?; proc.clone().serve(rx); let bind_allocated_procs = |router: &DialMailboxRouter| { diff --git a/hyperactor_multiprocess/src/proc_actor.rs b/hyperactor_multiprocess/src/proc_actor.rs index 3685484e3..972b5e335 100644 --- a/hyperactor_multiprocess/src/proc_actor.rs +++ b/hyperactor_multiprocess/src/proc_actor.rs @@ -34,7 +34,6 @@ use hyperactor::actor::Referable; use hyperactor::actor::remote::Remote; use hyperactor::channel; use hyperactor::channel::ChannelAddr; -use hyperactor::channel::TcpMode; use hyperactor::clock::Clock; use hyperactor::clock::ClockKind; use hyperactor::context; @@ -418,7 +417,7 @@ impl ProcActor { labels: HashMap, lifecycle_mode: ProcLifecycleMode, ) -> Result { - let (local_addr, rx) = channel::serve(listen_addr)?; + let (local_addr, rx) = channel::serve(listen_addr, "bootstrap_for_proc")?; let mailbox_handle = proc.clone().serve(rx); let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin); @@ -877,6 +876,7 @@ mod tests { use hyperactor::channel; use hyperactor::channel::ChannelAddr; use hyperactor::channel::ChannelTransport; + use hyperactor::channel::TcpMode; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; use hyperactor::forward; diff --git a/hyperactor_multiprocess/src/system.rs b/hyperactor_multiprocess/src/system.rs index 79654662b..ad9dc622b 100644 --- a/hyperactor_multiprocess/src/system.rs +++ b/hyperactor_multiprocess/src/system.rs @@ -55,7 +55,7 @@ impl System { let (actor_handle, system_proc) = SystemActor::bootstrap_with_clock(params, clock).await?; actor_handle.bind::(); - let (local_addr, rx) = channel::serve(addr)?; + let (local_addr, rx) = channel::serve(addr, "System::serve")?; let mailbox_handle = system_proc.clone().serve(rx); Ok(ServerHandle { @@ -90,7 +90,8 @@ impl System { BoxedMailboxSender::new(self.sender().await?), ); - let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(self.addr.transport())).unwrap(); + let (proc_addr, proc_rx) = + channel::serve(ChannelAddr::any(self.addr.transport()), "system").unwrap(); let _proc_serve_handle: MailboxServerHandle = proc.clone().serve(proc_rx); diff --git a/hyperactor_multiprocess/src/system_actor.rs b/hyperactor_multiprocess/src/system_actor.rs index 51bdfc05f..b1e6c6b80 100644 --- a/hyperactor_multiprocess/src/system_actor.rs +++ b/hyperactor_multiprocess/src/system_actor.rs @@ -1878,7 +1878,8 @@ mod tests { host_id, ); let (local_proc_addr, local_proc_rx) = - channel::serve::(ChannelAddr::any(ChannelTransport::Local)).unwrap(); + channel::serve::(ChannelAddr::any(ChannelTransport::Local), "test") + .unwrap(); let local_proc_mbox = Mailbox::new_detached(local_proc_id.actor_id("test".to_string(), 0)); let (local_proc_message_port, local_proc_message_receiver) = local_proc_mbox.open_port(); let _local_proc_serve_handle = local_proc_mbox.clone().serve(local_proc_rx); @@ -2384,7 +2385,7 @@ mod tests { let src_id = id!(proc[0].actor); let src_addr = ChannelAddr::Sim(SimAddr::new("unix!@src".parse().unwrap()).unwrap()); let dst_addr = ChannelAddr::Sim(SimAddr::new("unix!@dst".parse().unwrap()).unwrap()); - let (_, mut rx) = channel::serve::(src_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve::(src_addr.clone(), "test").unwrap(); let router = ReportingRouter::new();