Skip to content

Commit

Permalink
Fix crash when background task fails (facebook#5879)
Browse files Browse the repository at this point in the history
Summary:
Fixing crash. Full story in issue: facebook#5878
Pull Request resolved: facebook#5879

Differential Revision: D17812299

Pulled By: anand1976

fbshipit-source-id: 14e5a4fc502ade974583da9692d0ed6e5014613a
  • Loading branch information
koldat authored and facebook-github-bot committed Oct 8, 2019
1 parent 7145baf commit 29274ed
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 19 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Fix a bug in file ingestion caused by incorrect file number allocation when the number of column families involved in the ingestion exceeds 2.
* Fix a bug when format_version=3, partitioned fitlers, and prefix search are used in conjunction. The bug could result into Seek::(prefix) returning NotFound for an existing prefix.
* Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strong results when reseek happens with a different iterator upper bound.
* Fix a bug causing a crash during ingest external file when background compaction cause severe error (file not found).
### New Features
* Introduced DBOptions::max_write_batch_group_size_bytes to configure maximum limit on number of bytes that are written in a single batch of WAL or memtable write. It is followed when the leader write size is larger than 1/8 of this limit.
* VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting.
Expand Down
22 changes: 13 additions & 9 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2904,8 +2904,11 @@ DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
}

void DBImpl::ReleaseFileNumberFromPendingOutputs(
std::list<uint64_t>::iterator v) {
pending_outputs_.erase(v);
std::unique_ptr<std::list<uint64_t>::iterator>& v) {
if (v.get() != nullptr) {
pending_outputs_.erase(*v.get());
v.reset();
}
}

#ifndef ROCKSDB_LITE
Expand Down Expand Up @@ -3744,15 +3747,15 @@ Status DBImpl::IngestExternalFiles(

// TODO (yanqin) maybe handle the case in which column_families have
// duplicates
std::list<uint64_t>::iterator pending_output_elem;
std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
size_t total = 0;
for (const auto& arg : args) {
total += arg.external_files.size();
}
uint64_t next_file_number = 0;
Status status = ReserveFileNumbersBeforeIngestion(
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
&pending_output_elem, &next_file_number);
pending_output_elem, &next_file_number);
if (!status.ok()) {
InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
Expand Down Expand Up @@ -4026,7 +4029,7 @@ Status DBImpl::CreateColumnFamilyWithImport(
SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
VersionEdit dummy_edit;
uint64_t next_file_number = 0;
std::list<uint64_t>::iterator pending_output_elem;
std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);
Expand All @@ -4036,7 +4039,8 @@ Status DBImpl::CreateColumnFamilyWithImport(
}

// Make sure that bg cleanup wont delete the files that we are importing
pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
pending_output_elem.reset(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));

if (status.ok()) {
// If crash happen after a hard link established, Recover function may
Expand Down Expand Up @@ -4254,18 +4258,18 @@ Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,

Status DBImpl::ReserveFileNumbersBeforeIngestion(
ColumnFamilyData* cfd, uint64_t num,
std::list<uint64_t>::iterator* pending_output_elem,
std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
uint64_t* next_file_number) {
Status s;
SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
assert(nullptr != pending_output_elem);
assert(nullptr != next_file_number);
InstrumentedMutexLock l(&mutex_);
if (error_handler_.IsDBStopped()) {
// Do not ingest files when there is a bg_error
return error_handler_.GetBGError();
}
*pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
pending_output_elem.reset(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
*next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
auto cf_options = cfd->GetLatestMutableCFOptions();
VersionEdit dummy_edit;
Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1313,7 +1313,8 @@ class DBImpl : public DB {
// created between the calls CaptureCurrentFileNumberInPendingOutputs() and
// ReleaseFileNumberFromPendingOutputs() can now be deleted (if it's not live
// and blocked by any other pending_outputs_ calls)
void ReleaseFileNumberFromPendingOutputs(std::list<uint64_t>::iterator v);
void ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v);

Status SyncClosedLogs(JobContext* job_context);

Expand Down Expand Up @@ -1605,7 +1606,7 @@ class DBImpl : public DB {
// Write a version edit to the MANIFEST.
Status ReserveFileNumbersBeforeIngestion(
ColumnFamilyData* cfd, uint64_t num,
std::list<uint64_t>::iterator* pending_output_elem,
std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
uint64_t* next_file_number);
#endif //! ROCKSDB_LITE

Expand Down
15 changes: 9 additions & 6 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -969,8 +969,9 @@ Status DBImpl::CompactFilesImpl(
GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);

auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));

assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJobStats compaction_job_stats;
Expand Down Expand Up @@ -2216,8 +2217,9 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
assert(bg_flush_scheduled_);
num_running_flushes_++;

auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
std::unique_ptr<std::list<uint64_t>::iterator>
pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
FlushReason reason;

Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
Expand Down Expand Up @@ -2298,8 +2300,9 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,

num_running_compactions_++;

auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
std::unique_ptr<std::list<uint64_t>::iterator>
pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));

assert((bg_thread_pri == Env::Priority::BOTTOM &&
bg_bottom_compaction_scheduled_) ||
Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1136,8 +1136,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
ReadOptions ro;
ro.total_order_seek = true;
Expand Down

0 comments on commit 29274ed

Please sign in to comment.