Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Entry and_compute method for performing an insert, update, or remove operation with single closure #227

Closed
tatsuya6502 opened this issue Feb 10, 2023 · 11 comments · Fixed by #370
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@tatsuya6502
Copy link
Member

tatsuya6502 commented Feb 10, 2023

The entry API was added to Moka v0.10.0. It has some methods such as entry_by_ref(&key).or_insert_with(|| ...) to atomically insert a value when key does not exist. This issue will add another method and_compute to the entry API for more complex tasks. It is similar to the compute method in Java Caffeine cache. Moka user can use and_compute to insert, update, or remove a value for a key with single closure or function.

A new entry API: and_compute method

use moka::{Entry, ops::{Op, PerformedOp}};

// For moka::sync::Cache's entry API
// `K` and `V` are the key and value type of the cache.
pub fn and_compute<F>(self, f: F) -> (Option<Entry<K, V>>, PerformedOp)
where
    F: FnOnce(Option<&V> -> Op<V>);

// For moka::future::Cache's entry API
pub async fn and_compute<'a, F, Fut>(self, f: F) -> (Option<Entry<K, V>>, PerformedOp)
where
    F: FnOnce(Option<&'a V>) -> Fut,
    Fut: Future<Output = Op<V>> + 'a;

moka::ops::Op and moka::ops::PerformedOp will be defined as:

/// Instructs the `and_compute` method how to modify the cache entry.
pub enum Op<V> {
    /// No-op. Do not modify the cache entry.
    Nop,
    /// Insert or replace the value of the cache entry.
    Put(V),
    /// Invalidate the cache entry.
    Invalidate,
}

/// Will be returned from `and_compute` method to indicate what kind of
/// operation was performed.
pub enum PerformedOp {
    /// The entry did not exist, or already existed but was not modified.
    Nop,
    /// The entry did not exist and inserted.
    Inserted,
    /// The entry already existed and its value was updated.
    Updated,
    /// The entry existed and was invalidated.
    /// Note: If `and_compute` tried to invalidate a not-exiting entry, 
    /// `Nop` will be returned instead of `Invalidated`
    Invalidated,
}

Example

Here is an example of a future cache holding counters. (from a question in #179)

use moka::{future::Cache, ops::{Op, PerformedOp}};

let cache: Cache<String, u64> = Cache::new(100);
let key = "key".to_string();

// maybe_entry: Option<moka::Entry<K, V>>
// op_kind:     moka::op::PerformedOp
let (maybe_entry, performed_op) = cache
    .entry_by_ref(&key)
    .and_compute(|entry| async move {
        match entry {
            // If the entry does not exist, insert a value of 1.
            None => Op::Put(1),
            // If the entry exists, increment the value by 1.
            Some(count) => Op::Put(count.saturating_add(1)),
        }
    })
    .await;

assert_eq!(performed_op, PerformedOp::Inserted);
assert_eq!(maybe_entry.unwrap().into_value(), 1);

Concurrency

The and_compute should have the following attributes about concurrency:

  • It is a single operation that is atomic with respect to other operations. It is not possible to observe an intermediate state where the entry is neither present nor absent.
  • If there are simultaneous and_compute calls on the same key, only one call will proceed at a time. The other calls will wait until the first call completes.
    • This is also true between and_compute and or_insert_with on the same key. Only one call will proceed at a time, and the other calls will wait until the first call completes.
  • Unlike or_insert_with, there is no call coercing between simultaneous and_compute calls on the same key.
    • This is because, in typical use cases, and_compute will not do idempotent operation. (e.g. counting up a value)
@tatsuya6502 tatsuya6502 added the enhancement New feature or request label Feb 10, 2023
@tatsuya6502 tatsuya6502 added this to the Backlog milestone Feb 10, 2023
@tatsuya6502
Copy link
Member Author

I will probably have time in next few weeks for implementing and testing this feature. If so, it will be included in v0.10.1 release. But for now, I will put it under the backlog.

@tatsuya6502
Copy link
Member Author

Another example: #229 (comment).

@Swatinem
Copy link
Contributor

I would suggest to make the Op into a builder pattern. That way, you may have a builder method to add/update a per-item TTL.

As a workaround, I’m adding a per-item TTL manually in getsentry/symbolicator#1028 right now by making my cache value a (Instant, T), and using replace_if(|v| Instant::now() >= v.0).
It works, but it duplicates both the Instant and the checking of it that moka already does internally.

@Swatinem
Copy link
Contributor

Although it would not quite be the same, as your suggested and_modify returns an Option<V>, whereas I would very much like to always get back a value.

@tatsuya6502
Copy link
Member Author

tatsuya6502 commented Feb 19, 2023

Thank you for the feedback!

Although it would not quite be the same, as your suggested and_modify returns an Option<V>, whereas I would very much like to always get back a value.

Yeah, I agree. I did so too in my example above. I had to call unwrap() on the Option<V> unconditionally to get back the value (maybe_entry.unwrap().into_value()). If and_compute had returned a V, I did not need the unwrap.

I think I will rename the original and_compute to and_optionally_compute, and add a new and_compute that always returns a value V an Entry<K, V>.

New and_compute

// For moka::sync::Cache's entry API
pub fn and_compute<F>(self, f: F) -> (Entry<K, V>, PerformedOp)
where
    F: FnOnce(Option<&V> -> V);

// For moka::future::Cache's entry API
pub async fn and_compute<'a, F, Fut>(self, f: F) -> (Entry<K, V>, PerformedOp)
where
    F: FnOnce(Option<&'a V>) -> Fut,
    Fut: Future<Output = V> + 'a;

EDIT: Maybe I will change the return value from and_compute to just Entry<K, V> (remove PerformedOp). And add is_updated (was_updated ?) method to Entry<K, V>, which returns true when an existing value was updated. (It returns false when the key did not exist and a new value was inserted)

Updated Example

use moka::{future::Cache, ops::{Op, PerformedOp}};
use std::future::IntoFuture;  // since 1.64.0

let cache: Cache<String, u64> = Cache::new(100);
let key = "key".to_string();

// entry:        moka::Entry<K, V>
// performed_op: moka::ops::PerformedOp
let (entry, performed_op) = cache
    .entry_by_ref(&key)
    .and_compute(|entry| {
        if let Some(count) = entry {
            // The entry exists, increment the value by 1.
            count.saturating_add(1)
        } else {
            // The entry does not exist, insert a value of 1.
            1
        }.into_future()
    })
    .await;

assert_eq!(performed_op, PerformedOp::Inserted);
assert_eq!(entry.into_value(), 1);

@tatsuya6502
Copy link
Member Author

I would suggest to make the Op into a builder pattern. That way, you may have a builder method to add/update a per-item TTL.

Sounds good. I tried your idea and found a way to optionally add per-item TTL to an inserting value. We can do it without breaking the current API including get_with and insert.

The following is a draft of the new API. It only demonstrates insert method, but I think we can do the same thing for get_with, entry and_compute, etc.

use std::time::Duration;
use moka::sync::FakeCache;
// New trait to convert a value `V` into `CacheValue<V>`. You will need
// this `use` when you use `expires_after` method.
use moka::IntoCacheValue;

let mut cache = FakeCache::default(); // FakeCache<i32, &'static str>

// Insert a value "value1" that never expires.
cache.insert(1, "value1");

// Insert another value that expires after 5 minutes.
cache.insert(2, "value2".expires_after(Duration::from_secs(5 * 60)));

Here are the definitions of IntoCacheValue trait, etc.

pub mod moka {

    use std::time::Duration;

    /// `CacheValue<V>` stores a value `V` and optional expiration time in
    /// `std::time::Duration`.
    ///
    /// `Cache`'s write methods such as `get_with` and `insert` indirectly take
    /// `CacheValue<V>` as an argument via a conversion trait `IntoCacheValue<V>`.
    pub struct CacheValue<V> {
        value: V,
        #[allow(dead_code)]
        expires_after: Option<Duration>,
    }

    /// `IntoCacheValue<V>` is a conversion trait that converts a value into
    /// `CacheValue<V>`.
    pub trait IntoCacheValue<V> {
        /// Converts a value `Self` into a `CacheValue<V>`.
        fn into_val(self) -> CacheValue<V>;

        /// Converts a value `Self` into `CacheValue<V>` and set the optional
        /// expiration time.
        fn expires_after(self, duration: Duration) -> CacheValue<V>;
    }

    // Implement `IntoCacheValue<V>` for arbitrarily `V` type and `CacheValue<V>`.

    impl<V> IntoCacheValue<V> for V {
        fn into_val(self) -> CacheValue<V> {
            CacheValue {
                value: self,
                expires_after: None,
            }
        }

        fn expires_after(self, duration: Duration) -> CacheValue<V> {
            CacheValue {
                value: self,
                expires_after: Some(duration),
            }
        }
    }

    impl<V> IntoCacheValue<V> for CacheValue<V> {
        fn into_val(self) -> CacheValue<V> {
            self
        }

        fn expires_after(self, duration: Duration) -> CacheValue<V> {
            Self {
                expires_after: Some(duration),
                ..self
            }
        }
    }

    pub mod sync {

        use super::IntoCacheValue;
        use std::{collections::HashMap, hash::Hash};

        #[derive(Default)]
        pub struct FakeCache<K, V> {
            map: HashMap<K, V>,
        }

        impl<K: Eq + Hash, V> FakeCache<K, V> {
            pub fn insert(&mut self, key: K, value: impl IntoCacheValue<V>) {
                self.map.insert(key, value.into_val().value);
            }
        }
    }
}

@ben-manes
Copy link

The compute methods are incredibly useful and you should definitely provide them, if possible. The make writing concurrent code dramatically easier. So this is great!

fwiw, Caffeine's preferred compute methods are in the Cache.asMap() view as users are familiar with ConcurrentMap methods. Since the cache has many optional features, the common cache api hides things like expiration and tries to keep a low conceptual weight. The Cache.policy() has all of those ad hoc specialized methods that was useful to solve a user's problem, so it accrues methods that most users won't need or see. The compute with expiration was added to allow for Oracle Coherence to migrate from their homegrown cache and onto Caffeine's. If it was new code then they could have avoided some api design mistakes that required it. So that part of the api is noisy and a dumping ground, but has been a useful escape hatch.

I am more inclined towards callback evaluators for the entry's metadata (e.g. weigher, expiry) over having the call site supply this directly. The insert(k, v) having an infinite TTL is not obvious when maintaining code, whereas the callback centralizes the policy to be more safe to refactorings. I was also concerned an insert(k, v, duration) might incline someone to write racy code to extend a duration (read old TTL, add x time, update cache), so doing it within the computation was nicer. It also makes it easier to design the apis as fewer details have to be expressed, so if a new feature is added then it will less likely conflict with another api method.

The flaw is that it needs all of the context on the entry to extract from, e.g. the user might have to bloat their value with a field to calculate the cache's internal field from, which adds a little memory overhead. For that specialized compute, I was torn on the more simplistic direct approach as chosen, or some complex return type like in your example. The JCache's EntryProcessor api does that and it is a bit convoluted. I decided that since the policy api is already messy then instead of over engineering I could add it if a user had a justified need, reimplement the current methods onto that common one, and otherwise try to steer users towards the callback style. But since API design is hard, I can't claim it was the best choice and merely the was pragmatic one.

@Millione
Copy link

Sounds good. I tried your idea and found a way to optionally add per-item TTL to an inserting value. We can do it without breaking the current API including get_with and insert.

Is there a MVP version to use now?

@tatsuya6502 tatsuya6502 mentioned this issue Apr 6, 2023
11 tasks
@yannleretaille
Copy link

Out of necessity, I recently hacked together something I called or_try_insert_and_replace_with_if which is basically a try_and_compute_if, where in my case replace_if contains custom logic for calculating the expiration.

Looking back, I should probably fold that logic into the compute function, at which point it would be quite similar to your original proposal (initially I implemented this with two functions (init: Future, replace: FnOnce(V)>->Future) before I realized the ergonomics are better with a single one (init: FnOnce(Option<V>)->Future)). I agree with @ben-manes that some kind of callback expiry interface might preferable over doing this manually on a per-entry basis.

I can definitely attest to its usefulness in more complex, concurrent scenarios!

@tatsuya6502
Copy link
Member Author

I am implementing the compute methods by this pull request:

As for the per-entry expiration, I added it by #248 for v0.11.0 released in May 2023.

@tatsuya6502
Copy link
Member Author

I am reviewing #370 now and will merge it today. I will publish moka v0.12.3 hopefully in a few days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

5 participants