Skip to content

Commit

Permalink
Fix leak or crash on failure in automatic atomic flush
Browse files Browse the repository at this point in the history
Summary: Through code inspection in debugging an apparent leak of
ColumnFamilyData in the crash test, I found a case where too few
UnrefAndTryDelete() could be called on a cfd. This fixes that case,
which would fail like this in the new unit test:

```
db_flush_test: db/column_family.cc:1648:
rocksdb::ColumnFamilySet::~ColumnFamilySet(): Assertion `last_ref' failed.
```

Test Plan: unit test added
  • Loading branch information
pdillinger committed Dec 22, 2023
1 parent 106058c commit c0add78
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
33 changes: 33 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3035,6 +3035,39 @@ TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) {
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_P(DBAtomicFlushTest, FailureInMultiCfAutomaticFlush) {
bool atomic_flush = GetParam();
auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
Options options = CurrentOptions();
options.env = fault_injection_env.get();
options.create_if_missing = true;
options.atomic_flush = atomic_flush;
const int kNumKeysTriggerFlush = 4;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumKeysTriggerFlush));
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(2, handles_.size());
for (size_t cf = 0; cf < handles_.size(); ++cf) {
ASSERT_OK(Put(static_cast<int>(cf), "a", "value"));
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::ScheduleFlushes:PreSwitchMemtable",
[&](void* /*arg*/) { fault_injection_env->SetFilesystemActive(false); });
SyncPoint::GetInstance()->EnableProcessing();

for (int i = 1; i < kNumKeysTriggerFlush; ++i) {
ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// Next write after failed flush should fail.
ASSERT_NOK(Put(0, "x", "y"));
fault_injection_env->SetFilesystemActive(true);
Close();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

// In atomic flush, concurrent bg flush threads commit to the MANIFEST in
// serial, in the order of their picked memtables for each column family.
// Only when a bg flush thread finds out that its memtables are the earliest
Expand Down
9 changes: 5 additions & 4 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,8 @@ Status DBImpl::WriteRecoverableState() {
if (status.ok()) {
cached_recoverable_state_.Clear();
cached_recoverable_state_empty_ = true;
} else {
assert(false);
}
return status;
}
Expand Down Expand Up @@ -2060,16 +2062,15 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}

TEST_SYNC_POINT_CALLBACK("DBImpl::ScheduleFlushes:PreSwitchMemtable",
nullptr);
for (auto& cfd : cfds) {
if (!cfd->mem()->IsEmpty()) {
if (status.ok() && !cfd->mem()->IsEmpty()) {
status = SwitchMemtable(cfd, context);
}
if (cfd->UnrefAndTryDelete()) {
cfd = nullptr;
}
if (!status.ok()) {
break;
}
}

if (two_write_queues_) {
Expand Down
1 change: 1 addition & 0 deletions unreleased_history/bug_fixes/cfd_leak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a possible memory leak or crash on a failure (such as I/O error) in automatic atomic flush of multiple column families.

0 comments on commit c0add78

Please sign in to comment.