Skip to content

Commit

Permalink
breaking: Rework event data. (#30)
Browse files Browse the repository at this point in the history
* Rework data.

* Update tests.

* Update tests.

* Update docs.
  • Loading branch information
milesj committed Jul 26, 2023
1 parent bd0ee1d commit 1f1ea6a
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 216 deletions.
94 changes: 44 additions & 50 deletions crates/events/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
![Crates.io](https://img.shields.io/crates/d/starbase_events)

An async event emitter for the `starbase` application framework. This crate works quite differently
than other event systems, as subscribers _can mutate_ the event and its data. Because of this, we
cannot use message channels, and must take extra precaution to satisfy `Send` + `Sync` requirements.
than other event systems, as subscribers _can mutate_ event data. Because of this, we cannot use
message channels, and must take extra precaution to satisfy `Send` + `Sync` requirements.

## Creating events

Events must derive `Event`, or implement the `Event` trait. Events can be any type of struct, but
the major selling point is that events are _mutable_, allowing inner content to be modified by
subscribers.
Events must derive `Event` or implement the `Event` trait.

```rust
use starbase_events::Event;
Expand All @@ -21,6 +19,29 @@ use app::Project;
pub struct ProjectCreatedEvent(pub Project);
```

### Event data

Events can optionally contain data, which is passed to and can be mutated by subscribers. By default
the value is a unit type (`()`), but can be customized with `#[event]` for derived events, or
`type Data` when implemented manually.

```rust
use starbase_events::Event;
use std::path::PathBuf;

#[derive(Event)]
#[event(dataset = PathBuf)]
pub struct CacheCheckEvent(pub PathBuf);

// OR

pub struct CacheCheckEvent(pub PathBuf);

impl Event for CacheCheckEvent {
type Data = PathBuf;
}
```

## Creating emitters

An `Emitter` is in charge of managing subscribers, and dispatching an event to each subscriber,
Expand All @@ -38,16 +59,19 @@ let cache_check: Emitter<CacheCheckEvent> = Emitter::new();
## Using subscribers

Subscribers are async functions that are registered into an emitter, and are executed when the
emitter emits an event. They are passed the event object as a `Arc<RwLock<T>>`, allowing for the
event and its inner data to be accessed mutably or immutably.
emitter emits an event. They are passed the event object as a `Arc<T>`, and the event's data as
`Arc<RwLock<T::Data>>`, allowing for the event to referenced immutably, and its data to be accessed
mutably or immutably.

```rust
use starbase_events::{EventResult, EventState};
use starbase_events::{Event, EventResult, EventState};

async fn update_root(event: Arc<RwLock<ProjectCreatedEvent>>) -> EventResult<ProjectCreatedEvent> {
let event = event.write().await;

event.0.root = new_path;
async fn update_root(
event: Arc<ProjectCreatedEvent>,
data: Arc<RwLock<<ProjectCreatedEvent as Event>::Data>>
) -> EventResult {
let mut data = data.write().await;
data.root = new_path;

Ok(EventState::Continue)
}
Expand All @@ -61,17 +85,18 @@ implementation. For example, the above subscriber can be rewritten as:

```rust
#[subscriber]
async fn update_root(mut event: ProjectCreatedEvent) {
event.0.root = new_path;
async fn update_root(mut data: ProjectCreatedEvent) {
data.root = new_path;
}
```

When using `#[subscriber]`, the following benefits apply:

- The return type is optional.
- The return value is optional if `EventState::Continue`.
- Using `mut event` or `&mut Event` will acquire a write lock, otherwise a read lock.
- Using `mut event` or `&mut Event` will acquire a write lock on data, otherwise a read lock.
- Omitting the event parameter will not acquire any lock.
- The name of the parameter is for _the data_, while the event is simply `event`.

## Controlling the event flow

Expand All @@ -80,7 +105,6 @@ following variants:

- `Continue` - Continues to the next subscriber (default).
- `Stop` - Stops after this subscriber, discarding subsequent subscribers.
- `Return` - Like `Stop` but also returns a value for interception.

```rust
#[subscriber]
Expand All @@ -92,49 +116,19 @@ async fn continue_flow(mut event: CacheCheckEvent) {
async fn stop_flow(mut event: CacheCheckEvent) {
Ok(EventState::Stop)
}

#[subscriber]
async fn return_flow(mut event: CacheCheckEvent) {
Ok(EventState::Return(path_to_cache)))
}
```

For `Return` flows, the type of value returned is inferred from the event. By default the value is a
unit type (`()`), but can be customized with `#[event]` for derived events, or `type Value` when
implemented manually.

```rust
use starbase_events::{Event, Emitter};
use std::path::PathBuf;

#[derive(Event)]
#[event(value = "PathBuf")]
pub struct CacheCheckEvent(pub PathBuf);

// OR

pub struct CacheCheckEvent(pub PathBuf);

impl Event for CacheCheckEvent {
type Value = PathBuf;
}
```

## Emitting and handling results

When an event is emitted, subscribers are executed sequentially in the same thread so that each
subscriber can mutate the event if necessary. Because of this, events do not support references for
inner values, and instead must own everything.
subscriber can mutate the event if necessary. Because of this, events do not support
references/lifetimes for inner values, and instead must own everything.

An event can be emitted with the `emit()` method, which requires an owned event (and owned inner
data).

```rust
let (event, result) = emitter.emit(ProjectCreatedEvent(owned_project)).await?;

// Take back ownership of inner data
let project = event.0;
let data = emitter.emit(ProjectCreatedEvent(owned_project)).await?;
```

Emitting returns a tuple, containing the final event after all modifications, and a result of type
`Option<Event::Value>` (which is provided with `EventState::Return`).
Emitting returns the event data after all modifications.
20 changes: 8 additions & 12 deletions crates/events/src/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,25 @@ impl<E: Event + 'static> Emitter<E> {
///
/// When complete, the provided event will be returned along with the value returned
/// by the subscriber that returned [`EventState::Return`], or [`None`] if not occurred.
pub async fn emit(&self, event: E) -> miette::Result<(E, Option<E::Value>)> {
let mut result = None;
pub async fn emit(&self, event: E) -> miette::Result<E::Data> {
let mut remove_indices = HashSet::new();
let event = Arc::new(RwLock::new(event));
let mut subscribers = self.subscribers.write().await;

let event = Arc::new(event);
let data = Arc::new(RwLock::new(E::Data::default()));

for (index, subscriber) in subscribers.iter_mut().enumerate() {
let event = Arc::clone(&event);
let data = Arc::clone(&data);

if subscriber.is_once() {
remove_indices.insert(index);
}

match subscriber.on_emit(event).await? {
match subscriber.on_emit(event, data).await? {
EventState::Continue => continue,
EventState::Stop => break,
EventState::Return(value) => {
result = Some(value);
break;
}
}
};
}

// Remove only once subscribers that were called
Expand All @@ -83,8 +81,6 @@ impl<E: Event + 'static> Emitter<E> {
!remove
});

let event = Arc::into_inner(event).unwrap().into_inner();

Ok((event, result))
Ok(Arc::into_inner(data).unwrap().into_inner())
}
}
7 changes: 3 additions & 4 deletions crates/events/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
pub trait Event: Send + Sync {
type Value;
type Data: Send + Sync + Default;
}

pub enum EventState<V> {
pub enum EventState {
Continue,
Stop,
Return(V),
}

pub type EventResult<E> = miette::Result<EventState<<E as Event>::Value>>;
pub type EventResult = miette::Result<EventState>;
16 changes: 8 additions & 8 deletions crates/events/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@ use tokio::sync::RwLock;
#[async_trait]
pub trait Subscriber<E: Event>: Send + Sync {
fn is_once(&self) -> bool;
async fn on_emit(&mut self, event: Arc<RwLock<E>>) -> EventResult<E>;
async fn on_emit(&mut self, event: Arc<E>, data: Arc<RwLock<E::Data>>) -> EventResult;
}

pub type BoxedSubscriber<E> = Box<dyn Subscriber<E>>;

#[async_trait]
pub trait SubscriberFunc<E: Event>: Send + Sync {
async fn call(&self, event: Arc<RwLock<E>>) -> EventResult<E>;
async fn call(&self, event: Arc<E>, data: Arc<RwLock<E::Data>>) -> EventResult;
}

#[async_trait]
impl<T: Send + Sync, E: Event + 'static, F> SubscriberFunc<E> for T
where
T: Fn(Arc<RwLock<E>>) -> F,
F: Future<Output = EventResult<E>> + Send,
T: Fn(Arc<E>, Arc<RwLock<E::Data>>) -> F,
F: Future<Output = EventResult> + Send,
{
async fn call(&self, event: Arc<RwLock<E>>) -> EventResult<E> {
self(event).await
async fn call(&self, event: Arc<E>, data: Arc<RwLock<E::Data>>) -> EventResult {
self(event, data).await
}
}

Expand All @@ -48,7 +48,7 @@ impl<E: Event> Subscriber<E> for CallbackSubscriber<E> {
self.once
}

async fn on_emit(&mut self, event: Arc<RwLock<E>>) -> EventResult<E> {
self.func.call(event).await
async fn on_emit(&mut self, event: Arc<E>, data: Arc<RwLock<E::Data>>) -> EventResult {
self.func.call(event, data).await
}
}
44 changes: 19 additions & 25 deletions crates/events/tests/event_macros_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,62 +17,56 @@ enum TestError {
Test,
}

#[derive(Event)]
#[event(value = "i32")]
#[derive(Debug, Event)]
#[event(dataset = i32)]
struct IntEvent(pub i32);

#[derive(Event)]
#[event(value = "String")]
#[event(dataset = String)]
struct StringEvent(pub String);

#[derive(Event)]
#[event(value = "PathBuf")]
#[event(dataset = PathBuf)]
struct PathEvent(pub PathBuf);

#[derive(Event)]
#[event(value = "std::path::PathBuf")]
#[event(dataset = std::path::PathBuf)]
struct FQPathEvent(pub PathBuf);

async fn callback_func(event: Arc<RwLock<IntEvent>>) -> EventResult<IntEvent> {
let mut event = event.write().await;
event.0 += 5;
async fn callback_func(_event: Arc<IntEvent>, data: Arc<RwLock<i32>>) -> EventResult {
let mut data = data.write().await;
*data += 5;
Ok(EventState::Continue)
}

#[subscriber]
async fn callback_read(event: IntEvent) -> EventResult<IntEvent> {
dbg!(event.0);
async fn callback_read(data: IntEvent) -> EventResult {
dbg!(event, data);
}

#[subscriber]
async fn callback_write(mut event: IntEvent) -> EventResult<IntEvent> {
event.0 += 5;
async fn callback_write(mut data: IntEvent) -> EventResult {
*data += 5;
Ok(EventState::Continue)
}

#[subscriber]
async fn callback_write_ref(event: &mut IntEvent) -> EventResult<IntEvent> {
event.0 += 5;
async fn callback_write_ref(data: &mut IntEvent) -> EventResult {
*data += 5;
}

#[subscriber]
fn callback_return(event: &mut IntEvent) {
event.0 += 5;
fn callback_return(data: &mut IntEvent) {
*data += 5;
Ok(EventState::Stop)
}

#[subscriber]
async fn no_return(event: &mut IntEvent) -> EventResult<IntEvent> {
event.0 += 5;
}

#[subscriber]
async fn custom_return(event: &mut IntEvent) -> EventResult<IntEvent> {
event.0 += 5;
Ok(EventState::Return(123))
async fn no_return(data: &mut IntEvent) -> EventResult {
*data += 5;
}

#[subscriber]
async fn err_return(_event: IntEvent) -> EventResult<IntEvent> {
async fn err_return(_data: IntEvent) -> EventResult {
Err(TestError::Test.into())
}
Loading

0 comments on commit 1f1ea6a

Please sign in to comment.