Skip to content

Commit

Permalink
[Backport D8645 to 2.1.8] [#4610, #4302] Handle the case when a table…
Browse files Browse the repository at this point in the history
… is deleted during transaction aware snapshot creation

Summary:
When the table is not running async RPC task would not be able to reset the tablet server proxy.
But such a case is handled incorrectly and operation just retries after the timeout.

This diff fixes the issue.

Also fixed issue when a new master leader could miss changes performed by the previous leaders.

Test Plan:
ybd --gtest_filter BackupTxnTest.DeleteTableWithMastersRestart
ybd --gtest_filter BackupTxnTest.CompleteAndBounceMaster

Jenkins: auto rebase: no

Reviewers: mikhail, oleg, amitanand, hector, bogdan

Reviewed By: bogdan

Subscribers: ybase, kannan

Differential Revision: https://phabricator.dev.yugabyte.com/D8683
  • Loading branch information
spolitov committed Jun 16, 2020
1 parent 9ba2e7a commit 16b0e59
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 42 deletions.
9 changes: 4 additions & 5 deletions ent/src/yb/master/async_snapshot_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,11 @@ void AsyncTabletSnapshotOp::Finished(const Status& status) {
return;
}
if (resp_.has_error()) {
if (!tablet_->table()->is_running()) {
callback_(STATUS_FORMAT(
Expired, "Table is not running: $0", tablet_->table()->ToStringWithState()));
} else {
callback_(StatusFromPB(resp_.error().status()));
auto status = tablet_->CheckRunning();
if (status.ok()) {
status = StatusFromPB(resp_.error().status());
}
callback_(status);
} else {
callback_(&resp_);
}
Expand Down
2 changes: 2 additions & 0 deletions ent/src/yb/master/master_backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ message SysSnapshotEntryPB {
repeated SysRowEntry entries = 3;

optional fixed64 snapshot_hybrid_time = 4;

optional int64 version = 5;
}

////////////////////////////////////////////////////////////
Expand Down
37 changes: 36 additions & 1 deletion src/yb/client/backup-txn-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class BackupTxnTest : public TransactionTestBase {
void SetUp() override {
FLAGS_enable_history_cutoff_propagation = true;
SetIsolationLevel(IsolationLevel::SNAPSHOT_ISOLATION);
mini_cluster_opt_.num_masters = 3;
TransactionTestBase::SetUp();
}

Expand Down Expand Up @@ -263,6 +264,8 @@ class BackupTxnTest : public TransactionTestBase {

return true;
}

void TestDeleteTable(bool restart_masters);
};

TEST_F(BackupTxnTest, Simple) {
Expand Down Expand Up @@ -460,6 +463,22 @@ TEST_F(BackupTxnTest, Restart) {
ASSERT_OK(WaitSnapshotInState(snapshot_id, SysSnapshotEntryPB::COMPLETE, 1s));
}

TEST_F(BackupTxnTest, CompleteAndBounceMaster) {
ASSERT_NO_FATALS(WriteData());
auto snapshot_id = ASSERT_RESULT(CreateSnapshot());

std::this_thread::sleep_for(1s);

ASSERT_OK(client_->DeleteTable(kTableName));

auto leader = cluster_->leader_mini_master();
leader->Shutdown();

ASSERT_OK(WaitSnapshotInState(snapshot_id, SysSnapshotEntryPB::COMPLETE, 1s));

ASSERT_OK(leader->Start());
}

TEST_F(BackupTxnTest, FlushSysCatalogAndDelete) {
ASSERT_NO_FATALS(WriteData());
auto snapshot_id = ASSERT_RESULT(CreateSnapshot());
Expand Down Expand Up @@ -541,7 +560,7 @@ TEST_F(BackupTxnTest, Consistency) {
LOG(INFO) << "Value: " << restored_value;
}

TEST_F(BackupTxnTest, DeleteTable) {
void BackupTxnTest::TestDeleteTable(bool restart_masters) {
FLAGS_unresponsive_ts_rpc_timeout_ms = 1000;
FLAGS_snapshot_coordinator_poll_interval_ms = 2500 * kTimeMultiplier;

Expand All @@ -556,10 +575,26 @@ TEST_F(BackupTxnTest, DeleteTable) {

ASSERT_OK(client_->DeleteTable(kTableName, false));

if (restart_masters) {
ShutdownAllMasters(cluster_.get());
}

ASSERT_OK(StartAllTServers(cluster_.get()));

if (restart_masters) {
ASSERT_OK(StartAllMasters(cluster_.get()));
}

ASSERT_OK(WaitSnapshotInState(snapshot_id, SysSnapshotEntryPB::FAILED, 5s * kTimeMultiplier));
}

TEST_F(BackupTxnTest, DeleteTable) {
TestDeleteTable(/* restart_masters= */ false);
}

TEST_F(BackupTxnTest, DeleteTableWithMastersRestart) {
TestDeleteTable(/* restart_masters= */ true);
}

} // namespace client
} // namespace yb
47 changes: 29 additions & 18 deletions src/yb/master/async_rpc_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,32 +123,35 @@ RetryingTSRpcTask::RetryingTSRpcTask(Master *master,
// Send the subclass RPC request.
Status RetryingTSRpcTask::Run() {
VLOG_WITH_PREFIX(1) << "Start Running";
++attempt_;
auto task_state = state();
if (task_state == MonitoredTaskState::kAborted) {
UnregisterAsyncTask(); // May delete this.
return STATUS(IllegalState, "Unable to run task because it has been aborted");
// May delete this.
return Failed(STATUS(IllegalState, "Unable to run task because it has been aborted"));
}
// TODO(bogdan): There is a race between scheduling and running and can cause this to fail.
// Should look into removing the kScheduling state, if not needed, and simplifying the state
// transitions!
DCHECK(task_state == MonitoredTaskState::kWaiting) << "State: " << ToString(task_state);

const Status s = ResetTSProxy();
Status s = ResetTSProxy();
if (!s.ok()) {
s = s.CloneAndPrepend("Failed to reset TS proxy");
if (s.IsExpired()) {
TransitionToTerminalState(MonitoredTaskState::kWaiting, MonitoredTaskState::kFailed, s);
UnregisterAsyncTask();
return s;
}
if (RescheduleWithBackoffDelay()) {
return Status::OK();
}
auto transitioned_to_failed =
PerformStateTransition(MonitoredTaskState::kWaiting, MonitoredTaskState::kFailed) ||
state() == MonitoredTaskState::kFailed;
if (transitioned_to_failed) {
UnregisterAsyncTask(); // May delete this.
return s.CloneAndPrepend("Failed to reset TS proxy");
}

auto state = this->state();
if (state == MonitoredTaskState::kAborted) {
UnregisterAsyncTask(); // May delete this.
UnregisterAsyncTask(); // May delete this.

if (state == MonitoredTaskState::kFailed) {
return s;
} else if (state == MonitoredTaskState::kAborted) {
return STATUS(IllegalState, "Unable to run task because it has been aborted");
}

Expand All @@ -164,12 +167,12 @@ Status RetryingTSRpcTask::Run() {

if (!PerformStateTransition(MonitoredTaskState::kWaiting, MonitoredTaskState::kRunning)) {
if (state() == MonitoredTaskState::kAborted) {
UnregisterAsyncTask(); // May delete this.
return STATUS(Aborted, "Unable to run task because it has been aborted");
// May delete this.
return Failed(STATUS(Aborted, "Unable to run task because it has been aborted"));
} else {
LOG_WITH_PREFIX(DFATAL) <<
"Task transition MonitoredTaskState::kWaiting -> MonitoredTaskState::kRunning failed";
return STATUS_FORMAT(IllegalState, "Task in invalid state $0", state());
return Failed(STATUS_FORMAT(IllegalState, "Task in invalid state $0", state()));
}
}
auto slowdown_flag_val = GetAtomicFlag(&FLAGS_slowdown_master_async_rpc_tasks_by_ms);
Expand All @@ -180,7 +183,7 @@ Status RetryingTSRpcTask::Run() {
ThreadRestrictions::SetWaitAllowed(old_thread_restriction);
VLOG_WITH_PREFIX(2) << "Slowing down done. Resuming.";
}
if (!SendRequest(++attempt_)) {
if (!SendRequest(attempt_)) {
if (!RescheduleWithBackoffDelay()) {
UnregisterAsyncTask(); // May call 'delete this'.
}
Expand All @@ -202,8 +205,8 @@ MonitoredTaskState RetryingTSRpcTask::AbortAndReturnPrevState(const Status& stat
auto expected = prev_state;
if (state_.compare_exchange_weak(expected, MonitoredTaskState::kAborted)) {
AbortIfScheduled();
UnregisterAsyncTask();
Finished(status);
UnregisterAsyncTask();
return prev_state;
}
prev_state = state();
Expand Down Expand Up @@ -345,6 +348,13 @@ void RetryingTSRpcTask::RunDelayedTask(const Status& status) {

void RetryingTSRpcTask::UnregisterAsyncTaskCallback() {}

Status RetryingTSRpcTask::Failed(const Status& status) {
LOG_WITH_PREFIX(WARNING) << "Async task failed: " << status;
Finished(status);
UnregisterAsyncTask();
return status;
}

void RetryingTSRpcTask::UnregisterAsyncTask() {
std::unique_lock<decltype(unregister_mutex_)> lock(unregister_mutex_);
UnregisterAsyncTaskCallback();
Expand Down Expand Up @@ -393,7 +403,8 @@ void RetryingTSRpcTask::TransitionToTerminalState(MonitoredTaskState expected,
<< terminal_state << ". Task has been aborted";
} else {
LOG_WITH_PREFIX(DFATAL) << "State transition " << expected << " -> "
<< terminal_state << " failed. Current task is in an invalid state";
<< terminal_state << " failed. Current task is in an invalid state: "
<< state();
}
return;
}
Expand Down
4 changes: 3 additions & 1 deletion src/yb/master/async_rpc_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class RetryingTSRpcTask : public MonitoredTask {
const scoped_refptr<TableInfo>& table);

// Send the subclass RPC request.
Status Run();
CHECKED_STATUS Run();

// Abort this task and return its value before it was successfully aborted. If the task entered
// a different terminal state before we were able to abort it, return that state.
Expand Down Expand Up @@ -234,6 +234,8 @@ class RetryingTSRpcTask : public MonitoredTask {
// Clean up request and release resources. May call 'delete this'.
void UnregisterAsyncTask();

CHECKED_STATUS Failed(const Status& status);

// Only abort this task on reactor if it has been scheduled.
void AbortIfScheduled();

Expand Down
10 changes: 10 additions & 0 deletions src/yb/master/catalog_entity_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,23 @@ void TabletInfo::SetReplicaLocations(ReplicaMap replica_locations) {
replica_locations_ = std::move(replica_locations);
}

CHECKED_STATUS TabletInfo::CheckRunning() const {
if (!table()->is_running()) {
return STATUS_FORMAT(Expired, "Table is not running: $0", table()->ToStringWithState());
}

return Status::OK();
}

Result<TSDescriptor*> TabletInfo::GetLeader() const {
std::lock_guard<simple_spinlock> l(lock_);
auto result = GetLeaderUnlocked();
if (result) {
return result;
}

RETURN_NOT_OK(CheckRunning());

return STATUS_FORMAT(
NotFound,
"No leader found for tablet $0 with $1 replicas: $2.",
Expand Down
3 changes: 3 additions & 0 deletions src/yb/master/catalog_entity_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ class TabletInfo : public RefCountedThreadSafe<TabletInfo>,
// failures that happened before a certain point in time.
void GetLeaderStepDownFailureTimes(MonoTime forget_failures_before,
LeaderStepDownFailureTimes* dest);

CHECKED_STATUS CheckRunning() const;

private:
friend class RefCountedThreadSafe<TabletInfo>;

Expand Down
20 changes: 14 additions & 6 deletions src/yb/master/master_snapshot_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ class SnapshotState : public StateWithTablets {
SnapshotCoordinatorContext* context, const TxnSnapshotId& id,
const tserver::TabletSnapshotOpRequestPB& request)
: StateWithTablets(context, SysSnapshotEntryPB::CREATING),
id_(id), snapshot_hybrid_time_(request.snapshot_hybrid_time()) {
id_(id), snapshot_hybrid_time_(request.snapshot_hybrid_time()), version_(1) {
InitTabletIds(request.tablet_id(),
request.imported() ? SysSnapshotEntryPB::COMPLETE : SysSnapshotEntryPB::CREATING);
request.extra_data().UnpackTo(&entries_);
Expand All @@ -348,7 +348,7 @@ class SnapshotState : public StateWithTablets {
SnapshotCoordinatorContext* context, const TxnSnapshotId& id,
const SysSnapshotEntryPB& entry)
: StateWithTablets(context, entry.state()),
id_(id), snapshot_hybrid_time_(entry.snapshot_hybrid_time()) {
id_(id), snapshot_hybrid_time_(entry.snapshot_hybrid_time()), version_(entry.version()) {
InitTablets(entry.tablet_snapshots());
*entries_.mutable_entries() = entry.entries();
}
Expand All @@ -374,10 +374,13 @@ class SnapshotState : public StateWithTablets {

*out->mutable_entries() = entries_.entries();

out->set_version(version_);

return Status::OK();
}

CHECKED_STATUS StoreToWriteBatch(docdb::KeyValueWriteBatchPB* out) {
++version_;
docdb::DocKey doc_key({ docdb::PrimitiveValue::Int32(SysRowEntry::SNAPSHOT),
docdb::PrimitiveValue(id_.AsSlice().ToBuffer()) });
docdb::SubDocKey sub_doc_key(
Expand Down Expand Up @@ -418,6 +421,10 @@ class SnapshotState : public StateWithTablets {
});
}

int version() const {
return version_;
}

private:
bool IsTerminalFailure(const Status& status) override {
// Table was removed.
Expand All @@ -434,6 +441,7 @@ class SnapshotState : public StateWithTablets {
TxnSnapshotId id_;
HybridTime snapshot_hybrid_time_;
SysRowEntries entries_;
int version_;
};

class RestorationState : public StateWithTablets {
Expand Down Expand Up @@ -633,7 +641,7 @@ class MasterSnapshotCoordinator::Impl {
});
}

CHECKED_STATUS BootstrapWritePair(Slice key, const Slice& value) {
CHECKED_STATUS ApplyWritePair(Slice key, const Slice& value) {
docdb::SubDocKey sub_doc_key;
RETURN_NOT_OK(sub_doc_key.FullyDecodeFrom(key, docdb::HybridTimeRequired::kFalse));

Expand Down Expand Up @@ -808,7 +816,7 @@ class MasterSnapshotCoordinator::Impl {
auto it = snapshots_.find(snapshot_id);
if (it == snapshots_.end()) {
snapshots_.emplace(snapshot_id, std::move(snapshot));
} else {
} else if (it->second->version() < snapshot->version() || it->second->version() == 0) {
// If we have several updates for single snapshot, they are loaded in chronological order.
// So latest update should be picked.
it->second = std::move(snapshot);
Expand Down Expand Up @@ -950,8 +958,8 @@ void MasterSnapshotCoordinator::Shutdown() {
impl_->Shutdown();
}

Status MasterSnapshotCoordinator::BootstrapWritePair(const Slice& key, const Slice& value) {
return impl_->BootstrapWritePair(key, value);
Status MasterSnapshotCoordinator::ApplyWritePair(const Slice& key, const Slice& value) {
return impl_->ApplyWritePair(key, value);
}

} // namespace master
Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/master_snapshot_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class MasterSnapshotCoordinator : public tablet::SnapshotCoordinator {
// Check whether we have write request for snapshot while replaying write request during
// bootstrap. And upsert snapshot from it in this case.
// key and value are entry from the write batch.
CHECKED_STATUS BootstrapWritePair(const Slice& key, const Slice& value) override;
CHECKED_STATUS ApplyWritePair(const Slice& key, const Slice& value) override;

void Start();

Expand Down
2 changes: 1 addition & 1 deletion src/yb/tablet/snapshot_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class SnapshotCoordinator {

virtual CHECKED_STATUS Load(Tablet* tablet) = 0;

virtual CHECKED_STATUS BootstrapWritePair(const Slice& key, const Slice& value) = 0;
virtual CHECKED_STATUS ApplyWritePair(const Slice& key, const Slice& value) = 0;

virtual ~SnapshotCoordinator() = default;
};
Expand Down
6 changes: 6 additions & 0 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,12 @@ Status Tablet::ApplyKeyValueRowOperations(int64_t batch_idx,
} else {
PrepareNonTransactionWriteBatch(put_batch, hybrid_time, &write_batch);
WriteToRocksDB(frontiers, &write_batch, StorageDbType::kRegular);
if (snapshot_coordinator_) {
for (const auto& pair : put_batch.write_pairs()) {
WARN_NOT_OK(snapshot_coordinator_->ApplyWritePair(pair.key(), pair.value()),
"ApplyWritePair failed");
}
}
}

return Status::OK();
Expand Down
9 changes: 0 additions & 9 deletions src/yb/tablet/tablet_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1103,15 +1103,6 @@ void TabletBootstrap::PlayWriteRequest(ReplicateMsg* replicate_msg) {
WARN_NOT_OK(tablet_->ApplyRowOperations(&operation_state), "ApplyRowOperations failed");

tablet_->mvcc_manager()->Replicated(hybrid_time);

if (tablet_->snapshot_coordinator()) {
// We should load transaction aware snapshots duuring replaying logs, because we could replay
// snapshot operations that would refer them.
for (const auto& pair : write->write_batch().write_pairs()) {
WARN_NOT_OK(tablet_->snapshot_coordinator()->BootstrapWritePair(pair.key(), pair.value()),
"BootstrapWritePair failed");
}
}
}

Status TabletBootstrap::PlayChangeMetadataRequest(ReplicateMsg* replicate_msg) {
Expand Down

0 comments on commit 16b0e59

Please sign in to comment.