diff --git a/README.md b/README.md index c3de095..5997e8c 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,8 @@ More seriously, pharos is a small [observer](https://en.wikipedia.org/wiki/Observer_pattern) library that let's you create futures 0.3 streams that observers can listen to. -I created it to leverage interoperability we can create by using async Streams and Sinks from the futures library. So you can use all stream combinators, forward it into Sinks and so on. +I created it to leverage interoperability we can create by using async [Stream](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_core/stream/trait.Stream.html) and [Sink](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures/sink/trait.Sink.html +) from the futures library. So you can use all stream combinators, forward it into Sinks and so on. Minimal rustc version: 1.39. @@ -44,8 +45,8 @@ This crate has: `#![ forbid( unsafe_code ) ]` - [`Events`] is not clonable right now (would require support from the channels we use as back-ends, eg. broadcast type channel) - performance tweaking still needs to be done - pharos requires mut access for most operations. This is not intended to change anytime soon. Both on - [notify](Pharos::notify) and [observe](Observable::observe), the two main interfaces, manipulate internal - state, and most channels also require mutable access to either read or write. If you need it from non mutable + [send](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_util/sink/trait.SinkExt.html#method.send) and [observe](Observable::observe), the two main interfaces, manipulate internal + state, and most channels also require mutable access to either read or write. If you need it from immutable context, use interior mutability primitives like locks or Cells... ### Future work @@ -79,18 +80,17 @@ Please check out the [changelog](https://github.com/najamelan/pharos/blob/master ### Dependencies -This crate only has two dependencies. Cargo will automatically handle it's dependencies for you. +This crate only has one dependencies. Cargo will automatically handle it's dependencies for you. ```yaml dependencies: futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] } - pin-project : ^0.4.0-beta ``` ## Usage -pharos only works for async code, as the notify method is asynchronous. Observers must consume the messages +`pharos` only works for async code, as the notify method is asynchronous. Observers must consume the messages fast enough, otherwise they will slow down the observable (bounded channel) or cause memory leak (unbounded channel). Whenever observers want to unsubscribe, they can just drop the stream or call `close` on it. If you are an observable and you want to notify observers that no more messages will follow, just drop the pharos object. Failing that, create an event type that signifies EOF and send that to observers. @@ -102,8 +102,8 @@ Examples can be found in the [examples](https://github.com/najamelan/pharos/tree ```rust use { - pharos :: { * } , - futures :: { executor::block_on, StreamExt } , + pharos :: { * } , + futures :: { executor::block_on, StreamExt, SinkExt } , }; @@ -123,7 +123,10 @@ impl Goddess // pub async fn sail( &mut self ) { - self.pharos.notify( &GoddessEvent::Sailing ).await; + // It's infallible. Observers that error will be dropped, since the only kind of errors on + // channels are when the channel is closed. + // + self.pharos.send( GoddessEvent::Sailing ).await.expect( "notify observers" ); } } @@ -147,7 +150,9 @@ enum GoddessEvent // impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -164,7 +169,7 @@ fn main() // - channel type (bounded/unbounded) // - a predicate to filter events // - let mut events = isis.observe( Channel::Bounded( 3 ).into() ); + let mut events = isis.observe( Channel::Bounded( 3 ).into() ).expect( "observe" ); // trigger an event // @@ -209,7 +214,9 @@ struct Connection { pharos: Pharos } impl Observable for Connection { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -226,7 +233,7 @@ fn main() // By creating the config object through into, other options will be defaults, notably here // this will use unbounded channels. // - let observer = conn.observe( filter.into() ); + let observer = conn.observe( filter.into() ).expect( "observe" ); // Combine both options. // diff --git a/TODO.md b/TODO.md index 9a59f97..84b3084 100644 --- a/TODO.md +++ b/TODO.md @@ -1,13 +1,7 @@ # TODO - make Events clone? means we can only work with broadcast channels - - switch to more performant channels (crossbeam). Will be easier once they provide an async api. - - allow other channel types, like a ringchannel which drops messages on outpacing? To prevent DDOS and OOM attacks? -- scaling? For now we have an ever growing vector of observers - - other data structure than vec? smallvec? - - type that allows concurrent access to &mut for each observer, so we can mutate in place rather than have join_all allocate a new vector on easch notify. Maybe partitions crate? -> has 19 lines of unsafe code, needs review. - - let users set capacity on creation? diff --git a/examples/basic.rs b/examples/basic.rs index 519e296..5d6541f 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,7 +1,7 @@ use { - pharos :: { * } , - futures :: { executor::block_on, StreamExt } , + pharos :: { * } , + futures :: { executor::block_on, StreamExt, SinkExt } , }; @@ -21,7 +21,10 @@ impl Goddess // pub async fn sail( &mut self ) { - self.pharos.notify( &GoddessEvent::Sailing ).await; + // It's infallible. Observers that error will be dropped, since the only kind of errors on + // channels are when the channel is closed. + // + let _ = self.pharos.send( GoddessEvent::Sailing ).await; } } @@ -45,7 +48,9 @@ enum GoddessEvent // impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -62,7 +67,7 @@ fn main() // - channel type (bounded/unbounded) // - a predicate to filter events // - let mut events = isis.observe( Channel::Bounded( 3 ).into() ); + let mut events = isis.observe( Channel::Bounded( 3 ).into() ).expect( "observe" ); // trigger an event // diff --git a/examples/filter.rs b/examples/filter.rs index 8dda3ce..30a1b56 100644 --- a/examples/filter.rs +++ b/examples/filter.rs @@ -16,7 +16,9 @@ struct Connection { pharos: Pharos } impl Observable for Connection { - fn observe( &mut self, options: ObserveConfig) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig) -> Result< Events, Self::Error > { self.pharos.observe( options ) } @@ -33,7 +35,7 @@ fn main() // By creating the config object through into, other options will be defaults, notably here // this will use unbounded channels. // - let observer = conn.observe( filter.into() ); + let observer = conn.observe( filter.into() ).expect( "observe" ); // Combine both options. // diff --git a/src/error.rs b/src/error.rs index afd5c5d..2c550e4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,7 @@ use crate::{ import::* }; // #[ derive( Debug ) ] // -pub(crate) struct Error +pub struct Error { pub(crate) inner: Option< Box >, pub(crate) kind : ErrorKind, @@ -19,13 +19,18 @@ pub(crate) struct Error // #[ derive( Debug ) ] // -pub(crate) enum ErrorKind +pub enum ErrorKind { - /// Failed to send on channel, normally means it's closed. Pharos does not expose these errors - /// to the user. + #[ doc( hidden ) ] + // + //This variant is only used internally. // SendError, + /// The pharos object is already closed. You can no longer + // + Closed, + #[ doc( hidden ) ] // __NonExhaustive__ diff --git a/src/events.rs b/src/events.rs index b133980..a645f1d 100644 --- a/src/events.rs +++ b/src/events.rs @@ -91,49 +91,24 @@ impl Sender where Event: Clone + 'static + Send } - // Notify the observer and return a bool indicating whether this observer is still - // operational. If an error happens on a channel it usually means that the channel - // is closed, in which case we should drop this sender. + /// Check whether this sender is interested in this event // - pub(crate) async fn notify( &mut self, evt: &Event ) -> bool + pub(crate) fn filter( &mut self, evt: &Event ) -> bool { - if self.is_closed() { return false } - match self { - Sender::Bounded { tx, filter } => Self::notifier( tx, filter, evt ).await, - Sender::Unbounded{ tx, filter } => Self::notifier( tx, filter, evt ).await, + Sender::Bounded { filter, .. } => Self::filter_inner( filter, evt ), + Sender::Unbounded{ filter, .. } => Self::filter_inner( filter, evt ), } } - async fn notifier - ( - mut tx: impl Sink + Unpin , - filter: &mut Option> , - evt : &Event , - ) - - -> bool - + fn filter_inner( filter: &mut Option>, evt: &Event ) -> bool { - let interested = match filter + match filter { Some(f) => f.call(evt), None => true , - }; - - - #[ allow( clippy::match_bool ) ] - // - match interested - { - true => tx.send( evt.clone() ).await.is_ok(), - - // since we don't try to send, we know nothing about whether they are still - // observing, so assume they do. - // - false => true, } } } diff --git a/src/lib.rs b/src/lib.rs index 12504bc..5529e35 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,12 +40,7 @@ pub use filter :: { Filter } , observable :: { Observable, ObserveConfig, Channel } , events :: { Events } , -}; - - -pub(crate) use -{ - error :: { Error } , + error :: { Error, ErrorKind } , }; @@ -58,7 +53,7 @@ mod import futures :: { - future::{ join_all }, Stream, Sink, SinkExt, + Stream, Sink, ready, channel::mpsc:: { diff --git a/src/observable.rs b/src/observable.rs index 5a97a08..df87300 100644 --- a/src/observable.rs +++ b/src/observable.rs @@ -1,4 +1,4 @@ -use crate :: { Filter, Events }; +use crate :: { import::*, Filter, Events }; /// Indicate that a type is observable. You can call [`observe`](Observable::observe) to get a /// stream of events. @@ -28,6 +28,8 @@ use crate :: { Filter, Events }; /// /// impl Steps /// { +/// // We can use this as a predicate to filter events. +/// // /// fn is_err( &self ) -> bool /// { /// match self @@ -39,15 +41,17 @@ use crate :: { Filter, Events }; /// } /// /// -/// // The object we want to be observable +/// // The object we want to be observable. /// // /// struct Foo { pharos: Pharos }; /// /// impl Observable for Foo /// { +/// type Error = pharos::Error; +/// /// // Pharos implements observable, so we just forward the call. /// // -/// fn observe( &mut self, options: ObserveConfig ) -> Events +/// fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > /// { /// self.pharos.observe( options ) /// } @@ -59,9 +63,9 @@ use crate :: { Filter, Events }; /// async fn task() /// { /// let mut foo = Foo { pharos: Pharos::default() }; -/// let mut errors = foo.observe( Filter::Pointer( Steps::is_err ).into() ); +/// let mut errors = foo.observe( Filter::Pointer( Steps::is_err ).into() ).expect( "observe" ); /// -/// // will only be notified on errors now +/// // will only be notified on errors thanks to the filter. /// // /// let next_error = errors.next().await; /// } @@ -71,10 +75,20 @@ pub trait Observable where Event: Clone + 'static + Send , { + /// The error type that is returned if observing is not possible. [Pharos](crate::Pharos) implements Sink + /// which has a close method, so observing will no longer be possible after close is called. + /// + /// Other than that, you might want to have moments in your objects lifetime when you don't want to take + /// any more observers. Returning a result from [observe](Observable::observe) enables that. + /// + /// You can of course map the error of pharos to your own error type. + // + type Error: ErrorTrait; + /// Add an observer to the observable. Options can be in order to choose channel type and /// to filter events with a predicate. // - fn observe( &mut self, options: ObserveConfig ) -> Events; + fn observe( &mut self, options: ObserveConfig ) -> Result, Self::Error>; } diff --git a/src/pharos.rs b/src/pharos.rs index 919363b..caa1fbc 100644 --- a/src/pharos.rs +++ b/src/pharos.rs @@ -1,4 +1,4 @@ -use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender }; +use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender, Error, ErrorKind }; /// The Pharos lighthouse. When you implement Observable on your type, you can forward @@ -13,27 +13,38 @@ use crate :: { import::*, Observable, Events, ObserveConfig, events::Sender }; /// ## Implementation. /// /// Currently just holds a `Vec>`. It will stop notifying observers if the channel has -/// returned an error, which usually means it is closed or disconnected. However, we currently don't -/// compact the vector or use a more performant data structure for the observers. +/// returned an error, which means it is closed or disconnected. However, we currently don't +/// compact the vector. Slots are reused for new observers, but the vector never shrinks. /// /// In observe, we do loop the vector to find a free spot to re-use before pushing. /// -/// **Note**: we only detect that observers can be removed when [Pharos::notify] or [Pharos::num_observers] +/// **Note**: we only detect that observers can be removed when [futures::SinkExt::send] or [Pharos::num_observers] /// is being called. Otherwise, we won't find out about disconnected observers and the vector of observers /// will not mark deleted observers and thus their slots can not be reused. /// -/// Right now, in notify, we use `join_all` from the futures library to notify all observers concurrently. -/// We take all of our senders out of the options in our vector, operate on them and put them back if -/// they did not generate an error. +/// The [Sink] impl is not very optimized for the moment. It just loops over all observers in each poll method +/// so it will call `poll_ready` and `poll_flush` again for observers that already returned `Poll::Ready(Ok(()))`. /// -/// `join_all` will allocate a new vector on every notify from what our concurrent futures return. Ideally -/// we would use a data structure which allows &mut access to individual elements, so we can work on them -/// concurrently in place without reallocating. I am looking into the partitions crate, but that's for -/// the next release ;). +/// TODO: I will do some benchmarking and see if this can be improved, eg. by keeping a state which tracks which +/// observers we still have to poll. // pub struct Pharos where Event: 'static + Clone + Send { - observers: Vec >>, + // Observers never get moved. Their index stays stable, so that when we free a slot, + // we can store that in `free_slots`. + // + observers : Vec >>, + free_slots: Vec , + state : State , +} + + +#[ derive( Clone, Debug, PartialEq ) ] +// +enum State +{ + Ready, + Closed, } @@ -55,64 +66,20 @@ impl Pharos where Event: 'static + Clone + Send /// You can set the initial capacity of the vector of senders, if you know you will a lot of observers /// it will save allocations by setting this to a higher number. /// + /// TODO: update to pharos 0.4.0 /// For pharos 0.3.0 on x64 Linux: `std::mem::size_of::>>() == 56 bytes`. // pub fn new( capacity: usize ) -> Self { Self { - observers: Vec::with_capacity( capacity ), + observers : Vec::with_capacity( capacity ), + free_slots: Vec::with_capacity( capacity ), + state : State::Ready , } } - - /// Notify all observers of Event `evt`. - /// - /// Currently allocates a new vector for all observers on every run. That will be fixed in future - /// versions. - // - pub async fn notify( &mut self, evt: &Event ) - { - // Try to send to all channels in parallel, so they can all start processing this event - // even if one of them is blocked on a full queue. - // - // We can not have mutable access in parallel, so we take options out and put them back. - // - // The output of the join is a vec of options with the disconnected observers removed. - // - let fut = join_all - ( - self.observers.iter_mut().map( |opt| - { - let opt = opt.take(); - - async - { - let mut new = None; - - if let Some( mut s ) = opt - { - if s.notify( evt ).await - { - new = Some( s ) - } - } - - new - } - - }) - ); - - - // Put back the observers that we "borrowed" - // TODO: compact the vector from time to time? - // - self.observers = fut.await; - } - - /// Returns the size of the vector used to store the observers. Useful for debugging and testing if it /// seems to get to big. // @@ -130,14 +97,20 @@ impl Pharos where Event: 'static + Clone + Send { let mut count = 0; - for opt in self.observers.iter_mut() + + for (i, opt) in self.observers.iter_mut().enumerate() { - if let Some(observer) = opt.take() + if let Some(observer) = opt { if !observer.is_closed() { count += 1; - *opt = Some( observer ); + } + + else + { + self.free_slots.push( i ); + *opt = None } } } @@ -162,31 +135,182 @@ impl Default for Pharos where Event: 'static + Clone + Send impl Observable for Pharos where Event: 'static + Clone + Send { - fn observe( &mut self, options: ObserveConfig ) -> Events + type Error = Error; + + /// Will try to re-use slots in the vector from disconnected observers. + // + fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > { + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ); + } + + let (events, sender) = Events::new( options ); - let mut new_observer = Some(sender); - // Try to find a free slot + // Try to reuse a free slot // - for option in &mut self.observers + if let Some( i ) = self.free_slots.pop() { - if option.is_none() + self.observers[i] = Some( sender ); + } + + else + { + self.observers.push( Some( sender ) ); + } + + Ok( events ) + } +} + + + +impl Sink for Pharos where Event: Clone + 'static + Send +{ + type Error = Error; + + + fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ).into(); + } + + + for obs in self.get_mut().observers.iter_mut() + { + if let Some( ref mut o ) = obs { - *option = new_observer.take(); - break; + let res = ready!( Pin::new( o ).poll_ready( cx ) ); + + if res.is_err() + { + *obs = None; + } } } - // no free slots found - // - if new_observer.is_some() + Ok(()).into() + } + + + fn start_send( self: Pin<&mut Self>, evt: Event ) -> Result<(), Self::Error> + { + + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ); + } + + + let this = self.get_mut(); + + for (i, opt) in this.observers.iter_mut().enumerate() + { + // if this spot in the vector has a sender + // + if let Some( obs ) = opt + { + // if it's closed, let's remove it. + // + if obs.is_closed() + { + this.free_slots.push( i ); + + *opt = None; + } + + // else if it is interested in this event + // + else if obs.filter( &evt ) + { + // if sending fails, remove it + // + if Pin::new( obs ).start_send( evt.clone() ).is_err() + { + this.free_slots.push( i ); + + *opt = None; + } + } + } + } + + Ok(()).into() + } + + + + fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + + if self.state == State::Closed + { + return Err( ErrorKind::Closed.into() ).into(); + } + + + let this = self.get_mut(); + + for (i, opt) in this.observers.iter_mut().enumerate() + { + if let Some( ref mut obs ) = opt + { + let res = ready!( Pin::new( obs ).poll_flush( cx ) ); + + if res.is_err() + { + this.free_slots.push( i ); + + *opt = None; + } + } + } + + Ok(()).into() + } + + + + /// Will close and drop all observers. The pharos object will remain operational however. + /// The main annoyance would be that we'd have to make + // + fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll> + { + if self.state == State::Closed + { + return Ok(()).into(); + } + + else { - self.observers.push( new_observer ); + self.state = State::Closed; } - events + + let this = self.get_mut(); + + for (i, opt) in this.observers.iter_mut().enumerate() + { + if let Some( ref mut obs ) = opt + { + let res = ready!( Pin::new( obs ).poll_close( cx ) ); + + if res.is_err() + { + this.free_slots.push( i ); + + *opt = None; + } + } + } + + Ok(()).into() } } @@ -199,6 +323,7 @@ impl Observable for Pharos where Event: 'static + Clone + mod tests { use super::*; + use futures::SinkExt; #[test] // @@ -237,28 +362,33 @@ mod tests { let mut ph = Pharos::::default(); - assert_eq!( ph.storage_len (), 0 ); - assert_eq!( ph.num_observers(), 0 ); + assert_eq!( ph.storage_len (), 0 ); + assert_eq!( ph.num_observers (), 0 ); + assert_eq!( ph.free_slots.len(), 0 ); - let mut a = ph.observe( ObserveConfig::default() ); + let mut a = ph.observe( ObserveConfig::default() ).expect( "observe" ); - assert_eq!( ph.storage_len (), 1 ); - assert_eq!( ph.num_observers(), 1 ); + assert_eq!( ph.storage_len (), 1 ); + assert_eq!( ph.num_observers (), 1 ); + assert_eq!( ph.free_slots.len(), 0 ); - let b = ph.observe( ObserveConfig::default() ); + let b = ph.observe( ObserveConfig::default() ).expect( "observe" ); - assert_eq!( ph.storage_len (), 2 ); - assert_eq!( ph.num_observers(), 2 ); + assert_eq!( ph.storage_len (), 2 ); + assert_eq!( ph.num_observers (), 2 ); + assert_eq!( ph.free_slots.len(), 0 ); a.close(); - assert_eq!( ph.storage_len (), 2 ); - assert_eq!( ph.num_observers(), 1 ); + assert_eq!( ph.storage_len () , 2 ); + assert_eq!( ph.num_observers() , 1 ); + assert_eq!( &ph.free_slots , &[0] ); drop( b ); - assert_eq!( ph.storage_len (), 2 ); - assert_eq!( ph.num_observers(), 0 ); + assert_eq!( ph.storage_len (), 2 ); + assert_eq!( ph.num_observers(), 0 ); + assert_eq!( &ph.free_slots , &[0, 1] ); } @@ -284,18 +414,35 @@ mod tests assert_eq!( ph.num_observers(), 2 ); assert!( ph.observers[1].is_none() ); + assert_eq!( &ph.free_slots, &[1] ); let _d = ph.observe( ObserveConfig::default() ); - assert_eq!( ph.storage_len (), 3 ); - assert_eq!( ph.num_observers(), 3 ); + assert_eq!( ph.storage_len (), 3 ); + assert_eq!( ph.num_observers (), 3 ); + assert_eq!( ph.free_slots.len(), 0 ); let _e = ph.observe( ObserveConfig::default() ); // Now we should have pushed again // - assert_eq!( ph.storage_len (), 4 ); - assert_eq!( ph.num_observers(), 4); + assert_eq!( ph.storage_len (), 4 ); + assert_eq!( ph.num_observers (), 4); + assert_eq!( ph.free_slots.len(), 0 ); + } + + + // verify we can no longer observer after calling close + // + #[test] + // + fn observe_after_close() + { + let mut ph = Pharos::::default(); + + futures::executor::block_on( ph.close() ).expect( "close" ); + + assert!( ph.observe( ObserveConfig::default() ).is_err() ); } } diff --git a/tests/bounded.rs b/tests/bounded.rs index 4f5c1ff..e6f0b90 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -22,7 +22,7 @@ fn basic() block_on( async move { let mut isis = Goddess::new(); - let mut events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -45,7 +45,7 @@ fn close_receiver() block_on( async move { let mut isis = Goddess::new(); - let mut events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); isis.sail().await; events.close(); @@ -66,8 +66,8 @@ fn one_receiver_drops() block_on( async move { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( Channel::Bounded( 1 ).into() ); - let mut shine_evts = isis.observe( Channel::Bounded( 2 ).into() ); + let mut egypt_evts = isis.observe( Channel::Bounded( 1 ).into() ).expect( "observe" ); + let mut shine_evts = isis.observe( Channel::Bounded( 2 ).into() ).expect( "observe" ); isis.sail().await; @@ -99,8 +99,8 @@ fn types() // Note that because of the asserts below type inference works here and we don't have to // put type annotation, but I do find it quite obscure and better to be explicit. // - let mut shine_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ); - let mut egypt_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut shine_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); + let mut egypt_evts: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); isis.shine().await; isis.sail ().await; @@ -124,8 +124,8 @@ fn threads() block_on( async move { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( Channel::Bounded( 5 ).into() ); - let mut shine_evts = isis.observe( Channel::Bounded( 5 ).into() ); + let mut egypt_evts = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); + let mut shine_evts = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); thread::spawn( move || @@ -216,7 +216,7 @@ fn filter() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -249,7 +249,7 @@ fn filter_true() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -285,7 +285,7 @@ fn filter_false() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -325,7 +325,7 @@ fn filter_move() let opts = ObserveConfig::from( Channel::Bounded( 5 ) ).filter_boxed( filter ); - let mut events = isis.observe( opts ); + let mut events = isis.observe( opts ).expect( "observe" ); isis.sail().await; isis.sail().await; diff --git a/tests/combined.rs b/tests/combined.rs index 75553b9..78748ee 100644 --- a/tests/combined.rs +++ b/tests/combined.rs @@ -15,11 +15,11 @@ fn both() { let mut isis = Goddess::new(); - let mut events: Events = isis.observe( Channel::Bounded( 5 ).into() ); - let mut nuevts: Events = isis.observe( Channel::Bounded( 5 ).into() ); + let mut events: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); + let mut nuevts: Events = isis.observe( Channel::Bounded( 5 ).into() ).expect( "observe" ); - let mut ubevts: Events = isis.observe( ObserveConfig::default() ); - let mut ubnuts: Events = isis.observe( ObserveConfig::default() ); + let mut ubevts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut ubnuts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail ().await; isis.shine().await; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index ff76870..d6e0e0a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -14,7 +14,7 @@ pub mod import channel::mpsc :: Receiver , channel::mpsc :: UnboundedReceiver , executor :: block_on , - stream :: StreamExt , + stream :: StreamExt, SinkExt, }, }; } @@ -43,19 +43,19 @@ impl Goddess pub async fn sail( &mut self ) { - self.isis.notify( &IsisEvent::Sail ).await; + self.isis.send( IsisEvent::Sail ).await.expect( "send event" ); } pub async fn dock( &mut self ) { - self.isis.notify( &IsisEvent::Dock ).await; + self.isis.send( IsisEvent::Dock ).await.expect( "send event" ); } pub async fn shine( &mut self ) { let evt = NutEvent { time: "midnight".into() }; - self.nut.notify( &evt ).await; + self.nut.send( evt ).await.expect( "send event" ); } } @@ -81,7 +81,9 @@ pub struct NutEvent impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig ) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > { self.isis.observe( options ) } @@ -90,7 +92,9 @@ impl Observable for Goddess impl Observable for Goddess { - fn observe( &mut self, options: ObserveConfig ) -> Events + type Error = pharos::Error; + + fn observe( &mut self, options: ObserveConfig ) -> Result< Events, Self::Error > { self.nut.observe( options ) } diff --git a/tests/unbounded.rs b/tests/unbounded.rs index d103f86..756e3ee 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -23,7 +23,7 @@ fn basic() { let mut isis = Goddess::new(); - let mut events = isis.observe( ObserveConfig::default() ); + let mut events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -46,7 +46,7 @@ fn close_receiver() { let mut isis = Goddess::new(); - let mut events = isis.observe( ObserveConfig::default() ); + let mut events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail().await; events.close(); @@ -68,8 +68,8 @@ fn one_receiver_drops() { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( ObserveConfig::default() ); - let mut shine_evts = isis.observe( ObserveConfig::default() ); + let mut egypt_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut shine_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail().await; @@ -100,8 +100,8 @@ fn types() { let mut isis = Goddess::new(); - let mut egypt_evts: Events = isis.observe( ObserveConfig::default() ); - let mut shine_evts: Events = isis.observe( ObserveConfig::default() ); + let mut egypt_evts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut shine_evts: Events = isis.observe( ObserveConfig::default() ).expect( "observe" ); isis.sail ().await; isis.shine().await; @@ -125,8 +125,8 @@ fn threads() { let mut isis = Goddess::new(); - let mut egypt_evts = isis.observe( ObserveConfig::default() ); - let mut shine_evts = isis.observe( ObserveConfig::default() ); + let mut egypt_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); + let mut shine_evts = isis.observe( ObserveConfig::default() ).expect( "observe" ); thread::spawn( move || @@ -159,7 +159,7 @@ fn alot_of_events() { let mut w = Goddess::new(); - let mut events = w.observe( ObserveConfig::default() ); + let mut events = w.observe( ObserveConfig::default() ).expect( "observe" ); let amount = 1000; @@ -200,7 +200,7 @@ fn filter() } }; - let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -230,7 +230,7 @@ fn filter_true() let filter = |_: &IsisEvent| true; - let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -263,7 +263,7 @@ fn filter_false() let filter = |_: &IsisEvent| false; - let mut events = isis.observe( ObserveConfig::default().filter( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await; @@ -300,7 +300,7 @@ fn filter_move() } }; - let mut events = isis.observe( ObserveConfig::default().filter_boxed( filter ) ); + let mut events = isis.observe( ObserveConfig::default().filter_boxed( filter ) ).expect( "observe" ); isis.sail().await; isis.sail().await;