Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 50 additions & 51 deletions hyperactor_mesh/examples/dining_philosophers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
/// A naive implementation of the Dining Philosophers problem using Hyperactor.
/// https://en.wikipedia.org/wiki/Dining_philosophers_problem
use std::collections::HashMap;
use std::ops::Deref;
use std::process::ExitCode;

use anyhow::Result;
Expand All @@ -20,19 +21,14 @@ use hyperactor::Handler;
use hyperactor::Instance;
use hyperactor::Named;
use hyperactor::PortRef;
use hyperactor::Proc;
use hyperactor::Unbind;
use hyperactor::channel::ChannelTransport;
use hyperactor_mesh::ProcMesh;
use hyperactor_mesh::actor_mesh::ActorMesh;
use hyperactor_mesh::alloc::AllocSpec;
use hyperactor_mesh::alloc::Allocator;
use hyperactor_mesh::alloc::LocalAllocator;
use hyperactor::context;
use hyperactor_mesh::comm::multicast::CastInfo;
use hyperactor_mesh::extent;
use hyperactor_mesh::selection::dsl::all;
use hyperactor_mesh::selection::dsl::true_;
use ndslice::selection::selection_from;
use hyperactor_mesh::proc_mesh::global_root_client;
use hyperactor_mesh::v1::ActorMeshRef;
use hyperactor_mesh::v1::host_mesh::HostMesh;
use ndslice::ViewExt;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::OnceCell;
Expand Down Expand Up @@ -124,9 +120,11 @@ impl PhilosopherActor {

async fn release_chopsticks(&mut self, cx: &Instance<Self>) -> Result<()> {
let (left, right) = self.chopstick_indices();
eprintln!(
tracing::debug!(
"philosopher {} releasing chopsticks, {} and {}",
self.rank, left, right
self.rank,
left,
right
);
self.waiter
.get()
Expand All @@ -145,14 +143,16 @@ impl Handler<PhilosopherMessage> for PhilosopherActor {
message: PhilosopherMessage,
) -> Result<(), anyhow::Error> {
let point = cx.cast_point();
self.rank = point.rank();
match message {
PhilosopherMessage::Start(waiter) => {
self.waiter.set(waiter)?;
self.request_chopsticks(cx).await?;
// Start is always broadcasted to all philosophers; so this is
// our global rank.
self.rank = point.rank();
}
PhilosopherMessage::GrantChopstick(chopstick) => {
eprintln!("philosopher {} granted chopstick {}", self.rank, chopstick);
tracing::debug!("philosopher {} granted chopstick {}", self.rank, chopstick);
let (left, right) = self.chopstick_indices();
if left == chopstick {
self.chopsticks = (ChopstickStatus::Granted, self.chopsticks.1.clone());
Expand All @@ -162,7 +162,7 @@ impl Handler<PhilosopherMessage> for PhilosopherActor {
unreachable!("shouldn't be granted a chopstick that is not left or right");
}
if self.chopsticks == (ChopstickStatus::Granted, ChopstickStatus::Granted) {
eprintln!("philosopher {} starts dining", self.rank);
tracing::debug!("philosopher {} starts dining", self.rank);
self.release_chopsticks(cx).await?;
self.request_chopsticks(cx).await?;
}
Expand All @@ -172,20 +172,17 @@ impl Handler<PhilosopherMessage> for PhilosopherActor {
}
}

struct Waiter<A> {
struct Waiter {
/// A map from chopstick to the rank of the philosopher who holds it.
chopstick_assignments: HashMap<usize, usize>,
/// A map from chopstick to the rank of the philosopher who requested it.
chopstick_requests: HashMap<usize, usize>,
/// ActorMesh of the philosophers.
philosophers: A,
philosophers: ActorMeshRef<PhilosopherActor>,
}

impl<A> Waiter<A>
where
A: ActorMesh<Actor = PhilosopherActor>,
{
fn new(philosophers: A) -> Self {
impl Waiter {
fn new(philosophers: ActorMeshRef<PhilosopherActor>) -> Self {
Self {
chopstick_assignments: Default::default(),
chopstick_requests: Default::default(),
Expand All @@ -202,68 +199,70 @@ where
self.chopstick_assignments.insert(chopstick, rank);
}

fn handle_request_chopstick(&mut self, rank: usize, chopstick: usize) -> Result<()> {
fn handle_request_chopstick(
&mut self,
cx: &impl context::Actor,
rank: usize,
chopstick: usize,
) -> Result<()> {
if self.is_chopstick_available(chopstick) {
self.grant_chopstick(chopstick, rank);
self.philosophers.cast(
self.philosophers.proc_mesh().client(),
selection_from(self.philosophers.shape(), &[("replica", rank..rank + 1)])?,
PhilosopherMessage::GrantChopstick(chopstick),
)?
self.philosophers
.range("replica", rank)?
.cast(cx, PhilosopherMessage::GrantChopstick(chopstick))?
} else {
self.chopstick_requests.insert(chopstick, rank);
}
Ok(())
}

fn handle_release_chopstick(&mut self, chopstick: usize) -> Result<()> {
fn handle_release_chopstick(
&mut self,
cx: &impl context::Actor,
chopstick: usize,
) -> Result<()> {
self.chopstick_assignments.remove(&chopstick);
if let Some(rank) = self.chopstick_requests.remove(&chopstick) {
// now just handle the request again to grant the chopstick
self.handle_request_chopstick(rank, chopstick)?;
self.handle_request_chopstick(cx, rank, chopstick)?;
}
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<ExitCode> {
hyperactor_telemetry::initialize_logging_for_test();
let host_mesh = HostMesh::local().await?;

let group_size = 5;
let alloc = LocalAllocator
.allocate(AllocSpec {
extent: extent! {replica = group_size},
constraints: Default::default(),
proc_name: None,
transport: ChannelTransport::Local,
})
let instance = global_root_client();
let proc_mesh = host_mesh
.spawn(instance, "philosophers", extent!(replica = group_size))
.await?;

let (instance, _) = Proc::local().instance("client").unwrap();

let proc_mesh = ProcMesh::allocate(alloc).await?;
let params = PhilosopherActorParams { size: group_size };
let actor_mesh = proc_mesh
.spawn::<PhilosopherActor>(&instance, "philosopher", &params)
.await?;
let (dining_message_handle, mut dining_message_rx) = proc_mesh.client().open_port();
let (dining_message_handle, mut dining_message_rx) = instance.open_port();
actor_mesh
.cast(
proc_mesh.client(),
all(true_()),
instance,
PhilosopherMessage::Start(dining_message_handle.bind()),
)
.unwrap();
let mut waiter = Waiter::new(actor_mesh);
let mut waiter = Waiter::new(actor_mesh.deref().clone());
while let Ok(message) = dining_message_rx.recv().await {
eprintln!("waiter received message: {:?}", &message);
tracing::debug!("waiter received message: {:?}", &message);
match message {
WaiterMessage::RequestChopsticks((rank, left, right)) => {
waiter.handle_request_chopstick(rank, left)?;
waiter.handle_request_chopstick(rank, right)?;
waiter.handle_request_chopstick(instance, rank, left)?;
waiter.handle_request_chopstick(instance, rank, right)?;
}
WaiterMessage::ReleaseChopsticks((left, right)) => {
waiter.handle_release_chopstick(left)?;
waiter.handle_release_chopstick(right)?;
waiter.handle_release_chopstick(instance, left)?;
waiter.handle_release_chopstick(instance, right)?;
}
}
let mut sorted_chopstick_assignments = waiter
Expand All @@ -272,7 +271,7 @@ async fn main() -> Result<ExitCode> {
.map(|(k, v)| (*k, *v))
.collect::<Vec<_>>();
sorted_chopstick_assignments.sort();
eprintln!(
tracing::debug!(
"assignments [(CHO, PHI)]: {:?}",
sorted_chopstick_assignments
);
Expand All @@ -282,7 +281,7 @@ async fn main() -> Result<ExitCode> {
.map(|(k, v)| (*k, *v))
.collect::<Vec<_>>();
sorted_chopstick_requests.sort();
eprintln!(
tracing::debug!(
"pending requests [(CHO, PHI)]:: {:?}",
sorted_chopstick_requests
);
Expand Down
11 changes: 11 additions & 0 deletions hyperactor_mesh/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod testactor;
pub mod testing;
pub mod value_mesh;

use std::io;
use std::str::FromStr;

pub use actor_mesh::ActorMesh;
Expand All @@ -25,6 +26,7 @@ use enum_as_inner::EnumAsInner;
pub use host_mesh::HostMeshRef;
use hyperactor::ActorId;
use hyperactor::ActorRef;
use hyperactor::host::HostError;
use hyperactor::mailbox::MailboxSenderError;
use ndslice::view;
pub use proc_mesh::ProcMesh;
Expand Down Expand Up @@ -138,8 +140,17 @@ pub enum Error {
)]
ActorStopError { statuses: RankedValues<Status> },

#[error("error spawning actor: {0}")]
SingletonActorSpawnError(anyhow::Error),

#[error("error: {0} does not exist")]
NotExist(Name),

#[error(transparent)]
Io(#[from] io::Error),

#[error(transparent)]
Host(#[from] HostError),
}

/// Errors that occur during serialization and deserialization.
Expand Down
40 changes: 40 additions & 0 deletions hyperactor_mesh/src/v1/host_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use hyperactor::config;
use hyperactor::config::CONFIG;
use hyperactor::config::ConfigAttr;
use hyperactor::declare_attrs;
use hyperactor::host::Host;
use ndslice::view::CollectMeshExt;

pub mod mesh_agent;
Expand All @@ -39,8 +40,11 @@ use ndslice::view::RegionParseError;
use serde::Deserialize;
use serde::Serialize;

use crate::Bootstrap;
use crate::alloc::Alloc;
use crate::bootstrap::BootstrapCommand;
use crate::bootstrap::BootstrapProcManager;
use crate::proc_mesh::DEFAULT_TRANSPORT;
use crate::resource;
use crate::resource::CreateOrUpdateClient;
use crate::resource::GetRankStatus;
Expand All @@ -55,6 +59,7 @@ use crate::v1::ProcMesh;
use crate::v1::ProcMeshRef;
use crate::v1::StatusMesh;
use crate::v1::ValueMesh;
use crate::v1::host_mesh::mesh_agent::HostAgentMode;
pub use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
use crate::v1::host_mesh::mesh_agent::HostMeshAgentProcMeshTrampoline;
use crate::v1::host_mesh::mesh_agent::ProcState;
Expand Down Expand Up @@ -211,6 +216,41 @@ enum HostMeshAllocation {
}

impl HostMesh {
/// Fork a new `HostMesh` from this process, returning the new `HostMesh`
/// to the parent (owning) process, while running forever in child processes
/// (i.e., individual procs).
///
/// All of the code preceding the call to `local` will run in each child proc;
/// thus it is important to call `local` early in the lifetime of the program,
/// and to ensure that it is reached unconditionally.
///
/// This is intended for testing, development, examples.
pub async fn local() -> v1::Result<HostMesh> {
if let Ok(Some(boot)) = Bootstrap::get_from_env() {
let err = boot.bootstrap().await;
tracing::error!("failed to bootstrap local host mesh process: {}", err);
std::process::exit(1);
}

let addr = config::global::get_cloned(DEFAULT_TRANSPORT).any();

let manager = BootstrapProcManager::new(BootstrapCommand::current()?);
let (host, _handle) = Host::serve(manager, addr).await?;
let addr = host.addr().clone();
let host_mesh_agent = host
.system_proc()
.clone()
.spawn::<HostMeshAgent>("agent", HostAgentMode::Process(host))
.await
.map_err(v1::Error::SingletonActorSpawnError)?;
host_mesh_agent.bind::<HostMeshAgent>();

let host = HostRef(addr);
let host_mesh_ref =
HostMeshRef::new(Name::new("local"), extent!(hosts = 1).into(), vec![host])?;
Ok(HostMesh::take(host_mesh_ref))
}

/// Allocate a host mesh from an [`Alloc`]. This creates a HostMesh with the same extent
/// as the provided alloc. Allocs generate procs, and thus we define and run a Host for each
/// proc allocated by it.
Expand Down