Skip to content

Commit

Permalink
[#20651] DocDB: Schedule resumption of contentious waiters on strand …
Browse files Browse the repository at this point in the history
…instead of reactors threads

Summary:
In 7d7a636, we introduced a codepath in which we defer resumption of contentious waiters and schedule them on reactor threads. This led to a fatal since IO operations (done as part of conflict resolution) aren't supposed to be executed on reactor threads.

In this existing implementation, we use the rpc threads and a custom `wait-queue` threadpool to run conflict resolution. This diff addresses the above issue by scheduling contentious waiter resumption on the same underlying `Scheduler::Impl::strand_` that is used for executing incoming rpc calls.
Jira: DB-9646

Test Plan:
Jenkins: urgent

1. Manually executed the test put up on the github issue.

2. Since the `ResumeContentiousWaiter` has a `DCHECK` in there asserting that IO is allowed, ran the below test
`./yb_build.sh tsan --cxx-test='TEST_F(PgWaitQueueContentionStressTest, TestResumeWaitersOnScheduler) {' --test-args --vmodule=wait_queue=1`.

grep for `wait_queue.*vlog.*on Scheduler`, and can see that the waiter is resumed on `Scheduler::Impl::strand_` and that the assertion does not fail.

Reviewers: rsami

Reviewed By: rsami

Subscribers: yql, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D31766
  • Loading branch information
basavaraj29 committed Jan 18, 2024
1 parent c9e8ca7 commit bf593a8
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 41 deletions.
65 changes: 30 additions & 35 deletions src/yb/docdb/wait_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include "yb/util/source_location.h"
#include "yb/util/status_format.h"
#include "yb/util/sync_point.h"
#include "yb/util/thread_restrictions.h"
#include "yb/util/trace.h"
#include "yb/util/unique_lock.h"

Expand Down Expand Up @@ -745,24 +746,26 @@ struct SerialWaiter {
}
};

void ResumeWaiterOnReactor(SerialWaiter to_resume, const std::string log_prefix) {
auto& waiter = to_resume.waiter;
VLOG(1) << log_prefix << "Resuming waiter " << waiter->LogPrefix() << "on reactor thread.";
void ResumeContentiousWaiter(const SerialWaiter& to_resume) {
DCHECK(PREDICT_TRUE(ThreadRestrictions::IsIOAllowed()))
<< "IO disallowed, but the thread might have to perform IO operations downstearm.";

auto start_time = MonoTime::Now();
auto deadline = waiter->deadline();
auto s = waiter->InvokeCallback(Status::OK(), to_resume.resolve_ht, deadline);
if (s.ok()) {
VLOG(4) << log_prefix << "Successfully executed waiter " << waiter->LogPrefix()
<< "callback on reactor thread.";
return;
}
auto& waiter = to_resume.waiter;
VLOG(1) << "Resuming waiter " << waiter->LogPrefix() << "on Scheduler.";

s = s.CloneAndAppend(Format("timeout: $0", deadline - ToCoarse(start_time)));
VLOG(1) << log_prefix << "Couldn't resume waiter on reactor thread: " << s;
DCHECK_OK(waiter->InvokeCallback(s));
auto start_time = MonoTime::Now();
auto deadline = waiter->deadline();
auto s = waiter->InvokeCallback(Status::OK(), to_resume.resolve_ht, deadline);
if (s.ok()) {
VLOG(4) << "Successfully executed waiter " << waiter->LogPrefix() << "callback on Scheduler.";
return;
}

s = s.CloneAndAppend(Format("timeout: $0", deadline - ToCoarse(start_time)));
VLOG(1) << "Couldn't resume waiter on Scheduler: " << s;
DCHECK_OK(waiter->InvokeCallback(s));
}

// Resumes waiters async, in serial, and in the order of the waiter's serial_no, running the lowest
// serial number first in a best effort manner.
class ResumedWaiterRunner {
Expand Down Expand Up @@ -879,16 +882,20 @@ class ResumedWaiterRunner {
if (is_shutting_down) {
// A shutdown request could have arrived after the waiter was popped off pq_, and we
// could have failed to invoke the waiter's callback due to inability to obtain the
// shared in-memory locks. Instead of scheduling the callback on a reactor thread,
// execute the callback in-line here.
// shared in-memory locks. Instead of scheduling the callback, execute the callback
// in-line here.
serial_waiter.waiter->InvokeCallbackOrWarn(kShuttingDownError);
} else {
auto status = ScheduleWaiterResumptionOnReactor(serial_waiter);
if (!status.ok()) {
// When unable to schedule waiter resumption on a reactor thread, invoke the waiter
// callback with the error status.
serial_waiter.waiter->InvokeCallbackOrWarn(status);
}
VLOG_WITH_PREFIX(1) << "Scheduling waiter " << serial_waiter.waiter->LogPrefix()
<< "resumption on the Tablet Server's Scheduler.";
messenger_->scheduler().Schedule([serial_waiter](const Status& s) {
if (!s.ok()) {
serial_waiter.waiter->InvokeCallbackOrWarn(
s.CloneAndPrepend("Failed scheduling contentious waiter resumption: "));
return;
}
ResumeContentiousWaiter(serial_waiter);
}, std::chrono::milliseconds(100));
}
}
}), "Failed to trigger poll of ResumedWaiterRunner in wait queue");
Expand All @@ -911,26 +918,14 @@ class ResumedWaiterRunner {
}
}

Status ScheduleWaiterResumptionOnReactor(const SerialWaiter& to_resume) {
auto& waiter = to_resume.waiter;
VLOG_WITH_PREFIX(1) << "Scheduling waiter " << waiter->LogPrefix()
<< "resumption on reactor thread.";
VERIFY_RESULT_PREPEND(
messenger_->ScheduleOnReactor(
std::bind(&ResumeWaiterOnReactor, to_resume, log_prefix_),
MonoDelta::FromMilliseconds(100), SOURCE_LOCATION()),
Format("Failed to schedule waiter resumption $0", waiter->LogPrefix()));
return Status::OK();
}

std::string LogPrefix() const {
return Format("ResumedWaiterRunner: $0", log_prefix_);
}

mutable rw_spinlock mutex_;
std::priority_queue<
SerialWaiter, std::vector<SerialWaiter>, SerialWaiter> pq_ GUARDED_BY(mutex_);
// Set of weak waiter data pointers scheduled for resumption on reactor threads.
// Set of weak waiter data pointers scheduled for resumption.
std::vector<std::weak_ptr<WaiterData>> contentious_waiters_ GUARDED_BY(mutex_);
ThreadPoolToken* thread_pool_token_;
bool shutting_down_ GUARDED_BY(mutex_) = false;
Expand Down
4 changes: 4 additions & 0 deletions src/yb/util/thread_restrictions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ bool ThreadRestrictions::SetIOAllowed(bool allowed) {
return previous_allowed;
}

bool ThreadRestrictions::IsIOAllowed() {
return LoadTLS()->io_allowed;
}

void ThreadRestrictions::AssertIOAllowed() {
CHECK(LoadTLS()->io_allowed)
<< "Function marked as IO-only was called from a thread that "
Expand Down
3 changes: 3 additions & 0 deletions src/yb/util/thread_restrictions.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class ThreadRestrictions {
// Returns the previous value.
static bool SetIOAllowed(bool allowed);

static bool IsIOAllowed();

// Check whether the current thread is allowed to make IO calls,
// and FATALs if not. See the block comment above the class for
// a discussion of where to add these checks.
Expand All @@ -123,6 +125,7 @@ class ThreadRestrictions {
// compiled out.
static bool SetIOAllowed(bool allowed) { return true; }
static void AssertIOAllowed() {}
static bool IsIOAllowed() { return true; }
static bool SetWaitAllowed(bool allowed) { return true; }
static void AssertWaitAllowed() {}
static bool IsWaitAllowed() { return true; }
Expand Down
12 changes: 6 additions & 6 deletions src/yb/yql/pgwrapper/pg_wait_on_conflict-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -947,12 +947,12 @@ TEST_F(PgWaitQueueContentionStressTest, YB_DISABLE_TEST_IN_TSAN(ConcurrentReader
PerformConcurrentReads();
}

// When a waiter fails to re-acquire the shared in-memory locks while being resumed from the
// thread running ResumedWaiterRunner (which resumes waiters serially), it is scheduled for
// retry on a reactor thread which attempts to re-acquire the shared in-memory locks until
// the request deadline passes. The below test simulates a contentious workload and helps
// assert the above, particularly in tsan mode.
TEST_F(PgWaitQueueContentionStressTest, TestResumeWaitersOnReactor) {
// When a waiter fails to re-acquire the shared in-memory locks while being resumed from the thread
// running ResumedWaiterRunner (which resumes waiters serially), it is scheduled for retry on the
// Tablet Server's Scheduler which attempts to re-acquire the shared in-memory locks until the
// request deadline passes. The below test simulates a contentious workload and helps assert the
// above, particularly in tsan mode.
TEST_F(PgWaitQueueContentionStressTest, TestResumeWaitersOnScheduler) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_wait_for_relock_unblocked_txn_keys_ms) = 100;
PerformConcurrentReads();
}
Expand Down

0 comments on commit bf593a8

Please sign in to comment.