Skip to content

Commit

Permalink
#821: Fix the race condition that causes the master to get stuck on a…
Browse files Browse the repository at this point in the history
…borting tasks.

Summary:
Fix the race condition in the master that's causing the master to get stuck when it re-elects itself as the leader.

Issue: As part of becoming a leader, the master will abort and wait for all (previously created) tasks.

  - There could have been items added to the pending_tasks_ after Abort; that are waited on but not Aborted.
  - Even the tasks that have been aborted, aren't immediately removed from pending_tasks_.

Fix include:
  - ensuring that once Abort is called with the intention of waiting (AbortAndClose) all further tasks that are added will be automatically aborted
  - An aborted task is immediately unregistered -- causing it to be promptly removed from the pending_tasks_ list.

Other Minor fixes:
  - fix race condition between Shutdown and RunLoaders in SysCatalog
  - follower won't reject election requests from current leader because he believes the leader to be in good standing.

Test Plan:
Jenkins +
Added a test to repro Kyle's scenario.

Reviewers: sergei, timur, bogdan

Reviewed By: bogdan

Subscribers: mikhail, ybase, bharat

Differential Revision: https://phabricator.dev.yugabyte.com/D6186
  • Loading branch information
amitanandaiyer committed Feb 21, 2019
1 parent 2b165b9 commit 1ae0df3
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 37 deletions.
7 changes: 6 additions & 1 deletion src/yb/consensus/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ TAG_FLAG(log_inject_latency_ms_stddev, unsafe);
DEFINE_int32(log_inject_append_latency_ms_max, 0,
"The maximum latency to inject before the log append operation.");

DEFINE_test_flag(bool, log_consider_all_ops_safe, false,
"If true, we consider all operations to be safe and will not wait"
"for the opId to apply to the local log. i.e. WaitForSafeOpIdToApply "
"becomes a noop.");

// Validate that log_min_segments_to_retain >= 1
static bool ValidateLogsToRetain(const char* flagname, int value) {
if (value >= 1) {
Expand Down Expand Up @@ -767,7 +772,7 @@ yb::OpId Log::GetLatestEntryOpId() const {
}

yb::OpId Log::WaitForSafeOpIdToApply(const yb::OpId& min_allowed) {
if (all_op_ids_safe_) {
if (FLAGS_log_consider_all_ops_safe || all_op_ids_safe_) {
return min_allowed;
}

Expand Down
9 changes: 6 additions & 3 deletions src/yb/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1848,8 +1848,10 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
<< request->candidate_uuid();
}

// If we've heard recently from the leader, then we should ignore the request.
// It might be from a "disruptive" server. This could happen in a few cases:
// If we've heard recently from the leader, then we should ignore the request
// (except if it is the leader itself requesting a vote -- something that might
// happen if the leader were to stepdown and call an election.). Otherwise,
// it might be from a "disruptive" server. This could happen in a few cases:
//
// 1) Network partitions
// If the leader can talk to a majority of the nodes, but is partitioned from a
Expand All @@ -1865,7 +1867,8 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
// See also https://ramcloud.stanford.edu/~ongaro/thesis.pdf
// section 4.2.3.
MonoTime now = MonoTime::Now();
if (!request->ignore_live_leader() && now < withhold_votes_until_) {
if (request->candidate_uuid() != state_->GetLeaderUuidUnlocked() &&
!request->ignore_live_leader() && now < withhold_votes_until_) {
return RequestVoteRespondLeaderIsAlive(request, response);
}

Expand Down
2 changes: 1 addition & 1 deletion src/yb/consensus/replica_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ Status ReplicaState::ApplyPendingOperationsUnlocked(IndexToRoundMap::iterator it

// Stop at the operation after the last one we must commit. This iterator by definition points to
// the first entry greater than the committed index, so the entry preceding that must have the
// OpId equal to commited_index.
// OpId equal to committed_index.
IndexToRoundMap::iterator end_iter;
auto status = CheckOperationExist(committed_index, &end_iter);
if (!status.ok()) {
Expand Down
1 change: 1 addition & 0 deletions src/yb/integration-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ ADD_YB_TEST(flush-test)
ADD_YB_TEST(ts_tablet_manager-itest)
ADD_YB_TEST(ts_recovery-itest)
ADD_YB_TEST(create-table-stress-test)
ADD_YB_TEST(master-partitioned-test)
ADD_YB_TEST(delete_table-test)
ADD_YB_TEST(external_mini_cluster-test)
ADD_YB_TEST(linked_list-test)
Expand Down
246 changes: 246 additions & 0 deletions src/yb/integration-tests/master-partitioned-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include <memory>
#include <thread>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <glog/stl_logging.h>
#include <gtest/gtest.h>

#include "yb/client/client.h"
#include "yb/common/schema.h"
#include "yb/common/wire_protocol.h"
#include "yb/fs/fs_manager.h"
#include "yb/integration-tests/cluster_itest_util.h"
#include "yb/integration-tests/mini_cluster.h"
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
#include "yb/master/master-test-util.h"
#include "yb/master/master.proxy.h"
#include "yb/master/mini_master.h"
#include "yb/rpc/messenger.h"
#include "yb/tserver/mini_tablet_server.h"
#include "yb/tserver/tablet_server.h"
#include "yb/util/stopwatch.h"
#include "yb/util/test_util.h"

using yb::client::YBClient;
using yb::client::YBClientBuilder;
using yb::client::YBColumnSchema;
using yb::client::YBSchema;
using yb::client::YBSchemaBuilder;
using yb::client::YBTableCreator;
using yb::client::YBTableName;
using yb::itest::CreateTabletServerMap;
using yb::itest::TabletServerMap;
using yb::master::MasterServiceProxy;
using yb::rpc::Messenger;
using yb::rpc::MessengerBuilder;
using yb::rpc::RpcController;

DECLARE_int32(heartbeat_interval_ms);
DECLARE_bool(log_preallocate_segments);
DECLARE_bool(log_consider_all_ops_safe);
DECLARE_bool(enable_remote_bootstrap);
DECLARE_int32(leader_failure_exp_backoff_max_delta_ms);
DECLARE_int32(tserver_unresponsive_timeout_ms);
DECLARE_int32(raft_heartbeat_interval_ms);
DECLARE_int32(slowdown_master_async_rpc_tasks_by_ms);
DECLARE_int32(unresponsive_ts_rpc_timeout_ms);
DECLARE_string(vmodule);

DEFINE_int32(num_test_tablets, 60, "Number of tablets for stress test");

using std::string;
using std::vector;
using std::thread;
using std::unique_ptr;
using strings::Substitute;

namespace yb {

class MasterPartitionedTest : public YBMiniClusterTestBase<MiniCluster> {
public:
MasterPartitionedTest() {}

void SetUp() override {
// Make heartbeats faster to speed test runtime.
FLAGS_heartbeat_interval_ms = 10;
FLAGS_raft_heartbeat_interval_ms = 200;
FLAGS_unresponsive_ts_rpc_timeout_ms = 10000; // 10 sec.

FLAGS_leader_failure_exp_backoff_max_delta_ms = 5000;
FLAGS_slowdown_master_async_rpc_tasks_by_ms = 100;
FLAGS_vmodule = "catalog_manager=2,async_rpc_tasks=2";

FLAGS_log_consider_all_ops_safe = true;

YBMiniClusterTestBase::SetUp();
MiniClusterOptions opts;
opts.num_tablet_servers = num_tservers_;
opts.num_masters = 3;
cluster_.reset(new MiniCluster(env_.get(), opts));
ASSERT_OK(cluster_->Start());

ASSERT_OK(cluster_->WaitForTabletServerCount(opts.num_tablet_servers));
ASSERT_OK(YBClientBuilder()
.add_master_server_addr(cluster_->mini_master(0)->bound_rpc_addr_str())
.add_master_server_addr(cluster_->mini_master(1)->bound_rpc_addr_str())
.add_master_server_addr(cluster_->mini_master(2)->bound_rpc_addr_str())
.Build(&client_));
}

Status BreakMasterConnectivityTo(int from_idx, int to_idx) {
master::MiniMaster* src_master = cluster_->mini_master(from_idx);
IpAddress src = VERIFY_RESULT(HostToAddress(src_master->bound_rpc_addr().host()));
// TEST_RpcAddress is 1-indexed; we expect from_idx/to_idx to be 0-indexed.
auto dst_prv = CHECK_RESULT(HostToAddress(TEST_RpcAddress(to_idx + 1, server::Private::kTrue)));
auto dst_pub =
CHECK_RESULT(HostToAddress(TEST_RpcAddress(to_idx + 1, server::Private::kFalse)));
LOG(INFO) << "Breaking connectivities from master " << from_idx << " to " << to_idx << " i.e. "
<< src << " to " << dst_prv << " and " << dst_pub;
src_master->master()->messenger()->BreakConnectivityTo(dst_prv);
src_master->master()->messenger()->BreakConnectivityTo(dst_pub);
return Status::OK();
}

Status RestoreMasterConnectivityTo(int from_idx, int to_idx) {
master::MiniMaster* src_master = cluster_->mini_master(from_idx);
IpAddress src = VERIFY_RESULT(HostToAddress(src_master->bound_rpc_addr().host()));
// TEST_RpcAddress is 1-indexed; we expect from_idx/to_idx to be 0-indexed.
auto dst_prv = CHECK_RESULT(HostToAddress(TEST_RpcAddress(to_idx + 1, server::Private::kTrue)));
auto dst_pub =
CHECK_RESULT(HostToAddress(TEST_RpcAddress(to_idx + 1, server::Private::kFalse)));
LOG(INFO) << "Restoring connectivities from master " << from_idx << " to " << to_idx << " i.e. "
<< src << " to " << dst_prv << " and " << dst_pub;
src_master->master()->messenger()->RestoreConnectivityTo(dst_prv);
src_master->master()->messenger()->RestoreConnectivityTo(dst_pub);
return Status::OK();
}

void DoTearDown() override {
FLAGS_slowdown_master_async_rpc_tasks_by_ms = 0;
SleepFor(MonoDelta::FromMilliseconds(1000));
cluster_->Shutdown();
}

void CreateTable(const YBTableName& table_name, int num_tablets);

void CheckLeaderMasterIsResponsive(int master_idx);

protected:
std::shared_ptr<YBClient> client_;
int32_t num_tservers_ = 5;
};

void MasterPartitionedTest::CheckLeaderMasterIsResponsive(int master_idx) {
master::MiniMaster* master = cluster_->mini_master(master_idx);
auto role = master->master()->catalog_manager()->Role();
if (role != consensus::RaftPeerPB::LEADER) {
LOG(ERROR) << "Master " << master_idx << " is not the leader. It is " << yb::ToString(role);
return;
}
// cluster_->leader_mini_master() will retry and wait until the the leader
// is ready to serve.
master::MiniMaster* leader_master = cluster_->leader_mini_master();
if (!leader_master) {
// We may be in an election storm. So if we are at least making progress wrt the
// error messages that we get (which contains the term/ready_term) we will not
// consider it as a failure.
Status leader_status_before, leader_status_after;
{
master::CatalogManager::ScopedLeaderSharedLock l(master->master()->catalog_manager());
if (!l.catalog_status().ok()) {
LOG(INFO) << "Catalog status is not ok. " << l.catalog_status();
return;
}
leader_status_before = l.leader_status();
}
if (leader_status_before.ok()) {
return;
}
SleepFor(MonoDelta::FromMilliseconds(2 * FLAGS_leader_failure_exp_backoff_max_delta_ms));
{
master::CatalogManager::ScopedLeaderSharedLock l(master->master()->catalog_manager());
if (!l.catalog_status().ok()) {
LOG(INFO) << "Catalog status is not ok. " << l.catalog_status();
return;
}
leader_status_after = l.leader_status();
}
if (leader_status_after.ok()) {
return;
}

LOG(INFO) << "Master leader is not ready. Looking for some progress "
<< " in " << yb::ToString(2 * FLAGS_leader_failure_exp_backoff_max_delta_ms) << " ms"
<< "\nleader status before sleep " << leader_status_before
<< "\nleader status after sleep " << leader_status_after;
CHECK(leader_status_before.ToString(false) != leader_status_after.ToString(false))
<< "Master leader is not ready. And not making progress even after "
<< yb::ToString(2 * FLAGS_leader_failure_exp_backoff_max_delta_ms) << " ms"
<< "\n leader status before sleep " << leader_status_before
<< "\n leader status after sleep " << leader_status_after;
}
}

void MasterPartitionedTest::CreateTable(const YBTableName& table_name, int num_tablets) {
ASSERT_OK(client_->CreateNamespaceIfNotExists(table_name.namespace_name()));
gscoped_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
master::ReplicationInfoPB replication_info;
replication_info.mutable_live_replicas()->set_num_replicas(3);
ASSERT_OK(table_creator->table_name(table_name)
.table_type(client::YBTableType::REDIS_TABLE_TYPE)
.num_tablets(num_tablets)
.wait(false)
.Create());
}

TEST_F(MasterPartitionedTest, CauseMasterLeaderStepdownWithTasksInProgress) {
DontVerifyClusterBeforeNextTearDown();

// Break connectivity so that :
// master 0 can make outgoing RPCs to 1 and 2.
// but 1 and 2 cannot do Outgoing rpcs.
// This should result in master 0 becoming the leader.
BreakMasterConnectivityTo(1, 0);
BreakMasterConnectivityTo(1, 2);
BreakMasterConnectivityTo(2, 1);
BreakMasterConnectivityTo(2, 0);

// Allow some time for master 0 to become the leader
SleepFor(MonoDelta::FromMilliseconds(4000));
ASSERT_OK(cluster_->WaitForTabletServerCount(num_tservers_));

YBTableName table_name("my_keyspace", "test_table");
ASSERT_NO_FATALS(CreateTable(table_name, FLAGS_num_test_tablets));
LOG(INFO) << "Created table successfully!";

for (int i = 0; i < 10; i++) {
LOG(INFO) << "iteration " << i;
// master-0 cannot send updates to master 2. This will cause master-2
// to increase its term. And cause the leader (master-0) to step down
// and re-elect himself
BreakMasterConnectivityTo(0, 2);

SleepFor(MonoDelta::FromMilliseconds(4000));
CheckLeaderMasterIsResponsive(0);

RestoreMasterConnectivityTo(0, 2);
// Give some time for the master to realize the higher term from master-2.
SleepFor(MonoDelta::FromMilliseconds(1000));
CheckLeaderMasterIsResponsive(0);
}
}

} // namespace yb
25 changes: 21 additions & 4 deletions src/yb/master/async_rpc_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
#include "yb/tserver/tserver_admin.proxy.h"

#include "yb/util/flag_tags.h"
#include "yb/util/logging.h"
#include "yb/util/format.h"
#include "yb/util/logging.h"
#include "yb/util/thread_restrictions.h"

DEFINE_int32(unresponsive_ts_rpc_timeout_ms, 60 * 60 * 1000, // 1 hour
"After this amount of time (or after we have retried unresponsive_ts_rpc_retry_limit "
Expand All @@ -41,6 +42,10 @@ DEFINE_int32(unresponsive_ts_rpc_retry_limit, 20,
"to perform operations such as deleting a tablet.");
TAG_FLAG(unresponsive_ts_rpc_retry_limit, advanced);

DEFINE_test_flag(
int32, slowdown_master_async_rpc_tasks_by_ms, 0,
"For testing purposes, slow down the run method to take longer.");

// The flags are defined in catalog_manager.cc.
DECLARE_int32(master_ts_rpc_timeout_ms);
DECLARE_int32(tablet_creation_timeout_ms);
Expand Down Expand Up @@ -123,8 +128,11 @@ RetryingTSRpcTask::RetryingTSRpcTask(Master *master,
Status RetryingTSRpcTask::Run() {
VLOG_WITH_PREFIX(1) << "Start Running";
auto task_state = state();
DCHECK(task_state == MonitoredTaskState::kWaiting || task_state == MonitoredTaskState::kAborted)
<< "State: " << ToString(task_state);
if (task_state == MonitoredTaskState::kAborted) {
UnregisterAsyncTask(); // May delete this.
return STATUS(IllegalState, "Unable to run task because it has been aborted");
}
DCHECK(task_state == MonitoredTaskState::kWaiting) << "State: " << ToString(task_state);

const Status s = ResetTSProxy();
if (!s.ok()) {
Expand All @@ -134,7 +142,6 @@ Status RetryingTSRpcTask::Run() {
} else if (state() == MonitoredTaskState::kAborted) {
UnregisterAsyncTask(); // May delete this.
return STATUS(IllegalState, "Unable to run task because it has been aborted");

} else {
LOG_WITH_PREFIX(FATAL) << "Failed to change task to MonitoredTaskState::kFailed state";
}
Expand All @@ -158,6 +165,14 @@ Status RetryingTSRpcTask::Run() {
return STATUS_FORMAT(IllegalState, "Task in invalid state $0", state());
}
}
if (PREDICT_FALSE(FLAGS_slowdown_master_async_rpc_tasks_by_ms > 0)) {
VLOG(1) << "Slowing down " << this->description() << " by "
<< FLAGS_slowdown_master_async_rpc_tasks_by_ms << " ms.";
bool old_thread_restriction = ThreadRestrictions::SetWaitAllowed(true);
SleepFor(MonoDelta::FromMilliseconds(FLAGS_slowdown_master_async_rpc_tasks_by_ms));
ThreadRestrictions::SetWaitAllowed(old_thread_restriction);
VLOG(2) << "Slowing down " << this->description() << " done. Resuming.";
}
if (!SendRequest(++attempt_)) {
if (!RescheduleWithBackoffDelay()) {
UnregisterAsyncTask(); // May call 'delete this'.
Expand All @@ -174,10 +189,12 @@ MonitoredTaskState RetryingTSRpcTask::AbortAndReturnPrevState() {
auto expected = prev_state;
if (state_.compare_exchange_weak(expected, MonitoredTaskState::kAborted)) {
AbortIfScheduled();
UnregisterAsyncTask();
return prev_state;
}
prev_state = state();
}
UnregisterAsyncTask();
return prev_state;
}

Expand Down
Loading

0 comments on commit 1ae0df3

Please sign in to comment.