Skip to content

Commit

Permalink
Added cloning of Senders & Callers, send_later memory leak fix, abort…
Browse files Browse the repository at this point in the history
…handle return & version bumps
  • Loading branch information
jracollins committed Feb 17, 2022
1 parent 8bdc08e commit 5f4006b
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 138 deletions.
21 changes: 11 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "xactor"
version = "0.7.11"
authors = ["sunli <scott_s829@163.com>"]
description = "Xactor is a rust actors framework based on async-std"
edition = "2018"
edition = "2021"
publish = true
license = "MIT"
documentation = "https://docs.rs/xactor/"
Expand All @@ -14,16 +14,17 @@ categories = ["network-programming", "asynchronous"]
readme = "README.md"

[dependencies]
futures = "0.3.8"
async-trait = "0.1.42"
async-std = { version = "1.8.0", features = ["attributes"], optional = true }
tokio = { version = "1.0.1", features = ["rt-multi-thread", "macros", "time"], optional = true }
once_cell = "1.5.2"
xactor-derive = { path = "xactor-derive", version = "0.7"}
futures = "0.3.21"
async-trait = "0.1.52"
async-std = { version = "1.10.0", features = ["attributes"], optional = true }
tokio = { version = "1.17.0", features = ["rt-multi-thread", "macros", "time"], optional = true }
once_cell = "1.9.0"
xactor-derive = { path = "xactor-derive", version = "0.7" }
fnv = "1.0.7"
slab = "0.4.2"
anyhow = { version = "1.0.37", optional = true }
eyre = { version = "0.6.5", optional = true }
slab = "0.4.5"
anyhow = { version = "1.0.53", optional = true }
eyre = { version = "0.6.6", optional = true }
dyn-clone = "1.0.4"

[workspace]
members = [
Expand Down
31 changes: 31 additions & 0 deletions examples/intervals.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::time::Duration;
use xactor::*;

#[message]
struct IntervalMsg;

struct MyActor;

#[async_trait::async_trait]
impl Actor for MyActor {
async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
// Send the IntervalMsg message 3 seconds later
ctx.send_later(IntervalMsg, Duration::from_millis(500));
Ok(())
}
}

#[async_trait::async_trait]
impl Handler<IntervalMsg> for MyActor {
async fn handle(&mut self, ctx: &mut Context<Self>, _msg: IntervalMsg) {
ctx.send_later(IntervalMsg, Duration::from_millis(500));
}
}

#[xactor::main]
async fn main() -> Result<()> {
// Exit the program after 3 seconds
let addr = MyActor.start().await?;
addr.wait_for_stop().await;
Ok(())
}
16 changes: 14 additions & 2 deletions examples/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,21 @@ impl Actor for Subscriber {
// Send subscription request message to the Message Producer
println!("Child Subscriber Started - id {:?}", self.id);
let self_sender = ctx.address().sender();

let _ = self.message_producer_addr.send(SubscribeToProducer {
sender: self_sender,
sender: self_sender.clone(),
});

let _ = self.message_producer_addr.send(SubscribeToProducer {
sender: self_sender.clone(),
});
let _ = self.message_producer_addr.send(SubscribeToProducer {
sender: self_sender.clone(),
});
let _ = self.message_producer_addr.send(SubscribeToProducer {
sender: self_sender.clone(),
});

Ok(())
}
}
Expand Down Expand Up @@ -169,7 +181,7 @@ impl Handler<Broadcast> for MessageProducer {
#[async_trait::async_trait]
impl Handler<SubscribeToProducer> for MessageProducer {
async fn handle(&mut self, _ctx: &mut Context<Self>, msg: SubscribeToProducer) {
println!("Recieved Subscription Request");
println!("Recieved Subscription Request {:}", msg.sender.actor_id);
self.subscribers.push(msg.sender);
}
}
8 changes: 5 additions & 3 deletions src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::addr::ActorEvent;
use crate::error::Result;
use crate::runtime::spawn;
use crate::{Addr, Context};
use crate::error::Result;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
use futures::{FutureExt, StreamExt};
Expand Down Expand Up @@ -158,8 +158,10 @@ impl<A: Actor> ActorManager<A> {
ActorEvent::Exec(f) => f(&mut actor, &mut ctx).await,
ActorEvent::Stop(_err) => break,
ActorEvent::RemoveStream(id) => {
if ctx.streams.contains(id) {
ctx.streams.remove(id);
let mut streams = ctx.streams.lock().unwrap();

if streams.contains(id) {
streams.remove(id);
}
}
}
Expand Down
99 changes: 54 additions & 45 deletions src/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::future::Shared;
use futures::Future;
use std::hash::{Hash, Hasher};
use std::pin::Pin;
use std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Weak};

type ExecFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;

Expand Down Expand Up @@ -53,6 +53,8 @@ impl<A> PartialEq for Addr<A> {
}
}

impl<A> Eq for Addr<A> {}

impl<A> Hash for Addr<A> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.actor_id.hash(state)
Expand Down Expand Up @@ -111,29 +113,33 @@ impl<A: Actor> Addr<A> {
{
let weak_tx = Arc::downgrade(&self.tx);

Caller {
actor_id: self.actor_id.clone(),
caller_fn: Mutex::new(Box::new(move |msg| {
let weak_tx_option = weak_tx.upgrade();
Box::pin(async move {
match weak_tx_option {
Some(tx) => {
let (oneshot_tx, oneshot_rx) = oneshot::channel();

mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(
Box::new(move |actor, ctx| {
Box::pin(async move {
let res = Handler::handle(&mut *actor, ctx, msg).await;
let _ = oneshot_tx.send(res);
})
}),
))?;
Ok(oneshot_rx.await?)
}
None => Err(crate::error::anyhow!("Actor Dropped")),
let closure = move |msg: T| {
let weak_tx_option = weak_tx.upgrade();
Box::pin(async move {
match weak_tx_option {
Some(tx) => {
let (oneshot_tx, oneshot_rx) = oneshot::channel();

mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(
Box::new(move |actor, ctx| {
Box::pin(async move {
let res = Handler::handle(&mut *actor, ctx, msg).await;
let _ = oneshot_tx.send(res);
})
}),
))?;

let result = oneshot_rx.await?;
Ok(result)
}
})
})),
None => Err(crate::error::anyhow!("Actor Dropped")),
}
}) as Pin<Box<dyn Future<Output = Result<T::Result>>>>
};

Caller {
actor_id: self.actor_id,
caller_fn: Box::new(closure),
}
}

Expand All @@ -143,21 +149,25 @@ impl<A: Actor> Addr<A> {
A: Handler<T>,
{
let weak_tx = Arc::downgrade(&self.tx);

let closure = move |msg| match weak_tx.upgrade() {
Some(tx) => {
mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(Box::new(
move |actor, ctx| {
Box::pin(async move {
Handler::handle(&mut *actor, ctx, msg).await;
})
},
)))?;
Ok(())
}
None => Ok(()),
};

let sender_fn = Box::new(closure);
Sender {
actor_id: self.actor_id.clone(),
sender_fn: Box::new(move |msg| match weak_tx.upgrade() {
Some(tx) => {
mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(Box::new(
move |actor, ctx| {
Box::pin(async move {
Handler::handle(&mut *actor, ctx, msg).await;
})
},
)))?;
Ok(())
}
None => Ok(()),
}),
actor_id: self.actor_id,
sender_fn,
}
}

Expand All @@ -183,6 +193,8 @@ impl<A> PartialEq for WeakAddr<A> {
}
}

impl<A> Eq for WeakAddr<A> {}

impl<A> Hash for WeakAddr<A> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.actor_id.hash(state)
Expand All @@ -191,14 +203,11 @@ impl<A> Hash for WeakAddr<A> {

impl<A> WeakAddr<A> {
pub fn upgrade(&self) -> Option<Addr<A>> {
match self.tx.upgrade() {
Some(tx) => Some(Addr {
actor_id: self.actor_id,
tx,
rx_exit: self.rx_exit.clone(),
}),
None => None,
}
self.tx.upgrade().map(|tx| Addr {
actor_id: self.actor_id,
tx,
rx_exit: self.rx_exit.clone(),
})
}
}

Expand Down
44 changes: 14 additions & 30 deletions src/broker.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use crate::{Actor, Addr, Context, Handler, Message, Result, Sender, Service};
use crate::{Actor, ActorId, Addr, Context, Handler, Message, Result, Sender, Service};
use fnv::FnvHasher;
use std::any::Any;
use std::collections::HashMap;
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;

type SubscriptionId = u64;
use std::{collections::HashMap, hash::BuildHasherDefault};

pub(crate) struct Subscribe<T: Message<Result = ()>> {
pub(crate) id: SubscriptionId,
pub(crate) actor_id: ActorId,
pub(crate) sender: Sender<T>,
}

Expand All @@ -17,19 +12,12 @@ impl<T: Message<Result = ()>> Message for Subscribe<T> {
}

pub(crate) struct Unsubscribe {
pub(crate) id: SubscriptionId,
pub(crate) actor_id: ActorId,
}

impl Message for Unsubscribe {
type Result = ();
}

struct Publish<T: Message<Result = ()> + Clone>(T);

impl<T: Message<Result = ()> + Clone> Message for Publish<T> {
type Result = ();
}

/// Message broker is used to support publishing and subscribing to messages.
///
/// # Examples
Expand Down Expand Up @@ -86,15 +74,13 @@ impl<T: Message<Result = ()> + Clone> Message for Publish<T> {
/// }
/// ```
pub struct Broker<T: Message<Result = ()>> {
subscribes: HashMap<SubscriptionId, Box<dyn Any + Send>, BuildHasherDefault<FnvHasher>>,
mark: PhantomData<T>,
subscribers: HashMap<ActorId, Sender<T>, BuildHasherDefault<FnvHasher>>,
}

impl<T: Message<Result = ()>> Default for Broker<T> {
fn default() -> Self {
Self {
subscribes: Default::default(),
mark: PhantomData,
subscribers: Default::default(),
}
}
}
Expand All @@ -106,31 +92,29 @@ impl<T: Message<Result = ()>> Service for Broker<T> {}
#[async_trait::async_trait]
impl<T: Message<Result = ()>> Handler<Subscribe<T>> for Broker<T> {
async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Subscribe<T>) {
self.subscribes.insert(msg.id, Box::new(msg.sender));
self.subscribers.insert(msg.actor_id, msg.sender);
}
}

#[async_trait::async_trait]
impl<T: Message<Result = ()>> Handler<Unsubscribe> for Broker<T> {
async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Unsubscribe) {
self.subscribes.remove(&msg.id);
self.subscribers.remove(&msg.actor_id);
}
}

#[async_trait::async_trait]
impl<T: Message<Result = ()> + Clone> Handler<Publish<T>> for Broker<T> {
async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Publish<T>) {
for sender in self.subscribes.values_mut() {
if let Some(sender) = sender.downcast_mut::<Sender<T>>() {
sender.send(msg.0.clone()).ok();
}
}
impl<T: Message<Result = ()> + Clone> Handler<T> for Broker<T> {
async fn handle(&mut self, _ctx: &mut Context<Self>, msg: T) {
// Broadcast to all subscribers and remove any senders that return an error (most likely because reciever dropped because actor already stopped)
self.subscribers
.retain(|_actor_id, sender| sender.send(msg.clone()).is_ok())
}
}

impl<T: Message<Result = ()> + Clone> Addr<Broker<T>> {
/// Publishes a message of the specified type.
pub fn publish(&mut self, msg: T) -> Result<()> {
self.send(Publish(msg))
self.send(msg)
}
}
Loading

0 comments on commit 5f4006b

Please sign in to comment.