Skip to content

Commit

Permalink
Another breaking change to the API.
Browse files Browse the repository at this point in the history
- impl Sink for Pharos instead of a notify method. This makes sure poll methods are available
  when needing to notify from within other poll fn.
- We loop over all observers on every call to poll_ready and poll_flush, which should be improved
  • Loading branch information
najamelan committed Sep 24, 2019
1 parent db85733 commit 72d5892
Show file tree
Hide file tree
Showing 13 changed files with 348 additions and 200 deletions.
33 changes: 20 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
More seriously, pharos is a small [observer](https://en.wikipedia.org/wiki/Observer_pattern) library that let's you create futures 0.3 streams that observers can listen to.

I created it to leverage interoperability we can create by using async Streams and Sinks from the futures library. So you can use all stream combinators, forward it into Sinks and so on.
I created it to leverage interoperability we can create by using async [Stream](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_core/stream/trait.Stream.html) and [Sink](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures/sink/trait.Sink.html
) from the futures library. So you can use all stream combinators, forward it into Sinks and so on.

Minimal rustc version: 1.39.

Expand Down Expand Up @@ -44,8 +45,8 @@ This crate has: `#![ forbid( unsafe_code ) ]`
- [`Events`] is not clonable right now (would require support from the channels we use as back-ends, eg. broadcast type channel)
- performance tweaking still needs to be done
- pharos requires mut access for most operations. This is not intended to change anytime soon. Both on
[notify](Pharos::notify) and [observe](Observable::observe), the two main interfaces, manipulate internal
state, and most channels also require mutable access to either read or write. If you need it from non mutable
[send](https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_util/sink/trait.SinkExt.html#method.send) and [observe](Observable::observe), the two main interfaces, manipulate internal
state, and most channels also require mutable access to either read or write. If you need it from immutable
context, use interior mutability primitives like locks or Cells...

### Future work
Expand Down Expand Up @@ -79,18 +80,17 @@ Please check out the [changelog](https://github.com/najamelan/pharos/blob/master

### Dependencies

This crate only has two dependencies. Cargo will automatically handle it's dependencies for you.
This crate only has one dependencies. Cargo will automatically handle it's dependencies for you.

```yaml
dependencies:

futures-preview : { version: ^0.3.0-alpha, features: [async-await, nightly] }
pin-project : ^0.4.0-beta
```

## Usage

pharos only works for async code, as the notify method is asynchronous. Observers must consume the messages
`pharos` only works for async code, as the notify method is asynchronous. Observers must consume the messages
fast enough, otherwise they will slow down the observable (bounded channel) or cause memory leak (unbounded channel).

Whenever observers want to unsubscribe, they can just drop the stream or call `close` on it. If you are an observable and you want to notify observers that no more messages will follow, just drop the pharos object. Failing that, create an event type that signifies EOF and send that to observers.
Expand All @@ -102,8 +102,8 @@ Examples can be found in the [examples](https://github.com/najamelan/pharos/tree
```rust
use
{
pharos :: { * } ,
futures :: { executor::block_on, StreamExt } ,
pharos :: { * } ,
futures :: { executor::block_on, StreamExt, SinkExt } ,
};


Expand All @@ -123,7 +123,10 @@ impl Goddess
//
pub async fn sail( &mut self )
{
self.pharos.notify( &GoddessEvent::Sailing ).await;
// It's infallible. Observers that error will be dropped, since the only kind of errors on
// channels are when the channel is closed.
//
self.pharos.send( GoddessEvent::Sailing ).await.expect( "notify observers" );
}
}

Expand All @@ -147,7 +150,9 @@ enum GoddessEvent
//
impl Observable<GoddessEvent> for Goddess
{
fn observe( &mut self, options: ObserveConfig<GoddessEvent>) -> Events<GoddessEvent>
type Error = pharos::Error;

fn observe( &mut self, options: ObserveConfig<GoddessEvent>) -> Result< Events<GoddessEvent>, Self::Error >
{
self.pharos.observe( options )
}
Expand All @@ -164,7 +169,7 @@ fn main()
// - channel type (bounded/unbounded)
// - a predicate to filter events
//
let mut events = isis.observe( Channel::Bounded( 3 ).into() );
let mut events = isis.observe( Channel::Bounded( 3 ).into() ).expect( "observe" );

// trigger an event
//
Expand Down Expand Up @@ -209,7 +214,9 @@ struct Connection { pharos: Pharos<NetworkEvent> }

impl Observable<NetworkEvent> for Connection
{
fn observe( &mut self, options: ObserveConfig<NetworkEvent>) -> Events<NetworkEvent>
type Error = pharos::Error;

fn observe( &mut self, options: ObserveConfig<NetworkEvent>) -> Result< Events<NetworkEvent>, Self::Error >
{
self.pharos.observe( options )
}
Expand All @@ -226,7 +233,7 @@ fn main()
// By creating the config object through into, other options will be defaults, notably here
// this will use unbounded channels.
//
let observer = conn.observe( filter.into() );
let observer = conn.observe( filter.into() ).expect( "observe" );

// Combine both options.
//
Expand Down
6 changes: 0 additions & 6 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
# TODO

- make Events clone? means we can only work with broadcast channels

- switch to more performant channels (crossbeam). Will be easier once they provide an async api.

- allow other channel types, like a ringchannel which drops messages on outpacing? To prevent DDOS and OOM attacks?

- scaling? For now we have an ever growing vector of observers

- other data structure than vec? smallvec?
- type that allows concurrent access to &mut for each observer, so we can mutate in place rather than have join_all allocate a new vector on easch notify. Maybe partitions crate? -> has 19 lines of unsafe code, needs review.
- let users set capacity on creation?
15 changes: 10 additions & 5 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use
{
pharos :: { * } ,
futures :: { executor::block_on, StreamExt } ,
pharos :: { * } ,
futures :: { executor::block_on, StreamExt, SinkExt } ,
};


Expand All @@ -21,7 +21,10 @@ impl Goddess
//
pub async fn sail( &mut self )
{
self.pharos.notify( &GoddessEvent::Sailing ).await;
// It's infallible. Observers that error will be dropped, since the only kind of errors on
// channels are when the channel is closed.
//
let _ = self.pharos.send( GoddessEvent::Sailing ).await;
}
}

Expand All @@ -45,7 +48,9 @@ enum GoddessEvent
//
impl Observable<GoddessEvent> for Goddess
{
fn observe( &mut self, options: ObserveConfig<GoddessEvent>) -> Events<GoddessEvent>
type Error = pharos::Error;

fn observe( &mut self, options: ObserveConfig<GoddessEvent>) -> Result< Events<GoddessEvent>, Self::Error >
{
self.pharos.observe( options )
}
Expand All @@ -62,7 +67,7 @@ fn main()
// - channel type (bounded/unbounded)
// - a predicate to filter events
//
let mut events = isis.observe( Channel::Bounded( 3 ).into() );
let mut events = isis.observe( Channel::Bounded( 3 ).into() ).expect( "observe" );

// trigger an event
//
Expand Down
6 changes: 4 additions & 2 deletions examples/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ struct Connection { pharos: Pharos<NetworkEvent> }

impl Observable<NetworkEvent> for Connection
{
fn observe( &mut self, options: ObserveConfig<NetworkEvent>) -> Events<NetworkEvent>
type Error = pharos::Error;

fn observe( &mut self, options: ObserveConfig<NetworkEvent>) -> Result< Events<NetworkEvent>, Self::Error >
{
self.pharos.observe( options )
}
Expand All @@ -33,7 +35,7 @@ fn main()
// By creating the config object through into, other options will be defaults, notably here
// this will use unbounded channels.
//
let observer = conn.observe( filter.into() );
let observer = conn.observe( filter.into() ).expect( "observe" );

// Combine both options.
//
Expand Down
13 changes: 9 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{ import::* };
//
#[ derive( Debug ) ]
//
pub(crate) struct Error
pub struct Error
{
pub(crate) inner: Option< Box<dyn ErrorTrait + Send> >,
pub(crate) kind : ErrorKind,
Expand All @@ -19,13 +19,18 @@ pub(crate) struct Error
//
#[ derive( Debug ) ]
//
pub(crate) enum ErrorKind
pub enum ErrorKind
{
/// Failed to send on channel, normally means it's closed. Pharos does not expose these errors
/// to the user.
#[ doc( hidden ) ]
//
//This variant is only used internally.
//
SendError,

/// The pharos object is already closed. You can no longer
//
Closed,

#[ doc( hidden ) ]
//
__NonExhaustive__
Expand Down
37 changes: 6 additions & 31 deletions src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,49 +91,24 @@ impl<Event> Sender<Event> where Event: Clone + 'static + Send
}


// Notify the observer and return a bool indicating whether this observer is still
// operational. If an error happens on a channel it usually means that the channel
// is closed, in which case we should drop this sender.
/// Check whether this sender is interested in this event
//
pub(crate) async fn notify( &mut self, evt: &Event ) -> bool
pub(crate) fn filter( &mut self, evt: &Event ) -> bool
{
if self.is_closed() { return false }

match self
{
Sender::Bounded { tx, filter } => Self::notifier( tx, filter, evt ).await,
Sender::Unbounded{ tx, filter } => Self::notifier( tx, filter, evt ).await,
Sender::Bounded { filter, .. } => Self::filter_inner( filter, evt ),
Sender::Unbounded{ filter, .. } => Self::filter_inner( filter, evt ),
}
}


async fn notifier
(
mut tx: impl Sink<Event> + Unpin ,
filter: &mut Option<Filter<Event>> ,
evt : &Event ,
)

-> bool

fn filter_inner( filter: &mut Option<Filter<Event>>, evt: &Event ) -> bool
{
let interested = match filter
match filter
{
Some(f) => f.call(evt),
None => true ,
};


#[ allow( clippy::match_bool ) ]
//
match interested
{
true => tx.send( evt.clone() ).await.is_ok(),

// since we don't try to send, we know nothing about whether they are still
// observing, so assume they do.
//
false => true,
}
}
}
Expand Down
9 changes: 2 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,7 @@ pub use
filter :: { Filter } ,
observable :: { Observable, ObserveConfig, Channel } ,
events :: { Events } ,
};


pub(crate) use
{
error :: { Error } ,
error :: { Error, ErrorKind } ,
};


Expand All @@ -58,7 +53,7 @@ mod import

futures ::
{
future::{ join_all }, Stream, Sink, SinkExt,
Stream, Sink, ready,

channel::mpsc::
{
Expand Down
26 changes: 20 additions & 6 deletions src/observable.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate :: { Filter, Events };
use crate :: { import::*, Filter, Events };

/// Indicate that a type is observable. You can call [`observe`](Observable::observe) to get a
/// stream of events.
Expand Down Expand Up @@ -28,6 +28,8 @@ use crate :: { Filter, Events };
///
/// impl Steps
/// {
/// // We can use this as a predicate to filter events.
/// //
/// fn is_err( &self ) -> bool
/// {
/// match self
Expand All @@ -39,15 +41,17 @@ use crate :: { Filter, Events };
/// }
///
///
/// // The object we want to be observable
/// // The object we want to be observable.
/// //
/// struct Foo { pharos: Pharos<Steps> };
///
/// impl Observable<Steps> for Foo
/// {
/// type Error = pharos::Error;
///
/// // Pharos implements observable, so we just forward the call.
/// //
/// fn observe( &mut self, options: ObserveConfig<Steps> ) -> Events<Steps>
/// fn observe( &mut self, options: ObserveConfig<Steps> ) -> Result< Events<Steps>, Self::Error >
/// {
/// self.pharos.observe( options )
/// }
Expand All @@ -59,9 +63,9 @@ use crate :: { Filter, Events };
/// async fn task()
/// {
/// let mut foo = Foo { pharos: Pharos::default() };
/// let mut errors = foo.observe( Filter::Pointer( Steps::is_err ).into() );
/// let mut errors = foo.observe( Filter::Pointer( Steps::is_err ).into() ).expect( "observe" );
///
/// // will only be notified on errors now
/// // will only be notified on errors thanks to the filter.
/// //
/// let next_error = errors.next().await;
/// }
Expand All @@ -71,10 +75,20 @@ pub trait Observable<Event>

where Event: Clone + 'static + Send ,
{
/// The error type that is returned if observing is not possible. [Pharos](crate::Pharos) implements Sink
/// which has a close method, so observing will no longer be possible after close is called.
///
/// Other than that, you might want to have moments in your objects lifetime when you don't want to take
/// any more observers. Returning a result from [observe](Observable::observe) enables that.
///
/// You can of course map the error of pharos to your own error type.
//
type Error: ErrorTrait;

/// Add an observer to the observable. Options can be in order to choose channel type and
/// to filter events with a predicate.
//
fn observe( &mut self, options: ObserveConfig<Event> ) -> Events<Event>;
fn observe( &mut self, options: ObserveConfig<Event> ) -> Result<Events<Event>, Self::Error>;
}


Expand Down
Loading

0 comments on commit 72d5892

Please sign in to comment.