Skip to content

Commit

Permalink
reader_concurrency_semaphore: unify admission logic across all paths
Browse files Browse the repository at this point in the history
The semaphore currently has two admission paths: the
obtain_permit()/with_permit() methods which admits permits on user
request (the front door) and the maybe_admit_waiters() which admits
permits based on internal events like memory resource being returned
(the back door). The two paths used their own admission conditions
and naturally this means that they diverged in time. Notably,
maybe_admit_waiters() did not look at inactive readers assuming that if
there are waiters there cannot be inactive readers. This is not true
however since we merged the execution-stage into the semaphore. Waiters
can queue up even when there are inactive reads and thus
maybe_admit_waiters() has to consider evicting some of them to see if
this would allow for admitting new reads.
To avoid such divergence in the future, the admission logic was moved
into a new method can_admit_read() which is now shared between the two
method families. This method now checks for the possibility of evicting
inactive readers as well.
The admission logic was tuned slightly to only consider evicting
inactive readers if there is a real possibility that this will result
in admissions: notably, before this patch, resource availability was
checked before stalls were (used permits == blocked permits), so we
could evict readers even if this couldn't help.
Because now eviction can be started from maybe_admit_waiters(), which is
also downstream from eviction, we added a flag to avoid recursive
evict -> maybe admit -> evict ... loops.

Fixes: scylladb#11770

Closes scylladb#11784
  • Loading branch information
denesb authored and avikivity committed Oct 18, 2022
1 parent df8e1da commit 7fbad8d
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 12 deletions.
52 changes: 40 additions & 12 deletions reader_concurrency_semaphore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -864,35 +864,58 @@ future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read
}

void reader_concurrency_semaphore::evict_readers_in_background() {
if (_evicting) {
return;
}
_evicting = true;
// Evict inactive readers in the background while wait list isn't empty
// This is safe since stop() closes _gate;
(void)with_gate(_close_readers_gate, [this] {
return do_until([this] { return _wait_list.empty() || _inactive_reads.empty(); }, [this] {
return detach_inactive_reader(_inactive_reads.front(), evict_reason::permit).close();
});
});
}
}).finally([this] { _evicting = false; });
}

reader_concurrency_semaphore::can_admit
reader_concurrency_semaphore::can_admit_read(const reader_permit& permit, require_empty_waitlist wait_list_empty) const noexcept {
if (wait_list_empty && !_wait_list.empty()) {
return can_admit::no;
}

if (!_ready_list.empty()) {
return can_admit::no;
}

if (!all_used_permits_are_stalled()) {
return can_admit::no;
}

if (!has_available_units(permit.base_resources())) {
if (_inactive_reads.empty()) {
return can_admit::no;
} else {
return can_admit::maybe;
}
}

return can_admit::yes;
}

future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, read_func func) {
if (!_execution_loop_future) {
_execution_loop_future.emplace(execution_loop());
}
if (!_wait_list.empty() || !_ready_list.empty()) {
return enqueue_waiter(std::move(permit), std::move(func));
}

if (!has_available_units(permit.base_resources())) {
const auto admit = can_admit_read(permit, require_empty_waitlist::yes);
if (admit != can_admit::yes) {
auto fut = enqueue_waiter(std::move(permit), std::move(func));
if (!_inactive_reads.empty()) {
if (admit == can_admit::maybe) {
evict_readers_in_background();
}
return fut;
}

if (!all_used_permits_are_stalled()) {
return enqueue_waiter(std::move(permit), std::move(func));
}

permit.on_admission();
++_stats.reads_admitted;
if (func) {
Expand All @@ -902,7 +925,8 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, r
}

void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
while (!_wait_list.empty() && _ready_list.empty() && has_available_units(_wait_list.front().permit.base_resources()) && all_used_permits_are_stalled()) {
auto admit = can_admit::no;
while (!_wait_list.empty() && (admit = can_admit_read(_wait_list.front().permit, require_empty_waitlist::no)) == can_admit::yes) {
auto& x = _wait_list.front();
try {
x.permit.on_admission();
Expand All @@ -917,6 +941,10 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
}
_wait_list.pop_front();
}
if (admit == can_admit::maybe) {
// Evicting readers will trigger another call to `maybe_admit_waiters()` from `signal()`.
evict_readers_in_background();
}
}

void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) {
Expand Down
10 changes: 10 additions & 0 deletions reader_concurrency_semaphore.hh
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private:
stats _stats;
permit_list_type _permit_list;
bool _stopped = false;
bool _evicting = false;
gate _close_readers_gate;
gate _permit_gate;
std::optional<future<>> _execution_loop_future;
Expand All @@ -202,6 +203,15 @@ private:
future<> enqueue_waiter(reader_permit permit, read_func func);
void evict_readers_in_background();
future<> do_wait_admission(reader_permit permit, read_func func = {});

// Check whether permit can be admitted or not.
// Caller can specify whether wait list should be empty or not for admission
// to be possible. can_admit::maybe means admission might be possible if some
// of the inactive readers are evicted.
enum class can_admit { no, maybe, yes };
using require_empty_waitlist = bool_class<class require_empty_waitlist_tag>;
can_admit can_admit_read(const reader_permit& permit, require_empty_waitlist wait_list_empty) const noexcept;

void maybe_admit_waiters() noexcept;

void on_permit_created(reader_permit::impl&);
Expand Down
79 changes: 79 additions & 0 deletions test/boost/reader_concurrency_semaphore_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -957,3 +957,82 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
handles.clear();
}
}

// Reproduces https://github.com/scylladb/scylladb/issues/11770
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_when_all_is_blocked) {
simple_schema ss;
const auto& s = *ss.schema();

const auto initial_resources = reader_concurrency_semaphore::resources{2, 32 * 1024};
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
auto stop_sem = deferred_stop(semaphore);

class read {
reader_permit _permit;
promise<> _read_started_pr;
future<> _read_started_fut;
promise<> _read_done_pr;
reader_permit::used_guard _ug;
std::optional<reader_permit::blocked_guard> _bg;

public:
explicit read(reader_permit p) : _permit(std::move(p)), _read_started_fut(_read_started_pr.get_future()), _ug(_permit) { }
future<> wait_read_started() { return std::move(_read_started_fut); }
void set_read_done() { _read_done_pr.set_value(); }
void mark_as_blocked() { _bg.emplace(_permit); }
void mark_as_unblocked() { _bg.reset(); }
reader_concurrency_semaphore::read_func get_read_func() {
return [this] (reader_permit permit) -> future<> {
_read_started_pr.set_value();
co_await _read_done_pr.get_future();
};
}
};

auto p1 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get();
auto irh1 = semaphore.register_inactive_read(make_empty_flat_reader_v2(ss.schema(), p1));

auto p2 = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout).get();
read rd2(p2);
auto fut2 = semaphore.with_ready_permit(p2, rd2.get_read_func());

// At this point we expect to have:
// * 1 inactive read (not evicted)
// * 1 used (but not blocked) read on the ready list
// * 1 waiter
// * no more count resources left
auto p3_fut = semaphore.obtain_permit(&s, get_name(), 1024, db::no_timeout);
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued, 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0);
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0);
BOOST_REQUIRE(irh1);

// Start the read emptying the ready list, this should not be enough to admit p3
rd2.wait_read_started().get();
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 0);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 0);
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0);
BOOST_REQUIRE(irh1);

// Marking p2 as blocked should now allow p3 to be admitted by evicting p1
rd2.mark_as_blocked();
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 0);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().used_permits, 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().blocked_permits, 1);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().inactive_reads, 0);
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 1);
BOOST_REQUIRE_EQUAL(semaphore.available_resources().count, 0);
BOOST_REQUIRE(!irh1);

p3_fut.get();
rd2.mark_as_unblocked();
rd2.set_read_done();
fut2.get();
}

0 comments on commit 7fbad8d

Please sign in to comment.