Skip to content

Commit

Permalink
Support concurrent CF iteration and drop
Browse files Browse the repository at this point in the history
It's easy to cause coredump when closing ColumnFamilyHandle with unreleased
iterators, especially iterators release is controlled by java GC when using JNI.

This patch fixed concurrent CF iteration and drop, we let iterators(actually
SuperVersion) hold a ColumnFamilyData reference to prevent the CF from being
released too early.

fixed facebook#5982
  • Loading branch information
javeme committed Dec 10, 2019
1 parent a68dff5 commit 94cb303
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 90 deletions.
70 changes: 43 additions & 27 deletions db/column_family.cc
Expand Up @@ -60,11 +60,8 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
JobContext job_context(0);
mutex_->Lock();
if (cfd_->Unref()) {
bool dropped = cfd_->IsDropped();

delete cfd_;

bool dropped = cfd_->IsDropped();
if (cfd_->UnrefAndTryDelete()) {
if (dropped) {
db_->FindObsoleteFiles(&job_context, false, true);
}
Expand Down Expand Up @@ -439,13 +436,18 @@ void SuperVersion::Cleanup() {
to_delete.push_back(m);
}
current->Unref();
if (cfd->Unref()) {
delete cfd;
}
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current) {
void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
MemTableListVersion* new_imm, Version* new_current) {
cfd = new_cfd;
mem = new_mem;
imm = new_imm;
current = new_current;
cfd->Ref();
mem->Ref();
imm->Ref();
current->Ref();
Expand Down Expand Up @@ -581,21 +583,7 @@ ColumnFamilyData::~ColumnFamilyData() {
// compaction_queue_ and we destroyed it
assert(!queued_for_flush_);
assert(!queued_for_compaction_);

if (super_version_ != nullptr) {
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
super_version_->db_mutex->Unlock();
local_sv_.reset();
super_version_->db_mutex->Lock();

bool is_last_reference __attribute__((__unused__));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
super_version_ = nullptr;
}
assert(super_version_ == nullptr);

if (dummy_versions_ != nullptr) {
// List must be empty
Expand All @@ -615,6 +603,36 @@ ColumnFamilyData::~ColumnFamilyData() {
}
}

bool ColumnFamilyData::UnrefAndTryDelete() {
int old_refs = refs_.fetch_sub(1);
assert(old_refs > 0);

if (old_refs == 1) {
assert(super_version_ == nullptr);
delete this;
return true;
}

if (old_refs == 2 && super_version_ != nullptr) {
// Only the super_version_ holds me
SuperVersion* sv = super_version_;
super_version_ = nullptr;
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
sv->db_mutex->Unlock();
local_sv_.reset();
sv->db_mutex->Lock();

if (sv->Unref()) {
// May delete this ColumnFamilyData after calling Cleanup()
sv->Cleanup();
delete sv;
return true;
}
}
return false;
}

void ColumnFamilyData::SetDropped() {
// can't drop default CF
assert(id_ != 0);
Expand Down Expand Up @@ -1169,7 +1187,7 @@ void ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion = sv_context->new_superversion.release();
new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options;
new_superversion->Init(mem_, imm_.current(), current_);
new_superversion->Init(this, mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
++super_version_number_;
Expand Down Expand Up @@ -1344,14 +1362,12 @@ ColumnFamilySet::~ColumnFamilySet() {
// cfd destructor will delete itself from column_family_data_
auto cfd = column_family_data_.begin()->second;
bool last_ref __attribute__((__unused__));
last_ref = cfd->Unref();
last_ref = cfd->UnrefAndTryDelete();
assert(last_ref);
delete cfd;
}
bool dummy_last_ref __attribute__((__unused__));
dummy_last_ref = dummy_cfd_->Unref();
dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
assert(dummy_last_ref);
delete dummy_cfd_;
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
Expand Down
10 changes: 8 additions & 2 deletions db/column_family.h
Expand Up @@ -198,6 +198,7 @@ class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl {
struct SuperVersion {
// Accessing members of this class is not thread-safe and requires external
// synchronization (ie db mutex held or on write thread).
ColumnFamilyData* cfd;
MemTable* mem;
MemTableListVersion* imm;
Version* current;
Expand All @@ -221,8 +222,8 @@ struct SuperVersion {
// that needs to be deleted in to_delete vector. Unrefing those
// objects needs to be done in the mutex
void Cleanup();
void Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current);
void Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
MemTableListVersion* new_imm, Version* new_current);

// The value of dummy is not actually used. kSVInUse takes its address as a
// mark in the thread local storage to indicate the SuperVersion is in use
Expand Down Expand Up @@ -288,6 +289,11 @@ class ColumnFamilyData {
return old_refs == 1;
}

// UnrefAndTryDelete() decreases the reference count and do free if needed,
// return true if this is freed else false, UnrefAndTryDelete() can only
// be called while holding a DB mutex, or during single-threaded recovery.
bool UnrefAndTryDelete();

// SetDropped() can only be called under following conditions:
// 1) Holding a DB mutex,
// 2) from single-threaded write thread, AND
Expand Down
4 changes: 1 addition & 3 deletions db/compaction/compaction.cc
Expand Up @@ -275,9 +275,7 @@ Compaction::~Compaction() {
input_version_->Unref();
}
if (cfd_ != nullptr) {
if (cfd_->Unref()) {
delete cfd_;
}
cfd_->UnrefAndTryDelete();
}
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_filesnapshot.cc
Expand Up @@ -102,7 +102,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");
mutex_.Lock();
cfd->Unref();
cfd->UnrefAndTryDelete();
if (!status.ok()) {
break;
}
Expand Down
15 changes: 5 additions & 10 deletions db/db_impl/db_impl.cc
Expand Up @@ -337,7 +337,7 @@ Status DBImpl::ResumeImpl() {
mutex_.Unlock();
s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery);
mutex_.Lock();
cfd->Unref();
cfd->UnrefAndTryDelete();
if (!s.ok()) {
break;
}
Expand Down Expand Up @@ -425,7 +425,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
mutex_.Unlock();
FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
mutex_.Lock();
cfd->Unref();
cfd->UnrefAndTryDelete();
}
}
}
Expand Down Expand Up @@ -482,17 +482,12 @@ Status DBImpl::CloseHelper() {
while (!flush_queue_.empty()) {
const FlushRequest& flush_req = PopFirstFromFlushQueue();
for (const auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
if (cfd->Unref()) {
delete cfd;
}
iter.first->UnrefAndTryDelete();
}
}
while (!compaction_queue_.empty()) {
auto cfd = PopFirstFromCompactionQueue();
if (cfd->Unref()) {
delete cfd;
}
cfd->UnrefAndTryDelete();
}

if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
Expand Down Expand Up @@ -4303,7 +4298,7 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
}
}
for (auto cfd : cfd_list) {
cfd->Unref();
cfd->UnrefAndTryDelete();
}
}
return s;
Expand Down
26 changes: 8 additions & 18 deletions db/db_impl/db_impl_compaction_flush.cc
Expand Up @@ -1617,12 +1617,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
(flush_reason == FlushReason::kErrorRecovery));
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* tmp_cfd : cfds) {
if (tmp_cfd->Unref()) {
// Only one thread can reach here.
InstrumentedMutexLock lock_guard(&mutex_);
delete tmp_cfd;
}
tmp_cfd->UnrefAndTryDelete();
}
}
TEST_SYNC_POINT("FlushMemTableFinished");
Expand Down Expand Up @@ -1681,7 +1678,7 @@ Status DBImpl::AtomicFlushMemTables(
}
cfd->Ref();
s = SwitchMemtable(cfd, &context);
cfd->Unref();
cfd->UnrefAndTryDelete();
if (!s.ok()) {
break;
}
Expand Down Expand Up @@ -1721,12 +1718,9 @@ Status DBImpl::AtomicFlushMemTables(
}
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
(flush_reason == FlushReason::kErrorRecovery));
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* cfd : cfds) {
if (cfd->Unref()) {
// Only one thread can reach here.
InstrumentedMutexLock lock_guard(&mutex_);
delete cfd;
}
cfd->UnrefAndTryDelete();
}
}
return s;
Expand Down Expand Up @@ -2207,16 +2201,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
*reason = bg_flush_args[0].cfd_->GetFlushReason();
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
if (cfd->Unref()) {
delete cfd;
if (cfd->UnrefAndTryDelete()) {
arg.cfd_ = nullptr;
}
}
}
for (auto cfd : column_families_not_to_flush) {
if (cfd->Unref()) {
delete cfd;
}
cfd->UnrefAndTryDelete();
}
return status;
}
Expand Down Expand Up @@ -2545,10 +2536,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// reference).
// This will all happen under a mutex so we don't have to be afraid of
// somebody else deleting it.
if (cfd->Unref()) {
if (cfd->UnrefAndTryDelete()) {
// This was the last reference of the column family, so no need to
// compact.
delete cfd;
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Expand Up @@ -920,7 +920,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
ColumnFamilyData* cfd;

while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
cfd->Unref();
cfd->UnrefAndTryDelete();
// If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= log_number);
Expand Down
10 changes: 4 additions & 6 deletions db/db_impl/db_impl_write.cc
Expand Up @@ -1254,7 +1254,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
for (const auto cfd : cfds) {
cfd->Ref();
status = SwitchMemtable(cfd, write_context);
cfd->Unref();
cfd->UnrefAndTryDelete();
if (!status.ok()) {
break;
}
Expand Down Expand Up @@ -1333,7 +1333,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
}
cfd->Ref();
status = SwitchMemtable(cfd, write_context);
cfd->Unref();
cfd->UnrefAndTryDelete();
if (!status.ok()) {
break;
}
Expand Down Expand Up @@ -1523,8 +1523,7 @@ Status DBImpl::TrimMemtableHistory(WriteContext* context) {
assert(context->superversion_context.new_superversion.get() != nullptr);
cfd->InstallSuperVersion(&context->superversion_context, &mutex_);

if (cfd->Unref()) {
delete cfd;
if (cfd->UnrefAndTryDelete()) {
cfd = nullptr;
}
}
Expand Down Expand Up @@ -1556,8 +1555,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
if (!cfd->mem()->IsEmpty()) {
status = SwitchMemtable(cfd, context);
}
if (cfd->Unref()) {
delete cfd;
if (cfd->UnrefAndTryDelete()) {
cfd = nullptr;
}
if (!status.ok()) {
Expand Down
8 changes: 2 additions & 6 deletions db/flush_scheduler.cc
Expand Up @@ -60,9 +60,7 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
}

// no longer relevant, retry
if (cfd->Unref()) {
delete cfd;
}
cfd->UnrefAndTryDelete();
}
}

Expand All @@ -80,9 +78,7 @@ bool FlushScheduler::Empty() {
void FlushScheduler::Clear() {
ColumnFamilyData* cfd;
while ((cfd = TakeNextColumnFamily()) != nullptr) {
if (cfd->Unref()) {
delete cfd;
}
cfd->UnrefAndTryDelete();
}
assert(head_.load(std::memory_order_relaxed) == nullptr);
}
Expand Down
9 changes: 2 additions & 7 deletions db/trim_history_scheduler.cc
Expand Up @@ -34,10 +34,7 @@ ColumnFamilyData* TrimHistoryScheduler::TakeNextColumnFamily() {
// success
return cfd;
}
if (cfd->Unref()) {
// no longer relevant, retry
delete cfd;
}
cfd->UnrefAndTryDelete();
}
}

Expand All @@ -49,9 +46,7 @@ bool TrimHistoryScheduler::Empty() {
void TrimHistoryScheduler::Clear() {
ColumnFamilyData* cfd;
while ((cfd = TakeNextColumnFamily()) != nullptr) {
if (cfd->Unref()) {
delete cfd;
}
cfd->UnrefAndTryDelete();
}
assert(Empty());
}
Expand Down

0 comments on commit 94cb303

Please sign in to comment.