Skip to content

Commit 5909cef

Browse files
committed
Use channel instead Recipient
1 parent 23cf871 commit 5909cef

File tree

6 files changed

+265
-193
lines changed

6 files changed

+265
-193
lines changed

src/bus.rs

Lines changed: 67 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,17 @@ use core::mem::PinMut;
33
use core::num::NonZeroUsize;
44
use std::sync::atomic::{AtomicUsize, Ordering};
55

6-
use actix::prelude::*;
6+
use actix::{
7+
dev::ToEnvelope,
8+
fut,
9+
prelude::*,
10+
};
711
use futures::{compat::*, prelude::*};
812
use log::*;
13+
use tokio::prelude::{
14+
Async as Async01,
15+
Poll as Poll01,
16+
};
917
use typemap::{Key, SendMap};
1018
use pin_utils::unsafe_pinned;
1119

@@ -19,13 +27,15 @@ pub struct Bus {
1927
impl Bus {
2028
pub fn new_id() -> BusId { BusId::new() }
2129

22-
pub fn subscribe<M>(id: BusId, recipient: Recipient<M>) -> WaitSubscribe<M>
30+
pub fn subscribe<A, M>(id: BusId) -> WaitSubscribe<A, M>
2331
where
32+
A: Actor + Handler<M>,
33+
A::Context: AsyncContext<A> + ToEnvelope<A, M>,
2434
M: Message + Send + Clone + 'static,
2535
M::Result: Send,
2636
{
2737
let bus = Bus::from_registry();
28-
WaitSubscribe::new(&bus, id, recipient)
38+
WaitSubscribe::new(&bus, id)
2939
}
3040

3141
pub fn publish<M>(id: BusId, message: M) -> WaitPublish<M>
@@ -97,30 +107,31 @@ impl BusId {
97107
}
98108

99109
struct Bucket<M>(PhantomData<M>);
100-
impl<M: Message + Send + 'static> Key for Bucket<M> where M::Result: Send {
110+
impl<M: Send + 'static> Key for Bucket<M> {
101111
type Value = SubscriptionList<BusId, M>;
102112
}
103113

104114
#[derive(Message)]
105-
#[rtype(result = "()")]
106-
pub struct Subscribe<M: actix::Message + Send + 'static> where M::Result: Send {
115+
#[rtype(result = "crate::util::subscription::Receiver<M>")]
116+
pub struct Subscribe<M: 'static> {
107117
receiver: BusId,
108-
pub recipient: Recipient<M>,
118+
_msg: PhantomData<M>,
109119
}
110120

111-
impl<M: Message + Send + 'static> Subscribe<M> where M::Result: Send {
112-
pub fn new(receiver: BusId, recipient: Recipient<M>) -> Self {
113-
Subscribe { receiver, recipient }
121+
impl<M> Subscribe<M> {
122+
pub fn new(receiver: BusId) -> Self {
123+
Subscribe { receiver, _msg: PhantomData }
114124
}
115125
}
116126

117-
impl<M: Message + Send + 'static> Handler<Subscribe<M>> for Bus where M::Result: Send {
118-
type Result = ();
127+
impl<M: Send + Sized> Handler<Subscribe<M>> for Bus {
128+
type Result = MessageResult<Subscribe<M>>;
119129

120130
fn handle(&mut self, msg: Subscribe<M>, _: &mut Self::Context) -> Self::Result {
121131
debug!("Bus received Subscribe<M>");
122132
let list = self.map.entry::<Bucket<M>>().or_insert_with(Default::default);
123-
list.add(msg.receiver, msg.recipient);
133+
let rx = list.subscribe(msg.receiver);
134+
MessageResult(rx)
124135
}
125136
}
126137

@@ -137,31 +148,59 @@ impl<M: Message + Send + Clone + 'static> Handler<Publish<M>> for Bus where M::R
137148
fn handle(&mut self, msg: Publish<M>, _: &mut Self::Context) -> Self::Result {
138149
debug!("Bus received Publish<M>");
139150
let list = self.map.entry::<Bucket<M>>().or_insert_with(Default::default);
140-
list.send(msg.sender, msg.message);
151+
list.do_send(msg.sender, msg.message);
141152
}
142153
}
143154

144155

145-
pub struct WaitSubscribe<M> where M: Message + Send + 'static, M::Result: Send {
146-
future: Compat<Request<Bus, Subscribe<M>>, ()>,
156+
pub struct WaitSubscribe<A, M>
157+
where
158+
A: Actor,
159+
M: Send + 'static,
160+
{
161+
future: fut::FutureWrap<Request<Bus, Subscribe<M>>, A>,
147162
}
148163

149-
impl<M> WaitSubscribe<M> where M: Message + Send + 'static, M::Result: Send {
150-
unsafe_pinned!(future: Compat<Request<Bus, Subscribe<M>>, ()>);
151-
152-
fn new(addr: &Addr<Bus>, id: BusId, recipient: Recipient<M>) -> Self {
153-
WaitSubscribe { future: addr.send(Subscribe::new(id, recipient)).compat() }
164+
impl<A, M> WaitSubscribe<A, M>
165+
where
166+
A: Actor + Handler<M>,
167+
A::Context: AsyncContext<A> + ToEnvelope<A, M>,
168+
M: Message + Send + 'static,
169+
M::Result: Send,
170+
{
171+
fn new(addr: &Addr<Bus>, id: BusId) -> Self {
172+
WaitSubscribe {
173+
future: fut::wrap_future(addr.send(Subscribe::<M>::new(id)))
174+
}
154175
}
155176
}
156177

157-
impl<M> Unpin for WaitSubscribe<M> where M: Message + Send + 'static, M::Result: Send {}
158-
159-
impl<M> Future for WaitSubscribe<M> where M: Message + Send + 'static, M::Result: Send {
160-
type Output = Result<(), MailboxError>;
161-
162-
fn poll(mut self: PinMut<'_, Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
178+
impl<A, M> Unpin for WaitSubscribe<A, M>
179+
where
180+
A: Actor + Unpin,
181+
M: Message + Send + 'static,
182+
M::Result: Send,
183+
{}
184+
185+
impl<A, M> ActorFuture for WaitSubscribe<A, M>
186+
where
187+
A: Actor + Handler<M>,
188+
A::Context: AsyncContext<A> + ToEnvelope<A, M>,
189+
M: Message + Send + Unpin + 'static,
190+
M::Result: Send + Unpin,
191+
{
192+
type Item = ();
193+
type Error = MailboxError;
194+
type Actor = A;
195+
196+
fn poll(&mut self, srv: &mut Self::Actor, ctx: &mut <Self::Actor as Actor>::Context) -> Poll01<Self::Item, Self::Error> {
163197
debug!("WaitSubscribe::poll");
164-
self.future().poll(cx)
198+
let rx = match self.future.poll(srv, ctx)? {
199+
Async01::Ready(t) => t,
200+
Async01::NotReady => { return Ok(Async01::NotReady); }
201+
};
202+
ctx.add_message_stream(rx.map(Result::Ok).compat(TokioDefaultSpawn));
203+
Ok(Async01::Ready(()))
165204
}
166205
}
167206

src/discord_client/mod.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@ use serenity::model::{
2323
};
2424

2525
use crate::{
26-
AddrExt,
2726
Bus,
2827
BusId,
2928
Config,
3029
Error,
3130
fetch_config,
3231
message::{ChannelUpdated, IrcReady, MessageCreated, Terminate},
33-
task,
3432
};
3533

3634
use self::handler::{ClientState, DiscordEvent, new_client};
@@ -103,19 +101,18 @@ impl Actor for Discord {
103101
}
104102
}
105103

106-
let addr = ctx.address();
107-
async fn subscribe(addr: &Addr<Discord>) -> Result<(), MailboxError> {
108-
// await!(addr.subscribe::<ChannelUpdated>())?;
109-
await!(addr.subscribe::<IrcReady>())?;
110-
await!(addr.subscribe::<MessageCreated>())?;
111-
Ok(())
112-
}
113-
task::spawn(async move {
114-
if let Err(err) = await!(subscribe(&addr)) {
115-
error!("Failed to subscribe: {}", err);
116-
addr.do_send(Terminate);
117-
}
118-
}.boxed());
104+
let bus_id = self.bus_id;
105+
ctx.spawn(
106+
Bus::subscribe::<_, IrcReady>(bus_id)
107+
.and_then(move |_, _, _| Bus::subscribe::<_, MessageCreated>(bus_id))
108+
.then(|res, _, ctx: &mut Self::Context| {
109+
if let Err(err) = res {
110+
error!("Failed to subscribe: {}", err);
111+
ctx.notify(Terminate);
112+
}
113+
fut::ok(())
114+
})
115+
);
119116
}
120117

121118
fn stopping(&mut self, ctx: &mut Self::Context) -> Running {

src/irc_client.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ use std::collections::HashMap;
22
use std::sync::Arc;
33
use std::time::{Duration, Instant};
44

5-
use actix::actors::signal;
6-
use actix::prelude::*;
7-
use futures::prelude::*;
5+
use actix::{
6+
actors::signal,
7+
fut,
8+
prelude::*,
9+
};
810
use irc::{
911
client::{
1012
Client,
@@ -22,13 +24,11 @@ use log::*;
2224
use regex::Regex;
2325

2426
use crate::{
25-
AddrExt,
2627
Config,
2728
Error,
2829
fetch_config,
2930
bus::{Bus, BusId},
3031
message::{ChannelUpdated, IrcReady, MessageCreated, Terminate},
31-
util::task,
3232
};
3333

3434

@@ -131,18 +131,18 @@ impl Actor for Irc {
131131
.do_send(signal::Subscribe(ctx.address().recipient()));
132132

133133
ctx.add_stream(self.client.stream());
134-
let addr = ctx.address();
135-
async fn subscribe(addr: &Addr<Irc>) -> Result<(), MailboxError> {
136-
await!(addr.subscribe::<ChannelUpdated>())?;
137-
await!(addr.subscribe::<MessageCreated>())?;
138-
Ok(())
139-
}
140-
task::spawn(async move {
141-
if let Err(err) = await!(subscribe(&addr)) {
142-
error!("Failed to subscribe: {}", err);
143-
addr.do_send(Terminate);
144-
}
145-
}.boxed());
134+
let bus_id = self.bus_id;
135+
ctx.spawn(
136+
Bus::subscribe::<_, ChannelUpdated>(bus_id)
137+
.and_then(move |_, _, _| Bus::subscribe::<_, MessageCreated>(bus_id))
138+
.then(|res, _, ctx: &mut Self::Context| {
139+
if let Err(err) = res {
140+
error!("Failed to subscribe: {}", err);
141+
ctx.notify(Terminate);
142+
}
143+
fut::ok(())
144+
})
145+
);
146146
ctx.run_interval(Duration::from_secs(5), |this, _ctx| {
147147
let _ = this.sync_channel_joining();
148148
});

src/main.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,7 @@ async fn run() -> Result<(), failure::Error> {
6060
let _irc = irc_client::Irc::new()?.start();
6161
let _discord = discord_client::Discord::new()?.start();
6262

63-
let inspector = Inspector { counter: 2, bus_id: Bus::new_id() }.start();
64-
let _ = await!(inspector.subscribe::<ChannelUpdated>());
65-
let _ = await!(inspector.subscribe::<MessageCreated>());
66-
let _ = await!(inspector.subscribe::<message::Terminate>());
63+
let _inspector = Inspector { counter: 2, bus_id: Bus::new_id() }.start();
6764

6865
Ok(())
6966
}
@@ -76,6 +73,16 @@ struct Inspector {
7673

7774
impl actix::Actor for Inspector {
7875
type Context = actix::Context<Self>;
76+
77+
fn started(&mut self, ctx: &mut Self::Context) {
78+
let bus_id = self.bus_id;
79+
ctx.spawn(
80+
Bus::subscribe::<_, ChannelUpdated>(bus_id)
81+
.and_then(move |_, _, _| Bus::subscribe::<_, MessageCreated>(bus_id))
82+
.and_then(move |_, _, _| Bus::subscribe::<_, message::Terminate>(bus_id))
83+
.then(|_, _, _| actix::fut::ok(()))
84+
);
85+
}
7986
}
8087

8188
impl_get_bus_id!(Inspector);

0 commit comments

Comments
 (0)