diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index 4a622add8..2b31b8124 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -324,6 +324,13 @@ impl MessageEnvelope { self.errors.push(error) } + /// Change the sender on the envelope in case it was set incorrectly. This + /// should only be used by CommActor since it is forwarding from another + /// sender. + pub fn update_sender(&mut self, sender: ActorId) { + self.sender = sender; + } + /// The message has been determined to be undeliverable with the /// provided error. Mark the envelope with the error and return to /// sender. diff --git a/monarch_hyperactor/src/actor.rs b/monarch_hyperactor/src/actor.rs index 5256138d7..ab07af6a7 100644 --- a/monarch_hyperactor/src/actor.rs +++ b/monarch_hyperactor/src/actor.rs @@ -20,12 +20,17 @@ use hyperactor::Handler; use hyperactor::Instance; use hyperactor::Named; use hyperactor::OncePortHandle; +use hyperactor::ProcId; use hyperactor::mailbox::MessageEnvelope; use hyperactor::mailbox::Undeliverable; use hyperactor::message::Bind; use hyperactor::message::Bindings; use hyperactor::message::Unbind; +use hyperactor::reference::WorldId; +use hyperactor_mesh::actor_mesh::CAST_ACTOR_MESH_ID; +use hyperactor_mesh::comm::multicast::CAST_ORIGINATING_SENDER; use hyperactor_mesh::comm::multicast::CastInfo; +use hyperactor_mesh::reference::ActorMeshId; use monarch_types::PickledPyObject; use monarch_types::SerializablePyErr; use pyo3::IntoPyObjectExt; @@ -493,6 +498,47 @@ impl PythonActor { } } +/// An undeliverable might have its sender address set as the comm actor instead +/// of the original sender. Update it based on the headers present in the message +/// so it matches the sender. +fn update_undeliverable_envelope_for_casting( + mut envelope: Undeliverable, +) -> Undeliverable { + let old_actor = envelope.0.sender().clone(); + // v1 casting + if let Some(actor_id) = envelope.0.headers().get(CAST_ORIGINATING_SENDER).cloned() { + tracing::debug!( + actor_id = %old_actor, + "PythonActor::handle_undeliverable_message: remapped comm-actor id to id from CAST_ORIGINATING_SENDER {}", actor_id + ); + envelope.0.update_sender(actor_id); + // v0 casting + } else if let Some(actor_mesh_id) = envelope.0.headers().get(CAST_ACTOR_MESH_ID) { + match actor_mesh_id { + ActorMeshId::V0(proc_mesh_id, actor_name) => { + let actor_id = ActorId( + ProcId::Ranked(WorldId(proc_mesh_id.0.clone()), 0), + actor_name.clone(), + 0, + ); + tracing::debug!( + actor_id = %old_actor, + "PythonActor::handle_undeliverable_message: remapped comm-actor id to mesh id from CAST_ACTOR_MESH_ID {}", actor_id + ); + envelope.0.update_sender(actor_id); + } + ActorMeshId::V1(_) => { + tracing::debug!( + "PythonActor::handle_undeliverable_message: headers present but V1 ActorMeshId; leaving actor_id unchanged" + ); + } + } + } else { + // Do nothing, it wasn't from a comm actor. + } + envelope +} + #[async_trait] impl Actor for PythonActor { type Params = PickledPyObject; @@ -537,9 +583,21 @@ impl Actor for PythonActor { async fn handle_undeliverable_message( &mut self, ins: &Instance, - envelope: Undeliverable, + mut envelope: Undeliverable, ) -> Result<(), anyhow::Error> { - assert_eq!(envelope.0.sender(), ins.self_id()); + if envelope.0.sender() != ins.self_id() { + // This can happen if the sender is comm. Update the envelope. + envelope = update_undeliverable_envelope_for_casting(envelope); + } + assert_eq!( + envelope.0.sender(), + ins.self_id(), + "undeliverable message was returned to the wrong actor. \ + Return address = {}, src actor = {}, dest actor port = {}", + envelope.0.sender(), + ins.self_id(), + envelope.0.dest() + ); let cx = Context::new(ins, envelope.0.headers().clone());