Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,13 @@ impl MessageEnvelope {
self.errors.push(error)
}

/// Change the sender on the envelope in case it was set incorrectly. This
/// should only be used by CommActor since it is forwarding from another
/// sender.
pub fn update_sender(&mut self, sender: ActorId) {
self.sender = sender;
}

/// The message has been determined to be undeliverable with the
/// provided error. Mark the envelope with the error and return to
/// sender.
Expand Down
62 changes: 60 additions & 2 deletions monarch_hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ use hyperactor::Handler;
use hyperactor::Instance;
use hyperactor::Named;
use hyperactor::OncePortHandle;
use hyperactor::ProcId;
use hyperactor::mailbox::MessageEnvelope;
use hyperactor::mailbox::Undeliverable;
use hyperactor::message::Bind;
use hyperactor::message::Bindings;
use hyperactor::message::Unbind;
use hyperactor::reference::WorldId;
use hyperactor_mesh::actor_mesh::CAST_ACTOR_MESH_ID;
use hyperactor_mesh::comm::multicast::CAST_ORIGINATING_SENDER;
use hyperactor_mesh::comm::multicast::CastInfo;
use hyperactor_mesh::reference::ActorMeshId;
use monarch_types::PickledPyObject;
use monarch_types::SerializablePyErr;
use pyo3::IntoPyObjectExt;
Expand Down Expand Up @@ -493,6 +498,47 @@ impl PythonActor {
}
}

/// An undeliverable might have its sender address set as the comm actor instead
/// of the original sender. Update it based on the headers present in the message
/// so it matches the sender.
fn update_undeliverable_envelope_for_casting(
mut envelope: Undeliverable<MessageEnvelope>,
) -> Undeliverable<MessageEnvelope> {
let old_actor = envelope.0.sender().clone();
// v1 casting
if let Some(actor_id) = envelope.0.headers().get(CAST_ORIGINATING_SENDER).cloned() {
tracing::debug!(
actor_id = %old_actor,
"PythonActor::handle_undeliverable_message: remapped comm-actor id to id from CAST_ORIGINATING_SENDER {}", actor_id
);
envelope.0.update_sender(actor_id);
// v0 casting
} else if let Some(actor_mesh_id) = envelope.0.headers().get(CAST_ACTOR_MESH_ID) {
match actor_mesh_id {
ActorMeshId::V0(proc_mesh_id, actor_name) => {
let actor_id = ActorId(
ProcId::Ranked(WorldId(proc_mesh_id.0.clone()), 0),
actor_name.clone(),
0,
);
tracing::debug!(
actor_id = %old_actor,
"PythonActor::handle_undeliverable_message: remapped comm-actor id to mesh id from CAST_ACTOR_MESH_ID {}", actor_id
);
envelope.0.update_sender(actor_id);
}
ActorMeshId::V1(_) => {
tracing::debug!(
"PythonActor::handle_undeliverable_message: headers present but V1 ActorMeshId; leaving actor_id unchanged"
);
}
}
} else {
// Do nothing, it wasn't from a comm actor.
}
envelope
}

#[async_trait]
impl Actor for PythonActor {
type Params = PickledPyObject;
Expand Down Expand Up @@ -537,9 +583,21 @@ impl Actor for PythonActor {
async fn handle_undeliverable_message(
&mut self,
ins: &Instance<Self>,
envelope: Undeliverable<MessageEnvelope>,
mut envelope: Undeliverable<MessageEnvelope>,
) -> Result<(), anyhow::Error> {
assert_eq!(envelope.0.sender(), ins.self_id());
if envelope.0.sender() != ins.self_id() {
// This can happen if the sender is comm. Update the envelope.
envelope = update_undeliverable_envelope_for_casting(envelope);
}
assert_eq!(
envelope.0.sender(),
ins.self_id(),
"undeliverable message was returned to the wrong actor. \
Return address = {}, src actor = {}, dest actor port = {}",
envelope.0.sender(),
ins.self_id(),
envelope.0.dest()
);

let cx = Context::new(ins, envelope.0.headers().clone());

Expand Down