Skip to content

Commit

Permalink
loading_cache: implement a variation of least frequent recently used …
Browse files Browse the repository at this point in the history
…(LFRU) eviction policy

This patch implements a simple variation of LFRU eviction policy:
  * We define 2 dynamic cache partitions which total size should not exceed the maximum cache size.
  * New cache entry is always added to the "new generation" partition.
  * After a cache entry is read more than PartitionHitThreshold times it moves to the second cache partition.
  * Both partitions' entries obey expiration and reload rules as before this patch.
  * When cache entries need to be evicted due to a size restriction "new generation" partition
    least recently used entries are evicted first.

Fixes scylladb#8674

Signed-off-by: Vlad Zolotarov <vladz@scylladb.com>
  • Loading branch information
vladzcloudius committed Nov 4, 2021
1 parent 34cf92f commit 3cd19ca
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 31 deletions.
2 changes: 1 addition & 1 deletion alternator/auth.hh
Expand Up @@ -35,7 +35,7 @@ namespace alternator {

using hmac_sha256_digest = std::array<char, 32>;

using key_cache = utils::loading_cache<std::string, std::string>;
using key_cache = utils::loading_cache<std::string, std::string, 1>;

std::string get_signature(std::string_view access_key_id, std::string_view secret_access_key, std::string_view host, std::string_view method,
std::string_view orig_datestamp, std::string_view signed_headers_str, const std::map<std::string_view, std::string_view>& signed_headers_map,
Expand Down
1 change: 1 addition & 0 deletions auth/permissions_cache.hh
Expand Up @@ -67,6 +67,7 @@ class permissions_cache final {
using cache_type = utils::loading_cache<
std::pair<role_or_anonymous, resource>,
permission_set,
1,
utils::loading_cache_reload_enabled::yes,
utils::simple_entry_size<permission_set>,
utils::tuple_hash>;
Expand Down
1 change: 1 addition & 0 deletions cql3/authorized_prepared_statements_cache.hh
Expand Up @@ -115,6 +115,7 @@ private:
using checked_weak_ptr = typename statements::prepared_statement::checked_weak_ptr;
using cache_type = utils::loading_cache<cache_key_type,
checked_weak_ptr,
1,
utils::loading_cache_reload_enabled::yes,
authorized_prepared_statements_cache_size,
std::hash<cache_key_type>,
Expand Down
9 changes: 8 additions & 1 deletion cql3/prepared_statements_cache.hh
Expand Up @@ -102,7 +102,14 @@ public:

private:
using cache_key_type = typename prepared_cache_key_type::cache_key_type;
using cache_type = utils::loading_cache<cache_key_type, prepared_cache_entry, utils::loading_cache_reload_enabled::no, prepared_cache_entry_size, utils::tuple_hash, std::equal_to<cache_key_type>, prepared_cache_stats_updater>;
// Keep the entry in the "new generation" partition till 2 hits because
// every prepared statement is accessed at least twice in the cache:
// 1) During PREPARE
// 2) During EXECUTE
//
// Therefore a typical "pollution" (when a cache entry is used only once) would involve
// 2 cache hits.
using cache_type = utils::loading_cache<cache_key_type, prepared_cache_entry, 2, utils::loading_cache_reload_enabled::no, prepared_cache_entry_size, utils::tuple_hash, std::equal_to<cache_key_type>, prepared_cache_stats_updater>;
using cache_value_ptr = typename cache_type::value_ptr;
using checked_weak_ptr = typename statements::prepared_statement::checked_weak_ptr;

Expand Down
83 changes: 77 additions & 6 deletions test/boost/loading_cache_test.cc
Expand Up @@ -266,17 +266,24 @@ SEASTAR_TEST_CASE(test_loading_cache_loading_different_keys) {
SEASTAR_TEST_CASE(test_loading_cache_loading_expiry_eviction) {
return seastar::async([] {
using namespace std::chrono;
utils::loading_cache<int, sstring> loading_cache(num_loaders, 20ms, testlog);
utils::loading_cache<int, sstring, 1> loading_cache(num_loaders, 20ms, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });

prepare().get();

loading_cache.get_ptr(0, loader).discard_result().get();

// Check new generation eviction
BOOST_REQUIRE(loading_cache.size() == 1);
sleep(20ms).get();
REQUIRE_EVENTUALLY_EQUAL(loading_cache.size(), 0);

// Check old generation eviction
loading_cache.get_ptr(0, loader).discard_result().get();
BOOST_REQUIRE(loading_cache.find(0) != nullptr);

sleep(20ms).get();
REQUIRE_EVENTUALLY_EQUAL(loading_cache.find(0), nullptr);
REQUIRE_EVENTUALLY_EQUAL(loading_cache.size(), 0);
});
}

Expand Down Expand Up @@ -339,14 +346,31 @@ SEASTAR_TEST_CASE(test_loading_cache_move_item_to_mru_list_front_on_sync_op) {
});
}

SEASTAR_TEST_CASE(test_loading_cache_loading_reloading) {
SEASTAR_TEST_CASE(test_loading_cache_loading_reloading_old_gen) {
return seastar::async([] {
using namespace std::chrono;
load_count = 0;
utils::loading_cache<int, sstring, utils::loading_cache_reload_enabled::yes> loading_cache(num_loaders, 100ms, 20ms, testlog, loader);
utils::loading_cache<int, sstring, 1, utils::loading_cache_reload_enabled::yes> loading_cache(num_loaders, 100ms, 20ms, testlog, loader);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
prepare().get();
loading_cache.get_ptr(0, loader).discard_result().get();
// Push the entry into the old generation partition. Make sure it's being reloaded.
loading_cache.get_ptr(0).discard_result().get();
loading_cache.get_ptr(0).discard_result().get();
sleep(60ms).get();
BOOST_REQUIRE(eventually_true([&] { return load_count >= 3; }));
});
}

SEASTAR_TEST_CASE(test_loading_cache_loading_reloading_new_gen) {
return seastar::async([] {
using namespace std::chrono;
load_count = 0;
utils::loading_cache<int, sstring, 1, utils::loading_cache_reload_enabled::yes> loading_cache(num_loaders, 100ms, 20ms, testlog, loader);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
prepare().get();
// Load one entry into the new generation partition.
// Make sure it's reloaded.
loading_cache.get_ptr(0).discard_result().get();
sleep(60ms).get();
BOOST_REQUIRE(eventually_true([&] { return load_count >= 2; }));
});
Expand All @@ -370,11 +394,58 @@ SEASTAR_TEST_CASE(test_loading_cache_max_size_eviction) {
});
}

SEASTAR_TEST_CASE(test_loading_cache_max_size_eviction_new_gen_first) {
return seastar::async([] {
using namespace std::chrono;
load_count = 0;
utils::loading_cache<int, sstring, 1> loading_cache(4, 1h, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });

prepare().get();

// Touch the value with the key "-1" twice
loading_cache.get_ptr(-1, loader).discard_result().get();
loading_cache.find(-1);

for (int i = 0; i < num_loaders; ++i) {
loading_cache.get_ptr(i, loader).discard_result().get();
}

BOOST_REQUIRE_EQUAL(load_count, num_loaders + 1);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 4);
// Make sure that the value we touched twice is still in the cache
BOOST_REQUIRE(loading_cache.find(-1) != nullptr);
});
}

SEASTAR_TEST_CASE(test_loading_cache_eviction_new_gen) {
return seastar::async([] {
using namespace std::chrono;
load_count = 0;
utils::loading_cache<int, sstring, 1> loading_cache(4, 10ms, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });

prepare().get();

// Touch the value with the key "-1" twice
loading_cache.get_ptr(-1, loader).discard_result().get();
loading_cache.find(-1);

for (int i = 0; i < num_loaders; ++i) {
loading_cache.get_ptr(i, loader).discard_result().get();
}

// Make sure that the value we touched twice is eventually evicted
REQUIRE_EVENTUALLY_EQUAL(loading_cache.find(-1), nullptr);
REQUIRE_EVENTUALLY_EQUAL(loading_cache.size(), 0);
});
}

SEASTAR_TEST_CASE(test_loading_cache_reload_during_eviction) {
return seastar::async([] {
using namespace std::chrono;
load_count = 0;
utils::loading_cache<int, sstring, utils::loading_cache_reload_enabled::yes> loading_cache(1, 100ms, 10ms, testlog, loader);
utils::loading_cache<int, sstring, 0, utils::loading_cache_reload_enabled::yes> loading_cache(1, 100ms, 10ms, testlog, loader);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });

prepare().get();
Expand Down
96 changes: 73 additions & 23 deletions utils/loading_cache.hh
Expand Up @@ -30,6 +30,7 @@
#include <boost/intrusive/parent_from_member.hpp>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/join.hpp>

#include <seastar/core/seastar.hh>
#include <seastar/core/future-util.hh>
Expand Down Expand Up @@ -94,6 +95,7 @@ struct simple_entry_size {
/// \tparam Alloc elements allocator
template<typename Key,
typename Tp,
int PartitionHitThreshold = 0,
loading_cache_reload_enabled ReloadEnabled = loading_cache_reload_enabled::no,
typename EntrySize = simple_entry_size<Tp>,
typename Hash = std::hash<Key>,
Expand Down Expand Up @@ -240,7 +242,9 @@ public:
}

~loading_cache() {
_lru_list.erase_and_dispose(_lru_list.begin(), _lru_list.end(), [] (ts_value_lru_entry* ptr) { loading_cache::destroy_ts_value(ptr); });
auto value_destoyer = [] (ts_value_lru_entry* ptr) { loading_cache::destroy_ts_value(ptr); };
_new_gen_list.erase_and_dispose(_new_gen_list.begin(), _new_gen_list.end(), value_destoyer);
_lru_list.erase_and_dispose(_lru_list.begin(), _lru_list.end(), value_destoyer);
}

template <typename LoadFunc>
Expand Down Expand Up @@ -324,11 +328,15 @@ public:
void remove_if(Pred&& pred) {
static_assert(std::is_same<bool, std::result_of_t<Pred(const value_type&)>>::value, "Bad Pred signature");

_lru_list.remove_and_dispose_if([this, &pred] (const ts_value_lru_entry& v) {
auto cond_pred = [this, &pred] (const ts_value_lru_entry& v) {
return pred(v.timestamped_value().value());
}, [this] (ts_value_lru_entry* p) {
};
auto value_destroyer = [this] (ts_value_lru_entry* p) {
loading_cache::destroy_ts_value(p);
});
};

_new_gen_list.remove_and_dispose_if(cond_pred, value_destroyer);
_lru_list.remove_and_dispose_if(cond_pred, value_destroyer);
}

void remove(const Key& k) {
Expand All @@ -341,7 +349,7 @@ public:
}

size_t size() const {
return _lru_list.size();
return _lru_list.size() + _new_gen_list.size();
}

/// \brief returns the memory size the currently cached entries occupy according to the EntrySize predicate.
Expand All @@ -354,7 +362,9 @@ private:
if (!ts_ptr) {
return;
}
_lru_list.erase_and_dispose(_lru_list.iterator_to(*ts_ptr->lru_entry_ptr()), [this] (ts_value_lru_entry* p) { loading_cache::destroy_ts_value(p); });
ts_value_lru_entry* lru_entry_ptr = ts_ptr->lru_entry_ptr();
lru_list_type& entry_list = container_list(*lru_entry_ptr);
entry_list.erase_and_dispose(entry_list.iterator_to(*lru_entry_ptr), [this] (ts_value_lru_entry* p) { loading_cache::destroy_ts_value(p); });
}

timestamped_val_ptr ready_entry_ptr(timestamped_val_ptr tv_ptr) {
Expand All @@ -364,8 +374,8 @@ private:
return std::move(tv_ptr);
}

lru_list_type& container_list() noexcept {
return _lru_list;
lru_list_type& container_list(const ts_value_lru_entry& lru_entry_ptr) noexcept {
return (lru_entry_ptr.touch_count() > PartitionHitThreshold) ? _lru_list : _new_gen_list;
}

template<typename KeyType, typename KeyHasher, typename KeyEqual>
Expand All @@ -391,11 +401,25 @@ private:
/// Set the given item as the most recently used item.
/// The MRU item is going to be at the front of the _lru_list, the LRU item - at the back.
/// \param lru_entry Cache item that has been "touched"
void touch_lru_entry(ts_value_lru_entry& lru_entry) {
void touch_lru_entry_2_partitions(ts_value_lru_entry& lru_entry) {
if (lru_entry.is_linked()) {
_lru_list.erase(_lru_list.iterator_to(lru_entry));
lru_list_type& lru_list = container_list(lru_entry);
lru_list.erase(lru_list.iterator_to(lru_entry));
}

if (lru_entry.touch_count() < PartitionHitThreshold) {
_logger.trace("Putting key {} into the new generation partition", lru_entry.key());
_new_gen_list.push_front(lru_entry);
lru_entry.inc_touch_count();
} else {
_logger.trace("Putting key {} into the old generation partition", lru_entry.key());
_lru_list.push_front(lru_entry);

// Bump it up only once to avoid a wrap around
if (lru_entry.touch_count() == PartitionHitThreshold) {
lru_entry.inc_touch_count();
}
}
_lru_list.push_front(lru_entry);
}

future<> reload(timestamped_val_ptr ts_value_ptr) {
Expand Down Expand Up @@ -435,7 +459,7 @@ private:

void drop_expired() {
auto now = loading_cache_clock_type::now();
_lru_list.remove_and_dispose_if([now, this] (const ts_value_lru_entry& lru_entry) {
auto expiration_cond = [now, this] (const ts_value_lru_entry& lru_entry) {
using namespace std::chrono;
// An entry should be discarded if it hasn't been reloaded for too long or nobody cares about it anymore
const timestamped_val& v = lru_entry.timestamped_value();
Expand All @@ -446,15 +470,27 @@ private:
return true;
}
return false;
}, [this] (ts_value_lru_entry* p) {
};
auto value_destroyer = [this] (ts_value_lru_entry* p) {
loading_cache::destroy_ts_value(p);
});
};

_new_gen_list.remove_and_dispose_if(expiration_cond, value_destroyer);
_lru_list.remove_and_dispose_if(expiration_cond, value_destroyer);
}

// Shrink the cache to the _max_size discarding the least recently used items
// Shrink the cache to the _max_size discarding the least recently used items.
// Get rid from the entries that were used exactly once first.
void shrink() {
using namespace std::chrono;

while (_current_size > _max_size && !_new_gen_list.empty()) {
ts_value_lru_entry& lru_entry = *_new_gen_list.rbegin();
_logger.trace("shrink(): {}: dropping the new generation entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
loading_cache::destroy_ts_value(&lru_entry);
}

while (_current_size > _max_size) {
using namespace std::chrono;
ts_value_lru_entry& lru_entry = *_lru_list.rbegin();
_logger.trace("shrink(): {}: dropping the entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
loading_cache::destroy_ts_value(&lru_entry);
Expand Down Expand Up @@ -489,7 +525,7 @@ private:
// Future is waited on indirectly in `stop()` (via `_timer_reads_gate`).
// FIXME: error handling
(void)with_gate(_timer_reads_gate, [this] {
auto to_reload = boost::copy_range<utils::chunked_vector<timestamped_val_ptr>>(_lru_list
auto to_reload = boost::copy_range<utils::chunked_vector<timestamped_val_ptr>>(boost::range::join(_new_gen_list, _lru_list)
| boost::adaptors::filtered([this] (ts_value_lru_entry& lru_entry) {
return lru_entry.timestamped_value().loaded() + _refresh < loading_cache_clock_type::now();
})
Expand All @@ -509,6 +545,7 @@ private:

loading_values_type _loading_values;
lru_list_type _lru_list;
lru_list_type _new_gen_list;
size_t _current_size = 0;
size_t _max_size = 0;
std::chrono::milliseconds _expiry;
Expand All @@ -520,8 +557,8 @@ private:
seastar::gate _timer_reads_gate;
};

template<typename Key, typename Tp, loading_cache_reload_enabled ReloadEnabled, typename EntrySize, typename Hash, typename EqualPred, typename LoadingSharedValuesStats, typename Alloc>
class loading_cache<Key, Tp, ReloadEnabled, EntrySize, Hash, EqualPred, LoadingSharedValuesStats, Alloc>::timestamped_val::value_ptr {
template<typename Key, typename Tp, int PartitionHitThreshold, loading_cache_reload_enabled ReloadEnabled, typename EntrySize, typename Hash, typename EqualPred, typename LoadingSharedValuesStats, typename Alloc>
class loading_cache<Key, Tp, PartitionHitThreshold, ReloadEnabled, EntrySize, Hash, EqualPred, LoadingSharedValuesStats, Alloc>::timestamped_val::value_ptr {
private:
using loading_values_type = typename timestamped_val::loading_values_type;

Expand Down Expand Up @@ -551,8 +588,8 @@ public:
};

/// \brief This is and LRU list entry which is also an anchor for a loading_cache value.
template<typename Key, typename Tp, loading_cache_reload_enabled ReloadEnabled, typename EntrySize, typename Hash, typename EqualPred, typename LoadingSharedValuesStats, typename Alloc>
class loading_cache<Key, Tp, ReloadEnabled, EntrySize, Hash, EqualPred, LoadingSharedValuesStats, Alloc>::timestamped_val::lru_entry : public safe_link_list_hook {
template<typename Key, typename Tp, int PartitionHitThreshold, loading_cache_reload_enabled ReloadEnabled, typename EntrySize, typename Hash, typename EqualPred, typename LoadingSharedValuesStats, typename Alloc>
class loading_cache<Key, Tp, PartitionHitThreshold, ReloadEnabled, EntrySize, Hash, EqualPred, LoadingSharedValuesStats, Alloc>::timestamped_val::lru_entry : public safe_link_list_hook {
private:
using loading_values_type = typename timestamped_val::loading_values_type;

Expand All @@ -563,19 +600,32 @@ public:
private:
timestamped_val_ptr _ts_val_ptr;
loading_cache& _parent;
int _touch_count;

public:
lru_entry(timestamped_val_ptr ts_val, loading_cache& owner_cache)
: _ts_val_ptr(std::move(ts_val))
, _parent(owner_cache)
, _touch_count(0)
{
// We don't want to allow PartitionHitThreshold to be greater than half the max value of _touch_count to avoid a wrap around
static_assert(PartitionHitThreshold <= std::numeric_limits<typeof(_touch_count)>::max() / 2, "PartitionHitThreshold value is too big");

_ts_val_ptr->set_anchor_back_reference(this);
cache_size() += _ts_val_ptr->size();
}

void inc_touch_count() noexcept {
++_touch_count;
}

int touch_count() const noexcept {
return _touch_count;
}

~lru_entry() {
if (safe_link_list_hook::is_linked()) {
lru_list_type& lru_list = _parent.container_list();
lru_list_type& lru_list = _parent.container_list(*this);
lru_list.erase(lru_list.iterator_to(*this));
}
cache_size() -= _ts_val_ptr->size();
Expand All @@ -587,7 +637,7 @@ public:
}

void touch() noexcept {
_parent.touch_lru_entry(*this);
_parent.touch_lru_entry_2_partitions(*this);
}

const Key& key() const noexcept {
Expand Down

0 comments on commit 3cd19ca

Please sign in to comment.