Skip to content

Commit

Permalink
Reproduce data loss with concurrent memtable flush and GC
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Wu <yiwu@pingcap.com>
  • Loading branch information
Yi Wu committed Oct 8, 2019
1 parent b9915d9 commit 5172201
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 2 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ enable_language(C)
find_package(Git)

if (NOT ROCKSDB_GIT_REPO)
set(ROCKSDB_GIT_REPO "https://github.com/pingcap/rocksdb.git")
set(ROCKSDB_GIT_REPO "https://github.com/yiwu-arbug/rocksdb.git")
endif()

if (NOT ROCKSDB_GIT_BRANCH)
set(ROCKSDB_GIT_BRANCH "6.4.tikv")
set(ROCKSDB_GIT_BRANCH "flush_demo_6.4")
endif()

if (NOT DEFINED ROCKSDB_DIR)
Expand Down
2 changes: 2 additions & 0 deletions src/blob_gc_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ Status BlobGCJob::DoSample(const BlobFileMeta* file, bool* selected) {
}

Status BlobGCJob::DoRunGC() {
printf("do run gc\n");
Status s;

std::unique_ptr<BlobFileMergeIterator> gc_iter;
Expand Down Expand Up @@ -415,6 +416,7 @@ Status BlobGCJob::DiscardEntry(const Slice& key, const BlobIndex& blob_index,
if (!s.ok() && !s.IsNotFound()) {
return s;
}
printf("%s is blob %d\n", key.ToString().c_str(), is_blob_index);
// count read bytes for checking LSM entry
metrics_.bytes_read += key.size() + index_entry.size();
if (s.IsNotFound() || !is_blob_index) {
Expand Down
6 changes: 6 additions & 0 deletions src/blob_gc_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
bool stop_picking = false;
bool maybe_continue_next_time = false;
uint64_t next_gc_size = 0;
printf("picker\n");
for (auto& gc_score : blob_storage->gc_score()) {
printf("%lu %lf\n", gc_score.file_number, gc_score.score);
if (gc_score.score < cf_options_.blob_file_discardable_ratio) {
break;
}
Expand All @@ -37,6 +39,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
// or this file had been GCed
ROCKS_LOG_INFO(db_options_.info_log, "Blob file %" PRIu64 " no need gc",
blob_file->file_number());
printf("no need gc\n");
continue;
}
if (!stop_picking) {
Expand Down Expand Up @@ -71,13 +74,16 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
return nullptr;
}
// if there is only one small file to merge, no need to perform
/*
if (blob_files.size() == 1 &&
blob_files[0]->file_size() <= cf_options_.merge_small_file_threshold &&
blob_files[0]->gc_mark() == false &&
blob_files[0]->GetDiscardableRatio() <
cf_options_.blob_file_discardable_ratio) {
return nullptr;
}
*/
printf("pick %lu files\n", blob_files.size());

return std::unique_ptr<BlobGC>(new BlobGC(
std::move(blob_files), std::move(cf_options_), maybe_continue_next_time));
Expand Down
5 changes: 5 additions & 0 deletions src/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
bool is_blob_index = false;
s = db_impl_->GetImpl(options, handle, key, value, nullptr /*value_found*/,
nullptr /*read_callback*/, &is_blob_index);
printf("get status %s\n", s.ToString().c_str());
if (!s.ok() || !is_blob_index) return s;

StopWatch get_sw(env_, stats_.get(), BLOB_DB_GET_MICROS);
Expand All @@ -553,6 +554,7 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
s = index.DecodeFrom(value);
assert(s.ok());
if (!s.ok()) return s;
printf("get from file %lu\n", index.file_number);

BlobRecord record;
PinnableSlice buffer;
Expand Down Expand Up @@ -959,6 +961,7 @@ bool TitanDBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
}

void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
printf("completed flush job id %d\n", flush_job_info.job_id);
const auto& tps = flush_job_info.table_properties;
auto ucp_iter = tps.user_collected_properties.find(
BlobFileSizeCollector::kPropertiesName);
Expand Down Expand Up @@ -1004,9 +1007,11 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
ROCKS_LOG_INFO(db_options_.info_log,
"OnFlushCompleted[%d]: output blob file %" PRIu64 ".",
flush_job_info.job_id, file->file_number());
printf("complete file %lu\n", file_number);
file->FileStateTransit(BlobFileMeta::FileEvent::kFlushCompleted);
}
}
TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Finished");
}

void TitanDBImpl::OnCompactionCompleted(
Expand Down
1 change: 1 addition & 0 deletions src/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Status TitanDBImpl::PurgeObsoleteFilesImpl() {
candidate_files.end());

for (const auto& candidate_file : candidate_files) {
printf("purge file %s\n", candidate_file.c_str());
ROCKS_LOG_INFO(db_options_.info_log, "Titan deleting obsolete file [%s]",
candidate_file.c_str());
Status delete_status = env_->DeleteFile(candidate_file);
Expand Down
77 changes: 77 additions & 0 deletions src/titan_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
#include <options/cf_options.h>
#include <unordered_map>

#include "db/db_impl/db_impl.h"
#include "file/filename.h"
#include "rocksdb/utilities/debug.h"
#include "port/port.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "util/random.h"
Expand Down Expand Up @@ -1072,6 +1074,81 @@ TEST_F(TitanDBTest, GCAfterDropCF) {
Close();
}

TEST_F(TitanDBTest, GCBeforeFlushCommit) {
port::Mutex mu;
port::CondVar cv(&mu);
bool first_flush_pending = false;
bool gc_finished = false;
DBImpl* db_impl = nullptr;

SyncPoint::GetInstance()->LoadDependency({{
"TitanDBImpl::OnFlushCompleted:Finished",
"TitanDBTest::GCBeforeFlushCommit:1"}});
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::BeforeInstallResults",
[&](void*) {
MutexLock l(&mu);
if (first_flush_pending) {
// We are in the second flush.
return;
}
auto* db_mutex = db_impl->mutex();
db_mutex->Unlock();
first_flush_pending = true;
cv.SignalAll();
while (!gc_finished) {
cv.Wait();
}
db_mutex->Lock();
});

options_.create_if_missing = true;
// Setting max_flush_jobs = max_background_jobs / 4 = 2.
options_.max_background_jobs = 8;
options_.max_write_buffer_number = 4;
options_.min_blob_size = 0;
options_.merge_small_file_threshold = 1024 * 1024;
options_.disable_background_gc = true;
Open();

db_impl = reinterpret_cast<DBImpl*>(db_->GetRootDB());
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(db_->Put(WriteOptions(), "foo", "v"));
// t1 will wait for the second flush complete before install super version.
auto t1 = port::Thread([&]() {
// flush_opts.wait = true
ASSERT_OK(db_->Flush(FlushOptions()));
});
{
MutexLock l(&mu);
while (!first_flush_pending) {
cv.Wait();
}
}
// In the second flush we check if memtable has been committed, and signal
// the first flush to proceed.
ASSERT_OK(db_->Put(WriteOptions(), "bar", "v"));
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts));
TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:1");
ASSERT_OK(db_impl_->TEST_StartGC(db_->DefaultColumnFamily()->GetID()));
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());
{
MutexLock l(&mu);
gc_finished = true;
cv.SignalAll();
}
t1.join();
// Check value after memtable committed.
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), "bar", &value));
ASSERT_EQ("v", value);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

} // namespace titandb
} // namespace rocksdb

Expand Down

0 comments on commit 5172201

Please sign in to comment.