Skip to content

Commit

Permalink
Use RwLock for signals.
Browse files Browse the repository at this point in the history
  • Loading branch information
mintlu8 committed May 9, 2024
1 parent e62e131 commit 3363cff
Show file tree
Hide file tree
Showing 15 changed files with 70 additions and 110 deletions.
4 changes: 2 additions & 2 deletions derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn import_crate() -> TokenStream {
/// * All functions must have `&self` or `&mut self` receivers.
///
/// * Outputs must be `'static`.
///
///
/// * Does not support `async` functions, return `impl Future + 'static` instead.
///
/// ```
Expand All @@ -47,7 +47,7 @@ fn import_crate() -> TokenStream {
///
/// * `#[async_access(must_exist)]`
///
/// This will unwrap the results instead of returning `AsyncResult`.
/// This will unwrap the results instead of returning `AsyncResult`.
/// Useful on resources that should always be available.
#[proc_macro_attribute]
pub fn async_access(args: TokenStream1, tokens: TokenStream1) -> TokenStream1 {
Expand Down
12 changes: 5 additions & 7 deletions src/access/async_asset.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use crate::access::AsyncWorld;
use crate::executor::{with_world_mut, ASSET_SERVER};
use crate::sync::oneshot::MaybeChannelOut;
use crate::access::AsyncWorld;
use crate::{AccessError, AsyncResult};
use bevy_asset::{Asset, AssetPath, AssetServer, Assets, Handle, LoadState};
use bevy_ecs::world::World;
use futures::future::{ready, Either};

/// Async version of [`Handle`].
#[derive(Debug)]
pub struct AsyncAsset<A: Asset> (
pub(crate) Handle<A>,
);
pub struct AsyncAsset<A: Asset>(pub(crate) Handle<A>);

impl<A: Asset> Clone for AsyncAsset<A> {
fn clone(&self) -> Self {
Expand Down Expand Up @@ -89,12 +87,12 @@ impl<A: Asset> AsyncAsset<A> {
_ => (),
};
let handle = self.0.id();
AsyncWorld.watch_left(
move |world: &mut World| match world.resource::<AssetServer>().load_state(handle) {
AsyncWorld.watch_left(move |world: &mut World| {
match world.resource::<AssetServer>().load_state(handle) {
LoadState::Loaded => Some(true),
LoadState::Failed => Some(false),
_ => None,
}
)
})
}
}
8 changes: 4 additions & 4 deletions src/access/async_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ pub struct EventStream<E: Event + Clone> {

impl<E: Event + Clone> Clone for EventStream<E> {
fn clone(&self) -> Self {
Self {
frame: self.frame,
index: self.index,
event: self.event.clone()
Self {
frame: self.frame,
index: self.index,
event: self.event.clone(),
}
}
}
Expand Down
21 changes: 4 additions & 17 deletions src/access/async_query.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::reactors::Reactors;
use crate::{
async_systems::AsyncWorldParam, executor::with_world_mut,
signals::Signals,
};
use crate::{async_systems::AsyncEntityParam, AccessError};
use crate::{async_systems::AsyncWorldParam, executor::with_world_mut, signals::Signals};
#[allow(unused)]
use bevy_ecs::system::Query;
use bevy_ecs::{
Expand All @@ -16,9 +13,7 @@ use std::{borrow::Borrow, marker::PhantomData, ops::Deref};

/// Async version of [`Query`]
#[derive(Debug)]
pub struct AsyncQuery<T: QueryData, F: QueryFilter = ()> (
pub(crate) PhantomData<(T, F)>
);
pub struct AsyncQuery<T: QueryData, F: QueryFilter = ()>(pub(crate) PhantomData<(T, F)>);

impl<T: QueryData, F: QueryFilter> Copy for AsyncQuery<T, F> {}

Expand All @@ -45,9 +40,7 @@ impl<T: QueryData, F: QueryFilter> Clone for AsyncEntityQuery<T, F> {

/// Async version of [`Query`] on a unique entity.
#[derive(Debug)]
pub struct AsyncQuerySingle<T: QueryData, F: QueryFilter = ()> (
pub(crate) PhantomData<(T, F)>,
);
pub struct AsyncQuerySingle<T: QueryData, F: QueryFilter = ()>(pub(crate) PhantomData<(T, F)>);

impl<T: QueryData, F: QueryFilter> Copy for AsyncQuerySingle<T, F> {}

Expand All @@ -57,7 +50,6 @@ impl<T: QueryData, F: QueryFilter> Clone for AsyncQuerySingle<T, F> {
}
}


impl<T: QueryData, F: QueryFilter> AsyncQuery<T, F> {
/// Obtain an [`AsyncEntityQuery`] on a specific entity.
pub fn entity(&self, entity: impl Borrow<Entity>) -> AsyncEntityQuery<T, F> {
Expand Down Expand Up @@ -104,12 +96,7 @@ impl<T: QueryData, F: QueryFilter> AsyncEntityParam for AsyncEntityQuery<T, F> {
Some(())
}

fn from_async_context(
entity: Entity,
_: &Reactors,
_: (),
_: &[Entity],
) -> Option<Self> {
fn from_async_context(entity: Entity, _: &Reactors, _: (), _: &[Entity]) -> Option<Self> {
Some(Self {
entity,
p: PhantomData,
Expand Down
16 changes: 3 additions & 13 deletions src/access/async_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type SysParamFn<Q, T> = dyn Fn(StaticSystemParam<Q>) -> T + Send + Sync + 'stati
struct ResSysParamId<P: SystemParam, T>(SystemId<Box<SysParamFn<P, T>>, T>);

impl<Q: SystemParam + 'static> AsyncSystemParam<Q> {

/// Run a function on the [`SystemParam`] and obtain the result.
pub fn run<T: Send + Sync + 'static>(
&self,
Expand Down Expand Up @@ -81,12 +80,7 @@ impl<C: Component> AsyncEntityParam for AsyncComponent<C> {
Some(())
}

fn from_async_context(
entity: Entity,
_: &Reactors,
_: (),
_: &[Entity],
) -> Option<Self> {
fn from_async_context(entity: Entity, _: &Reactors, _: (), _: &[Entity]) -> Option<Self> {
Some(Self {
entity,
p: PhantomData,
Expand All @@ -99,9 +93,7 @@ pub use bevy_ecs::system::NonSend;

/// An `AsyncSystemParam` that gets or sets a resource on the `World`.
#[derive(Debug)]
pub struct AsyncNonSend<R: 'static> (
pub(crate) PhantomData<R>,
);
pub struct AsyncNonSend<R: 'static>(pub(crate) PhantomData<R>);

impl<R: 'static> Copy for AsyncNonSend<R> {}

Expand All @@ -119,9 +111,7 @@ impl<R: 'static> AsyncWorldParam for AsyncNonSend<R> {

/// An `AsyncSystemParam` that gets or sets a resource on the `World`.
#[derive(Debug)]
pub struct AsyncResource<R: Resource>(
pub(crate) PhantomData<R>,
);
pub struct AsyncResource<R: Resource>(pub(crate) PhantomData<R>);

impl<R: Resource> Copy for AsyncResource<R> {}

Expand Down
7 changes: 2 additions & 5 deletions src/access/async_world.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::access::{
AsyncComponent, AsyncEntityQuery, AsyncNonSend, AsyncQuery, AsyncResource, AsyncSystemParam,
};
use crate::async_systems::AsyncEntityParam;
use crate::async_systems::AsyncWorldParam;
use crate::executor::QUERY_QUEUE;
use crate::reactors::Reactors;
use crate::async_systems::AsyncEntityParam;
use bevy_ecs::{
component::Component,
entity::Entity,
Expand All @@ -23,7 +23,6 @@ use bevy_ecs::{system::Commands, world::World};
#[derive(Debug, Copy, Clone)]
pub struct AsyncWorld;


impl AsyncWorld {
/// Obtain an [`AsyncEntityMut`] of the entity.
///
Expand Down Expand Up @@ -80,9 +79,7 @@ impl AsyncWorld {

#[derive(Debug, Clone)]
/// Async version of `EntityMut` or `EntityCommands`.
pub struct AsyncEntityMut(
pub(crate) Entity,
);
pub struct AsyncEntityMut(pub(crate) Entity);

impl AsyncEntityMut {
/// Obtain the underlying [`Entity`] id.
Expand Down
1 change: 0 additions & 1 deletion src/access/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,3 @@ type AsyncWorldMut = AsyncWorld;

#[cfg(feature = "derive")]
pub use bevy_defer_derive::{AsyncComponent, AsyncNonSend, AsyncResource};

43 changes: 20 additions & 23 deletions src/access/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,11 @@ pub trait AsyncAccess {
Self: AsyncTake + AsyncLoad,
{
let ctx = self.as_cx();
AsyncWorld
.watch(move |w| match <Self as AsyncTake>::take(w, &ctx) {
Ok(result) => Some(Ok(result)),
Err(err) if Self::should_continue(err) => None,
Err(err) => Some(Err(err)),
})
AsyncWorld.watch(move |w| match <Self as AsyncTake>::take(w, &ctx) {
Ok(result) => Some(Ok(result)),
Err(err) if Self::should_continue(err) => None,
Err(err) => Some(Err(err)),
})
}

/// Run a function on this item and obtain the result.
Expand All @@ -110,16 +109,15 @@ pub trait AsyncAccess {
Self: AsyncLoad,
{
let cx = self.as_cx();
AsyncWorld
.watch(move |w| match Self::from_mut_world(w, &cx) {
Ok(mut mut_cx) => match Self::from_mut_cx(&mut mut_cx, &cx) {
Ok(ref_mut) => Some(Ok(f(ref_mut))),
Err(err) if Self::should_continue(err) => None,
Err(err) => Some(Err(err)),
},
AsyncWorld.watch(move |w| match Self::from_mut_world(w, &cx) {
Ok(mut mut_cx) => match Self::from_mut_cx(&mut mut_cx, &cx) {
Ok(ref_mut) => Some(Ok(f(ref_mut))),
Err(err) if Self::should_continue(err) => None,
Err(err) => Some(Err(err)),
})
},
Err(err) if Self::should_continue(err) => None,
Err(err) => Some(Err(err)),
})
}

/// Run a function on this item until it returns `Some`.
Expand All @@ -128,16 +126,15 @@ pub trait AsyncAccess {
mut f: impl FnMut(Self::RefMut<'_>) -> Option<T> + 'static,
) -> ChannelOut<AsyncResult<T>> {
let cx = self.as_cx();
AsyncWorld
.watch(move |w| match Self::from_mut_world(w, &cx) {
Ok(mut mut_cx) => match Self::from_mut_cx(&mut mut_cx, &cx) {
Ok(ref_mut) => f(ref_mut).map(Ok),
Err(err) if Self::should_continue(err) => None,
Err(err) => Some(Err(err)),
},
AsyncWorld.watch(move |w| match Self::from_mut_world(w, &cx) {
Ok(mut mut_cx) => match Self::from_mut_cx(&mut mut_cx, &cx) {
Ok(ref_mut) => f(ref_mut).map(Ok),
Err(err) if Self::should_continue(err) => None,
Err(err) => Some(Err(err)),
})
},
Err(err) if Self::should_continue(err) => None,
Err(err) => Some(Err(err)),
})
}

/// Continue `watch` and `on_load` if fetch context failed with these errors.
Expand Down Expand Up @@ -545,7 +542,7 @@ impl<D: QueryData + 'static, F: QueryFilter + 'static> AsyncAccess for AsyncQuer
type RefMutCx<'t> = Option<OwnedQueryState<'t, D, F>>;
type Ref<'t> = OwnedQueryState<'t, D, F>;
type RefMut<'t> = OwnedQueryState<'t, D, F>;

fn as_cx(&self) -> Self::Cx {}

fn from_mut_world<'t>(world: &'t mut World, _: &Self::Cx) -> AsyncResult<Self::RefMutCx<'t>> {
Expand Down
14 changes: 4 additions & 10 deletions src/async_systems.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,15 @@ type PinnedFut = Pin<Box<dyn Future<Output = Result<(), AccessError>> + 'static>

/// An async system function.
pub struct AsyncSystem {
pub(crate) function: Box<
dyn FnMut(Entity, &Reactors, &Signals, &[Entity]) -> Option<PinnedFut> + Send + Sync,
>,
pub(crate) function:
Box<dyn FnMut(Entity, &Reactors, &Signals, &[Entity]) -> Option<PinnedFut> + Send + Sync>,
pub(crate) marker: ParentAlive,
pub id: Option<NonZeroU32>,
}

impl AsyncSystem {
pub fn new<F>(
mut f: impl FnMut(Entity, &Reactors, &Signals, &[Entity]) -> Option<F>
+ Send
+ Sync
+ 'static,
mut f: impl FnMut(Entity, &Reactors, &Signals, &[Entity]) -> Option<F> + Send + Sync + 'static,
) -> Self
where
F: Future<Output = AsyncResult> + 'static,
Expand Down Expand Up @@ -331,9 +327,7 @@ impl AsyncSystem {
if !self.marker.other_alive() {
let alive = self.marker.clone_child();
let children = children.map(|x| x.as_ref()).unwrap_or(&[]);
let Some(fut) =
(self.function)(entity, reactors, signals, children)
else {
let Some(fut) = (self.function)(entity, reactors, signals, children) else {
return;
};
executor.spawn(futures::future::select(alive, fut.map(|_| ())));
Expand Down
12 changes: 5 additions & 7 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,11 @@ impl AsyncWorld {
/// ```
#[deprecated = "Use `state_stream` instead."]
pub fn in_state<S: States>(&self, state: S) -> ChannelOut<()> {
self.watch(
move |world: &mut World| {
world
.get_resource::<State<S>>()
.and_then(|s| (s.get() == &state).then_some(()))
},
)
self.watch(move |world: &mut World| {
world
.get_resource::<State<S>>()
.and_then(|s| (s.get() == &state).then_some(()))
})
}

/// Obtain a [`Stream`] that reacts to changes of a [`States`].
Expand Down
4 changes: 1 addition & 3 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ pub fn run_async_executor(world: &mut World) {
SPAWNER.set(&executor.0.clone(), || {
QUERY_QUEUE.set(&queue, || {
REACTORS.set(&reactors, || {
WORLD.set(world, || {
while executor.0.try_tick() {}
});
WORLD.set(world, || while executor.0.try_tick() {});
})
})
})
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ pub struct AsyncPlugin {

impl AsyncPlugin {
/// Equivalent to [`CoreAsyncPlugin`].
///
///
/// Use [`AsyncPlugin::run_in`] and [`AsyncPlugin::run_in_set`] to add runs.
pub fn empty() -> Self {
AsyncPlugin {
schedules: Vec::new(),
}
}

/// Run in [`Update`] once.
///
/// Run in [`Update`] once.
///
/// This is usually enough, be sure to order your
/// systems against [`run_async_executor`](systems::run_async_executor) correctly if needed.
pub fn default_settings() -> Self {
Expand Down
4 changes: 1 addition & 3 deletions src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::executor::QUERY_QUEUE;
use crate::sync::oneshot::ChannelOutOrCancel;
use crate::sync::waitlist::WaitList;
use crate::{
access::AsyncWorld, cancellation::TaskCancellation, channel, sync::oneshot::Sender,
};
use crate::{access::AsyncWorld, cancellation::TaskCancellation, channel, sync::oneshot::Sender};
use bevy_core::FrameCount;
use bevy_ecs::system::NonSend;
use bevy_ecs::system::Res;
Expand Down

0 comments on commit 3363cff

Please sign in to comment.