Skip to content

Commit

Permalink
memtable_list: avoid rolling back memtable flush on CF drop (#144)
Browse files Browse the repository at this point in the history
This is a continuation of #126 with changes that were missed there in case
a CF drop was encountered after the flush was completed but before or during
manifest write.

Rolling back on CF drop is meaningless because a flush is not possible
in that state, and the only path forwards is for the memtables to be freed
shortly afterwards, so making them available for flush again is useless
at best.

Additionally, this might be needed for the WriteBufferManager changes in
in the future for triggering flushes based on immutable memory as well,
and rolling back flushes causes the memtable memory to become ready for
flush again for a brief period of time until it is dropped, which might
wrongly affect the WBM decisions.

Finally, the code installs a new version even if no changes are made. This
is unnecessary and as such that part is moved into the version-mutating
code path only.

This PR also adds two regression tests, so we could rely on rollback not
being performed on CF drop.
  • Loading branch information
isaac-io authored and udi-speedb committed Nov 12, 2023
1 parent 8cc9cf4 commit d125706
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 9 deletions.
183 changes: 183 additions & 0 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,189 @@ TEST_P(ColumnFamilyTest, CrashAfterFlush) {
db_options_.env = env_;
}

TEST_P(ColumnFamilyTest, DropBeforeInstallResults) {
Open();
CreateColumnFamilies({"one"});

// The memtables in the following vector are simply pointers to memtables that
// are managed by the CF that is about to be dropped and are collected during
// the flush through the sync point callback below. The vector isn't owning
// them and access to them is performed only after making sure that they are
// still alive (asserting that the amount of immutable memtables that the CF
// reports is the same as the amount of memtables that we collected). The
// vector is also cleared right after the checks are done in order to avoid
// leaking the pointers after they are freed.
std::vector<MemTable*> mems;

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FlushMemTableToOutputFile:Finish",
"ColumnFamilyTest::DropBeforeInstallResults"}});
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table", [&](void* arg) {
auto* memtables = static_cast<autovector<MemTable*>*>(arg);
ASSERT_NE(memtables, nullptr);
ASSERT_EQ(memtables->size(), 1);
for (auto& picked_mem : *memtables) {
mems.push_back(picked_mem);
}
ASSERT_OK(db_->DropColumnFamily(handles_[1]));
});
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(Put(1, "foo", "bar"));

uint64_t num_immutable = 0;
ASSERT_TRUE(db_->GetIntProperty(
handles_[1], "rocksdb.num-immutable-mem-table", &num_immutable));
ASSERT_EQ(num_immutable, 0);

ASSERT_TRUE(Flush(1).IsColumnFamilyDropped());

TEST_SYNC_POINT("ColumnFamilyTest::DropBeforeInstallResults");

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();

// Make sure we can still read the key that we inserted
std::unique_ptr<Iterator> dropped_cf_iter{db_->NewIterator({}, handles_[1])};
dropped_cf_iter->Seek("foo");
ASSERT_TRUE(dropped_cf_iter->Valid());
ASSERT_EQ(dropped_cf_iter->key(), "foo");
ASSERT_EQ(dropped_cf_iter->value(), "bar");
dropped_cf_iter.reset();

// Ensure that the memtable still exists and is marked as immutable
ASSERT_TRUE(db_->GetIntProperty(
handles_[1], "rocksdb.num-immutable-mem-table", &num_immutable));
ASSERT_EQ(num_immutable, 1);

// Make sure that the memtable was not rolled back
ASSERT_EQ(mems.size(), 1);
for (auto& mem : mems) {
ASSERT_GT(mem->GetEdits()->NumEntries(), 0);
}
mems.clear();

std::vector<ColumnFamilyDescriptor> descs;
for (auto h : handles_) {
if (h) {
ColumnFamilyDescriptor desc;
ASSERT_OK(h->GetDescriptor(&desc));
descs.push_back(desc);
ASSERT_OK(db_->DestroyColumnFamilyHandle(h));
}
}
handles_.clear();
names_.clear();

// Ensure the DB closes successfully after this
ASSERT_OK(db_->Close());
Destroy(descs);
}

#ifndef ROCKSDB_LITE // EventListener is not supported
TEST_P(ColumnFamilyTest, DropAfterPickMemtable) {
class FlushBeginListener : public EventListener {
public:
void OnFlushBegin(DB* db, const FlushJobInfo& flush_job_info) override {
if (flush_job_info.cf_name == "one" && handle != nullptr) {
ASSERT_OK(db->DropColumnFamily(handle));
handle = nullptr;
}
}

ColumnFamilyHandle* handle = nullptr;
};

std::shared_ptr<FlushBeginListener> listener =
std::make_shared<FlushBeginListener>();
db_options_.listeners.push_back(listener);

Open();
CreateColumnFamilies({"one"});

listener->handle = handles_[1];

// The memtables in the following vector are simply pointers to memtables that
// are managed by the CF that is about to be dropped and are collected during
// the flush through the sync point callback below. The vector isn't owning
// them and access to them is performed only after making sure that they are
// still alive (asserting that the amount of immutable memtables that the CF
// reports is the same as the amount of memtables that we collected). The
// vector is also cleared right after the checks are done in order to avoid
// leaking the pointers after they are freed.
std::vector<MemTable*> mems;

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FlushMemTableToOutputFile:Finish",
"ColumnFamilyTest::DropAfterPickMemtable"}});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", [&](void* arg) {
auto* job = reinterpret_cast<FlushJob*>(arg);
ASSERT_NE(job, nullptr);
ASSERT_EQ(job->GetMemTables().size(), 1);
for (auto& picked_mem : job->GetMemTables()) {
mems.push_back(picked_mem);
}
});
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(Put(1, "foo", "bar"));

uint64_t num_immutable = 0;
ASSERT_TRUE(db_->GetIntProperty(
handles_[1], "rocksdb.num-immutable-mem-table", &num_immutable));
ASSERT_EQ(num_immutable, 0);

ASSERT_TRUE(Flush(1).IsColumnFamilyDropped());

TEST_SYNC_POINT("ColumnFamilyTest::DropAfterPickMemtable");

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();

// Make sure we can still read the key that we inserted
std::unique_ptr<Iterator> dropped_cf_iter{db_->NewIterator({}, handles_[1])};
dropped_cf_iter->Seek("foo");
ASSERT_TRUE(dropped_cf_iter->Valid());
ASSERT_EQ(dropped_cf_iter->key(), "foo");
ASSERT_EQ(dropped_cf_iter->value(), "bar");
dropped_cf_iter.reset();

// Ensure that the memtable still exists and is marked as immutable
ASSERT_TRUE(db_->GetIntProperty(
handles_[1], "rocksdb.num-immutable-mem-table", &num_immutable));
ASSERT_EQ(num_immutable, 1);

// Make sure that the memtable was not rolled back
ASSERT_EQ(mems.size(), 1);
for (auto& mem : mems) {
ASSERT_GT(mem->GetEdits()->NumEntries(), 0);
}
mems.clear();

std::vector<ColumnFamilyDescriptor> descs;
for (auto h : handles_) {
if (h) {
ColumnFamilyDescriptor desc;
ASSERT_OK(h->GetDescriptor(&desc));
descs.push_back(desc);
ASSERT_OK(db_->DestroyColumnFamilyHandle(h));
}
}
handles_.clear();
names_.clear();

// Ensure the DB closes successfully after this
ASSERT_OK(db_->Close());
Destroy(descs);
}
#endif // !ROCKSDB_LITE

TEST_P(ColumnFamilyTest, OpenNonexistentColumnFamily) {
ASSERT_OK(TryOpen({"default"}));
Close();
Expand Down
25 changes: 16 additions & 9 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,6 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
assert(mu);
mu->AssertHeld();
assert(to_delete);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();

// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
Expand All @@ -748,6 +745,10 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
// read full data as long as column family handle is not deleted, even if
// the column family is dropped.
if (s.ok() && !cfd->IsDropped()) { // commit new state
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();

while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
if (m->edit_.GetBlobFileAdditions().empty()) {
Expand Down Expand Up @@ -788,13 +789,19 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
m->edit_.GetBlobFileAdditions().size(), mem_id);
}

m->SetFlushCompleted(false);
m->SetFlushInProgress(false);
// Do not roll back if the CF has been dropped. There's no point in
// setting a pending flush state again since we won't be able to complete
// a flush anyway in that state, and we can only drop the memtable after
// all handles are destroyed.
if (!cfd->IsDropped()) {
m->SetFlushCompleted(false);
m->SetFlushInProgress(false);

m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
}
++mem_id;
}
}
Expand Down

0 comments on commit d125706

Please sign in to comment.