Skip to content

Commit

Permalink
ENG-3861: #505: Fix check that reactor could accept new tasks
Browse files Browse the repository at this point in the history
Summary:
Was not able to reproduce the issue, but found at least one scenario that could happen.
Reactor is getting shut down in two phases, initially it goes to closing state, then to closed.
But new tasks could be added in closing state, that is wrong.

Fixed this case.

Added info about source line, that created reactor task.
Dump this info when scheduled tasks is not empty during messenger shutdown.

Test Plan: Jenkins

Reviewers: amitanand, timur, mikhail

Reviewed By: timur, mikhail

Subscribers: bogdan, bharat, kannan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D5592
  • Loading branch information
spolitov committed Oct 19, 2018
1 parent db3b14a commit 685a507
Show file tree
Hide file tree
Showing 25 changed files with 254 additions and 155 deletions.
6 changes: 4 additions & 2 deletions src/yb/client/tablet_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ bool TabletInvoker::Done(Status* status) {
FailToNewReplica(*status, rpc_->response_error());
} else {
auto retry_status = retrier_->DelayedRetry(command_, *status);
LOG_IF(DFATAL, !retry_status.ok()) << "Retry failed: " << retry_status;
if (!retry_status.ok()) {
command_->Finished(retry_status);
}
}
return false;
}
Expand Down Expand Up @@ -358,7 +360,7 @@ void TabletInvoker::LookupTabletCb(const Result<RemoteTabletPtr>& result) {
auto retry_status = retrier_->DelayedRetry(
command_, result.ok() ? Status::OK() : result.status());
if (!retry_status.ok()) {
command_->Finished(result.status());
command_->Finished(!result.ok() ? result.status() : retry_status);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/yb/client/tablet_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class TabletInvoker {
virtual ~TabletInvoker();

void Execute(const std::string& tablet_id, bool leader_only = false);

// Returns true when whole operation is finished, false otherwise.
bool Done(Status* status);

bool IsLocalCall() const;
Expand Down
4 changes: 1 addition & 3 deletions src/yb/integration-tests/full_stack-insert-scan-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,7 @@ void FullStackInsertScanTest::DoConcurrentClientInserts() {
kNumRows, kNumInsertClients)) {
start_latch.CountDown();
for (const scoped_refptr<Thread>& thread : threads) {
ASSERT_OK(ThreadJoiner(thread.get())
.warn_every_ms(15000)
.Join());
ASSERT_OK(ThreadJoiner(thread.get()).warn_every(15s).Join());
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/yb/integration-tests/update_scan_delta_compact-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,7 @@ void UpdateScanDeltaCompactionTest::RunThreads() {
stop_latch.CountDown();

for (const scoped_refptr<Thread>& thread : threads) {
ASSERT_OK(ThreadJoiner(thread.get())
.warn_every_ms(500)
.Join());
ASSERT_OK(ThreadJoiner(thread.get()).warn_every(500ms).Join());
}
}

Expand Down
15 changes: 12 additions & 3 deletions src/yb/master/async_rpc_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ RetryingTSRpcTask::RetryingTSRpcTask(Master *master,
// Send the subclass RPC request.
Status RetryingTSRpcTask::Run() {
VLOG_WITH_PREFIX(1) << "Start Running";
DCHECK(state() == MonitoredTaskState::kWaiting || state() == MonitoredTaskState::kAborted);
auto task_state = state();
DCHECK(task_state == MonitoredTaskState::kWaiting || task_state == MonitoredTaskState::kAborted)
<< "State: " << ToString(task_state);

const Status s = ResetTSProxy();
if (!s.ok()) {
Expand Down Expand Up @@ -257,9 +259,16 @@ bool RetryingTSRpcTask::RescheduleWithBackoffDelay() {
LOG_WITH_PREFIX(WARNING) << "Unable to mark this task as MonitoredTaskState::kScheduling";
return false;
}
reactor_task_id_ = master_->messenger()->ScheduleOnReactor(
auto task_id = master_->messenger()->ScheduleOnReactor(
std::bind(&RetryingTSRpcTask::RunDelayedTask, shared_from(this), _1),
MonoDelta::FromMilliseconds(delay_millis), master_->messenger());
MonoDelta::FromMilliseconds(delay_millis), SOURCE_LOCATION(), master_->messenger());
reactor_task_id_.store(task_id, std::memory_order_release);

if (task_id == rpc::kInvalidTaskId) {
AbortTask();
UnregisterAsyncTask();
return false;
}

if (!PerformStateTransition(MonitoredTaskState::kScheduling, MonitoredTaskState::kWaiting)) {
// The only valid reason for state not being MonitoredTaskState is because the task got
Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/async_rpc_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class RetryingTSRpcTask : public MonitoredTask {
MonitoredTaskState AbortAndReturnPrevState() override;

MonitoredTaskState state() const override {
return state_.load();
return state_.load(std::memory_order_acquire);
}

MonoTime start_timestamp() const override { return start_ts_; }
Expand Down
21 changes: 12 additions & 9 deletions src/yb/rpc/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "yb/util/string_util.h"

using namespace std::literals;
using namespace std::placeholders;
using std::shared_ptr;
using std::vector;
using strings::Substitute;
Expand Down Expand Up @@ -146,10 +147,11 @@ void Connection::OutboundQueued() {
auto status = stream_->TryWrite();
if (!status.ok()) {
VLOG_WITH_PREFIX(1) << "Write failed: " << status;
reactor_->ScheduleReactorTask(
auto scheduled = reactor_->ScheduleReactorTask(
MakeFunctorReactorTask(
std::bind(&Reactor::DestroyConnection, reactor_, this, status),
shared_from_this()));
shared_from_this(), SOURCE_LOCATION()));
LOG_IF_WITH_PREFIX(WARNING, !scheduled) << "Failed to schedule destroy";
}
}

Expand Down Expand Up @@ -351,26 +353,27 @@ void Connection::QueueOutboundData(OutboundDataPtr outbound_data) {
std::unique_lock<simple_spinlock> lock(outbound_data_queue_lock_);
if (!shutdown_status_.ok()) {
auto task = MakeFunctorReactorTaskWithAbort(
std::bind(&OutboundData::Transferred,
outbound_data,
std::placeholders::_2,
/* conn */ nullptr));
std::bind(&OutboundData::Transferred, outbound_data, _2, /* conn */ nullptr),
SOURCE_LOCATION());
lock.unlock();
reactor_->ScheduleReactorTask(task);
auto scheduled = reactor_->ScheduleReactorTask(task, true /* schedule_even_closing */);
LOG_IF_WITH_PREFIX(DFATAL, !scheduled) << "Failed to schedule OutboundData::Transferred";
return;
}
was_empty = outbound_data_to_process_.empty();
outbound_data_to_process_.push_back(std::move(outbound_data));
if (was_empty && !process_response_queue_task_) {
process_response_queue_task_ =
MakeFunctorReactorTask(std::bind(&Connection::ProcessResponseQueue, this),
shared_from_this());
shared_from_this(), SOURCE_LOCATION());
}
}

if (was_empty) {
// TODO: what happens if the reactor is shutting down? Currently Abort is ignored.
reactor_->ScheduleReactorTask(process_response_queue_task_);
auto scheduled = reactor_->ScheduleReactorTask(process_response_queue_task_);
LOG_IF_WITH_PREFIX(WARNING, !scheduled)
<< "Failed to schedule Connection::ProcessResponseQueue";
}
}

Expand Down
29 changes: 20 additions & 9 deletions src/yb/rpc/messenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ void Messenger::Shutdown() {

{
std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
DCHECK(scheduled_tasks_.empty());
LOG_IF(DFATAL, !scheduled_tasks_.empty())
<< "Scheduled tasks is not empty after messenger shutdown: "
<< yb::ToString(scheduled_tasks_);
}
}

Expand Down Expand Up @@ -262,11 +264,13 @@ void Messenger::BreakConnectivityWith(const IpAddress& address) {
if (broken_connectivity_.insert(address).second) {
latch.reset(new CountDownLatch(reactors_.size()));
for (auto* reactor : reactors_) {
reactor->ScheduleReactorTask(MakeFunctorReactorTask(
auto scheduled = reactor->ScheduleReactorTask(MakeFunctorReactorTask(
[&latch, address](Reactor* reactor) {
reactor->DropWithRemoteAddress(address);
latch->CountDown();
}));
}, SOURCE_LOCATION()));
// Ok to use check here, because this functionality is used only in tests.
CHECK(scheduled);
}
}
}
Expand Down Expand Up @@ -344,7 +348,8 @@ Status Messenger::UnregisterService(const string& service_name) {

class NotifyDisconnectedReactorTask : public ReactorTask {
public:
explicit NotifyDisconnectedReactorTask(OutboundCallPtr call) : call_(std::move(call)) {}
NotifyDisconnectedReactorTask(OutboundCallPtr call, const SourceLocation& source_location)
: ReactorTask(source_location), call_(std::move(call)) {}

void Run(Reactor* reactor) override {
call_->Transferred(STATUS_FORMAT(
Expand All @@ -365,7 +370,10 @@ void Messenger::QueueOutboundCall(OutboundCallPtr call) {

if (IsArtificiallyDisconnectedFrom(remote.address())) {
LOG(INFO) << "TEST: Rejected connection to " << remote;
reactor->ScheduleReactorTask(std::make_shared<NotifyDisconnectedReactorTask>(std::move(call)));
auto scheduled = reactor->ScheduleReactorTask(std::make_shared<NotifyDisconnectedReactorTask>(
std::move(call), SOURCE_LOCATION()));
// Ok to use check here, because this functionality is used only in tests.
CHECK(scheduled);
return;
}

Expand Down Expand Up @@ -481,10 +489,11 @@ Status Messenger::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
return Status::OK();
}

Status Messenger::QueueEventOnAllReactors(ServerEventListPtr server_event) {
Status Messenger::QueueEventOnAllReactors(
ServerEventListPtr server_event, const SourceLocation& source_location) {
shared_lock<rw_spinlock> guard(lock_.get_lock());
for (Reactor* reactor : reactors_) {
reactor->QueueEventOnAllConnections(server_event);
reactor->QueueEventOnAllConnections(server_event, source_location);
}
return Status::OK();
}
Expand Down Expand Up @@ -514,7 +523,8 @@ void Messenger::AbortOnReactor(ScheduledTaskId task_id) {
}

ScheduledTaskId Messenger::ScheduleOnReactor(
StatusFunctor func, MonoDelta when, const shared_ptr<Messenger>& msgr) {
StatusFunctor func, MonoDelta when, const SourceLocation& source_location,
const shared_ptr<Messenger>& msgr) {
DCHECK(!reactors_.empty());

// If we're already running on a reactor thread, reuse it.
Expand All @@ -533,7 +543,8 @@ ScheduledTaskId Messenger::ScheduleOnReactor(
if (msgr != nullptr) {
task_id = next_task_id_.fetch_add(1);
}
auto task = std::make_shared<DelayedTask>(std::move(func), when, task_id, msgr);
auto task = std::make_shared<DelayedTask>(
std::move(func), when, task_id, source_location, msgr);
if (msgr != nullptr) {
std::lock_guard<std::mutex> guard(mutex_scheduled_tasks_);
scheduled_tasks_.emplace(task_id, task);
Expand Down
10 changes: 6 additions & 4 deletions src/yb/rpc/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ class Messenger : public ProxyContext {

const Protocol* DefaultProtocol() override { return listen_protocol_; }

CHECKED_STATUS QueueEventOnAllReactors(ServerEventListPtr server_event);
CHECKED_STATUS QueueEventOnAllReactors(
ServerEventListPtr server_event, const SourceLocation& source_location);

// Dump the current RPCs into the given protobuf.
CHECKED_STATUS DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
Expand All @@ -215,9 +216,10 @@ class Messenger : public ProxyContext {
//
// The status argument conveys whether 'func' was run correctly (i.e. after the elapsed time) or
// not.
ScheduledTaskId ScheduleOnReactor(StatusFunctor func,
MonoDelta when,
const std::shared_ptr<Messenger>& msgr = nullptr);
MUST_USE_RESULT ScheduledTaskId ScheduleOnReactor(
StatusFunctor func, MonoDelta when,
const SourceLocation& source_location,
const std::shared_ptr<Messenger>& msgr);

std::string name() const {
return name_;
Expand Down
9 changes: 5 additions & 4 deletions src/yb/rpc/mt-rpc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ METRIC_DECLARE_counter(rpcs_queue_overflow);
using std::string;
using std::shared_ptr;
using strings::Substitute;
using namespace std::literals;

namespace yb {
namespace rpc {
Expand Down Expand Up @@ -91,7 +92,7 @@ class MultiThreadedRpcTest : public RpcTestBase {
};

static void AssertShutdown(yb::Thread* thread, const Status* status) {
ASSERT_OK(ThreadJoiner(thread).warn_every_ms(500).Join());
ASSERT_OK(ThreadJoiner(thread).warn_every(500ms).Join());
string msg = status->ToString();
ASSERT_TRUE(msg.find("Service unavailable") != string::npos ||
msg.find("Network error") != string::npos)
Expand Down Expand Up @@ -147,7 +148,7 @@ TEST_F(MultiThreadedRpcTest, TestShutdownClientWhileCallsPending) {
client_messenger->Shutdown();
client_messenger.reset();

ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join());
ASSERT_OK(ThreadJoiner(thread.get()).warn_every(500ms).Join());
ASSERT_TRUE(status.IsAborted() ||
status.IsServiceUnavailable());
string msg = status.ToString();
Expand Down Expand Up @@ -216,7 +217,7 @@ TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) {
server_messenger->Shutdown();

for (const auto& thread : threads) {
ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join());
ASSERT_OK(ThreadJoiner(thread.get()).warn_every(500ms).Join());
}

// Verify that one error was due to backpressure.
Expand Down Expand Up @@ -280,7 +281,7 @@ TEST_F(MultiThreadedRpcTest, TestShutdownWithIncomingConnections) {
server().Shutdown();

for (scoped_refptr<yb::Thread>& t : threads) {
ASSERT_OK(ThreadJoiner(t.get()).warn_every_ms(500).Join());
ASSERT_OK(ThreadJoiner(t.get()).warn_every(500ms).Join());
}
}

Expand Down
28 changes: 18 additions & 10 deletions src/yb/rpc/reactor-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "yb/util/countdown_latch.h"

using std::shared_ptr;
using namespace std::literals;
using namespace std::placeholders;

namespace yb {
Expand Down Expand Up @@ -68,9 +69,10 @@ class ReactorTest : public RpcTestBase {
}

void ScheduledTaskScheduleAgain(const Status& status) {
messenger_->ScheduleOnReactor(
auto task_id = messenger_->ScheduleOnReactor(
std::bind(&ReactorTest::ScheduledTaskCheckThread, this, _1, Thread::current_thread()),
MonoDelta::FromMilliseconds(0));
0s, SOURCE_LOCATION(), nullptr /* messenger */);
ASSERT_EQ(task_id, 0);
latch_.CountDown();
}

Expand All @@ -80,26 +82,30 @@ class ReactorTest : public RpcTestBase {
};

TEST_F(ReactorTest, TestFunctionIsCalled) {
messenger_->ScheduleOnReactor(
std::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()), MonoDelta::FromSeconds(0));
auto task_id = messenger_->ScheduleOnReactor(
std::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()), 0s,
SOURCE_LOCATION(), nullptr /* messenger */);
ASSERT_EQ(task_id, 0);
latch_.Wait();
}

TEST_F(ReactorTest, TestFunctionIsCalledAtTheRightTime) {
MonoTime before = MonoTime::Now();
messenger_->ScheduleOnReactor(
auto task_id = messenger_->ScheduleOnReactor(
std::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()),
MonoDelta::FromMilliseconds(100));
100ms, SOURCE_LOCATION(), nullptr /* messenger */);
ASSERT_EQ(task_id, 0);
latch_.Wait();
MonoTime after = MonoTime::Now();
MonoDelta delta = after.GetDeltaSince(before);
CHECK_GE(delta.ToMilliseconds(), 100);
}

TEST_F(ReactorTest, TestFunctionIsCalledIfReactorShutdown) {
messenger_->ScheduleOnReactor(
auto task_id = messenger_->ScheduleOnReactor(
std::bind(&ReactorTest::ScheduledTask, this, _1, STATUS(Aborted, "doesn't matter")),
MonoDelta::FromSeconds(60));
60s, SOURCE_LOCATION(), nullptr /* messenger */);
ASSERT_EQ(task_id, 0);
messenger_->Shutdown();
latch_.Wait();
}
Expand All @@ -108,8 +114,10 @@ TEST_F(ReactorTest, TestReschedulesOnSameReactorThread) {
// Our scheduled task will schedule yet another task.
latch_.Reset(2);

messenger_->ScheduleOnReactor(
std::bind(&ReactorTest::ScheduledTaskScheduleAgain, this, _1), MonoDelta::FromSeconds(0));
auto task_id = messenger_->ScheduleOnReactor(
std::bind(&ReactorTest::ScheduledTaskScheduleAgain, this, _1), 0s,
SOURCE_LOCATION(), nullptr /* messenger */);
ASSERT_EQ(task_id, 0);
latch_.Wait();
latch_.Wait();
}
Expand Down

0 comments on commit 685a507

Please sign in to comment.