From deb93c41160b096196507a5526caa17f33f8d7ce Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 4 Feb 2023 19:48:05 +0800 Subject: [PATCH 1/5] Reduce the size of the future returned by async `get_with` and friends --- src/future.rs | 2 +- src/future/cache.rs | 167 +++++++++++++++---------- src/future/entry_selector.rs | 14 ++- src/future/value_initializer.rs | 215 ++++++++++---------------------- 4 files changed, 171 insertions(+), 227 deletions(-) diff --git a/src/future.rs b/src/future.rs index f8159314..f0625c2e 100644 --- a/src/future.rs +++ b/src/future.rs @@ -25,7 +25,7 @@ pub use { pub type PredicateId = String; // Empty struct to be used in InitResult::InitErr to represent the Option None. -struct OptionallyNone; +pub(crate) struct OptionallyNone; pub struct Iter<'i, K, V>(crate::sync_base::iter::Iter<'i, K, V>); diff --git a/src/future/cache.rs b/src/future/cache.rs index 2ca51ed4..f7a61e4b 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -31,6 +31,89 @@ use std::{ time::Duration, }; +// +// Macros to mitigate the compiler issue that inflates the size of the `init` future +// in `get_with` and friends methods: +// +// - https://github.com/moka-rs/moka/issues/212 +// - https://swatinem.de/blog/future-size/ +// +// These macros are used by `get_with`, etc. of this module. We are not big fans of +// macros as they make the code difficult to understand. However, we believe the +// benefits of reducing the future size outweigh this drawback. +// +macro_rules! insert_with_hash_and_fun_m( + ( $self:ident, $key:expr, $hash:expr, $get:expr, $init:expr, $replace_if:expr, $need_key:expr ) => { + { + use futures_util::FutureExt; + + + let insert = |v| $self.insert_with_hash($key.clone(), $hash, v).boxed(); + + let k = if $need_key { + Some(Arc::clone(&$key)) + } else { + None + }; + + let type_id = ValueInitializer::::type_id_for_get_with(); + let post_init = ValueInitializer::::post_init_for_get_with; + + match $self + .value_initializer + .try_init_or_read(&Arc::clone(&$key), type_id, $get, $init, insert, post_init) + .await + { + InitResult::Initialized(v) => { + crossbeam_epoch::pin().flush(); + Entry::new(k, v, true) + } + InitResult::ReadExisting(v) => Entry::new(k, v, false), + InitResult::InitErr(_) => unreachable!(), + } + } + } +); + +macro_rules! get_or_insert_with_hash_and_fun_m( + ( $self:ident, $key:expr, $hash:expr, $init:expr, $replace_if:expr, $need_key:expr ) => { + { + let maybe_entry = + $self.base + .get_with_hash_but_ignore_if(&$key, $hash, $replace_if.as_mut(), $need_key); + if let Some(entry) = maybe_entry { + entry + } else { + let get = || { + $self.base + .get_with_hash_but_no_recording(&$key, $hash, $replace_if.as_mut()) + }; + insert_with_hash_and_fun_m!($self, $key, $hash, get, $init, $replace_if, $need_key) + } + } + } +); + +macro_rules! get_or_insert_with_hash_by_ref_and_fun_m( + ( $self:ident, $key:expr, $hash:expr, $init:expr, $replace_if:expr, $need_key:expr ) => { + { + let maybe_entry = + $self.base + .get_with_hash_but_ignore_if($key, $hash, $replace_if.as_mut(), $need_key); + if let Some(entry) = maybe_entry { + entry + } else { + let get = || { + $self.base + .get_with_hash_but_no_recording(&$key, $hash, $replace_if.as_mut()) + }; + let arc_key = Arc::new($key.to_owned()); + insert_with_hash_and_fun_m!($self, arc_key, $hash, get, $init, $replace_if, $need_key) + } + } + } +); + /// A thread-safe, futures-aware concurrent in-memory cache. /// /// `Cache` supports full concurrency of retrievals and a high expected concurrency @@ -917,10 +1000,8 @@ where pub async fn get_with(&self, key: K, init: impl Future) -> V { let hash = self.base.hash(&key); let key = Arc::new(key); - let replace_if = None as Option bool>; - self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false) - .await - .into_value() + let mut replace_if = None as Option bool>; + get_or_insert_with_hash_and_fun_m!(self, key, hash, init, replace_if, false).into_value() } /// Similar to [`get_with`](#method.get_with), but instead of passing an owned @@ -932,10 +1013,8 @@ where Q: ToOwned + Hash + Eq + ?Sized, { let hash = self.base.hash(key); - let replace_if = None as Option bool>; - - self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false) - .await + let mut replace_if = None as Option bool>; + get_or_insert_with_hash_by_ref_and_fun_m!(self, key, hash, init, replace_if, false) .into_value() } @@ -950,9 +1029,8 @@ where ) -> V { let hash = self.base.hash(&key); let key = Arc::new(key); - self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false) - .await - .into_value() + let mut replace_if = Some(replace_if); + get_or_insert_with_hash_and_fun_m!(self, key, hash, init, replace_if, false).into_value() } /// Returns a _clone_ of the value corresponding to the key. If the value does @@ -1432,15 +1510,7 @@ where mut replace_if: Option bool>, need_key: bool, ) -> Entry { - let maybe_entry = - self.base - .get_with_hash_but_ignore_if(&key, hash, replace_if.as_mut(), need_key); - if let Some(entry) = maybe_entry { - entry - } else { - self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key) - .await - } + get_or_insert_with_hash_and_fun_m!(self, key, hash, init, replace_if, need_key) } pub(crate) async fn get_or_insert_with_hash_by_ref_and_fun( @@ -1455,52 +1525,7 @@ where K: Borrow, Q: ToOwned + Hash + Eq + ?Sized, { - let maybe_entry = - self.base - .get_with_hash_but_ignore_if(key, hash, replace_if.as_mut(), need_key); - if let Some(entry) = maybe_entry { - entry - } else { - let key = Arc::new(key.to_owned()); - self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key) - .await - } - } - - async fn insert_with_hash_and_fun( - &self, - key: Arc, - hash: u64, - init: impl Future, - mut replace_if: Option bool>, - need_key: bool, - ) -> Entry { - use futures_util::FutureExt; - - let get = || { - self.base - .get_with_hash_but_no_recording(&key, hash, replace_if.as_mut()) - }; - let insert = |v| self.insert_with_hash(key.clone(), hash, v).boxed(); - - let k = if need_key { - Some(Arc::clone(&key)) - } else { - None - }; - - match self - .value_initializer - .init_or_read(Arc::clone(&key), get, init, insert) - .await - { - InitResult::Initialized(v) => { - crossbeam_epoch::pin().flush(); - Entry::new(k, v, true) - } - InitResult::ReadExisting(v) => Entry::new(k, v, false), - InitResult::InitErr(_) => unreachable!(), - } + get_or_insert_with_hash_by_ref_and_fun_m!(self, key, hash, init, replace_if, need_key) } pub(crate) async fn get_or_insert_with_hash( @@ -1608,9 +1633,12 @@ where None }; + let type_id = ValueInitializer::::type_id_for_optionally_get_with(); + let post_init = ValueInitializer::::post_init_for_optionally_get_with; + match self .value_initializer - .optionally_init_or_read(Arc::clone(&key), get, init, insert) + .try_init_or_read(&Arc::clone(&key), type_id, get, init, insert, post_init) .await { InitResult::Initialized(v) => { @@ -1688,9 +1716,12 @@ where None }; + let type_id = ValueInitializer::::type_id_for_try_get_with::(); + let post_init = ValueInitializer::::post_init_for_try_get_with; + match self .value_initializer - .try_init_or_read(Arc::clone(&key), get, init, insert) + .try_init_or_read(&Arc::clone(&key), type_id, get, init, insert, post_init) .await { InitResult::Initialized(v) => { diff --git a/src/future/entry_selector.rs b/src/future/entry_selector.rs index 9485919b..9a7cb2f1 100644 --- a/src/future/entry_selector.rs +++ b/src/future/entry_selector.rs @@ -521,11 +521,9 @@ where /// /// [get-with-method]: ./struct.Cache.html#method.get_with pub async fn or_insert_with(self, init: impl Future) -> Entry { - let owned_key: K = self.ref_key.to_owned(); - let key = Arc::new(owned_key); let replace_if = None as Option bool>; self.cache - .get_or_insert_with_hash_and_fun(key, self.hash, init, replace_if, true) + .get_or_insert_with_hash_by_ref_and_fun(self.ref_key, self.hash, init, replace_if, true) .await } @@ -542,10 +540,14 @@ where init: impl Future, replace_if: impl FnMut(&V) -> bool, ) -> Entry { - let owned_key: K = self.ref_key.to_owned(); - let key = Arc::new(owned_key); self.cache - .get_or_insert_with_hash_and_fun(key, self.hash, init, Some(replace_if), true) + .get_or_insert_with_hash_by_ref_and_fun( + self.ref_key, + self.hash, + init, + Some(replace_if), + true, + ) .await } diff --git a/src/future/value_initializer.rs b/src/future/value_initializer.rs index 540f8f7b..fed79eb2 100644 --- a/src/future/value_initializer.rs +++ b/src/future/value_initializer.rs @@ -117,141 +117,18 @@ where /// # Panics /// Panics if the `init` future has been panicked. - pub(crate) async fn init_or_read<'a>( - &'a self, - key: Arc, - // Closure to get an existing value from cache. - get: impl FnMut() -> Option, - init: impl Future, - // Closure to insert a new value into cache. - mut insert: impl FnMut(V) -> BoxFuture<'a, ()> + Send + 'a, - ) -> InitResult { - // This closure will be called before the init future is resolved, in order - // to check if the value has already been inserted by other async task. - let pre_init = make_pre_init(get); - - // This closure will be called after the init future has returned a value. It - // will insert the returned value (from init) to the cache, and convert the - // value into a pair of a WaiterValue and an InitResult. - let post_init = |value: V| { - async move { - insert(value.clone()).await; - ( - WaiterValue::Ready(Ok(value.clone())), - InitResult::Initialized(value), - ) - } - .boxed() - }; - - let type_id = TypeId::of::<()>(); - self.do_try_init(&key, type_id, pre_init, init, post_init) - .await - } - - /// # Panics - /// Panics if the `init` future has been panicked. - pub(crate) async fn try_init_or_read<'a, E>( - &'a self, - key: Arc, - get: impl FnMut() -> Option, - init: impl Future>, - mut insert: impl FnMut(V) -> BoxFuture<'a, ()> + Send + 'a, - ) -> InitResult - where - E: Send + Sync + 'static, - { - // This closure will be called before the init future is resolved, in order - // to check if the value has already been inserted by other async task. - let pre_init = make_pre_init(get); - - // This closure will be called after the init future has returned a value. It - // will insert the returned value (from init) to the cache, and convert the - // value into a pair of a WaiterValue and an InitResult. - let post_init = move |value: Result| { - async move { - match value { - Ok(value) => { - insert(value.clone()).await; - ( - WaiterValue::Ready(Ok(value.clone())), - InitResult::Initialized(value), - ) - } - Err(e) => { - let err: ErrorObject = Arc::new(e); - ( - WaiterValue::Ready(Err(Arc::clone(&err))), - InitResult::InitErr(err.downcast().unwrap()), - ) - } - } - } - .boxed() - }; - - let type_id = TypeId::of::(); - self.do_try_init(&key, type_id, pre_init, init, post_init) - .await - } - - /// # Panics - /// Panics if the `init` future has been panicked. - pub(super) async fn optionally_init_or_read<'a>( - &'a self, - key: Arc, - get: impl FnMut() -> Option, - init: impl Future>, - mut insert: impl FnMut(V) -> BoxFuture<'a, ()> + Send + 'a, - ) -> InitResult { - // This closure will be called before the init future is resolved, in order - // to check if the value has already been inserted by other async task. - let pre_init = make_pre_init(get); - - // This closure will be called after the init future has returned a value. It - // will insert the returned value (from init) to the cache, and convert the - // value into a pair of a WaiterValue and an InitResult. - let post_init = |value: Option| { - async move { - match value { - Some(value) => { - insert(value.clone()).await; - ( - WaiterValue::Ready(Ok(value.clone())), - InitResult::Initialized(value), - ) - } - None => { - // `value` can be either `Some` or `None`. For `None` case, - // without change the existing API too much, we will need to - // convert `None` to Arc here. `Infallible` could not be - // instantiated. So it might be good to use an empty struct - // to indicate the error type. - let err: ErrorObject = Arc::new(OptionallyNone); - ( - WaiterValue::Ready(Err(Arc::clone(&err))), - InitResult::InitErr(err.downcast().unwrap()), - ) - } - } - } - .boxed() - }; - - let type_id = TypeId::of::(); - self.do_try_init(&key, type_id, pre_init, init, post_init) - .await - } - - /// # Panics - /// Panics if the `init` future has been panicked. - async fn do_try_init<'a, O, E>( + pub(crate) async fn try_init_or_read<'a, O, E>( &'a self, key: &Arc, type_id: TypeId, - mut pre_init: impl FnMut() -> Option<(WaiterValue, InitResult)>, + // Closure to get an existing value from cache. + mut get: impl FnMut() -> Option, init: impl Future, - post_init: impl FnOnce(O) -> BoxFuture<'a, (WaiterValue, InitResult)>, + // Closure to insert a new value into cache. + mut insert: impl FnMut(V) -> BoxFuture<'a, ()> + Send + 'a, + // This function will be called after the init future has returned a value of + // type O. It converts O into Result. + post_init: fn(O) -> Result, ) -> InitResult where E: Send + Sync + 'static, @@ -283,12 +160,12 @@ where ); // Check if the value has already been inserted by other thread. - if let Some((waiter_val, init_res)) = pre_init() { + if let Some(value) = get() { // Yes. Set the waiter value, remove our waiter, and return // the existing value. - waiter_guard.set_waiter_value(waiter_val); + waiter_guard.set_waiter_value(WaiterValue::Ready(Ok(value.clone()))); remove_waiter(&self.waiters, cht_key, hash); - return init_res; + return InitResult::ReadExisting(value); } // The value still does note exist. Let's resolve the init future. @@ -297,7 +174,22 @@ where match AssertUnwindSafe(init).catch_unwind().await { // Resolved. Ok(value) => { - let (waiter_val, init_res) = post_init(value).await; + let (waiter_val, init_res) = match post_init(value) { + Ok(value) => { + insert(value.clone()).await; + ( + WaiterValue::Ready(Ok(value.clone())), + InitResult::Initialized(value), + ) + } + Err(e) => { + let err: ErrorObject = Arc::new(e); + ( + WaiterValue::Ready(Err(Arc::clone(&err))), + InitResult::InitErr(err.downcast().unwrap()), + ) + } + }; waiter_guard.set_waiter_value(waiter_val); remove_waiter(&self.waiters, cht_key, hash); return init_res; @@ -345,6 +237,42 @@ where } } } + + /// The `post_init` function for the `get_with` method of cache. + pub(crate) fn post_init_for_get_with(value: V) -> Result { + Ok(value) + } + + /// The `post_init` function for the `optionally_get_with` method of cache. + pub(crate) fn post_init_for_optionally_get_with( + value: Option, + ) -> Result> { + // `value` can be either `Some` or `None`. For `None` case, without change + // the existing API too much, we will need to convert `None` to Arc here. + // `Infallible` could not be instantiated. So it might be good to use an + // empty struct to indicate the error type. + value.ok_or(Arc::new(OptionallyNone)) + } + + /// The `post_init` function for `try_get_with` method of cache. + pub(crate) fn post_init_for_try_get_with(result: Result) -> Result { + result + } + + /// Returns the `type_id` for `get_with` method of cache. + pub(crate) fn type_id_for_get_with() -> TypeId { + TypeId::of::<()>() + } + + /// Returns the `type_id` for `optionally_get_with` method of cache. + pub(crate) fn type_id_for_optionally_get_with() -> TypeId { + TypeId::of::<()>() + } + + /// Returns the `type_id` for `try_get_with` method of cache. + pub(crate) fn type_id_for_try_get_with() -> TypeId { + TypeId::of::() + } } #[inline] @@ -386,23 +314,6 @@ where (cht_key, hash) } -#[inline] -fn make_pre_init( - mut get: impl FnMut() -> Option, -) -> impl FnMut() -> Option<(WaiterValue, InitResult)> -where - V: Clone, -{ - move || { - get().map(|value| { - ( - WaiterValue::Ready(Ok(value.clone())), - InitResult::ReadExisting(value), - ) - }) - } -} - fn panic_if_retry_exhausted_for_panicking(retries: usize, max: usize) { if retries >= max { panic!( From deec070f0eb61696fa9d2a9e3af0ed0259cc141c Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 4 Feb 2023 20:37:23 +0800 Subject: [PATCH 2/5] Reduce the size of the future returned by async `get_with` and friends Correct a wrong `TypeId` for `optionally_get_with`. --- src/future/value_initializer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/future/value_initializer.rs b/src/future/value_initializer.rs index fed79eb2..10a93a3a 100644 --- a/src/future/value_initializer.rs +++ b/src/future/value_initializer.rs @@ -266,7 +266,7 @@ where /// Returns the `type_id` for `optionally_get_with` method of cache. pub(crate) fn type_id_for_optionally_get_with() -> TypeId { - TypeId::of::<()>() + TypeId::of::() } /// Returns the `type_id` for `try_get_with` method of cache. From bc92f4e1a00234c257b7225deb5618229f98de9b Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sun, 5 Feb 2023 11:12:29 +0800 Subject: [PATCH 3/5] Reduce the size of the future returned by async `get_with` and friends Change the type of the `init` future argument for internal `async fn`s from `Future<...>` to `Pin<&mut Future<...>>`. --- src/future/cache.rs | 35 ++++++++++++++++++++------------- src/future/entry_selector.rs | 8 ++++++++ src/future/value_initializer.rs | 3 ++- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/src/future/cache.rs b/src/future/cache.rs index f7a61e4b..395ac4d0 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -27,13 +27,14 @@ use std::{ fmt, future::Future, hash::{BuildHasher, Hash}, + pin::Pin, sync::Arc, time::Duration, }; // -// Macros to mitigate the compiler issue that inflates the size of the `init` future -// in `get_with` and friends methods: +// Macros to mitigate the compiler issue that inflates the size of future returned +// from `get_with` and friends methods: // // - https://github.com/moka-rs/moka/issues/212 // - https://swatinem.de/blog/future-size/ @@ -43,11 +44,10 @@ use std::{ // benefits of reducing the future size outweigh this drawback. // macro_rules! insert_with_hash_and_fun_m( - ( $self:ident, $key:expr, $hash:expr, $get:expr, $init:expr, $replace_if:expr, $need_key:expr ) => { + ( $self:ident, $key:expr, $hash:expr, $get:expr, $init:ident, $replace_if:expr, $need_key:expr ) => { { use futures_util::FutureExt; - let insert = |v| $self.insert_with_hash($key.clone(), $hash, v).boxed(); let k = if $need_key { @@ -76,7 +76,7 @@ macro_rules! insert_with_hash_and_fun_m( ); macro_rules! get_or_insert_with_hash_and_fun_m( - ( $self:ident, $key:expr, $hash:expr, $init:expr, $replace_if:expr, $need_key:expr ) => { + ( $self:ident, $key:expr, $hash:expr, $init:ident, $replace_if:expr, $need_key:expr ) => { { let maybe_entry = $self.base @@ -95,7 +95,7 @@ macro_rules! get_or_insert_with_hash_and_fun_m( ); macro_rules! get_or_insert_with_hash_by_ref_and_fun_m( - ( $self:ident, $key:expr, $hash:expr, $init:expr, $replace_if:expr, $need_key:expr ) => { + ( $self:ident, $key:expr, $hash:expr, $init:ident, $replace_if:expr, $need_key:expr ) => { { let maybe_entry = $self.base @@ -998,6 +998,7 @@ where /// `init` futures. /// pub async fn get_with(&self, key: K, init: impl Future) -> V { + futures_util::pin_mut!(init); let hash = self.base.hash(&key); let key = Arc::new(key); let mut replace_if = None as Option bool>; @@ -1012,6 +1013,7 @@ where K: Borrow, Q: ToOwned + Hash + Eq + ?Sized, { + futures_util::pin_mut!(init); let hash = self.base.hash(key); let mut replace_if = None as Option bool>; get_or_insert_with_hash_by_ref_and_fun_m!(self, key, hash, init, replace_if, false) @@ -1027,6 +1029,7 @@ where init: impl Future, replace_if: impl FnMut(&V) -> bool, ) -> V { + futures_util::pin_mut!(init); let hash = self.base.hash(&key); let key = Arc::new(key); let mut replace_if = Some(replace_if); @@ -1129,6 +1132,7 @@ where where F: Future>, { + futures_util::pin_mut!(init); let hash = self.base.hash(&key); let key = Arc::new(key); self.get_or_optionally_insert_with_hash_and_fun(key, hash, init, false) @@ -1146,6 +1150,7 @@ where K: Borrow, Q: ToOwned + Hash + Eq + ?Sized, { + futures_util::pin_mut!(init); let hash = self.base.hash(key); self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false) .await @@ -1252,6 +1257,7 @@ where F: Future>, E: Send + Sync + 'static, { + futures_util::pin_mut!(init); let hash = self.base.hash(&key); let key = Arc::new(key); self.get_or_try_insert_with_hash_and_fun(key, hash, init, false) @@ -1269,6 +1275,7 @@ where K: Borrow, Q: ToOwned + Hash + Eq + ?Sized, { + futures_util::pin_mut!(init); let hash = self.base.hash(key); self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false) .await @@ -1506,7 +1513,7 @@ where &self, key: Arc, hash: u64, - init: impl Future, + init: Pin<&mut impl Future>, mut replace_if: Option bool>, need_key: bool, ) -> Entry { @@ -1517,7 +1524,7 @@ where &self, key: &Q, hash: u64, - init: impl Future, + init: Pin<&mut impl Future>, mut replace_if: Option bool>, need_key: bool, ) -> Entry @@ -1571,7 +1578,7 @@ where &self, key: Arc, hash: u64, - init: F, + init: Pin<&mut F>, need_key: bool, ) -> Option> where @@ -1590,7 +1597,7 @@ where &self, key: &Q, hash: u64, - init: F, + init: Pin<&mut F>, need_key: bool, ) -> Option> where @@ -1612,7 +1619,7 @@ where &self, key: Arc, hash: u64, - init: F, + init: Pin<&mut F>, need_key: bool, ) -> Option> where @@ -1654,7 +1661,7 @@ where &self, key: Arc, hash: u64, - init: F, + init: Pin<&mut F>, need_key: bool, ) -> Result, Arc> where @@ -1673,7 +1680,7 @@ where &self, key: &Q, hash: u64, - init: F, + init: Pin<&mut F>, need_key: bool, ) -> Result, Arc> where @@ -1694,7 +1701,7 @@ where &self, key: Arc, hash: u64, - init: F, + init: Pin<&mut F>, need_key: bool, ) -> Result, Arc> where diff --git a/src/future/entry_selector.rs b/src/future/entry_selector.rs index 9a7cb2f1..1257c153 100644 --- a/src/future/entry_selector.rs +++ b/src/future/entry_selector.rs @@ -176,6 +176,7 @@ where /// /// [get-with-method]: ./struct.Cache.html#method.get_with pub async fn or_insert_with(self, init: impl Future) -> Entry { + futures_util::pin_mut!(init); let key = Arc::new(self.owned_key); let replace_if = None as Option bool>; self.cache @@ -196,6 +197,7 @@ where init: impl Future, replace_if: impl FnMut(&V) -> bool, ) -> Entry { + futures_util::pin_mut!(init); let key = Arc::new(self.owned_key); self.cache .get_or_insert_with_hash_and_fun(key, self.hash, init, Some(replace_if), true) @@ -269,6 +271,7 @@ where self, init: impl Future>, ) -> Option> { + futures_util::pin_mut!(init); let key = Arc::new(self.owned_key); self.cache .get_or_optionally_insert_with_hash_and_fun(key, self.hash, init, true) @@ -344,6 +347,7 @@ where F: Future>, E: Send + Sync + 'static, { + futures_util::pin_mut!(init); let key = Arc::new(self.owned_key); self.cache .get_or_try_insert_with_hash_and_fun(key, self.hash, init, true) @@ -521,6 +525,7 @@ where /// /// [get-with-method]: ./struct.Cache.html#method.get_with pub async fn or_insert_with(self, init: impl Future) -> Entry { + futures_util::pin_mut!(init); let replace_if = None as Option bool>; self.cache .get_or_insert_with_hash_by_ref_and_fun(self.ref_key, self.hash, init, replace_if, true) @@ -540,6 +545,7 @@ where init: impl Future, replace_if: impl FnMut(&V) -> bool, ) -> Entry { + futures_util::pin_mut!(init); self.cache .get_or_insert_with_hash_by_ref_and_fun( self.ref_key, @@ -617,6 +623,7 @@ where self, init: impl Future>, ) -> Option> { + futures_util::pin_mut!(init); self.cache .get_or_optionally_insert_with_hash_by_ref_and_fun(self.ref_key, self.hash, init, true) .await @@ -692,6 +699,7 @@ where F: Future>, E: Send + Sync + 'static, { + futures_util::pin_mut!(init); self.cache .get_or_try_insert_with_hash_by_ref_and_fun(self.ref_key, self.hash, init, true) .await diff --git a/src/future/value_initializer.rs b/src/future/value_initializer.rs index 10a93a3a..4eb89cc8 100644 --- a/src/future/value_initializer.rs +++ b/src/future/value_initializer.rs @@ -4,6 +4,7 @@ use std::{ any::{Any, TypeId}, future::Future, hash::{BuildHasher, Hash}, + pin::Pin, sync::Arc, }; use triomphe::Arc as TrioArc; @@ -123,7 +124,7 @@ where type_id: TypeId, // Closure to get an existing value from cache. mut get: impl FnMut() -> Option, - init: impl Future, + init: Pin<&mut impl Future>, // Closure to insert a new value into cache. mut insert: impl FnMut(V) -> BoxFuture<'a, ()> + Send + 'a, // This function will be called after the init future has returned a value of From 9a68e3dea18a7baaab290fb2a80989ce7a7047cd Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sun, 5 Feb 2023 13:39:30 +0800 Subject: [PATCH 4/5] Reduce the size of the future returned by async `get_with` and friends Change the macros back to private functions. --- src/future/cache.rs | 159 ++++++++++++++------------------ src/future/value_initializer.rs | 15 ++- 2 files changed, 83 insertions(+), 91 deletions(-) diff --git a/src/future/cache.rs b/src/future/cache.rs index 395ac4d0..1a675575 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -32,88 +32,6 @@ use std::{ time::Duration, }; -// -// Macros to mitigate the compiler issue that inflates the size of future returned -// from `get_with` and friends methods: -// -// - https://github.com/moka-rs/moka/issues/212 -// - https://swatinem.de/blog/future-size/ -// -// These macros are used by `get_with`, etc. of this module. We are not big fans of -// macros as they make the code difficult to understand. However, we believe the -// benefits of reducing the future size outweigh this drawback. -// -macro_rules! insert_with_hash_and_fun_m( - ( $self:ident, $key:expr, $hash:expr, $get:expr, $init:ident, $replace_if:expr, $need_key:expr ) => { - { - use futures_util::FutureExt; - - let insert = |v| $self.insert_with_hash($key.clone(), $hash, v).boxed(); - - let k = if $need_key { - Some(Arc::clone(&$key)) - } else { - None - }; - - let type_id = ValueInitializer::::type_id_for_get_with(); - let post_init = ValueInitializer::::post_init_for_get_with; - - match $self - .value_initializer - .try_init_or_read(&Arc::clone(&$key), type_id, $get, $init, insert, post_init) - .await - { - InitResult::Initialized(v) => { - crossbeam_epoch::pin().flush(); - Entry::new(k, v, true) - } - InitResult::ReadExisting(v) => Entry::new(k, v, false), - InitResult::InitErr(_) => unreachable!(), - } - } - } -); - -macro_rules! get_or_insert_with_hash_and_fun_m( - ( $self:ident, $key:expr, $hash:expr, $init:ident, $replace_if:expr, $need_key:expr ) => { - { - let maybe_entry = - $self.base - .get_with_hash_but_ignore_if(&$key, $hash, $replace_if.as_mut(), $need_key); - if let Some(entry) = maybe_entry { - entry - } else { - let get = || { - $self.base - .get_with_hash_but_no_recording(&$key, $hash, $replace_if.as_mut()) - }; - insert_with_hash_and_fun_m!($self, $key, $hash, get, $init, $replace_if, $need_key) - } - } - } -); - -macro_rules! get_or_insert_with_hash_by_ref_and_fun_m( - ( $self:ident, $key:expr, $hash:expr, $init:ident, $replace_if:expr, $need_key:expr ) => { - { - let maybe_entry = - $self.base - .get_with_hash_but_ignore_if($key, $hash, $replace_if.as_mut(), $need_key); - if let Some(entry) = maybe_entry { - entry - } else { - let get = || { - $self.base - .get_with_hash_but_no_recording(&$key, $hash, $replace_if.as_mut()) - }; - let arc_key = Arc::new($key.to_owned()); - insert_with_hash_and_fun_m!($self, arc_key, $hash, get, $init, $replace_if, $need_key) - } - } - } -); - /// A thread-safe, futures-aware concurrent in-memory cache. /// /// `Cache` supports full concurrency of retrievals and a high expected concurrency @@ -1001,8 +919,10 @@ where futures_util::pin_mut!(init); let hash = self.base.hash(&key); let key = Arc::new(key); - let mut replace_if = None as Option bool>; - get_or_insert_with_hash_and_fun_m!(self, key, hash, init, replace_if, false).into_value() + let replace_if = None as Option bool>; + self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false) + .await + .into_value() } /// Similar to [`get_with`](#method.get_with), but instead of passing an owned @@ -1015,8 +935,9 @@ where { futures_util::pin_mut!(init); let hash = self.base.hash(key); - let mut replace_if = None as Option bool>; - get_or_insert_with_hash_by_ref_and_fun_m!(self, key, hash, init, replace_if, false) + let replace_if = None as Option bool>; + self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false) + .await .into_value() } @@ -1032,8 +953,10 @@ where futures_util::pin_mut!(init); let hash = self.base.hash(&key); let key = Arc::new(key); - let mut replace_if = Some(replace_if); - get_or_insert_with_hash_and_fun_m!(self, key, hash, init, replace_if, false).into_value() + let replace_if = Some(replace_if); + self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false) + .await + .into_value() } /// Returns a _clone_ of the value corresponding to the key. If the value does @@ -1517,7 +1440,15 @@ where mut replace_if: Option bool>, need_key: bool, ) -> Entry { - get_or_insert_with_hash_and_fun_m!(self, key, hash, init, replace_if, need_key) + let maybe_entry = + self.base + .get_with_hash_but_ignore_if(&key, hash, replace_if.as_mut(), need_key); + if let Some(entry) = maybe_entry { + entry + } else { + self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key) + .await + } } pub(crate) async fn get_or_insert_with_hash_by_ref_and_fun( @@ -1532,7 +1463,55 @@ where K: Borrow, Q: ToOwned + Hash + Eq + ?Sized, { - get_or_insert_with_hash_by_ref_and_fun_m!(self, key, hash, init, replace_if, need_key) + let maybe_entry = + self.base + .get_with_hash_but_ignore_if(key, hash, replace_if.as_mut(), need_key); + if let Some(entry) = maybe_entry { + entry + } else { + let key = Arc::new(key.to_owned()); + self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key) + .await + } + } + + async fn insert_with_hash_and_fun( + &self, + key: Arc, + hash: u64, + init: Pin<&mut impl Future>, + mut replace_if: Option bool>, + need_key: bool, + ) -> Entry { + use futures_util::FutureExt; + + let get = || { + self.base + .get_with_hash_but_no_recording(&key, hash, replace_if.as_mut()) + }; + let insert = |v| self.insert_with_hash(key.clone(), hash, v).boxed(); + + let k = if need_key { + Some(Arc::clone(&key)) + } else { + None + }; + + let type_id = ValueInitializer::::type_id_for_get_with(); + let post_init = ValueInitializer::::post_init_for_get_with; + + match self + .value_initializer + .try_init_or_read(&Arc::clone(&key), type_id, get, init, insert, post_init) + .await + { + InitResult::Initialized(v) => { + crossbeam_epoch::pin().flush(); + Entry::new(k, v, true) + } + InitResult::ReadExisting(v) => Entry::new(k, v, false), + InitResult::InitErr(_) => unreachable!(), + } } pub(crate) async fn get_or_insert_with_hash( diff --git a/src/future/value_initializer.rs b/src/future/value_initializer.rs index 4eb89cc8..b7f176b2 100644 --- a/src/future/value_initializer.rs +++ b/src/future/value_initializer.rs @@ -116,6 +116,16 @@ where } } + // + // NOTES: We use `Pin<&mut impl Future>` instead of `impl Future` here for the + // `init` argument. This is because we want to avoid the future size inflation + // caused by calling nested async functions. See the following links for more + // details: + // + // - https://github.com/moka-rs/moka/issues/212 + // - https://swatinem.de/blog/future-size/ + // + /// # Panics /// Panics if the `init` future has been panicked. pub(crate) async fn try_init_or_read<'a, O, E>( @@ -124,8 +134,9 @@ where type_id: TypeId, // Closure to get an existing value from cache. mut get: impl FnMut() -> Option, + // Future to initialize a new value. init: Pin<&mut impl Future>, - // Closure to insert a new value into cache. + // Closure that returns a future to insert a new value into cache. mut insert: impl FnMut(V) -> BoxFuture<'a, ()> + Send + 'a, // This function will be called after the init future has returned a value of // type O. It converts O into Result. @@ -262,6 +273,8 @@ where /// Returns the `type_id` for `get_with` method of cache. pub(crate) fn type_id_for_get_with() -> TypeId { + // NOTE: We use a regular function here instead of a const fn because TypeId + // is not stable as a const fn. (as of our MSRV) TypeId::of::<()>() } From d4f452162a3a1b1913737780269850140fe62f02 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Mon, 6 Feb 2023 14:57:19 +0800 Subject: [PATCH 5/5] Reduce the size of the future returned by async `get_with` and friends --- src/future/cache.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/future/cache.rs b/src/future/cache.rs index 1a675575..acf8ada4 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -953,8 +953,7 @@ where futures_util::pin_mut!(init); let hash = self.base.hash(&key); let key = Arc::new(key); - let replace_if = Some(replace_if); - self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false) + self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false) .await .into_value() }