Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,7 @@ mod tests {
// Set up a local actor.
let local_proc_id = world_id.proc_id(rank);
let (local_proc_addr, local_proc_rx) =
channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
channel::serve(ChannelAddr::any(ChannelTransport::Local), "mock_proc_actor").unwrap();
let local_proc_mbox = Mailbox::new_detached(
local_proc_id.actor_id(format!("test_dummy_proc{}", idx).to_string(), 0),
);
Expand Down
8 changes: 4 additions & 4 deletions hyperactor/benches/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ fn bench_message_sizes(c: &mut Criterion) {
assert!(!socket_addr.ip().is_loopback());
}

let (listen_addr, mut rx) = serve::<Message>(addr).unwrap();
let (listen_addr, mut rx) = serve::<Message>(addr, "bench").unwrap();
let tx = dial::<Message>(listen_addr).unwrap();
let msg = Message::new(0, size);
let start = Instant::now();
Expand Down Expand Up @@ -127,7 +127,7 @@ fn bench_message_rates(c: &mut Criterion) {
b.iter_custom(|iters| async move {
let total_msgs = iters * rate;
let addr = ChannelAddr::any(transport.clone());
let (listen_addr, mut rx) = serve::<Message>(addr).unwrap();
let (listen_addr, mut rx) = serve::<Message>(addr, "bench").unwrap();
tokio::spawn(async move {
let mut received_count = 0;

Expand Down Expand Up @@ -212,9 +212,9 @@ async fn channel_ping_pong(
struct Message(Part);

let (client_addr, mut client_rx) =
channel::serve::<Message>(ChannelAddr::any(transport.clone())).unwrap();
channel::serve::<Message>(ChannelAddr::any(transport.clone()), "ping_pong_client").unwrap();
let (server_addr, mut server_rx) =
channel::serve::<Message>(ChannelAddr::any(transport.clone())).unwrap();
channel::serve::<Message>(ChannelAddr::any(transport.clone()), "ping_pong_server").unwrap();

let _server_handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> =
tokio::spawn(async move {
Expand Down
13 changes: 9 additions & 4 deletions hyperactor/example/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ async fn client(
) -> anyhow::Result<()> {
let server_tx = channel::dial(server_addr)?;

let (client_addr, mut client_rx) =
channel::serve::<Message>(ChannelAddr::any(server_tx.addr().transport().clone())).unwrap();
let (client_addr, mut client_rx) = channel::serve::<Message>(
ChannelAddr::any(server_tx.addr().transport().clone()),
"example",
)
.unwrap();

server_tx.post(Message::Hello(client_addr));

Expand Down Expand Up @@ -164,7 +167,8 @@ async fn main() -> Result<(), anyhow::Error> {
match args.command {
Some(Commands::Server) => {
let (server_addr, server_rx) =
channel::serve::<Message>(ChannelAddr::any(args.transport.clone())).unwrap();
channel::serve::<Message>(ChannelAddr::any(args.transport.clone()), "example")
.unwrap();
eprintln!("server listening on {}", server_addr);
server(server_rx).await?;
}
Expand All @@ -176,7 +180,8 @@ async fn main() -> Result<(), anyhow::Error> {
// No command: run a self-contained benchmark.
None => {
let (server_addr, server_rx) =
channel::serve::<Message>(ChannelAddr::any(args.transport.clone())).unwrap();
channel::serve::<Message>(ChannelAddr::any(args.transport.clone()), "example")
.unwrap();
let _server_handle = tokio::spawn(server(server_rx));
let client_handle = tokio::spawn(client(server_addr, args.message_size, args.num_iter));

Expand Down
23 changes: 15 additions & 8 deletions hyperactor/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use core::net::SocketAddr;
use std::fmt;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
#[cfg(target_os = "linux")]
use std::os::linux::net::SocketAddrExt;
Expand All @@ -35,7 +34,6 @@ use crate::Named;
use crate::RemoteMessage;
use crate::attrs::AttrValue;
use crate::channel::sim::SimAddr;
use crate::config;
use crate::simnet::SimNetError;

pub(crate) mod local;
Expand Down Expand Up @@ -842,8 +840,8 @@ pub fn dial<M: RemoteMessage>(addr: ChannelAddr) -> Result<ChannelTx<M>, Channel
#[crate::instrument]
pub fn serve<M: RemoteMessage>(
addr: ChannelAddr,
reason: &str,
) -> Result<(ChannelAddr, ChannelRx<M>), ChannelError> {
tracing::debug!(name = "serve", "serving channel address {}", addr);
match addr {
ChannelAddr::Tcp(addr) => {
let (addr, rx) = net::tcp::serve::<M>(addr)?;
Expand All @@ -870,7 +868,15 @@ pub fn serve<M: RemoteMessage>(
a
))),
}
.map(|(addr, inner)| (addr, ChannelRx { inner }))
.map(|(addr, inner)| {
tracing::debug!(
name = "serve",
"serving channel address {} for {}",
addr,
reason
);
(addr, ChannelRx { inner })
})
}

/// Serve on the local address. The server is turned down
Expand Down Expand Up @@ -900,6 +906,7 @@ mod tests {
use super::*;
use crate::clock::Clock;
use crate::clock::RealClock;
use crate::config;

#[test]
fn test_channel_addr() {
Expand Down Expand Up @@ -1044,7 +1051,7 @@ mod tests {
#[tokio::test]
async fn test_multiple_connections() {
for addr in ChannelTransport::all().map(ChannelAddr::any) {
let (listen_addr, mut rx) = crate::channel::serve::<u64>(addr).unwrap();
let (listen_addr, mut rx) = crate::channel::serve::<u64>(addr, "test").unwrap();

let mut sends: JoinSet<()> = JoinSet::new();
for message in 0u64..100u64 {
Expand Down Expand Up @@ -1083,7 +1090,7 @@ mod tests {
continue;
}

let (listen_addr, rx) = crate::channel::serve::<u64>(addr).unwrap();
let (listen_addr, rx) = crate::channel::serve::<u64>(addr, "test").unwrap();

let tx = dial::<u64>(listen_addr).unwrap();
tx.try_post(123, oneshot::channel().0).unwrap();
Expand Down Expand Up @@ -1132,7 +1139,7 @@ mod tests {
#[cfg_attr(not(feature = "fb"), ignore)]
async fn test_dial_serve() {
for addr in addrs() {
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr).unwrap();
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr, "test").unwrap();
let tx = crate::channel::dial(listen_addr).unwrap();
tx.try_post(123, oneshot::channel().0).unwrap();
assert_eq!(rx.recv().await.unwrap(), 123);
Expand All @@ -1152,7 +1159,7 @@ mod tests {
);
let _guard2 = config.override_key(crate::config::MESSAGE_ACK_EVERY_N_MESSAGES, 1);
for addr in addrs() {
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr).unwrap();
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr, "test").unwrap();
let tx = crate::channel::dial(listen_addr).unwrap();
tx.send(123).await.unwrap();
assert_eq!(rx.recv().await.unwrap(), 123);
Expand Down
24 changes: 16 additions & 8 deletions hyperactor/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ use crate::Proc;
use crate::ProcId;
use crate::actor::Binds;
use crate::actor::Referable;
use crate::attrs::Attrs;
use crate::channel;
use crate::channel::ChannelAddr;
use crate::channel::ChannelError;
Expand Down Expand Up @@ -135,7 +134,7 @@ impl<M: ProcManager> Host<M> {
manager: M,
addr: ChannelAddr,
) -> Result<(Self, MailboxServerHandle), HostError> {
let (frontend_addr, frontend_rx) = channel::serve(addr)?;
let (frontend_addr, frontend_rx) = channel::serve(addr, "host frontend")?;

// We set up a cascade of routers: first, the outer router supports
// sending to the the system proc, while the dial router manages dialed
Expand All @@ -144,7 +143,8 @@ impl<M: ProcManager> Host<M> {

// Establish a backend channel on the preferred transport. We currently simply
// serve the same router on both.
let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?;
let (backend_addr, backend_rx) =
channel::serve(ChannelAddr::any(manager.transport()), "host backend")?;

// Set up a system proc. This is often used to manage the host itself.
let service_proc_id = ProcId::Direct(frontend_addr.clone(), "service".to_string());
Expand Down Expand Up @@ -865,7 +865,10 @@ where
proc_id.clone(),
MailboxClient::dial(forwarder_addr)?.into_boxed(),
);
let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?;
let (proc_addr, rx) = channel::serve(
ChannelAddr::any(transport),
&format!("LocalProcManager spawning: {}", &proc_id),
)?;
self.procs
.lock()
.await
Expand Down Expand Up @@ -1036,8 +1039,10 @@ where
forwarder_addr: ChannelAddr,
_config: (),
) -> Result<Self::Handle, HostError> {
let (callback_addr, mut callback_rx) =
channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
let (callback_addr, mut callback_rx) = channel::serve(
ChannelAddr::any(ChannelTransport::Unix),
&format!("ProcessProcManager spawning: {}", &proc_id),
)?;

let mut cmd = Command::new(&self.program);
cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string());
Expand Down Expand Up @@ -1144,11 +1149,14 @@ where

let agent_handle = spawn(proc.clone())
.await
.map_err(|e| HostError::AgentSpawnFailure(proc_id, e))?;
.map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;

// Finally serve the proc on the same transport as the backend address,
// and call back.
let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?;
let (proc_addr, proc_rx) = channel::serve(
ChannelAddr::any(backend_transport),
&format!("proc addr of: {}", &proc_id),
)?;
proc.clone().serve(proc_rx);
channel::dial(callback_addr)?
.send((proc_addr, agent_handle.bind::<A>()))
Expand Down
28 changes: 21 additions & 7 deletions hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1051,10 +1051,16 @@ pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
}
}
result = stopped_rx.changed(), if !detached => {
tracing::debug!(
"the mailbox server is stopped"
);
detached = result.is_err();
if detached {
tracing::debug!(
"the mailbox server is detached for Rx {}", rx.addr()
);
} else {
tracing::debug!(
"the mailbox server is stopped for Rx {}", rx.addr()
);
}
}
}
}
Expand Down Expand Up @@ -1160,7 +1166,7 @@ impl MailboxSender for MailboxClient {
envelope: MessageEnvelope,
return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
) {
tracing::event!(target:"messages", tracing::Level::DEBUG, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
tracing::event!(target:"messages", tracing::Level::TRACE, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.0, "port"= envelope.dest.1, "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
if let Err(mpsc::error::SendError((envelope, return_handle))) =
self.buffer.send((envelope, return_handle))
{
Expand Down Expand Up @@ -1509,7 +1515,14 @@ impl MailboxSender for Mailbox {

match self.inner.ports.entry(envelope.dest().index()) {
Entry::Vacant(_) => {
let err = DeliveryError::Unroutable("port not bound in mailbox".to_string());
let err = DeliveryError::Unroutable(format!(
"port not bound in mailbox; port id: {}; message type: {}",
envelope.dest().index(),
envelope.data().typename().map_or_else(
|| format!("unregistered type hash {}", envelope.data().typehash()),
|s| s.to_string(),
)
));

envelope.undeliverable(err, return_handle);
}
Expand Down Expand Up @@ -2849,7 +2862,7 @@ mod tests {
.unwrap(),
);

let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone())).unwrap();
let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone()), "test").unwrap();
let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
let mbox = Mailbox::new_detached(id!(test[0].actor0));
let serve_handle = mbox.clone().serve(rx);
Expand Down Expand Up @@ -2978,7 +2991,8 @@ mod tests {

let mut handles = Vec::new(); // hold on to handles, or channels get closed
for mbox in mailboxes.iter() {
let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
let (addr, rx) =
channel::serve(ChannelAddr::any(ChannelTransport::Local), "test").unwrap();
let handle = (*mbox).clone().serve(rx);
handles.push(handle);

Expand Down
7 changes: 5 additions & 2 deletions hyperactor/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl Proc {

/// Create a new direct-addressed proc.
pub async fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
let (addr, rx) = channel::serve(addr)?;
let (addr, rx) = channel::serve(addr, &format!("creating Proc::direct: {}", name))?;
let proc_id = ProcId::Direct(addr, name);
let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed());
proc.clone().serve(rx);
Expand All @@ -347,7 +347,10 @@ impl Proc {
name: String,
default: BoxedMailboxSender,
) -> Result<Self, ChannelError> {
let (addr, rx) = channel::serve(addr)?;
let (addr, rx) = channel::serve(
addr,
&format!("creating Proc::direct_with_default: {}", name),
)?;
let proc_id = ProcId::Direct(addr, name);
let proc = Self::new(
proc_id,
Expand Down
2 changes: 1 addition & 1 deletion hyperactor_mesh/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1730,7 +1730,7 @@ mod tests {
let config = hyperactor::config::global::lock();
let _guard = config.override_key(MAX_CAST_DIMENSION_SIZE, 2);

let (_, mut rx) = serve::<usize>(addr).unwrap();
let (_, mut rx) = serve::<usize>(addr, "test").unwrap();

let expected_ranks = selection
.eval(
Expand Down
8 changes: 5 additions & 3 deletions hyperactor_mesh/src/alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ impl AllocAssignedAddr {

pub(crate) fn serve_with_config<M: RemoteMessage>(
self,
reason: &str,
) -> anyhow::Result<(ChannelAddr, ChannelRx<M>)> {
fn set_as_inaddr_any(original: &mut SocketAddr) {
let inaddr_any: IpAddr = match &original {
Expand Down Expand Up @@ -551,7 +552,7 @@ impl AllocAssignedAddr {
}
};

let (mut bound, rx) = channel::serve(bind_to)?;
let (mut bound, rx) = channel::serve(bind_to, reason)?;

// Restore the original IP address if we used INADDR_ANY.
match &mut bound {
Expand Down Expand Up @@ -836,13 +837,14 @@ pub(crate) mod testing {
transport: ChannelTransport,
) -> (DialMailboxRouter, Instance<()>, Proc, ChannelAddr) {
let (router_channel_addr, router_rx) =
channel::serve(ChannelAddr::any(transport.clone())).unwrap();
channel::serve(ChannelAddr::any(transport.clone()), "test").unwrap();
let router =
DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
router.clone().serve(router_rx);

let client_proc_id = ProcId::Ranked(WorldId("test_stuck".to_string()), 0);
let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(transport)).unwrap();
let (client_proc_addr, client_rx) =
channel::serve(ChannelAddr::any(transport), "test").unwrap();
let client_proc = Proc::new(
client_proc_id.clone(),
BoxedMailboxSender::new(router.clone()),
Expand Down
5 changes: 4 additions & 1 deletion hyperactor_mesh/src/alloc/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ impl Alloc for LocalAlloc {
match self.todo_rx.recv().await? {
Action::Start(rank) => {
let (addr, proc_rx) = loop {
match channel::serve(ChannelAddr::any(self.transport())) {
match channel::serve(
ChannelAddr::any(self.transport()),
"LocalAlloc next proc addr",
) {
Ok(addr_and_proc_rx) => break addr_and_proc_rx,
Err(err) => {
tracing::error!(
Expand Down
7 changes: 5 additions & 2 deletions hyperactor_mesh/src/alloc/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ impl Allocator for ProcessAllocator {

#[hyperactor::instrument(fields(name = "process_allocate", monarch_client_trace_id = spec.constraints.match_labels.get(CLIENT_TRACE_ID_LABEL).cloned().unwrap_or_else(|| "".to_string())))]
async fn allocate(&mut self, spec: AllocSpec) -> Result<ProcessAlloc, AllocatorError> {
let (bootstrap_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix))
.map_err(anyhow::Error::from)?;
let (bootstrap_addr, rx) = channel::serve(
ChannelAddr::any(ChannelTransport::Unix),
"ProcessAllocator allocate bootstrap_addr",
)
.map_err(anyhow::Error::from)?;

if spec.transport == ChannelTransport::Local {
return Err(AllocatorError::Other(anyhow::anyhow!(
Expand Down
Loading