Skip to content

Commit

Permalink
[#1248] ysql: Create background task for verifying tablet data integrity
Browse files Browse the repository at this point in the history
Summary:
We add an online recurring background task to verify the integrity of each tablet's
rocksdb data. RocksDB will catch corruption errors relating to missing sst files and
incorrect file sizes, and we can expand checks to cover block checksums in the future.
If any corruption is detected, we bump a new metric `tablet_data_corruptions`
and log a warning.

Test Plan:
New test suite `TabletDataIntegrityTest` testing no corruption, missing files,
and incorrect file sizes.

Reviewers: mbautin, bogdan, rahuldesirazu

Reviewed By: rahuldesirazu

Subscribers: zyu, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D11031
  • Loading branch information
zhaoalex committed Apr 2, 2021
1 parent b0741bb commit aa58dab
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/yb/tablet/CMakeLists.txt
Expand Up @@ -129,3 +129,4 @@ ADD_YB_TEST(mvcc-test)
ADD_YB_TEST(composite-pushdown-test)
ADD_YB_TEST(tablet_peer-test)
ADD_YB_TEST(tablet_random_access-test)
ADD_YB_TEST(tablet_data_integrity-test)
49 changes: 49 additions & 0 deletions src/yb/tablet/tablet.cc
Expand Up @@ -3456,6 +3456,55 @@ void Tablet::TriggerPostSplitCompactionSync() {
WARN_NOT_OK(ForceFullRocksDBCompact(), "Failed to compact post-split tablet.");
}

Status Tablet::VerifyDataIntegrity() {
LOG_WITH_PREFIX(INFO) << "Beginning data integrity checks on this tablet";

// Verify regular db.
if (regular_db_) {
const auto& db_dir = metadata()->rocksdb_dir();
RETURN_NOT_OK(OpenDbAndCheckIntegrity(db_dir));
}

// Verify intents db.
if (intents_db_) {
const auto& db_dir = metadata()->intents_rocksdb_dir();
RETURN_NOT_OK(OpenDbAndCheckIntegrity(db_dir));
}

return Status::OK();
}

Status Tablet::OpenDbAndCheckIntegrity(const std::string& db_dir) {
// Similar to ldb's CheckConsistency, we open db as read-only with paranoid checks on.
// If any corruption is detected then the open will fail with a Corruption status.
rocksdb::Options db_opts;
InitRocksDBOptions(&db_opts, LogPrefix());
db_opts.paranoid_checks = true;

std::unique_ptr<rocksdb::DB> db;
rocksdb::DB* db_raw = nullptr;
rocksdb::Status st = rocksdb::DB::OpenForReadOnly(db_opts, db_dir, &db_raw);
if (db_raw != nullptr) {
db.reset(db_raw);
}
if (!st.ok()) {
if (st.IsCorruption()) {
LOG_WITH_PREFIX(WARNING) << "Detected rocksdb data corruption: " << st;
// TODO: should we bump metric here or in top-level validation or both?
metrics()->tablet_data_corruptions->Increment();
return st;
}

LOG_WITH_PREFIX(WARNING) << "Failed to open read-only RocksDB in directory " << db_dir
<< ": " << st;
return Status::OK();
}

// TODO: we can add more checks here to verify block contents/checksums

return Status::OK();
}

// ------------------------------------------------------------------------------------------------

Result<ScopedReadOperation> ScopedReadOperation::Create(
Expand Down
8 changes: 8 additions & 0 deletions src/yb/tablet/tablet.h
Expand Up @@ -659,6 +659,9 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier {
CHECKED_STATUS TriggerPostSplitCompactionIfNeeded(
std::function<std::unique_ptr<ThreadPoolToken>()> get_token_for_compaction);

// Verifies the data on this tablet for consistency. Returns status OK if checks pass.
CHECKED_STATUS VerifyDataIntegrity();

private:
friend class Iterator;
friend class TabletPeerTest;
Expand Down Expand Up @@ -723,6 +726,9 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier {

void TriggerPostSplitCompactionSync();

// Opens read-only rocksdb at the specified directory and checks for any file corruption.
CHECKED_STATUS OpenDbAndCheckIntegrity(const std::string& db_dir);

const Schema key_schema_;

RaftGroupMetadataPtr metadata_;
Expand Down Expand Up @@ -903,6 +909,8 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier {
// compaction has already been triggered for this instance.
std::unique_ptr<ThreadPoolToken> post_split_compaction_task_pool_token_ = nullptr;

std::unique_ptr<ThreadPoolToken> data_integrity_token_;

DISALLOW_COPY_AND_ASSIGN(Tablet);
};

Expand Down
114 changes: 114 additions & 0 deletions src/yb/tablet/tablet_data_integrity-test.cc
@@ -0,0 +1,114 @@
// Copyright (c) Yugabyte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include <gtest/gtest.h>

#include "yb/client/table.h"

#include "yb/common/row.h"
#include "yb/common/ql_expr.h"
#include "yb/common/ql_rowwise_iterator_interface.h"

#include "yb/gutil/stl_util.h"
#include "yb/gutil/strings/join.h"
#include "yb/tablet/local_tablet_writer.h"
#include "yb/tablet/tablet.h"
#include "yb/tablet/tablet-test-base.h"
#include "yb/util/path_util.h"
#include "yb/util/slice.h"
#include "yb/util/test_macros.h"

namespace yb {
namespace tablet {

class TabletDataIntegrityTest : public TabletTestBase<IntKeyTestSetup<INT32>> {
typedef TabletTestBase<IntKeyTestSetup<INT32>> superclass;
public:
void SetUp() override {
superclass::SetUp();

const auto& tablet = this->tablet().get();
LocalTabletWriter writer(tablet);
ASSERT_OK(this->InsertTestRow(&writer, 12345, 0));
ASSERT_OK(tablet->Flush(FlushMode::kSync));
}

protected:
Result<std::string> GetFirstSstFilePath() {
const auto& tablet = this->tablet().get();
auto dir = tablet->metadata()->rocksdb_dir();
auto list = VERIFY_RESULT(tablet->metadata()->fs_manager()->ListDir(dir));
if (std::find(list.begin(), list.end(), "CURRENT") == list.end()) {
return STATUS(NotFound, "No rocksdb files found at tablet directory");
}

for (const auto& file : list) {
if (file.find(".sst") != std::string::npos) {
return JoinPathSegments(dir, file);
}
}

return STATUS(NotFound, "No sst files found in rocksdb directory");
}
};

TEST_F(TabletDataIntegrityTest, TestNoCorruption) {
const auto& tablet = this->tablet().get();
ASSERT_OK(tablet->VerifyDataIntegrity());
}

TEST_F(TabletDataIntegrityTest, TestDeletedFile) {
const auto& tablet = this->tablet().get();

auto sst_path = ASSERT_RESULT(GetFirstSstFilePath());
ASSERT_OK(env_->DeleteFile(sst_path));

Status s = tablet->VerifyDataIntegrity();
ASSERT_TRUE(s.IsCorruption());
ASSERT_STR_CONTAINS(s.message().ToBuffer(), "No such file");
}

TEST_F(TabletDataIntegrityTest, TestFileTruncate) {
const auto& tablet = this->tablet().get();

auto sst_path = ASSERT_RESULT(GetFirstSstFilePath());
faststring data;
ASSERT_OK(ReadFileToString(env_.get(), sst_path, &data));
data.resize(1);
ASSERT_OK(WriteStringToFile(env_.get(), Slice(data), sst_path));

Status s = tablet->VerifyDataIntegrity();
ASSERT_TRUE(s.IsCorruption());
ASSERT_STR_CONTAINS(s.message().ToBuffer(), "file size mismatch");
}

// Skipping as we currently don't have any block checks in place.
// TODO: enable this test once we add those. (See issue #7904)
TEST_F(TabletDataIntegrityTest, DISABLED_TestFileGarbageOverwrite) {
const auto& tablet = this->tablet().get();

auto sst_path = ASSERT_RESULT(GetFirstSstFilePath());
faststring data;
ASSERT_OK(ReadFileToString(env_.get(), sst_path, &data));

faststring garbage;
garbage.resize(data.size());
ASSERT_OK(WriteStringToFile(env_.get(), Slice(garbage), sst_path));

Status s = tablet->VerifyDataIntegrity();
ASSERT_TRUE(s.IsCorruption());
ASSERT_STR_CONTAINS(s.message().ToBuffer(), "bad block contents");
}

} // namespace tablet
} // namespace yb
6 changes: 6 additions & 0 deletions src/yb/tablet/tablet_metrics.cc
Expand Up @@ -144,6 +144,11 @@ METRIC_DEFINE_counter(tablet, pgsql_consistent_prefix_read_rows,
yb::MetricUnit::kRequests,
"Number of pgsql rows read as part of a consistent prefix request");

METRIC_DEFINE_counter(tablet, tablet_data_corruptions,
"Tablet Data Corruption Detections",
yb::MetricUnit::kUnits,
"Number of times this tablet was flagged for corrupted data");

using strings::Substitute;

namespace yb {
Expand All @@ -164,6 +169,7 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity)
MINIT(restart_read_requests),
MINIT(consistent_prefix_read_requests),
MINIT(pgsql_consistent_prefix_read_rows),
MINIT(tablet_data_corruptions),
MINIT(rows_inserted) {
}
#undef MINIT
Expand Down
1 change: 1 addition & 0 deletions src/yb/tablet/tablet_metrics.h
Expand Up @@ -68,6 +68,7 @@ struct TabletMetrics {
scoped_refptr<Counter> restart_read_requests;
scoped_refptr<Counter> consistent_prefix_read_requests;
scoped_refptr<Counter> pgsql_consistent_prefix_read_rows;
scoped_refptr<Counter> tablet_data_corruptions;

scoped_refptr<Counter> rows_inserted;
};
Expand Down
39 changes: 39 additions & 0 deletions src/yb/tserver/ts_tablet_manager.cc
Expand Up @@ -195,6 +195,13 @@ DEFINE_int32(flush_background_task_interval_msec, 0,
"This defaults to 0, which means disable the background task "
"And only use callbacks on memstore allocations. ");

DEFINE_int32(verify_tablet_data_interval_sec, 0,
"The tick interval time for the tablet data integrity verification background task. "
"This defaults to 0, which means disable the background task.");

DEFINE_bool(skip_tablet_data_verification, false,
"Skip checking tablet data for corruption.");

DEFINE_int64(global_memstore_size_percentage, 10,
"Percentage of total available memory to use for the global memstore. "
"Default is 10. See also memstore_size_mb and "
Expand Down Expand Up @@ -357,6 +364,24 @@ void TSTabletManager::MaybeFlushTablet() {
}
}

void TSTabletManager::VerifyTabletData() {
LOG_WITH_PREFIX(INFO) << "Beginning tablet data verification checks";
for (const TabletPeerPtr& peer : GetTabletPeers()) {
if (peer->state() == RUNNING) {
if (PREDICT_FALSE(FLAGS_skip_tablet_data_verification)) {
LOG_WITH_PREFIX(INFO)
<< Format("Skipped tablet data verification check on $0", peer->tablet_id());
} else {
Status s = peer->tablet()->VerifyDataIntegrity();
if (!s.ok()) {
LOG(WARNING) << "Tablet data integrity verification failed on " << peer->tablet_id()
<< ": " << s;
}
}
}
}
}

// Return the tablet with the oldest write in memstore, or nullptr if all tablet memstores are
// empty or about to flush.
TabletPeerPtr TSTabletManager::TabletToFlush() {
Expand Down Expand Up @@ -523,6 +548,7 @@ TSTabletManager::TSTabletManager(FsManager* fs_manager,
}

// Add memory monitor and background thread for flushing
// TODO(zhaoalex): replace task with Poller
if (should_count_memory) {
background_task_.reset(new BackgroundTask(
std::function<void()>([this](){ MaybeFlushTablet(); }),
Expand Down Expand Up @@ -646,6 +672,9 @@ Status TSTabletManager::Init() {
tablets_cleaner_ = std::make_unique<rpc::Poller>(
LogPrefix(), std::bind(&TSTabletManager::CleanupSplitTablets, this));

verify_tablet_data_poller_ = std::make_unique<rpc::Poller>(
LogPrefix(), std::bind(&TSTabletManager::VerifyTabletData, this));

return Status::OK();
}

Expand Down Expand Up @@ -689,6 +718,14 @@ Status TSTabletManager::Start() {
LOG(INFO)
<< "Split tablets cleanup is disabled by cleanup_split_tablets_interval_sec flag set to 0";
}
if (FLAGS_verify_tablet_data_interval_sec > 0) {
verify_tablet_data_poller_->Start(
&server_->messenger()->scheduler(), FLAGS_verify_tablet_data_interval_sec * 1s);
LOG(INFO) << "Tablet data verification task started...";
} else {
LOG(INFO)
<< "Tablet data verification is disabled by verify_tablet_data_interval_sec flag set to 0";
}

return Status::OK();
}
Expand Down Expand Up @@ -1577,6 +1614,8 @@ void TSTabletManager::StartShutdown() {

tablets_cleaner_->Shutdown();

verify_tablet_data_poller_->Shutdown();

async_client_init_->Shutdown();

if (background_task_) {
Expand Down
6 changes: 6 additions & 0 deletions src/yb/tserver/ts_tablet_manager.h
Expand Up @@ -323,6 +323,9 @@ class TSTabletManager : public tserver::TabletPeerLookupIf, public tablet::Table

CHECKED_STATUS UpdateSnapshotSchedules(const master::TSSnapshotSchedulesInfoPB& info);

// Background task that verifies the data on each tablet for consistency.
void VerifyTabletData();

client::YBClient& client();

tablet::TabletOptions* TEST_tablet_options() { return &tablet_options_; }
Expand Down Expand Up @@ -567,6 +570,9 @@ class TSTabletManager : public tserver::TabletPeerLookupIf, public tablet::Table
// Used for scheduling flushes
std::unique_ptr<BackgroundTask> background_task_;

// Used for verifying tablet data integrity.
std::unique_ptr<rpc::Poller> verify_tablet_data_poller_;

// For block cache and memory monitor shared across tablets
tablet::TabletOptions tablet_options_;

Expand Down

0 comments on commit aa58dab

Please sign in to comment.