From 3363cffe9d1522543f03e5a5aac1cb1f34ae6ba2 Mon Sep 17 00:00:00 2001 From: mintlu8 Date: Thu, 9 May 2024 08:59:12 +0800 Subject: [PATCH] Use RwLock for signals. --- derive/src/lib.rs | 4 ++-- src/access/async_asset.rs | 12 +++++------ src/access/async_event.rs | 8 +++---- src/access/async_query.rs | 21 ++++-------------- src/access/async_values.rs | 16 +++----------- src/access/async_world.rs | 7 ++---- src/access/mod.rs | 1 - src/access/traits.rs | 43 +++++++++++++++++-------------------- src/async_systems.rs | 14 ++++-------- src/commands.rs | 12 +++++------ src/executor.rs | 4 +--- src/lib.rs | 6 +++--- src/queue.rs | 4 +--- src/signals/signal_inner.rs | 20 ++++++++--------- tests/signals.rs | 8 +++++-- 15 files changed, 70 insertions(+), 110 deletions(-) diff --git a/derive/src/lib.rs b/derive/src/lib.rs index dd076cc..14bd8ce 100644 --- a/derive/src/lib.rs +++ b/derive/src/lib.rs @@ -27,7 +27,7 @@ fn import_crate() -> TokenStream { /// * All functions must have `&self` or `&mut self` receivers. /// /// * Outputs must be `'static`. -/// +/// /// * Does not support `async` functions, return `impl Future + 'static` instead. /// /// ``` @@ -47,7 +47,7 @@ fn import_crate() -> TokenStream { /// /// * `#[async_access(must_exist)]` /// -/// This will unwrap the results instead of returning `AsyncResult`. +/// This will unwrap the results instead of returning `AsyncResult`. /// Useful on resources that should always be available. #[proc_macro_attribute] pub fn async_access(args: TokenStream1, tokens: TokenStream1) -> TokenStream1 { diff --git a/src/access/async_asset.rs b/src/access/async_asset.rs index f0e354c..ffc59ee 100644 --- a/src/access/async_asset.rs +++ b/src/access/async_asset.rs @@ -1,6 +1,6 @@ +use crate::access::AsyncWorld; use crate::executor::{with_world_mut, ASSET_SERVER}; use crate::sync::oneshot::MaybeChannelOut; -use crate::access::AsyncWorld; use crate::{AccessError, AsyncResult}; use bevy_asset::{Asset, AssetPath, AssetServer, Assets, Handle, LoadState}; use bevy_ecs::world::World; @@ -8,9 +8,7 @@ use futures::future::{ready, Either}; /// Async version of [`Handle`]. #[derive(Debug)] -pub struct AsyncAsset ( - pub(crate) Handle, -); +pub struct AsyncAsset(pub(crate) Handle); impl Clone for AsyncAsset { fn clone(&self) -> Self { @@ -89,12 +87,12 @@ impl AsyncAsset { _ => (), }; let handle = self.0.id(); - AsyncWorld.watch_left( - move |world: &mut World| match world.resource::().load_state(handle) { + AsyncWorld.watch_left(move |world: &mut World| { + match world.resource::().load_state(handle) { LoadState::Loaded => Some(true), LoadState::Failed => Some(false), _ => None, } - ) + }) } } diff --git a/src/access/async_event.rs b/src/access/async_event.rs index 54a5659..b40ba19 100644 --- a/src/access/async_event.rs +++ b/src/access/async_event.rs @@ -65,10 +65,10 @@ pub struct EventStream { impl Clone for EventStream { fn clone(&self) -> Self { - Self { - frame: self.frame, - index: self.index, - event: self.event.clone() + Self { + frame: self.frame, + index: self.index, + event: self.event.clone(), } } } diff --git a/src/access/async_query.rs b/src/access/async_query.rs index f61a55a..5b65d63 100644 --- a/src/access/async_query.rs +++ b/src/access/async_query.rs @@ -1,9 +1,6 @@ use crate::reactors::Reactors; -use crate::{ - async_systems::AsyncWorldParam, executor::with_world_mut, - signals::Signals, -}; use crate::{async_systems::AsyncEntityParam, AccessError}; +use crate::{async_systems::AsyncWorldParam, executor::with_world_mut, signals::Signals}; #[allow(unused)] use bevy_ecs::system::Query; use bevy_ecs::{ @@ -16,9 +13,7 @@ use std::{borrow::Borrow, marker::PhantomData, ops::Deref}; /// Async version of [`Query`] #[derive(Debug)] -pub struct AsyncQuery ( - pub(crate) PhantomData<(T, F)> -); +pub struct AsyncQuery(pub(crate) PhantomData<(T, F)>); impl Copy for AsyncQuery {} @@ -45,9 +40,7 @@ impl Clone for AsyncEntityQuery { /// Async version of [`Query`] on a unique entity. #[derive(Debug)] -pub struct AsyncQuerySingle ( - pub(crate) PhantomData<(T, F)>, -); +pub struct AsyncQuerySingle(pub(crate) PhantomData<(T, F)>); impl Copy for AsyncQuerySingle {} @@ -57,7 +50,6 @@ impl Clone for AsyncQuerySingle { } } - impl AsyncQuery { /// Obtain an [`AsyncEntityQuery`] on a specific entity. pub fn entity(&self, entity: impl Borrow) -> AsyncEntityQuery { @@ -104,12 +96,7 @@ impl AsyncEntityParam for AsyncEntityQuery { Some(()) } - fn from_async_context( - entity: Entity, - _: &Reactors, - _: (), - _: &[Entity], - ) -> Option { + fn from_async_context(entity: Entity, _: &Reactors, _: (), _: &[Entity]) -> Option { Some(Self { entity, p: PhantomData, diff --git a/src/access/async_values.rs b/src/access/async_values.rs index 3a8a138..7e507ae 100644 --- a/src/access/async_values.rs +++ b/src/access/async_values.rs @@ -33,7 +33,6 @@ type SysParamFn = dyn Fn(StaticSystemParam) -> T + Send + Sync + 'stati struct ResSysParamId(SystemId>, T>); impl AsyncSystemParam { - /// Run a function on the [`SystemParam`] and obtain the result. pub fn run( &self, @@ -81,12 +80,7 @@ impl AsyncEntityParam for AsyncComponent { Some(()) } - fn from_async_context( - entity: Entity, - _: &Reactors, - _: (), - _: &[Entity], - ) -> Option { + fn from_async_context(entity: Entity, _: &Reactors, _: (), _: &[Entity]) -> Option { Some(Self { entity, p: PhantomData, @@ -99,9 +93,7 @@ pub use bevy_ecs::system::NonSend; /// An `AsyncSystemParam` that gets or sets a resource on the `World`. #[derive(Debug)] -pub struct AsyncNonSend ( - pub(crate) PhantomData, -); +pub struct AsyncNonSend(pub(crate) PhantomData); impl Copy for AsyncNonSend {} @@ -119,9 +111,7 @@ impl AsyncWorldParam for AsyncNonSend { /// An `AsyncSystemParam` that gets or sets a resource on the `World`. #[derive(Debug)] -pub struct AsyncResource( - pub(crate) PhantomData, -); +pub struct AsyncResource(pub(crate) PhantomData); impl Copy for AsyncResource {} diff --git a/src/access/async_world.rs b/src/access/async_world.rs index 8b3738b..b8ada57 100644 --- a/src/access/async_world.rs +++ b/src/access/async_world.rs @@ -1,10 +1,10 @@ use crate::access::{ AsyncComponent, AsyncEntityQuery, AsyncNonSend, AsyncQuery, AsyncResource, AsyncSystemParam, }; +use crate::async_systems::AsyncEntityParam; use crate::async_systems::AsyncWorldParam; use crate::executor::QUERY_QUEUE; use crate::reactors::Reactors; -use crate::async_systems::AsyncEntityParam; use bevy_ecs::{ component::Component, entity::Entity, @@ -23,7 +23,6 @@ use bevy_ecs::{system::Commands, world::World}; #[derive(Debug, Copy, Clone)] pub struct AsyncWorld; - impl AsyncWorld { /// Obtain an [`AsyncEntityMut`] of the entity. /// @@ -80,9 +79,7 @@ impl AsyncWorld { #[derive(Debug, Clone)] /// Async version of `EntityMut` or `EntityCommands`. -pub struct AsyncEntityMut( - pub(crate) Entity, -); +pub struct AsyncEntityMut(pub(crate) Entity); impl AsyncEntityMut { /// Obtain the underlying [`Entity`] id. diff --git a/src/access/mod.rs b/src/access/mod.rs index 7d254e1..2c67d40 100644 --- a/src/access/mod.rs +++ b/src/access/mod.rs @@ -18,4 +18,3 @@ type AsyncWorldMut = AsyncWorld; #[cfg(feature = "derive")] pub use bevy_defer_derive::{AsyncComponent, AsyncNonSend, AsyncResource}; - diff --git a/src/access/traits.rs b/src/access/traits.rs index 667a43e..bec3a6a 100644 --- a/src/access/traits.rs +++ b/src/access/traits.rs @@ -83,12 +83,11 @@ pub trait AsyncAccess { Self: AsyncTake + AsyncLoad, { let ctx = self.as_cx(); - AsyncWorld - .watch(move |w| match ::take(w, &ctx) { - Ok(result) => Some(Ok(result)), - Err(err) if Self::should_continue(err) => None, - Err(err) => Some(Err(err)), - }) + AsyncWorld.watch(move |w| match ::take(w, &ctx) { + Ok(result) => Some(Ok(result)), + Err(err) if Self::should_continue(err) => None, + Err(err) => Some(Err(err)), + }) } /// Run a function on this item and obtain the result. @@ -110,16 +109,15 @@ pub trait AsyncAccess { Self: AsyncLoad, { let cx = self.as_cx(); - AsyncWorld - .watch(move |w| match Self::from_mut_world(w, &cx) { - Ok(mut mut_cx) => match Self::from_mut_cx(&mut mut_cx, &cx) { - Ok(ref_mut) => Some(Ok(f(ref_mut))), - Err(err) if Self::should_continue(err) => None, - Err(err) => Some(Err(err)), - }, + AsyncWorld.watch(move |w| match Self::from_mut_world(w, &cx) { + Ok(mut mut_cx) => match Self::from_mut_cx(&mut mut_cx, &cx) { + Ok(ref_mut) => Some(Ok(f(ref_mut))), Err(err) if Self::should_continue(err) => None, Err(err) => Some(Err(err)), - }) + }, + Err(err) if Self::should_continue(err) => None, + Err(err) => Some(Err(err)), + }) } /// Run a function on this item until it returns `Some`. @@ -128,16 +126,15 @@ pub trait AsyncAccess { mut f: impl FnMut(Self::RefMut<'_>) -> Option + 'static, ) -> ChannelOut> { let cx = self.as_cx(); - AsyncWorld - .watch(move |w| match Self::from_mut_world(w, &cx) { - Ok(mut mut_cx) => match Self::from_mut_cx(&mut mut_cx, &cx) { - Ok(ref_mut) => f(ref_mut).map(Ok), - Err(err) if Self::should_continue(err) => None, - Err(err) => Some(Err(err)), - }, + AsyncWorld.watch(move |w| match Self::from_mut_world(w, &cx) { + Ok(mut mut_cx) => match Self::from_mut_cx(&mut mut_cx, &cx) { + Ok(ref_mut) => f(ref_mut).map(Ok), Err(err) if Self::should_continue(err) => None, Err(err) => Some(Err(err)), - }) + }, + Err(err) if Self::should_continue(err) => None, + Err(err) => Some(Err(err)), + }) } /// Continue `watch` and `on_load` if fetch context failed with these errors. @@ -545,7 +542,7 @@ impl AsyncAccess for AsyncQuer type RefMutCx<'t> = Option>; type Ref<'t> = OwnedQueryState<'t, D, F>; type RefMut<'t> = OwnedQueryState<'t, D, F>; - + fn as_cx(&self) -> Self::Cx {} fn from_mut_world<'t>(world: &'t mut World, _: &Self::Cx) -> AsyncResult> { diff --git a/src/async_systems.rs b/src/async_systems.rs index 8d1cf69..14e4686 100644 --- a/src/async_systems.rs +++ b/src/async_systems.rs @@ -157,19 +157,15 @@ type PinnedFut = Pin> + 'static> /// An async system function. pub struct AsyncSystem { - pub(crate) function: Box< - dyn FnMut(Entity, &Reactors, &Signals, &[Entity]) -> Option + Send + Sync, - >, + pub(crate) function: + Box Option + Send + Sync>, pub(crate) marker: ParentAlive, pub id: Option, } impl AsyncSystem { pub fn new( - mut f: impl FnMut(Entity, &Reactors, &Signals, &[Entity]) -> Option - + Send - + Sync - + 'static, + mut f: impl FnMut(Entity, &Reactors, &Signals, &[Entity]) -> Option + Send + Sync + 'static, ) -> Self where F: Future + 'static, @@ -331,9 +327,7 @@ impl AsyncSystem { if !self.marker.other_alive() { let alive = self.marker.clone_child(); let children = children.map(|x| x.as_ref()).unwrap_or(&[]); - let Some(fut) = - (self.function)(entity, reactors, signals, children) - else { + let Some(fut) = (self.function)(entity, reactors, signals, children) else { return; }; executor.spawn(futures::future::select(alive, fut.map(|_| ()))); diff --git a/src/commands.rs b/src/commands.rs index ce56e8a..4b3c2b9 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -324,13 +324,11 @@ impl AsyncWorld { /// ``` #[deprecated = "Use `state_stream` instead."] pub fn in_state(&self, state: S) -> ChannelOut<()> { - self.watch( - move |world: &mut World| { - world - .get_resource::>() - .and_then(|s| (s.get() == &state).then_some(())) - }, - ) + self.watch(move |world: &mut World| { + world + .get_resource::>() + .and_then(|s| (s.get() == &state).then_some(())) + }) } /// Obtain a [`Stream`] that reacts to changes of a [`States`]. diff --git a/src/executor.rs b/src/executor.rs index abe4731..f11dc90 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -105,9 +105,7 @@ pub fn run_async_executor(world: &mut World) { SPAWNER.set(&executor.0.clone(), || { QUERY_QUEUE.set(&queue, || { REACTORS.set(&reactors, || { - WORLD.set(world, || { - while executor.0.try_tick() {} - }); + WORLD.set(world, || while executor.0.try_tick() {}); }) }) }) diff --git a/src/lib.rs b/src/lib.rs index fa98037..61eb5eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -110,7 +110,7 @@ pub struct AsyncPlugin { impl AsyncPlugin { /// Equivalent to [`CoreAsyncPlugin`]. - /// + /// /// Use [`AsyncPlugin::run_in`] and [`AsyncPlugin::run_in_set`] to add runs. pub fn empty() -> Self { AsyncPlugin { @@ -118,8 +118,8 @@ impl AsyncPlugin { } } - /// Run in [`Update`] once. - /// + /// Run in [`Update`] once. + /// /// This is usually enough, be sure to order your /// systems against [`run_async_executor`](systems::run_async_executor) correctly if needed. pub fn default_settings() -> Self { diff --git a/src/queue.rs b/src/queue.rs index 50c1088..5b87174 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,9 +1,7 @@ use crate::executor::QUERY_QUEUE; use crate::sync::oneshot::ChannelOutOrCancel; use crate::sync::waitlist::WaitList; -use crate::{ - access::AsyncWorld, cancellation::TaskCancellation, channel, sync::oneshot::Sender, -}; +use crate::{access::AsyncWorld, cancellation::TaskCancellation, channel, sync::oneshot::Sender}; use bevy_core::FrameCount; use bevy_ecs::system::NonSend; use bevy_ecs::system::Res; diff --git a/src/signals/signal_inner.rs b/src/signals/signal_inner.rs index 1933946..6382964 100644 --- a/src/signals/signal_inner.rs +++ b/src/signals/signal_inner.rs @@ -7,13 +7,13 @@ use std::{ops::Deref, sync::atomic::AtomicU32, task::Poll}; use futures::future::FusedFuture; use futures::{Future, Sink, Stream}; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use std::sync::atomic::Ordering; /// The data component of a signal. #[derive(Debug, Default)] pub(crate) struct SignalData { - pub(crate) data: Mutex, + pub(crate) data: RwLock, pub(crate) tick: AtomicU32, pub(crate) wakers: Mutex>, } @@ -60,7 +60,7 @@ impl Signal { pub fn new(value: T) -> Self { Self(Arc::new(SignalInner { inner: Arc::new(SignalData { - data: Mutex::new(value), + data: RwLock::new(value), tick: AtomicU32::new(0), wakers: Default::default(), }), @@ -93,7 +93,7 @@ impl Deref for Signal { impl SignalInner { /// Send a value, does not increment the read tick. pub fn send(&self, value: T) { - let mut lock = self.inner.data.lock(); + let mut lock = self.inner.data.write(); *lock = value; self.inner.tick.fetch_add(1, Ordering::Relaxed); let mut wakers = self.inner.wakers.lock(); @@ -105,7 +105,7 @@ impl SignalInner { where T: PartialEq, { - let mut lock = self.inner.data.lock(); + let mut lock = self.inner.data.write(); if *lock != value { *lock = value; let mut wakers = self.inner.wakers.lock(); @@ -116,7 +116,7 @@ impl SignalInner { /// Send a value and increment the read tick. pub fn broadcast(&self, value: T) { - let mut lock = self.inner.data.lock(); + let mut lock = self.inner.data.write(); *lock = value; let version = self.inner.tick.fetch_add(1, Ordering::Relaxed); let mut wakers = self.inner.wakers.lock(); @@ -129,7 +129,7 @@ impl SignalInner { where T: PartialEq, { - let mut lock = self.inner.data.lock(); + let mut lock = self.inner.data.write(); if *lock != value { *lock = value; let version = self.inner.tick.fetch_add(1, Ordering::Relaxed); @@ -146,7 +146,7 @@ impl SignalInner { { let version = self.inner.tick.load(Ordering::Relaxed); if self.tick.swap(version, Ordering::Relaxed) != version { - Some(self.inner.data.lock().clone()) + Some(self.inner.data.read().clone()) } else { None } @@ -159,7 +159,7 @@ impl SignalInner { { let version = self.inner.tick.load(Ordering::Relaxed); self.tick.swap(version, Ordering::Relaxed); - self.inner.data.lock().clone() + self.inner.data.read().clone() } /// Poll the signal value asynchronously. @@ -190,7 +190,7 @@ impl Future for SignalFuture { let tick = self.signal.inner.tick.load(Ordering::Relaxed); if self.signal.tick.swap(tick, Ordering::Relaxed) != tick { self.is_terminated = true; - Poll::Ready(self.signal.inner.data.lock().clone()) + Poll::Ready(self.signal.inner.data.read().clone()) } else { let mut lock = self.signal.inner.wakers.lock(); lock.push(cx.waker().clone()); diff --git a/tests/signals.rs b/tests/signals.rs index 481e8b3..7ab0733 100644 --- a/tests/signals.rs +++ b/tests/signals.rs @@ -6,7 +6,6 @@ use std::{ use bevy::MinimalPlugins; use bevy_app::{App, Startup, Update}; use bevy_core::FrameCountPlugin; -use bevy_defer::{signals::Signals, systems::run_async_executor}; use bevy_defer::{ async_system, async_systems::AsyncSystems, @@ -15,8 +14,13 @@ use bevy_defer::{ systems::react_to_event, world, AsyncExtension, AsyncPlugin, }; +use bevy_defer::{signals::Signals, systems::run_async_executor}; use bevy_ecs::{ - component::Component, event::{Event, EventWriter}, query::With, schedule::IntoSystemConfigs, system::{Commands, Local, Query} + component::Component, + event::{Event, EventWriter}, + query::With, + schedule::IntoSystemConfigs, + system::{Commands, Local, Query}, }; use bevy_tasks::futures_lite::StreamExt; use bevy_time::TimePlugin;