Skip to content

Commit

Permalink
[#11946] DST: PITR - Correctly set op id when replaying snapshot oper…
Browse files Browse the repository at this point in the history
…ations during tablet bootstrap

Summary:
Currently, we don't set op id when replaying snapshot operations during tablet bootstrap. However, we flush the snapshot entry into the sys catalog immediately after the create succeeds and also update the frontier. This can cause a duplicate replay of the same snapshot op with the same snapshot id. On the master side, we treat this as FATAL and crash. In particular, consider the following case:

- Cluster is running pre 2.6, where we flush rarely. It adds CREATE_ON_MASTER to the WAL
- Cluster is updated to 2.6+, during local bootstrap we replay this CREATE_ON_MASTER and add snapshot record to RocksDB. And flush RocksDB, but op.id is not initialized properly so it does not contain CREATE_ON_MASTER record
- Cluster is restarted, it tries to replay CREATE_ON_MASTER again, but it is already present in DB, so the master crashes.

Separately, we should also add some logic to bypass the duplicate snapshot request and not crash. This would be governed by a GFlag. By default, the behavior would be to crash but if someone runs into this issue then they can turn it on and complete the bootstrap and then again turn it off.

Test Plan:
Tested manually by bringing in data and wals from a 2.4.x that has an unflushed CREATE_ON_MASTER
op to master and performed steps (2) and (3). Without the fix it crashes, with this fix it doesn't
replay.

Also, tested that we can bypass without crashing on the above data.

Reviewers: bogdan, sergei

Reviewed By: sergei

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D16314
  • Loading branch information
sanketkedia committed Apr 1, 2022
1 parent 9e02ccf commit fe2b082
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 6 deletions.
13 changes: 12 additions & 1 deletion src/yb/master/master_snapshot_coordinator.cc
Expand Up @@ -68,6 +68,11 @@ DEFINE_bool(schedule_snapshot_rpcs_out_of_band, true,
" background scheduling.");
TAG_FLAG(schedule_snapshot_rpcs_out_of_band, runtime);

DEFINE_bool(skip_crash_on_duplicate_snapshot, false,
"Should we not crash when we get a create snapshot request with the same "
"id as one of the previous snapshots.");
TAG_FLAG(skip_crash_on_duplicate_snapshot, runtime);

DECLARE_bool(allow_consecutive_restore);

namespace yb {
Expand Down Expand Up @@ -222,7 +227,13 @@ class MasterSnapshotCoordinator::Impl {
std::lock_guard<std::mutex> lock(mutex_);
auto emplace_result = snapshots_.emplace(std::move(snapshot));
if (!emplace_result.second) {
return STATUS_FORMAT(IllegalState, "Duplicate snapshot id: $0", id);
LOG(INFO) << "Received a duplicate create snapshot request for id: " << id;
if (FLAGS_skip_crash_on_duplicate_snapshot) {
LOG(INFO) << "Skipping duplicate create snapshot request gracefully.";
return Status::OK();
} else {
return STATUS_FORMAT(IllegalState, "Duplicate snapshot id: $0", id);
}
}

if (leader_term >= 0) {
Expand Down
20 changes: 15 additions & 5 deletions src/yb/tablet/operations/snapshot_operation.cc
Expand Up @@ -19,12 +19,17 @@
#include "yb/tserver/backup.pb.h"
#include "yb/tserver/tserver_error.h"

#include "yb/util/flag_tags.h"
#include "yb/util/logging.h"
#include "yb/util/status_format.h"
#include "yb/util/trace.h"

DEFINE_bool(consistent_restore, false, "Whether to enable consistent restoration of snapshots");

DEFINE_test_flag(bool, modify_flushed_frontier_snapshot_op, true,
"Whether to modify flushed frontier after "
"a create snapshot operation.");

namespace yb {
namespace tablet {

Expand Down Expand Up @@ -208,11 +213,16 @@ Status SnapshotOperation::DoReplicated(int64_t leader_term, Status* complete_sta
// the flushed frontier to have this exact value, although in practice it will, since this is the
// latest operation we've ever executed in this Raft group. This way we keep the current value
// of history cutoff.
docdb::ConsensusFrontier frontier;
frontier.set_op_id(op_id());
frontier.set_hybrid_time(hybrid_time());
return tablet()->ModifyFlushedFrontier(
frontier, rocksdb::FrontierModificationMode::kUpdate);
if (FLAGS_TEST_modify_flushed_frontier_snapshot_op) {
docdb::ConsensusFrontier frontier;
frontier.set_op_id(op_id());
frontier.set_hybrid_time(hybrid_time());
LOG(INFO) << "Forcing modify flushed frontier to " << frontier.op_id();
return tablet()->ModifyFlushedFrontier(
frontier, rocksdb::FrontierModificationMode::kUpdate);
} else {
return Status::OK();
}
}

} // namespace tablet
Expand Down
1 change: 1 addition & 0 deletions src/yb/tablet/tablet_bootstrap.cc
Expand Up @@ -961,6 +961,7 @@ class TabletBootstrap {

SnapshotOperation operation(tablet_.get(), snapshot);
operation.set_hybrid_time(HybridTime(replicate_msg->hybrid_time()));
operation.set_op_id(OpId::FromPB(replicate_msg->id()));

return operation.Replicated(/* leader_term= */ yb::OpId::kUnknownTerm);
}
Expand Down
53 changes: 53 additions & 0 deletions src/yb/tools/yb-admin-snapshot-schedule-test.cc
Expand Up @@ -1895,6 +1895,59 @@ TEST_F(YbAdminSnapshotScheduleTest, CatalogLoadRace) {
ASSERT_OK(RestoreSnapshotSchedule(schedule_id, time));
}

class YbAdminSnapshotScheduleFlushTest : public YbAdminSnapshotScheduleTest {
public:
std::vector<std::string> ExtraMasterFlags() override {
// To speed up tests.
return { "--snapshot_coordinator_cleanup_delay_ms=1000",
"--snapshot_coordinator_poll_interval_ms=500",
"--enable_automatic_tablet_splitting=true",
"--enable_transactional_ddl_gc=false",
"--flush_rocksdb_on_shutdown=false",
"--vmodule=tablet_bootstrap=3" };
}
};

TEST_F_EX(YbAdminSnapshotScheduleTest, TestSnapshotBootstrap, YbAdminSnapshotScheduleFlushTest) {
LOG(INFO) << "Create cluster";
CreateCluster(kClusterName, ExtraTSFlags(), ExtraMasterFlags());

// Disable modifying flushed frontier when snapshot is created.
ASSERT_OK(cluster_->SetFlagOnMasters("TEST_modify_flushed_frontier_snapshot_op", "false"));

// Create a database and a table.
auto conn = ASSERT_RESULT(CqlConnect());
ASSERT_OK(conn.ExecuteQuery(Format(
"CREATE KEYSPACE IF NOT EXISTS $0", client::kTableName.namespace_name())));

conn = ASSERT_RESULT(CqlConnect(client::kTableName.namespace_name()));

ASSERT_OK(conn.ExecuteQueryFormat(
"CREATE TABLE $0 (key INT PRIMARY KEY, value TEXT) WITH tablets = 1",
client::kTableName.table_name()));
LOG(INFO) << "Created Keyspace and table";

// Create a CREATE_ON_MASTER op in WALs without flushing frontier.
ASSERT_OK(CallAdmin("create_keyspace_snapshot",
Format("ycql.$0", client::kTableName.namespace_name())));
SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier));
LOG(INFO) << "Created snapshot on keyspace";

// Enable modifying flushed frontier when snapshot is replayed.
LOG(INFO) << "Resetting test flag to modify flushed frontier";

// Restart the masters so that this op gets replayed.
ASSERT_OK(cluster_->SetFlagOnMasters("TEST_modify_flushed_frontier_snapshot_op", "true"));
LOG(INFO) << "Restart#1";
cluster_->Shutdown();
ASSERT_OK(cluster_->Restart());

// Restart the masters again. Now this op shouldn't be replayed.
LOG(INFO) << "Restart#2";
cluster_->Shutdown();
ASSERT_OK(cluster_->Restart());
}

class YbAdminSnapshotScheduleTestWithoutConsecutiveRestore : public YbAdminSnapshotScheduleTest {
std::vector<std::string> ExtraMasterFlags() override {
// To speed up tests.
Expand Down

0 comments on commit fe2b082

Please sign in to comment.