diff --git a/controller/src/lib.rs b/controller/src/lib.rs index 0dac1e42c..f6417643e 100644 --- a/controller/src/lib.rs +++ b/controller/src/lib.rs @@ -1573,7 +1573,7 @@ mod tests { // Set up a local actor. let local_proc_id = world_id.proc_id(rank); let (local_proc_addr, local_proc_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap(); + channel::serve(ChannelAddr::any(ChannelTransport::Local), "mock_proc_actor").unwrap(); let local_proc_mbox = Mailbox::new_detached( local_proc_id.actor_id(format!("test_dummy_proc{}", idx).to_string(), 0), ); diff --git a/hyperactor/benches/main.rs b/hyperactor/benches/main.rs index bae362d65..dc15eec8c 100644 --- a/hyperactor/benches/main.rs +++ b/hyperactor/benches/main.rs @@ -87,7 +87,7 @@ fn bench_message_sizes(c: &mut Criterion) { assert!(!socket_addr.ip().is_loopback()); } - let (listen_addr, mut rx) = serve::(addr).unwrap(); + let (listen_addr, mut rx) = serve::(addr, "bench").unwrap(); let tx = dial::(listen_addr).unwrap(); let msg = Message::new(0, size); let start = Instant::now(); @@ -127,7 +127,7 @@ fn bench_message_rates(c: &mut Criterion) { b.iter_custom(|iters| async move { let total_msgs = iters * rate; let addr = ChannelAddr::any(transport.clone()); - let (listen_addr, mut rx) = serve::(addr).unwrap(); + let (listen_addr, mut rx) = serve::(addr, "bench").unwrap(); tokio::spawn(async move { let mut received_count = 0; @@ -212,9 +212,9 @@ async fn channel_ping_pong( struct Message(Part); let (client_addr, mut client_rx) = - channel::serve::(ChannelAddr::any(transport.clone())).unwrap(); + channel::serve::(ChannelAddr::any(transport.clone()), "ping_pong_client").unwrap(); let (server_addr, mut server_rx) = - channel::serve::(ChannelAddr::any(transport.clone())).unwrap(); + channel::serve::(ChannelAddr::any(transport.clone()), "ping_pong_server").unwrap(); let _server_handle: tokio::task::JoinHandle> = tokio::spawn(async move { diff --git a/hyperactor/example/channel.rs b/hyperactor/example/channel.rs index 97f3d2955..32872d176 100644 --- a/hyperactor/example/channel.rs +++ b/hyperactor/example/channel.rs @@ -64,8 +64,11 @@ async fn client( ) -> anyhow::Result<()> { let server_tx = channel::dial(server_addr)?; - let (client_addr, mut client_rx) = - channel::serve::(ChannelAddr::any(server_tx.addr().transport().clone())).unwrap(); + let (client_addr, mut client_rx) = channel::serve::( + ChannelAddr::any(server_tx.addr().transport().clone()), + "example", + ) + .unwrap(); server_tx.post(Message::Hello(client_addr)); @@ -164,7 +167,8 @@ async fn main() -> Result<(), anyhow::Error> { match args.command { Some(Commands::Server) => { let (server_addr, server_rx) = - channel::serve::(ChannelAddr::any(args.transport.clone())).unwrap(); + channel::serve::(ChannelAddr::any(args.transport.clone()), "example") + .unwrap(); eprintln!("server listening on {}", server_addr); server(server_rx).await?; } @@ -176,7 +180,8 @@ async fn main() -> Result<(), anyhow::Error> { // No command: run a self-contained benchmark. None => { let (server_addr, server_rx) = - channel::serve::(ChannelAddr::any(args.transport.clone())).unwrap(); + channel::serve::(ChannelAddr::any(args.transport.clone()), "example") + .unwrap(); let _server_handle = tokio::spawn(server(server_rx)); let client_handle = tokio::spawn(client(server_addr, args.message_size, args.num_iter)); diff --git a/hyperactor/src/channel.rs b/hyperactor/src/channel.rs index 075ac75cf..65533713b 100644 --- a/hyperactor/src/channel.rs +++ b/hyperactor/src/channel.rs @@ -14,7 +14,6 @@ use core::net::SocketAddr; use std::fmt; use std::net::IpAddr; -use std::net::Ipv4Addr; use std::net::Ipv6Addr; #[cfg(target_os = "linux")] use std::os::linux::net::SocketAddrExt; @@ -35,7 +34,6 @@ use crate::Named; use crate::RemoteMessage; use crate::attrs::AttrValue; use crate::channel::sim::SimAddr; -use crate::config; use crate::simnet::SimNetError; pub(crate) mod local; @@ -842,8 +840,8 @@ pub fn dial(addr: ChannelAddr) -> Result, Channel #[crate::instrument] pub fn serve( addr: ChannelAddr, + reason: &str, ) -> Result<(ChannelAddr, ChannelRx), ChannelError> { - tracing::debug!(name = "serve", "serving channel address {}", addr); match addr { ChannelAddr::Tcp(addr) => { let (addr, rx) = net::tcp::serve::(addr)?; @@ -870,7 +868,15 @@ pub fn serve( a ))), } - .map(|(addr, inner)| (addr, ChannelRx { inner })) + .map(|(addr, inner)| { + tracing::debug!( + name = "serve", + "serving channel address {} for {}", + addr, + reason + ); + (addr, ChannelRx { inner }) + }) } /// Serve on the local address. The server is turned down @@ -900,6 +906,7 @@ mod tests { use super::*; use crate::clock::Clock; use crate::clock::RealClock; + use crate::config; #[test] fn test_channel_addr() { @@ -1044,7 +1051,7 @@ mod tests { #[tokio::test] async fn test_multiple_connections() { for addr in ChannelTransport::all().map(ChannelAddr::any) { - let (listen_addr, mut rx) = crate::channel::serve::(addr).unwrap(); + let (listen_addr, mut rx) = crate::channel::serve::(addr, "test").unwrap(); let mut sends: JoinSet<()> = JoinSet::new(); for message in 0u64..100u64 { @@ -1083,7 +1090,7 @@ mod tests { continue; } - let (listen_addr, rx) = crate::channel::serve::(addr).unwrap(); + let (listen_addr, rx) = crate::channel::serve::(addr, "test").unwrap(); let tx = dial::(listen_addr).unwrap(); tx.try_post(123, oneshot::channel().0).unwrap(); @@ -1132,7 +1139,7 @@ mod tests { #[cfg_attr(not(feature = "fb"), ignore)] async fn test_dial_serve() { for addr in addrs() { - let (listen_addr, mut rx) = crate::channel::serve::(addr).unwrap(); + let (listen_addr, mut rx) = crate::channel::serve::(addr, "test").unwrap(); let tx = crate::channel::dial(listen_addr).unwrap(); tx.try_post(123, oneshot::channel().0).unwrap(); assert_eq!(rx.recv().await.unwrap(), 123); @@ -1152,7 +1159,7 @@ mod tests { ); let _guard2 = config.override_key(crate::config::MESSAGE_ACK_EVERY_N_MESSAGES, 1); for addr in addrs() { - let (listen_addr, mut rx) = crate::channel::serve::(addr).unwrap(); + let (listen_addr, mut rx) = crate::channel::serve::(addr, "test").unwrap(); let tx = crate::channel::dial(listen_addr).unwrap(); tx.send(123).await.unwrap(); assert_eq!(rx.recv().await.unwrap(), 123); diff --git a/hyperactor/src/host.rs b/hyperactor/src/host.rs index a6fe1900f..e8bf9bfa9 100644 --- a/hyperactor/src/host.rs +++ b/hyperactor/src/host.rs @@ -64,7 +64,6 @@ use crate::Proc; use crate::ProcId; use crate::actor::Binds; use crate::actor::Referable; -use crate::attrs::Attrs; use crate::channel; use crate::channel::ChannelAddr; use crate::channel::ChannelError; @@ -135,7 +134,7 @@ impl Host { manager: M, addr: ChannelAddr, ) -> Result<(Self, MailboxServerHandle), HostError> { - let (frontend_addr, frontend_rx) = channel::serve(addr)?; + let (frontend_addr, frontend_rx) = channel::serve(addr, "host frontend")?; // We set up a cascade of routers: first, the outer router supports // sending to the the system proc, while the dial router manages dialed @@ -144,7 +143,8 @@ impl Host { // Establish a backend channel on the preferred transport. We currently simply // serve the same router on both. - let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?; + let (backend_addr, backend_rx) = + channel::serve(ChannelAddr::any(manager.transport()), "host backend")?; // Set up a system proc. This is often used to manage the host itself. let service_proc_id = ProcId::Direct(frontend_addr.clone(), "service".to_string()); @@ -865,7 +865,10 @@ where proc_id.clone(), MailboxClient::dial(forwarder_addr)?.into_boxed(), ); - let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?; + let (proc_addr, rx) = channel::serve( + ChannelAddr::any(transport), + &format!("LocalProcManager spawning: {}", &proc_id), + )?; self.procs .lock() .await @@ -1036,8 +1039,10 @@ where forwarder_addr: ChannelAddr, _config: (), ) -> Result { - let (callback_addr, mut callback_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Unix))?; + let (callback_addr, mut callback_rx) = channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + &format!("ProcessProcManager spawning: {}", &proc_id), + )?; let mut cmd = Command::new(&self.program); cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string()); @@ -1144,11 +1149,14 @@ where let agent_handle = spawn(proc.clone()) .await - .map_err(|e| HostError::AgentSpawnFailure(proc_id, e))?; + .map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?; // Finally serve the proc on the same transport as the backend address, // and call back. - let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?; + let (proc_addr, proc_rx) = channel::serve( + ChannelAddr::any(backend_transport), + &format!("proc addr of: {}", &proc_id), + )?; proc.clone().serve(proc_rx); channel::dial(callback_addr)? .send((proc_addr, agent_handle.bind::())) diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index 2b31b8124..7503a8498 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -1051,10 +1051,16 @@ pub trait MailboxServer: MailboxSender + Clone + Sized + 'static { } } result = stopped_rx.changed(), if !detached => { - tracing::debug!( - "the mailbox server is stopped" - ); detached = result.is_err(); + if detached { + tracing::debug!( + "the mailbox server is detached for Rx {}", rx.addr() + ); + } else { + tracing::debug!( + "the mailbox server is stopped for Rx {}", rx.addr() + ); + } } } } @@ -1160,7 +1166,7 @@ impl MailboxSender for MailboxClient { envelope: MessageEnvelope, return_handle: PortHandle>, ) { - tracing::event!(target:"messages", tracing::Level::DEBUG, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message"); + tracing::event!(target:"messages", tracing::Level::TRACE, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message"); if let Err(mpsc::error::SendError((envelope, return_handle))) = self.buffer.send((envelope, return_handle)) { @@ -1509,7 +1515,14 @@ impl MailboxSender for Mailbox { match self.inner.ports.entry(envelope.dest().index()) { Entry::Vacant(_) => { - let err = DeliveryError::Unroutable("port not bound in mailbox".to_string()); + let err = DeliveryError::Unroutable(format!( + "port not bound in mailbox; port id: {}; message type: {}", + envelope.dest().index(), + envelope.data().typename().map_or_else( + || format!("unregistered type hash {}", envelope.data().typehash()), + |s| s.to_string(), + ) + )); envelope.undeliverable(err, return_handle); } @@ -2849,7 +2862,7 @@ mod tests { .unwrap(), ); - let (_, rx) = serve::(ChannelAddr::Sim(dst_addr.clone())).unwrap(); + let (_, rx) = serve::(ChannelAddr::Sim(dst_addr.clone()), "test").unwrap(); let tx = dial::(src_to_dst).unwrap(); let mbox = Mailbox::new_detached(id!(test[0].actor0)); let serve_handle = mbox.clone().serve(rx); @@ -2978,7 +2991,8 @@ mod tests { let mut handles = Vec::new(); // hold on to handles, or channels get closed for mbox in mailboxes.iter() { - let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap(); + let (addr, rx) = + channel::serve(ChannelAddr::any(ChannelTransport::Local), "test").unwrap(); let handle = (*mbox).clone().serve(rx); handles.push(handle); diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 67f002e51..5c2c8668b 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -334,7 +334,7 @@ impl Proc { /// Create a new direct-addressed proc. pub async fn direct(addr: ChannelAddr, name: String) -> Result { - let (addr, rx) = channel::serve(addr)?; + let (addr, rx) = channel::serve(addr, &format!("creating Proc::direct: {}", name))?; let proc_id = ProcId::Direct(addr, name); let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed()); proc.clone().serve(rx); @@ -347,7 +347,10 @@ impl Proc { name: String, default: BoxedMailboxSender, ) -> Result { - let (addr, rx) = channel::serve(addr)?; + let (addr, rx) = channel::serve( + addr, + &format!("creating Proc::direct_with_default: {}", name), + )?; let proc_id = ProcId::Direct(addr, name); let proc = Self::new( proc_id, diff --git a/hyperactor_mesh/src/actor_mesh.rs b/hyperactor_mesh/src/actor_mesh.rs index 4ea59a057..140771578 100644 --- a/hyperactor_mesh/src/actor_mesh.rs +++ b/hyperactor_mesh/src/actor_mesh.rs @@ -1730,7 +1730,7 @@ mod tests { let config = hyperactor::config::global::lock(); let _guard = config.override_key(MAX_CAST_DIMENSION_SIZE, 2); - let (_, mut rx) = serve::(addr).unwrap(); + let (_, mut rx) = serve::(addr, "test").unwrap(); let expected_ranks = selection .eval( diff --git a/hyperactor_mesh/src/alloc.rs b/hyperactor_mesh/src/alloc.rs index f66097b87..2eb7d45d2 100644 --- a/hyperactor_mesh/src/alloc.rs +++ b/hyperactor_mesh/src/alloc.rs @@ -517,6 +517,7 @@ impl AllocAssignedAddr { pub(crate) fn serve_with_config( self, + reason: &str, ) -> anyhow::Result<(ChannelAddr, ChannelRx)> { fn set_as_inaddr_any(original: &mut SocketAddr) { let inaddr_any: IpAddr = match &original { @@ -551,7 +552,7 @@ impl AllocAssignedAddr { } }; - let (mut bound, rx) = channel::serve(bind_to)?; + let (mut bound, rx) = channel::serve(bind_to, reason)?; // Restore the original IP address if we used INADDR_ANY. match &mut bound { @@ -836,13 +837,14 @@ pub(crate) mod testing { transport: ChannelTransport, ) -> (DialMailboxRouter, Instance<()>, Proc, ChannelAddr) { let (router_channel_addr, router_rx) = - channel::serve(ChannelAddr::any(transport.clone())).unwrap(); + channel::serve(ChannelAddr::any(transport.clone()), "test").unwrap(); let router = DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed()); router.clone().serve(router_rx); let client_proc_id = ProcId::Ranked(WorldId("test_stuck".to_string()), 0); - let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(transport)).unwrap(); + let (client_proc_addr, client_rx) = + channel::serve(ChannelAddr::any(transport), "test").unwrap(); let client_proc = Proc::new( client_proc_id.clone(), BoxedMailboxSender::new(router.clone()), diff --git a/hyperactor_mesh/src/alloc/local.rs b/hyperactor_mesh/src/alloc/local.rs index 039442cae..87bfd8429 100644 --- a/hyperactor_mesh/src/alloc/local.rs +++ b/hyperactor_mesh/src/alloc/local.rs @@ -147,7 +147,10 @@ impl Alloc for LocalAlloc { match self.todo_rx.recv().await? { Action::Start(rank) => { let (addr, proc_rx) = loop { - match channel::serve(ChannelAddr::any(self.transport())) { + match channel::serve( + ChannelAddr::any(self.transport()), + "LocalAlloc next proc addr", + ) { Ok(addr_and_proc_rx) => break addr_and_proc_rx, Err(err) => { tracing::error!( diff --git a/hyperactor_mesh/src/alloc/process.rs b/hyperactor_mesh/src/alloc/process.rs index 94d68b365..65c918737 100644 --- a/hyperactor_mesh/src/alloc/process.rs +++ b/hyperactor_mesh/src/alloc/process.rs @@ -89,8 +89,11 @@ impl Allocator for ProcessAllocator { #[hyperactor::instrument(fields(name = "process_allocate", monarch_client_trace_id = spec.constraints.match_labels.get(CLIENT_TRACE_ID_LABEL).cloned().unwrap_or_else(|| "".to_string())))] async fn allocate(&mut self, spec: AllocSpec) -> Result { - let (bootstrap_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)) - .map_err(anyhow::Error::from)?; + let (bootstrap_addr, rx) = channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + "ProcessAllocator allocate bootstrap_addr", + ) + .map_err(anyhow::Error::from)?; if spec.transport == ChannelTransport::Local { return Err(AllocatorError::Other(anyhow::anyhow!( diff --git a/hyperactor_mesh/src/alloc/remoteprocess.rs b/hyperactor_mesh/src/alloc/remoteprocess.rs index 7bb52165f..96e8addb7 100644 --- a/hyperactor_mesh/src/alloc/remoteprocess.rs +++ b/hyperactor_mesh/src/alloc/remoteprocess.rs @@ -184,7 +184,8 @@ impl RemoteProcessAllocator { ::Alloc: Sync, { tracing::info!("starting remote allocator on: {}", serve_addr); - let (_, mut rx) = channel::serve(serve_addr.clone()).map_err(anyhow::Error::from)?; + let (_, mut rx) = channel::serve(serve_addr.clone(), "RemoteProcessAllocator serve addr") + .map_err(anyhow::Error::from)?; struct ActiveAllocation { handle: JoinHandle<()>, @@ -319,13 +320,14 @@ impl RemoteProcessAllocator { ) { tracing::info!("handle allocation request, bootstrap_addr: {bootstrap_addr}"); // start proc message forwarder - let (forwarder_addr, forwarder_rx) = match forwarder_addr.serve_with_config() { - Ok(v) => v, - Err(e) => { - tracing::error!("failed to to bootstrap forwarder actor: {}", e); - return; - } - }; + let (forwarder_addr, forwarder_rx) = + match forwarder_addr.serve_with_config("handle_allocation_request: forwarder_addr") { + Ok(v) => v, + Err(e) => { + tracing::error!("failed to to bootstrap forwarder actor: {}", e); + return; + } + }; let router = DialMailboxRouter::new(); let mailbox_handle = router.clone().serve(forwarder_rx); tracing::info!("started forwarder on: {}", forwarder_addr); @@ -627,7 +629,7 @@ impl RemoteProcessAlloc { None => AllocAssignedAddr::new(ChannelAddr::any(spec.transport.clone())), }; - let (bootstrap_addr, rx) = alloc_serve_addr.serve_with_config()?; + let (bootstrap_addr, rx) = alloc_serve_addr.serve_with_config("alloc bootstrap_addr")?; tracing::info!( "starting alloc for {} on: {}", @@ -1324,7 +1326,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 2); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -1480,7 +1482,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 2); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -1561,7 +1563,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 2); @@ -1700,7 +1702,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 2); @@ -1790,7 +1792,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 2); @@ -1892,7 +1894,7 @@ mod test { hyperactor_telemetry::initialize_logging(ClockKind::default()); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 1); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -1973,7 +1975,7 @@ mod test { hyperactor_telemetry::initialize_logging(ClockKind::default()); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); let extent = extent!(host = 1, gpu = 1); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -2453,7 +2455,7 @@ mod test_alloc { let hosts_per_proc_mesh = 5; let pid_addr = ChannelAddr::any(ChannelTransport::Unix); - let (pid_addr, mut pid_rx) = channel::serve::(pid_addr).unwrap(); + let (pid_addr, mut pid_rx) = channel::serve::(pid_addr, "test").unwrap(); let addresses = (0..(num_proc_meshes * hosts_per_proc_mesh)) .map(|_| ChannelAddr::any(ChannelTransport::Unix).to_string()) @@ -2475,7 +2477,7 @@ mod test_alloc { let done_allocating_addr = ChannelAddr::any(ChannelTransport::Unix); let (done_allocating_addr, mut done_allocating_rx) = - channel::serve::<()>(done_allocating_addr).unwrap(); + channel::serve::<()>(done_allocating_addr, "test").unwrap(); let mut remote_process_alloc = Command::new(crate::testresource::get( "monarch/hyperactor_mesh/remote_process_alloc", )) diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index 8484bbd0d..be6a681d2 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -1663,8 +1663,10 @@ impl ProcManager for BootstrapProcManager { backend_addr: ChannelAddr, config: BootstrapProcConfig, ) -> Result { - let (callback_addr, mut callback_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Unix))?; + let (callback_addr, mut callback_rx) = channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + &format!("BootstrapProcManager::spawn callback_addr: {}", &proc_id), + )?; let mode = Bootstrap::Proc { proc_id: proc_id.clone(), @@ -1925,7 +1927,8 @@ async fn bootstrap_v0_proc_mesh() -> anyhow::Error { .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))? .parse()?; let listen_addr = ChannelAddr::any(bootstrap_addr.transport()); - let (serve_addr, mut rx) = channel::serve(listen_addr)?; + let (serve_addr, mut rx) = + channel::serve(listen_addr, "bootstrap_v0_proc_mesh listen_addr")?; let tx = channel::dial(bootstrap_addr.clone())?; let (rtx, mut return_channel) = oneshot::channel(); @@ -1957,7 +1960,10 @@ async fn bootstrap_v0_proc_mesh() -> anyhow::Error { match the_msg? { Allocator2Process::StartProc(proc_id, listen_transport) => { let (proc, mesh_agent) = ProcMeshAgent::bootstrap(proc_id.clone()).await?; - let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?; + let (proc_addr, proc_rx) = channel::serve( + ChannelAddr::any(listen_transport), + &format!("bootstrap_v0_proc_mesh proc_addr: {}", &proc_id,), + )?; let handle = proc.clone().serve(proc_rx); drop(handle); // linter appeasement; it is safe to drop this future tx.send(Process2Allocator( @@ -2120,7 +2126,6 @@ mod tests { use ndslice::Extent; use ndslice::ViewExt; use ndslice::extent; - use tokio::io::AsyncReadExt; use tokio::process::Command; use super::*; @@ -2352,7 +2357,7 @@ mod tests { let router = DialMailboxRouter::new(); let (proc_addr, proc_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + channel::serve(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone())); proc.clone().serve(proc_rx); router.bind(id!(client[0]).into(), proc_addr.clone()); @@ -3192,7 +3197,8 @@ mod tests { // Serve a Unix channel as the "backend_addr" and hook it into // this test proc. - let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + let (backend_addr, rx) = + channel::serve(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); // Route messages arriving on backend_addr into this test // proc's mailbox so the bootstrap child can reach the host diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index 264c28579..8aef2f821 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -47,7 +47,6 @@ use hyperactor::data::Serialized; use hyperactor::declare_attrs; use hyperactor_telemetry::env; use hyperactor_telemetry::log_file_path; -use notify::Event; use notify::Watcher; use serde::Deserialize; use serde::Serialize; @@ -58,14 +57,10 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncSeek; use tokio::io::AsyncSeekExt; use tokio::io::AsyncWriteExt; -use tokio::io::BufReader; use tokio::io::SeekFrom; use tokio::sync::Mutex; use tokio::sync::Notify; use tokio::sync::RwLock; -use tokio::sync::mpsc; -use tokio::sync::mpsc::UnboundedReceiver; -use tokio::sync::mpsc::UnboundedSender; use tokio::sync::watch::Receiver; use tokio::task::JoinHandle; @@ -477,14 +472,16 @@ impl FileAppender { return None; } }; - let (stdout_addr, stdout_rx) = - match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) { - Ok((addr, rx)) => (addr, rx), - Err(e) => { - tracing::warn!("failed to serve stdout channel: {}", e); - return None; - } - }; + let (stdout_addr, stdout_rx) = match channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + "FileAppender stdout_addr", + ) { + Ok((addr, rx)) => (addr, rx), + Err(e) => { + tracing::warn!("failed to serve stdout channel: {}", e); + return None; + } + }; let stdout_stop = stop.clone(); let stdout_task = tokio::spawn(file_monitor_task( stdout_rx, @@ -502,14 +499,16 @@ impl FileAppender { return None; } }; - let (stderr_addr, stderr_rx) = - match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) { - Ok((addr, rx)) => (addr, rx), - Err(e) => { - tracing::warn!("failed to serve stderr channel: {}", e); - return None; - } - }; + let (stderr_addr, stderr_rx) = match channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + "FileAppender stderr_addr", + ) { + Ok((addr, rx)) => (addr, rx), + Err(e) => { + tracing::warn!("failed to serve stderr channel: {}", e); + return None; + } + }; let stderr_stop = stop.clone(); let stderr_task = tokio::spawn(file_monitor_task( stderr_rx, @@ -1116,7 +1115,7 @@ impl Actor for LogForwardActor { log_channel ); - let rx = match channel::serve(log_channel.clone()) { + let rx = match channel::serve(log_channel.clone(), "LogForwardActor") { Ok((_, rx)) => rx, Err(err) => { // This can happen if we are not spanwed on a separate process like local. @@ -1126,7 +1125,11 @@ impl Actor for LogForwardActor { log_channel, err ); - channel::serve(ChannelAddr::any(ChannelTransport::Unix))?.1 + channel::serve( + ChannelAddr::any(ChannelTransport::Unix), + "LogForwardActor Unix fallback", + )? + .1 } }; @@ -1573,7 +1576,7 @@ mod tests { // Setup the basics let router = DialMailboxRouter::new(); let (proc_addr, client_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + channel::serve(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone())); proc.clone().serve(client_rx); router.bind(id!(client[0]).into(), proc_addr.clone()); @@ -1787,7 +1790,7 @@ mod tests { let (mut writer, reader) = tokio::io::duplex(1024); let (log_channel, mut rx) = - channel::serve::(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + channel::serve::(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); // Create a temporary file for testing the writer let temp_file = tempfile::NamedTempFile::new().unwrap(); @@ -1920,7 +1923,7 @@ mod tests { #[tokio::test] async fn test_local_log_sender_inactive_status() { let (log_channel, _) = - channel::serve::(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); + channel::serve::(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); let mut sender = LocalLogSender::new(log_channel, 12345).unwrap(); // This test verifies that the sender handles inactive status gracefully diff --git a/hyperactor_mesh/src/proc_mesh.rs b/hyperactor_mesh/src/proc_mesh.rs index a0fb4e79d..9c855316e 100644 --- a/hyperactor_mesh/src/proc_mesh.rs +++ b/hyperactor_mesh/src/proc_mesh.rs @@ -314,8 +314,11 @@ impl ProcMesh { // everything else, so now the whole mesh should be able to communicate. let client_proc_id = ProcId::Ranked(WorldId(format!("{}_client", alloc.world_id().name())), 0); - let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(alloc.transport())) - .map_err(|err| AllocatorError::Other(err.into()))?; + let (client_proc_addr, client_rx) = channel::serve( + ChannelAddr::any(alloc.transport()), + &format!("client_proc_addr: {}", &client_proc_id), + ) + .map_err(|err| AllocatorError::Other(err.into()))?; tracing::info!( name = "ProcMesh::Allocate::ChannelServe", alloc_id = alloc_id, @@ -380,7 +383,7 @@ impl ProcMesh { // Ensure that the router is served so that agents may reach us. let (router_channel_addr, router_rx) = alloc .client_router_addr() - .serve_with_config() + .serve_with_config("client_router_addr") .map_err(AllocatorError::Other)?; router.serve(router_rx); tracing::info!("router channel started listening on addr: {router_channel_addr}"); diff --git a/hyperactor_mesh/src/router.rs b/hyperactor_mesh/src/router.rs index 947f4818c..3fab6b931 100644 --- a/hyperactor_mesh/src/router.rs +++ b/hyperactor_mesh/src/router.rs @@ -63,7 +63,7 @@ impl Router { return Ok(addr.clone()); } - let (addr, rx) = channel::serve(ChannelAddr::any(transport.clone()))?; + let (addr, rx) = channel::serve(ChannelAddr::any(transport.clone()), "Router::serve")?; self.router.clone().serve(rx); servers.insert(transport.clone(), addr.clone()); Ok(addr) diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index 79829e699..ed1cd0b83 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -854,7 +854,10 @@ impl HostMeshRef { } else { // Timeout error, stop reading from the receiver and send back what we have so far, // padding with failed states. - tracing::warn!("Timeout waiting for response from host mesh agent for proc_states"); + tracing::warn!( + "Timeout waiting for response from host mesh agent for proc_states after {:?}", + timeout + ); let all_ranks = (0..num_ranks).collect::>(); let completed_ranks = states.iter().map(|(rank, _)| *rank).collect::>(); let mut leftover_ranks = all_ranks.difference(&completed_ranks).collect::>(); diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 8e901a17f..e2c2b8764 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -296,7 +296,10 @@ impl ProcMesh { let proc = cx.instance().proc(); // First make sure we can serve the proc: - let (proc_channel_addr, rx) = channel::serve(ChannelAddr::any(alloc.transport()))?; + let (proc_channel_addr, rx) = channel::serve( + ChannelAddr::any(alloc.transport()), + &format!("proc_channel_addr for {}", proc.proc_id()), + )?; proc.clone().serve(rx); let bind_allocated_procs = |router: &DialMailboxRouter| { @@ -646,7 +649,8 @@ impl ProcMeshRef { } } else { tracing::error!( - "timeout waiting for a message from proc mesh agent in mesh: {}", + "timeout waiting for a message after {:?} from proc mesh agent in mesh {}", + timeout, agent_mesh ); // Timeout error, stop reading from the receiver and send back what we have so far, diff --git a/hyperactor_multiprocess/src/proc_actor.rs b/hyperactor_multiprocess/src/proc_actor.rs index 3685484e3..972b5e335 100644 --- a/hyperactor_multiprocess/src/proc_actor.rs +++ b/hyperactor_multiprocess/src/proc_actor.rs @@ -34,7 +34,6 @@ use hyperactor::actor::Referable; use hyperactor::actor::remote::Remote; use hyperactor::channel; use hyperactor::channel::ChannelAddr; -use hyperactor::channel::TcpMode; use hyperactor::clock::Clock; use hyperactor::clock::ClockKind; use hyperactor::context; @@ -418,7 +417,7 @@ impl ProcActor { labels: HashMap, lifecycle_mode: ProcLifecycleMode, ) -> Result { - let (local_addr, rx) = channel::serve(listen_addr)?; + let (local_addr, rx) = channel::serve(listen_addr, "bootstrap_for_proc")?; let mailbox_handle = proc.clone().serve(rx); let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin); @@ -877,6 +876,7 @@ mod tests { use hyperactor::channel; use hyperactor::channel::ChannelAddr; use hyperactor::channel::ChannelTransport; + use hyperactor::channel::TcpMode; use hyperactor::clock::Clock; use hyperactor::clock::RealClock; use hyperactor::forward; diff --git a/hyperactor_multiprocess/src/system.rs b/hyperactor_multiprocess/src/system.rs index 79654662b..ad9dc622b 100644 --- a/hyperactor_multiprocess/src/system.rs +++ b/hyperactor_multiprocess/src/system.rs @@ -55,7 +55,7 @@ impl System { let (actor_handle, system_proc) = SystemActor::bootstrap_with_clock(params, clock).await?; actor_handle.bind::(); - let (local_addr, rx) = channel::serve(addr)?; + let (local_addr, rx) = channel::serve(addr, "System::serve")?; let mailbox_handle = system_proc.clone().serve(rx); Ok(ServerHandle { @@ -90,7 +90,8 @@ impl System { BoxedMailboxSender::new(self.sender().await?), ); - let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(self.addr.transport())).unwrap(); + let (proc_addr, proc_rx) = + channel::serve(ChannelAddr::any(self.addr.transport()), "system").unwrap(); let _proc_serve_handle: MailboxServerHandle = proc.clone().serve(proc_rx); diff --git a/hyperactor_multiprocess/src/system_actor.rs b/hyperactor_multiprocess/src/system_actor.rs index 51bdfc05f..b1e6c6b80 100644 --- a/hyperactor_multiprocess/src/system_actor.rs +++ b/hyperactor_multiprocess/src/system_actor.rs @@ -1878,7 +1878,8 @@ mod tests { host_id, ); let (local_proc_addr, local_proc_rx) = - channel::serve::(ChannelAddr::any(ChannelTransport::Local)).unwrap(); + channel::serve::(ChannelAddr::any(ChannelTransport::Local), "test") + .unwrap(); let local_proc_mbox = Mailbox::new_detached(local_proc_id.actor_id("test".to_string(), 0)); let (local_proc_message_port, local_proc_message_receiver) = local_proc_mbox.open_port(); let _local_proc_serve_handle = local_proc_mbox.clone().serve(local_proc_rx); @@ -2384,7 +2385,7 @@ mod tests { let src_id = id!(proc[0].actor); let src_addr = ChannelAddr::Sim(SimAddr::new("unix!@src".parse().unwrap()).unwrap()); let dst_addr = ChannelAddr::Sim(SimAddr::new("unix!@dst".parse().unwrap()).unwrap()); - let (_, mut rx) = channel::serve::(src_addr.clone()).unwrap(); + let (_, mut rx) = channel::serve::(src_addr.clone(), "test").unwrap(); let router = ReportingRouter::new();