Skip to content

Commit

Permalink
Simplified event stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
mintlu8 committed May 18, 2024
1 parent 2d0a997 commit e4ed508
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 69 deletions.
90 changes: 36 additions & 54 deletions src/access/async_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use crate::async_systems::AsyncWorldParam;
use crate::executor::{with_world_mut, REACTORS};
use crate::reactors::Reactors;
use crate::{AccessError, AccessResult};
use bevy_core::FrameCount;
use bevy_ecs::event::{Event, EventId, EventReader};
use bevy_ecs::system::{Local, Res};
use bevy_ecs::world::World;
use event_listener::EventListener;
use futures::{Future, Stream};
use event_listener_strategy::{NonBlocking, Strategy};
use futures::Stream;
use parking_lot::RwLock;
use std::cell::OnceCell;
use std::pin::{pin, Pin};
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -39,17 +39,16 @@ impl AsyncWorld {

/// A double buffered [`Stream`] of [`Event`]s.
#[derive(Debug)]
pub struct DoubleBufferedEvent<E: Event> {
last: RwLock<Vec<E>>,
this: RwLock<Vec<E>>,
wakers: event_listener::Event,
current_frame: AtomicU32,
pub struct EventBuffer<E: Event> {
buffer: RwLock<Vec<E>>,
notify: event_listener::Event,
tick: AtomicU32,
}

impl<E: Event + Clone> DoubleBufferedEvent<E> {
impl<E: Event + Clone> EventBuffer<E> {
pub fn into_stream(self: Arc<Self>) -> EventStream<E> {
EventStream {
frame: self.current_frame.load(Ordering::Relaxed).wrapping_sub(1),
tick: self.tick.load(Ordering::Acquire).wrapping_sub(1),
index: 0,
listener: None,
event: self,
Expand All @@ -58,20 +57,18 @@ impl<E: Event + Clone> DoubleBufferedEvent<E> {
}

/// A [`Stream`] of [`Event`]s, requires system [`react_to_event`] to function.
///
/// This follows bevy's double buffering semantics.
#[derive(Debug)]
pub struct EventStream<E: Event + Clone> {
frame: u32,
tick: u32,
index: usize,
listener: Option<EventListener>,
event: Arc<DoubleBufferedEvent<E>>,
event: Arc<EventBuffer<E>>,
}

impl<E: Event + Clone> Clone for EventStream<E> {
fn clone(&self) -> Self {
Self {
frame: self.frame,
tick: self.tick,
index: self.index,
listener: None,
event: self.event.clone(),
Expand All @@ -86,31 +83,21 @@ impl<E: Event + Clone> Stream for EventStream<E> {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
let current_frame = this.event.current_frame.load(Ordering::Relaxed);
let listener = this.listener.get_or_insert_with(||this.event.wakers.listen());
loop {
if current_frame != this.frame {
if this.frame != current_frame.wrapping_sub(1) {
this.frame = current_frame.wrapping_sub(1);
this.index = 0;
}
let lock = this.event.last.read();
if let Some(event) = lock.get(this.index).cloned() {
this.index += 1;
this.listener = None;
return Poll::Ready(Some(event));
} else {
this.frame = current_frame;
this.index = 0;
}
let current_tick = this.event.tick.load(Ordering::Acquire);
if current_tick != this.tick {
this.tick = current_tick;
this.index = 0;
}
let lock = this.event.this.read();
if let Some(event) = lock.get(this.index).cloned() {
let lock = this.event.buffer.read();
let value = lock.get(this.index).cloned();
this.listener.get_or_insert_with(||this.event.notify.listen());
if let Some(event) = value {
this.index += 1;
this.listener = None;
return Poll::Ready(Some(event))
} else {
match pin!(&mut *listener).poll(cx) {
drop(lock);
match NonBlocking::default().poll(&mut this.listener, cx) {
Poll::Ready(()) => (),
Poll::Pending => return Poll::Pending,
}
Expand All @@ -119,37 +106,32 @@ impl<E: Event + Clone> Stream for EventStream<E> {
}
}

impl<E: Event> Default for DoubleBufferedEvent<E> {
impl<E: Event> Default for EventBuffer<E> {
fn default() -> Self {
Self {
last: Default::default(),
this: Default::default(),
wakers: Default::default(),
current_frame: Default::default(),
notify: Default::default(),
tick: Default::default(),
buffer: Default::default()
}
}
}

/// React to an event, this system is safe to be repeated in the schedule.
/// React to an event.
///
/// Consecutive calls will flush the stream, make sure to order this against the executor correctly.
pub fn react_to_event<E: Event + Clone>(
cached: Local<OnceCell<Arc<DoubleBufferedEvent<E>>>>,
frame: Res<FrameCount>,
cached: Local<OnceCell<Arc<EventBuffer<E>>>>,
reactors: Res<Reactors>,
mut reader: EventReader<E>,
) {
let buffers = cached.get_or_init(|| reactors.get_event::<E>());
buffers.tick.fetch_add(1, Ordering::AcqRel);
if !reader.is_empty() {
buffers.wakers.notify(usize::MAX);
}
if buffers.current_frame.swap(frame.0, Ordering::Relaxed) == frame.0 {
buffers.this.write().extend(reader.read().cloned());
} else {
let mut this = buffers.this.write();
let mut last = buffers.last.write();
std::mem::swap::<Vec<_>>(this.as_mut(), last.as_mut());
this.clear();
this.extend(reader.read().cloned());
}
buffers.notify.notify(usize::MAX);
let mut lock = buffers.buffer.write();
lock.drain(..);
lock.extend(reader.read().cloned());
};
}

impl<E: Event + Clone> AsyncWorldParam for EventStream<E> {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod tween;
pub use crate::executor::world;
#[allow(deprecated)]
pub use crate::executor::{in_async_context, spawn, spawn_scoped};
pub use access::async_event::DoubleBufferedEvent;
pub use access::async_event::EventBuffer;
pub use access::async_query::OwnedQueryState;
pub use access::traits::AsyncAccess;
use bevy_ecs::{
Expand Down
8 changes: 4 additions & 4 deletions src/reactors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{cell::OnceCell, convert::Infallible, marker::PhantomData, sync::Arc};
use ty_map_gen::type_map;

use crate::signals::{Receiver, Signal, SignalId, SignalSender, Signals};
use crate::{access::async_event::DoubleBufferedEvent, signals::SignalMap};
use crate::{access::async_event::EventBuffer, signals::SignalMap};

/// Signal that sends changed values of a [`States`].
#[derive(Debug, Clone, Copy)]
Expand All @@ -37,7 +37,7 @@ type_map!(
type_map!(
/// A type map of signals.
#[derive(Clone)]
pub EventBufferMap where E [Event] => Arc<DoubleBufferedEvent<E>> [Clone + Send + Sync] as FxHashMap
pub EventBufferMap where E [Event] => Arc<EventBuffer<E>> [Clone + Send + Sync] as FxHashMap
);

/// Named or typed synchronization primitives of `bevy_defer`.
Expand Down Expand Up @@ -84,12 +84,12 @@ impl Reactors {
}

/// Obtain an event buffer by event type.
pub fn get_event<E: Event + Clone>(&self) -> Arc<DoubleBufferedEvent<E>> {
pub fn get_event<E: Event + Clone>(&self) -> Arc<EventBuffer<E>> {
let mut lock = self.0.event_buffers.lock();
if let Some(data) = lock.get::<E>() {
data.clone()
} else {
let signal = <Arc<DoubleBufferedEvent<E>>>::default();
let signal = <Arc<EventBuffer<E>>>::default();
lock.insert::<E>(signal.clone());
signal
}
Expand Down
21 changes: 11 additions & 10 deletions tests/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ pub fn events() {
app.add_plugins(AsyncPlugin::default_settings());
app.add_event::<AliceChat>();
app.add_event::<BobChat>();
app.add_systems(Update, react_to_event::<AliceChat>);
app.add_systems(Update, react_to_event::<BobChat>);
app.add_systems(PreUpdate, react_to_event::<AliceChat>);
app.add_systems(PreUpdate, react_to_event::<BobChat>);
app.add_plugins(MinimalPlugins);
app.spawn_task(async {
let world = AsyncWorld;
Expand Down Expand Up @@ -183,13 +183,12 @@ pub struct Chat(char);
pub fn stream() {
static DONE: AtomicBool = AtomicBool::new(false);
let mut app = App::new();
app.add_plugins(AsyncPlugin::default_settings());
app.add_event::<Chat>();
app.add_plugins(MinimalPlugins);
app.add_systems(Update, react_to_event::<Chat>);
app.add_plugins(AsyncPlugin::default_settings());
app.add_systems(PreUpdate, react_to_event::<Chat>);
app.spawn_task(async {
let world = AsyncWorld;
let mut stream = world.event_stream::<Chat>();
let mut stream = AsyncWorld.event_stream::<Chat>();
assert_eq!(stream.next().await, Some(Chat('r')));
assert_eq!(stream.next().await, Some(Chat('u')));
assert_eq!(stream.next().await, Some(Chat('s')));
Expand All @@ -202,13 +201,12 @@ pub fn stream() {
assert_eq!(stream.next().await, Some(Chat('v')));
assert_eq!(stream.next().await, Some(Chat('y')));
if DONE.swap(true, Ordering::Relaxed) {
world.quit();
AsyncWorld.quit();
}
Ok(())
});
app.spawn_task(async {
let world = AsyncWorld;
let mut stream = world
let mut stream = AsyncWorld
.event_stream::<Chat>()
.map(|c| c.0.to_ascii_uppercase());
assert_eq!(stream.next().await, Some('R'));
Expand All @@ -223,7 +221,7 @@ pub fn stream() {
assert_eq!(stream.next().await, Some('V'));
assert_eq!(stream.next().await, Some('Y'));
if DONE.swap(true, Ordering::Relaxed) {
world.quit();
AsyncWorld.quit();
}
Ok(())
});
Expand All @@ -248,6 +246,7 @@ static CELL: AtomicU32 = AtomicU32::new(0);
pub fn event_stream() {
let mut app = App::new();
app.add_plugins(MinimalPlugins);
app.add_event::<IntegerEvent>();
app.add_plugins(AsyncPlugin::default_settings());
app.spawn_task(async {
let mut i = 0;
Expand All @@ -262,11 +261,13 @@ pub fn event_stream() {
AsyncWorld.quit();
Ok(())
});
app.add_systems(PreUpdate, react_to_event::<IntegerEvent>);
app.add_systems(Update, sys_update);
app.add_systems(Update, sys_update);
app.add_systems(Update, sys_update);
app.add_systems(PreUpdate, sys_update);
app.add_systems(PostUpdate, sys_update);
app.run()
}

fn sys_update(mut event: EventWriter<IntegerEvent>) {
Expand Down

0 comments on commit e4ed508

Please sign in to comment.