diff --git a/Cargo.toml b/Cargo.toml index 05979c3..3035bd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "xactor" version = "0.7.11" authors = ["sunli "] description = "Xactor is a rust actors framework based on async-std" -edition = "2018" +edition = "2021" publish = true license = "MIT" documentation = "https://docs.rs/xactor/" @@ -14,16 +14,17 @@ categories = ["network-programming", "asynchronous"] readme = "README.md" [dependencies] -futures = "0.3.8" -async-trait = "0.1.42" -async-std = { version = "1.8.0", features = ["attributes"], optional = true } -tokio = { version = "1.0.1", features = ["rt-multi-thread", "macros", "time"], optional = true } -once_cell = "1.5.2" -xactor-derive = { path = "xactor-derive", version = "0.7"} +futures = "0.3.21" +async-trait = "0.1.52" +async-std = { version = "1.10.0", features = ["attributes"], optional = true } +tokio = { version = "1.17.0", features = ["rt-multi-thread", "macros", "time"], optional = true } +once_cell = "1.9.0" +xactor-derive = { path = "xactor-derive", version = "0.7" } fnv = "1.0.7" -slab = "0.4.2" -anyhow = { version = "1.0.37", optional = true } -eyre = { version = "0.6.5", optional = true } +slab = "0.4.5" +anyhow = { version = "1.0.53", optional = true } +eyre = { version = "0.6.6", optional = true } +dyn-clone = "1.0.4" [workspace] members = [ diff --git a/examples/intervals.rs b/examples/intervals.rs new file mode 100644 index 0000000..f87cd31 --- /dev/null +++ b/examples/intervals.rs @@ -0,0 +1,31 @@ +use std::time::Duration; +use xactor::*; + +#[message] +struct IntervalMsg; + +struct MyActor; + +#[async_trait::async_trait] +impl Actor for MyActor { + async fn started(&mut self, ctx: &mut Context) -> Result<()> { + // Send the IntervalMsg message 3 seconds later + ctx.send_later(IntervalMsg, Duration::from_millis(500)); + Ok(()) + } +} + +#[async_trait::async_trait] +impl Handler for MyActor { + async fn handle(&mut self, ctx: &mut Context, _msg: IntervalMsg) { + ctx.send_later(IntervalMsg, Duration::from_millis(500)); + } +} + +#[xactor::main] +async fn main() -> Result<()> { + // Exit the program after 3 seconds + let addr = MyActor.start().await?; + addr.wait_for_stop().await; + Ok(()) +} diff --git a/examples/subscriber.rs b/examples/subscriber.rs index 7f4fda1..0b15adf 100644 --- a/examples/subscriber.rs +++ b/examples/subscriber.rs @@ -95,9 +95,21 @@ impl Actor for Subscriber { // Send subscription request message to the Message Producer println!("Child Subscriber Started - id {:?}", self.id); let self_sender = ctx.address().sender(); + let _ = self.message_producer_addr.send(SubscribeToProducer { - sender: self_sender, + sender: self_sender.clone(), }); + + let _ = self.message_producer_addr.send(SubscribeToProducer { + sender: self_sender.clone(), + }); + let _ = self.message_producer_addr.send(SubscribeToProducer { + sender: self_sender.clone(), + }); + let _ = self.message_producer_addr.send(SubscribeToProducer { + sender: self_sender.clone(), + }); + Ok(()) } } @@ -169,7 +181,7 @@ impl Handler for MessageProducer { #[async_trait::async_trait] impl Handler for MessageProducer { async fn handle(&mut self, _ctx: &mut Context, msg: SubscribeToProducer) { - println!("Recieved Subscription Request"); + println!("Recieved Subscription Request {:}", msg.sender.actor_id); self.subscribers.push(msg.sender); } } diff --git a/src/actor.rs b/src/actor.rs index 1e7b46c..2fd0629 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -1,7 +1,7 @@ use crate::addr::ActorEvent; +use crate::error::Result; use crate::runtime::spawn; use crate::{Addr, Context}; -use crate::error::Result; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot; use futures::{FutureExt, StreamExt}; @@ -158,8 +158,10 @@ impl ActorManager { ActorEvent::Exec(f) => f(&mut actor, &mut ctx).await, ActorEvent::Stop(_err) => break, ActorEvent::RemoveStream(id) => { - if ctx.streams.contains(id) { - ctx.streams.remove(id); + let mut streams = ctx.streams.lock().unwrap(); + + if streams.contains(id) { + streams.remove(id); } } } diff --git a/src/addr.rs b/src/addr.rs index a81c249..aa200dd 100644 --- a/src/addr.rs +++ b/src/addr.rs @@ -4,7 +4,7 @@ use futures::future::Shared; use futures::Future; use std::hash::{Hash, Hasher}; use std::pin::Pin; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::{Arc, Weak}; type ExecFuture<'a> = Pin + Send + 'a>>; @@ -53,6 +53,8 @@ impl PartialEq for Addr { } } +impl Eq for Addr {} + impl Hash for Addr { fn hash(&self, state: &mut H) { self.actor_id.hash(state) @@ -111,29 +113,33 @@ impl Addr { { let weak_tx = Arc::downgrade(&self.tx); - Caller { - actor_id: self.actor_id.clone(), - caller_fn: Mutex::new(Box::new(move |msg| { - let weak_tx_option = weak_tx.upgrade(); - Box::pin(async move { - match weak_tx_option { - Some(tx) => { - let (oneshot_tx, oneshot_rx) = oneshot::channel(); - - mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec( - Box::new(move |actor, ctx| { - Box::pin(async move { - let res = Handler::handle(&mut *actor, ctx, msg).await; - let _ = oneshot_tx.send(res); - }) - }), - ))?; - Ok(oneshot_rx.await?) - } - None => Err(crate::error::anyhow!("Actor Dropped")), + let closure = move |msg: T| { + let weak_tx_option = weak_tx.upgrade(); + Box::pin(async move { + match weak_tx_option { + Some(tx) => { + let (oneshot_tx, oneshot_rx) = oneshot::channel(); + + mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec( + Box::new(move |actor, ctx| { + Box::pin(async move { + let res = Handler::handle(&mut *actor, ctx, msg).await; + let _ = oneshot_tx.send(res); + }) + }), + ))?; + + let result = oneshot_rx.await?; + Ok(result) } - }) - })), + None => Err(crate::error::anyhow!("Actor Dropped")), + } + }) as Pin>>> + }; + + Caller { + actor_id: self.actor_id, + caller_fn: Box::new(closure), } } @@ -143,21 +149,25 @@ impl Addr { A: Handler, { let weak_tx = Arc::downgrade(&self.tx); + + let closure = move |msg| match weak_tx.upgrade() { + Some(tx) => { + mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(Box::new( + move |actor, ctx| { + Box::pin(async move { + Handler::handle(&mut *actor, ctx, msg).await; + }) + }, + )))?; + Ok(()) + } + None => Ok(()), + }; + + let sender_fn = Box::new(closure); Sender { - actor_id: self.actor_id.clone(), - sender_fn: Box::new(move |msg| match weak_tx.upgrade() { - Some(tx) => { - mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(Box::new( - move |actor, ctx| { - Box::pin(async move { - Handler::handle(&mut *actor, ctx, msg).await; - }) - }, - )))?; - Ok(()) - } - None => Ok(()), - }), + actor_id: self.actor_id, + sender_fn, } } @@ -183,6 +193,8 @@ impl PartialEq for WeakAddr { } } +impl Eq for WeakAddr {} + impl Hash for WeakAddr { fn hash(&self, state: &mut H) { self.actor_id.hash(state) @@ -191,14 +203,11 @@ impl Hash for WeakAddr { impl WeakAddr { pub fn upgrade(&self) -> Option> { - match self.tx.upgrade() { - Some(tx) => Some(Addr { - actor_id: self.actor_id, - tx, - rx_exit: self.rx_exit.clone(), - }), - None => None, - } + self.tx.upgrade().map(|tx| Addr { + actor_id: self.actor_id, + tx, + rx_exit: self.rx_exit.clone(), + }) } } diff --git a/src/broker.rs b/src/broker.rs index 9b0b1ab..8298428 100644 --- a/src/broker.rs +++ b/src/broker.rs @@ -1,14 +1,9 @@ -use crate::{Actor, Addr, Context, Handler, Message, Result, Sender, Service}; +use crate::{Actor, ActorId, Addr, Context, Handler, Message, Result, Sender, Service}; use fnv::FnvHasher; -use std::any::Any; -use std::collections::HashMap; -use std::hash::BuildHasherDefault; -use std::marker::PhantomData; - -type SubscriptionId = u64; +use std::{collections::HashMap, hash::BuildHasherDefault}; pub(crate) struct Subscribe> { - pub(crate) id: SubscriptionId, + pub(crate) actor_id: ActorId, pub(crate) sender: Sender, } @@ -17,19 +12,12 @@ impl> Message for Subscribe { } pub(crate) struct Unsubscribe { - pub(crate) id: SubscriptionId, + pub(crate) actor_id: ActorId, } impl Message for Unsubscribe { type Result = (); } - -struct Publish + Clone>(T); - -impl + Clone> Message for Publish { - type Result = (); -} - /// Message broker is used to support publishing and subscribing to messages. /// /// # Examples @@ -86,15 +74,13 @@ impl + Clone> Message for Publish { /// } /// ``` pub struct Broker> { - subscribes: HashMap, BuildHasherDefault>, - mark: PhantomData, + subscribers: HashMap, BuildHasherDefault>, } impl> Default for Broker { fn default() -> Self { Self { - subscribes: Default::default(), - mark: PhantomData, + subscribers: Default::default(), } } } @@ -106,31 +92,29 @@ impl> Service for Broker {} #[async_trait::async_trait] impl> Handler> for Broker { async fn handle(&mut self, _ctx: &mut Context, msg: Subscribe) { - self.subscribes.insert(msg.id, Box::new(msg.sender)); + self.subscribers.insert(msg.actor_id, msg.sender); } } #[async_trait::async_trait] impl> Handler for Broker { async fn handle(&mut self, _ctx: &mut Context, msg: Unsubscribe) { - self.subscribes.remove(&msg.id); + self.subscribers.remove(&msg.actor_id); } } #[async_trait::async_trait] -impl + Clone> Handler> for Broker { - async fn handle(&mut self, _ctx: &mut Context, msg: Publish) { - for sender in self.subscribes.values_mut() { - if let Some(sender) = sender.downcast_mut::>() { - sender.send(msg.0.clone()).ok(); - } - } +impl + Clone> Handler for Broker { + async fn handle(&mut self, _ctx: &mut Context, msg: T) { + // Broadcast to all subscribers and remove any senders that return an error (most likely because reciever dropped because actor already stopped) + self.subscribers + .retain(|_actor_id, sender| sender.send(msg.clone()).is_ok()) } } impl + Clone> Addr> { /// Publishes a message of the specified type. pub fn publish(&mut self, msg: T) -> Result<()> { - self.send(Publish(msg)) + self.send(msg) } } diff --git a/src/caller.rs b/src/caller.rs index b70a03a..5142e98 100644 --- a/src/caller.rs +++ b/src/caller.rs @@ -2,55 +2,57 @@ use crate::{ActorId, Message, Result}; use std::future::Future; use std::hash::{Hash, Hasher}; use std::pin::Pin; -use std::sync::Mutex; - -pub(crate) type CallerFuture = - Pin::Result>> + Send + 'static>>; - -pub(crate) type CallerFn = Box CallerFuture + Send + 'static>; - -pub(crate) type SenderFn = Box Result<()> + 'static + Send>; /// Caller of a specific message type /// -/// Like `Sender, Caller has a weak reference to the recipient of the message type, and so will not prevent an actor from stopping if all Addr's have been dropped elsewhere. +/// Like `Sender`, Caller has a weak reference to the recipient of the message type, and so will not prevent an actor from stopping if all Addr's have been dropped elsewhere. +/// This takes a boxed closure with the message as a parameter with the mpsc channel of the actor inside and the type of actor abstracted away. pub struct Caller { pub actor_id: ActorId, - pub(crate) caller_fn: Mutex>, + pub(crate) caller_fn: Box>, } impl Caller { - pub fn call(&self, msg: T) -> CallerFuture { - (self.caller_fn.lock().unwrap())(msg) + pub async fn call(&self, msg: T) -> Result { + self.caller_fn.call(msg).await } } -impl> PartialEq for Caller { +impl PartialEq for Caller { fn eq(&self, other: &Self) -> bool { self.actor_id == other.actor_id } } -impl> Hash for Caller { +impl Hash for Caller { fn hash(&self, state: &mut H) { self.actor_id.hash(state) } } +impl Clone for Caller { + fn clone(&self) -> Caller { + Caller { + actor_id: self.actor_id, + caller_fn: dyn_clone::clone_box(&*self.caller_fn), + } + } +} + /// Sender of a specific message type /// -/// Like `Caller, Sender has a weak reference to the recipient of the message type, and so will not prevent an actor from stopping if all Addr's have been dropped elsewhere. +/// Like `Caller`, Sender has a weak reference to the recipient of the message type, and so will not prevent an actor from stopping if all Addr's have been dropped elsewhere. /// This allows it to be used in `send_later` `send_interval` actor functions, and not keep the actor alive indefinitely even after all references to it have been dropped (unless `ctx.stop()` is called from within) pub struct Sender { pub actor_id: ActorId, - pub(crate) sender_fn: SenderFn, + pub(crate) sender_fn: Box>, } impl> Sender { pub fn send(&self, msg: T) -> Result<()> { - (self.sender_fn)(msg) + self.sender_fn.send(msg) } } @@ -65,3 +67,50 @@ impl> Hash for Sender { self.actor_id.hash(state) } } + +impl> Clone for Sender { + fn clone(&self) -> Sender { + Sender { + actor_id: self.actor_id, + sender_fn: dyn_clone::clone_box(&*self.sender_fn), + } + } +} + +// https://stackoverflow.com/questions/63842261/how-to-derive-clone-for-structures-with-boxed-closure +// https://users.rust-lang.org/t/expected-opaque-type-found-a-different-opaque-type-when-trying-futures-join-all/40596/3 +use dyn_clone::DynClone; + +pub trait SenderFn: DynClone + 'static + Send + Sync +where + T: Message, +{ + fn send(&self, msg: T) -> Result<()>; +} + +impl SenderFn for F +where + F: Fn(T) -> Result<()> + 'static + Send + Sync + Clone, + T: Message, +{ + fn send(&self, msg: T) -> Result<()> { + self(msg) + } +} + +pub trait CallerFn: DynClone + 'static + Send + Sync +where + T: Message, +{ + fn call(&self, msg: T) -> Pin>>>; +} + +impl CallerFn for F +where + F: Fn(T) -> Pin>>> + 'static + Send + Sync + Clone, + T: Message, +{ + fn call(&self, msg: T) -> Pin>>> { + self(msg) + } +} diff --git a/src/context.rs b/src/context.rs index fa30be5..553353d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -7,7 +7,9 @@ use futures::future::{AbortHandle, Abortable, Shared}; use futures::{Stream, StreamExt}; use once_cell::sync::OnceCell; use slab::Slab; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::fmt; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Mutex; use std::sync::{Arc, Weak}; use std::time::Duration; @@ -16,8 +18,16 @@ pub struct Context { actor_id: ActorId, tx: Weak>>, pub(crate) rx_exit: Option>>, - pub(crate) streams: Slab, - pub(crate) intervals: Slab, + pub(crate) streams: Arc>>, + pub(crate) intervals: Arc>>, +} + +impl fmt::Debug for Context { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Context") + .field("actor_id", &self.actor_id) + .finish() + } } impl Context { @@ -28,7 +38,7 @@ impl Context { mpsc::UnboundedReceiver>, Arc>>, ) { - static ACTOR_ID: OnceCell = OnceCell::new(); + static ACTOR_ID: OnceCell = OnceCell::new(); // Get an actor id let actor_id = ACTOR_ID @@ -76,14 +86,18 @@ impl Context { } pub fn abort_intervals(&mut self) { - for handle in self.intervals.drain() { - handle.abort() + if let Ok(mut intervals) = self.intervals.lock() { + for handle in intervals.drain() { + handle.abort() + } } } pub fn abort_streams(&mut self) { - for handle in self.streams.drain() { - handle.abort(); + if let Ok(mut intervals) = self.streams.lock() { + for handle in intervals.drain() { + handle.abort() + } } } @@ -149,7 +163,9 @@ impl Context { A: StreamHandler, { let tx = self.tx.clone(); - let entry = self.streams.vacant_entry(); + let mut streams = self.streams.lock().unwrap(); + + let entry = streams.vacant_entry(); let id = entry.key(); let (handle, registration) = futures::future::AbortHandle::new_pair(); entry.insert(handle); @@ -209,59 +225,78 @@ impl Context { /// /// We use `Sender` instead of `Addr` so that the interval doesn't keep reference to address and prevent the actor from being dropped and stopped - pub fn send_later(&mut self, msg: T, after: Duration) + pub fn send_later(&mut self, msg: T, after: Duration) -> AbortHandle where A: Handler, T: Message, { let sender = self.address().sender(); - let entry = self.intervals.vacant_entry(); + let intervals_clone = self.intervals.clone(); + + let mut intervals = self.intervals.lock().unwrap(); + + let entry = intervals.vacant_entry(); + let key = entry.key(); + let (handle, registration) = futures::future::AbortHandle::new_pair(); - entry.insert(handle); + entry.insert(handle.clone()); spawn(Abortable::new( async move { sleep(after).await; sender.send(msg).ok(); + // We have to remove the entry after the send has been completed or the slab will grow indefinitely + let mut intervals = intervals_clone.lock().unwrap(); + intervals.remove(key); }, registration, )); + handle } /// Sends the message to self, at a specified fixed interval. /// The message is created each time using a closure `f`. - pub fn send_interval_with(&mut self, f: F, dur: Duration) + pub fn send_interval_with(&mut self, f: F, dur: Duration) -> AbortHandle where A: Handler, F: Fn() -> T + Sync + Send + 'static, T: Message, { let sender = self.address().sender(); + let intervals_clone = self.intervals.clone(); + + let mut intervals = self.intervals.lock().unwrap(); + + let entry = intervals.vacant_entry(); + let key = entry.key(); - let entry = self.intervals.vacant_entry(); let (handle, registration) = futures::future::AbortHandle::new_pair(); - entry.insert(handle); + entry.insert(handle.clone()); spawn(Abortable::new( async move { loop { sleep(dur).await; if sender.send(f()).is_err() { + // Again, we have to remove the entry after the send has been completed or the slab will grow indefinitely + let mut intervals = intervals_clone.lock().unwrap(); + intervals.remove(key); break; } } }, registration, )); + handle } /// Sends the message `msg` to self, at a specified fixed interval. - pub fn send_interval(&mut self, msg: T, dur: Duration) + pub fn send_interval(&mut self, msg: T, dur: Duration) -> AbortHandle where A: Handler, T: Message + Clone + Sync, { - self.send_interval_with(move || msg.clone(), dur); + self.send_interval_with(move || msg.clone(), dur) } /// Subscribes to a message of a specified type. @@ -273,7 +308,7 @@ impl Context { let sender = self.address().sender(); broker .send(Subscribe { - id: self.actor_id, + actor_id: self.actor_id, sender, }) .ok(); @@ -283,6 +318,8 @@ impl Context { /// Unsubscribe to a message of a specified type. pub async fn unsubscribe>(&self) -> Result<()> { let broker = Broker::::from_registry().await?; - broker.send(Unsubscribe { id: self.actor_id }) + broker.send(Unsubscribe { + actor_id: self.actor_id, + }) } } diff --git a/src/lib.rs b/src/lib.rs index 26343e7..baf7abc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,15 +72,17 @@ mod service; mod supervisor; #[cfg(all(feature = "anyhow", feature = "eyre"))] -compile_error!(r#" +compile_error!( + r#" features `xactor/anyhow` and `xactor/eyre` are mutually exclusive. If you are trying to disable anyhow set `default-features = false`. -"#); +"# +); -#[cfg(feature="anyhow")] +#[cfg(feature = "anyhow")] pub use anyhow as error; -#[cfg(feature="eyre")] +#[cfg(feature = "eyre")] pub use eyre as error; /// Alias of error::Result @@ -89,7 +91,7 @@ pub type Result = error::Result; /// Alias of error::Error pub type Error = error::Error; -pub type ActorId = u64; +pub type ActorId = usize; pub use actor::{Actor, Handler, Message, StreamHandler}; pub use addr::{Addr, WeakAddr}; diff --git a/src/service.rs b/src/service.rs index bf2127d..fcac805 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,6 +1,6 @@ use crate::actor::ActorManager; -use crate::{Actor, Addr}; use crate::error::Result; +use crate::{Actor, Addr}; use fnv::FnvHasher; use futures::lock::Mutex; use once_cell::sync::OnceCell; diff --git a/src/supervisor.rs b/src/supervisor.rs index d220e70..4fa0f7e 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -1,7 +1,7 @@ use crate::addr::ActorEvent; +use crate::error::Result; use crate::runtime::spawn; use crate::{Actor, Addr, Context}; -use crate::error::Result; use futures::StreamExt; /// Actor supervisor @@ -97,8 +97,10 @@ impl Supervisor { Some(ActorEvent::Stop(_err)) => break 'event_loop, Some(ActorEvent::Exec(f)) => f(&mut actor, &mut ctx).await, Some(ActorEvent::RemoveStream(id)) => { - if ctx.streams.contains(id) { - ctx.streams.remove(id); + let mut streams = ctx.streams.lock().unwrap(); + + if streams.contains(id) { + streams.remove(id); } } } diff --git a/xactor-derive/Cargo.toml b/xactor-derive/Cargo.toml index 35e6ca9..8de5d11 100644 --- a/xactor-derive/Cargo.toml +++ b/xactor-derive/Cargo.toml @@ -16,6 +16,6 @@ categories = ["network-programming", "asynchronous"] proc-macro = true [dependencies] -proc-macro2 = "1.0.6" -syn = { version = "1.0.13", features = ["full"] } -quote = "1.0.2" +proc-macro2 = "1.0.36" +syn = { version = "1.0.86", features = ["full"] } +quote = "1.0.15"