Skip to content

Commit

Permalink
Add an option to trigger flush when the number of range deletions rea…
Browse files Browse the repository at this point in the history
…ch a threshold (#11358)

Summary:
Add a mutable column family option `memtable_max_range_deletions`. When non-zero, RocksDB will try to flush the current memtable after it has at least `memtable_max_range_deletions` range deletions. Java API is added and crash test is updated accordingly to randomly enable this option.

Pull Request resolved: facebook/rocksdb#11358

Test Plan:
* New unit test: `DBRangeDelTest.MemtableMaxRangeDeletions`
* Ran crash test `python3 ./tools/db_crashtest.py whitebox --simple --memtable_max_range_deletions=20` and saw logs showing flushed memtables usually with 20 range deletions.

Reviewed By: ajkr

Differential Revision: D46582680

Pulled By: cbi42

fbshipit-source-id: f23d6fa8d8264ecf0a18d55c113ba03f5e2504da
  • Loading branch information
vrdhn authored and rockeet committed Dec 18, 2023
1 parent 7216d6a commit feac267
Show file tree
Hide file tree
Showing 22 changed files with 229 additions and 6 deletions.
36 changes: 36 additions & 0 deletions db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3475,6 +3475,42 @@ TEST_F(DBRangeDelTest, NonBottommostCompactionDropRangetombstone) {
db_->ReleaseSnapshot(snapshot);
}

TEST_F(DBRangeDelTest, MemtableMaxRangeDeletions) {
// Tests option `memtable_max_range_deletions`.
Options options = CurrentOptions();
options.level_compaction_dynamic_file_size = false;
options.memtable_max_range_deletions = 50;
options.level0_file_num_compaction_trigger = 5;
DestroyAndReopen(options);

for (int i = 0; i < 50; ++i) {
// Intentionally delete overlapping ranges to see if the option
// checks number of range tombstone fragments instead.
ASSERT_OK(Put(Key(i), "val1"));
ASSERT_OK(Put(Key(i + 1), "val2"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(i), Key(i + 2)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
}
// One more write to trigger flush.
ASSERT_OK(Put(Key(50), "val"));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(1, NumTableFilesAtLevel(0));

// This should take effect for the next new memtable.
ASSERT_OK(db_->SetOptions({{"memtable_max_range_deletions", "1"}}));
ASSERT_OK(Flush());
ASSERT_EQ(2, NumTableFilesAtLevel(0));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(50), Key(100)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(2, NumTableFilesAtLevel(0));
// One more write to trigger flush.
ASSERT_OK(Put(Key(50), "new val"));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(3, NumTableFilesAtLevel(0));
}
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
8 changes: 7 additions & 1 deletion db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ Status FlushJob::WriteLevel0Table() {
uint64_t total_num_entries = 0, total_num_deletes = 0;
uint64_t total_data_size = 0;
size_t total_memory_usage = 0;
uint64_t total_num_range_deletes = 0;
// Used for testing:
uint64_t mems_size = mems_.size();
(void)mems_size; // avoids unused variable error when
Expand All @@ -874,15 +875,20 @@ Status FlushJob::WriteLevel0Table() {
total_num_deletes += m->num_deletes();
total_data_size += m->get_data_size();
total_memory_usage += m->ApproximateMemoryUsage();
total_num_range_deletes += m->num_range_deletes();
}

// TODO(cbi): when memtable is flushed due to number of range deletions
// hitting limit memtable_max_range_deletions, flush_reason_ is still
// "Write Buffer Full", should make update flush_reason_ accordingly.
event_logger_->Log() << "job" << job_context_->job_id << "event"
<< "flush_started"
<< "num_memtables" << mems_.size() << "num_entries"
<< total_num_entries << "num_deletes"
<< total_num_deletes << "total_data_size"
<< total_data_size << "memory_usage"
<< total_memory_usage << "flush_reason"
<< total_memory_usage << "num_range_deletes"
<< total_num_range_deletes << "flush_reason"
<< GetFlushReasonString(flush_reason_);

{
Expand Down
19 changes: 18 additions & 1 deletion db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
num_entries_(0),
num_deletes_(0),
num_merges_(0),
num_range_deletes_(0),
largest_seqno_(0),
raw_key_size_(0),
raw_value_size_(0),
Expand All @@ -118,7 +119,9 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
ioptions.memtable_insert_with_hint_prefix_extractor.get()),
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0) {
approximate_memory_usage_(0),
memtable_max_range_deletions_(
mutable_cf_options.memtable_max_range_deletions) {
needs_user_key_cmp_in_get_ = table_->NeedsUserKeyCompareInGet();
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
Expand Down Expand Up @@ -175,6 +178,14 @@ size_t MemTable::ApproximateMemoryUsage() {
}

bool MemTable::ShouldFlushNow() {
// This is set if memtable_max_range_deletions is > 0,
// and that many range deletions are done
if (memtable_max_range_deletions_ > 0 &&
num_range_deletes_.load(std::memory_order_relaxed) >=
static_cast<uint64_t>(memtable_max_range_deletions_)) {
return true;
}

size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
// In a lot of times, we cannot allocate arena blocks that exactly matches the
// buffer size. Thus we have to decide if we should over-allocate or
Expand Down Expand Up @@ -704,6 +715,9 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
type == kTypeDeletionWithTimestamp) {
num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
} else if (type == kTypeRangeDeletion) {
uint64_t val = num_range_deletes_.load(std::memory_order_relaxed) + 1;
num_range_deletes_.store(val, std::memory_order_relaxed);
}
else if (type == kTypeMerge) {
num_merges_.store(num_merges_.load(std::memory_order_relaxed) + 1,
Expand Down Expand Up @@ -798,6 +812,7 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
size_t size = cached_range_tombstone_.Size();
if (allow_concurrent) {
post_process_info->num_range_deletes++;
range_del_mutex_.lock();
}
for (size_t i = 0; i < size; ++i) {
Expand All @@ -816,6 +831,7 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
new_local_cache_ref, new_cache.get()),
std::memory_order_relaxed);
}

if (allow_concurrent) {
range_del_mutex_.unlock();
}
Expand Down Expand Up @@ -1255,6 +1271,7 @@ bool MemTable::Get(const LookupKey& key, PinnableSlice* value,
// Avoiding recording stats for speed.
return false;
}

PERF_TIMER_GUARD(get_from_memtable_time);

std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
Expand Down
17 changes: 17 additions & 0 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct MemTablePostProcessInfo {
uint64_t num_entries = 0;
uint64_t num_deletes = 0;
uint64_t num_merges = 0;
uint64_t num_range_deletes = 0;
uint64_t largest_seqno = 0;
uint64_t raw_key_size = 0; // internal key
uint64_t raw_value_size = 0;
Expand Down Expand Up @@ -382,6 +383,10 @@ class MemTable {
}
raw_key_size_.fetch_add(update_counters.raw_key_size, std::memory_order_relaxed);
raw_value_size_.fetch_add(update_counters.raw_value_size, std::memory_order_relaxed);
if (update_counters.num_range_deletes > 0) {
num_range_deletes_.fetch_add(update_counters.num_range_deletes,
std::memory_order_relaxed);
}
UpdateFlushState();
}

Expand All @@ -402,6 +407,13 @@ class MemTable {
return num_merges_.load(std::memory_order_relaxed);
}

// Get total number of range deletions in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
uint64_t num_range_deletes() const {
return num_range_deletes_.load(std::memory_order_relaxed);
}

uint64_t get_data_size() const {
return data_size_.load(std::memory_order_relaxed);
}
Expand Down Expand Up @@ -618,6 +630,7 @@ class MemTable {
std::atomic<uint64_t> num_entries_;
std::atomic<uint64_t> num_deletes_;
std::atomic<uint64_t> num_merges_;
std::atomic<uint64_t> num_range_deletes_;
std::atomic<uint64_t> largest_seqno_;
std::atomic<uint64_t> raw_key_size_;
std::atomic<uint64_t> raw_value_size_;
Expand Down Expand Up @@ -683,6 +696,10 @@ class MemTable {
// Gets refreshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
std::atomic<uint64_t> approximate_memory_usage_;

// max range deletions in a memtable, before automatic flushing, 0 for
// unlimited.
uint32_t memtable_max_range_deletions_ = 0;

// Flush job info of the current memtable.
std::unique_ptr<FlushJobInfo> flush_job_info_;

Expand Down
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ DECLARE_bool(allow_data_in_errors);

DECLARE_bool(enable_thread_tracking);

DECLARE_uint32(memtable_max_range_deletions);

// Tiered storage
DECLARE_bool(enable_tiered_storage); // set last_level_temperature
DECLARE_int64(preclude_last_level_data_seconds);
Expand Down
4 changes: 4 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1102,4 +1102,8 @@ DEFINE_uint64(stats_dump_period_sec,
DEFINE_bool(use_io_uring, false, "Enable the use of IO uring on Posix");
extern "C" bool RocksDbIOUringEnable() { return FLAGS_use_io_uring; }

DEFINE_uint32(memtable_max_range_deletions, 0,
"If nonzero, RocksDB will try to flush the current memtable"
"after the number of range deletions is >= this limit");

#endif // GFLAGS
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3269,6 +3269,8 @@ void InitializeOptionsFromFlags(
options.allow_data_in_errors = FLAGS_allow_data_in_errors;

options.enable_thread_tracking = FLAGS_enable_thread_tracking;

options.memtable_max_range_deletions = FLAGS_memtable_max_range_deletions;
}

void InitializeOptionsGeneral(
Expand Down
11 changes: 11 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,17 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
std::shared_ptr<class CompactionExecutorFactory> compaction_executor_factory;
std::shared_ptr<class AnyPlugin> html_user_key_coder;

// RocksDB will try to flush the current memtable after the number of range
// deletions is >= this limit. For workloads with many range
// deletions, limiting the number of range deletions in memtable can help
// prevent performance degradation and/or OOM caused by too many range
// tombstones in a single memtable.
//
// Default: 0 (disabled)
//
// Dynamically changeable through SetOptions() API
uint32_t memtable_max_range_deletions = 0;

// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options
Expand Down
47 changes: 47 additions & 0 deletions java/rocksjni/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3904,6 +3904,29 @@ jbyte Java_org_rocksdb_Options_prepopulateBlobCache(JNIEnv*, jobject,
opts->prepopulate_blob_cache);
}

/*
* Class: org_rocksdb_Options
* Method: setMemtableMaxRangeDeletions
* Signature: (JI)V
*/
void Java_org_rocksdb_Options_setMemtableMaxRangeDeletions(
JNIEnv*, jobject, jlong jhandle, jint jmemtable_max_range_deletions) {
auto* opts = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
opts->memtable_max_range_deletions =
static_cast<int32_t>(jmemtable_max_range_deletions);
}

/*
* Class: org_rocksdb_Options
* Method: memtableMaxRangeDeletions
* Signature: (J)I
*/
jint Java_org_rocksdb_Options_memtableMaxRangeDeletions(JNIEnv*, jobject,
jlong jhandle) {
auto* opts = reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle);
return static_cast<jint>(opts->memtable_max_range_deletions);
}

//////////////////////////////////////////////////////////////////////////////
// ROCKSDB_NAMESPACE::ColumnFamilyOptions

Expand Down Expand Up @@ -5770,6 +5793,30 @@ jbyte Java_org_rocksdb_ColumnFamilyOptions_prepopulateBlobCache(JNIEnv*,
opts->prepopulate_blob_cache);
}

/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: setMemtableMaxRangeDeletions
* Signature: (JI)V
*/
void Java_org_rocksdb_ColumnFamilyOptions_setMemtableMaxRangeDeletions(
JNIEnv*, jobject, jlong jhandle, jint jmemtable_max_range_deletions) {
auto* opts =
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jhandle);
opts->memtable_max_range_deletions = jmemtable_max_range_deletions;
}

/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: memtableMaxRangeDeletions
* Signature: (J)I
*/
jint Java_org_rocksdb_ColumnFamilyOptions_memtableMaxRangeDeletions(
JNIEnv*, jobject, jlong jhandle) {
auto* opts =
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jhandle);
return static_cast<jint>(opts->memtable_max_range_deletions);
}

/////////////////////////////////////////////////////////////////////
// ROCKSDB_NAMESPACE::DBOptions

Expand Down
13 changes: 13 additions & 0 deletions java/src/main/java/org/rocksdb/ColumnFamilyOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,17 @@ public SstPartitionerFactory sstPartitionerFactory() {
return sstPartitionerFactory_;
}

@Override
public ColumnFamilyOptions setMemtableMaxRangeDeletions(final int count) {
setMemtableMaxRangeDeletions(nativeHandle_, count);
return this;
}

@Override
public int memtableMaxRangeDeletions() {
return memtableMaxRangeDeletions(nativeHandle_);
}

//
// BEGIN options for blobs (integrated BlobDB)
//
Expand Down Expand Up @@ -1498,6 +1509,8 @@ private native void setForceConsistencyChecks(final long handle,
private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle);
private static native void setCompactionThreadLimiter(
final long nativeHandle_, final long compactionThreadLimiterHandle);
private native void setMemtableMaxRangeDeletions(final long handle, final int count);
private native int memtableMaxRangeDeletions(final long handle);

private native void setEnableBlobFiles(final long nativeHandle_, final boolean enableBlobFiles);
private native boolean enableBlobFiles(final long nativeHandle_);
Expand Down
17 changes: 17 additions & 0 deletions java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,23 @@ T setCompressionOptions(
@Experimental("Caution: this option is experimental")
SstPartitionerFactory sstPartitionerFactory();

/**
* Sets the maximum range delete calls, after which memtable is flushed.
* This applies to the mutable memtable.
*
* @param count a positive integer, 0 (default) to disable the feature.
* @return the reference of the current options.
*/
T setMemtableMaxRangeDeletions(final int count);

/**
* Gets the current setting of maximum range deletes allowed
* 0(default) indicates that feature is disabled.
*
* @return current value of memtable_max_range_deletions
*/
int memtableMaxRangeDeletions();

/**
* Compaction concurrent thread limiter for the column family.
* If non-nullptr, use given concurrent thread limiter to control
Expand Down
13 changes: 13 additions & 0 deletions java/src/main/java/org/rocksdb/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,17 @@ public SstPartitionerFactory sstPartitionerFactory() {
return sstPartitionerFactory_;
}

@Override
public Options setMemtableMaxRangeDeletions(final int count) {
setMemtableMaxRangeDeletions(nativeHandle_, count);
return this;
}

@Override
public int memtableMaxRangeDeletions() {
return memtableMaxRangeDeletions(nativeHandle_);
}

@Override
public Options setCompactionThreadLimiter(final ConcurrentTaskLimiter compactionThreadLimiter) {
setCompactionThreadLimiter(nativeHandle_, compactionThreadLimiter.nativeHandle_);
Expand Down Expand Up @@ -2502,6 +2513,8 @@ private native void setAtomicFlush(final long handle,
final boolean atomicFlush);
private native boolean atomicFlush(final long handle);
private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle);
private native void setMemtableMaxRangeDeletions(final long handle, final int count);
private native int memtableMaxRangeDeletions(final long handle);
private static native void setCompactionThreadLimiter(
final long nativeHandle_, final long newLimiterHandle);
private static native void setAvoidUnnecessaryBlockingIO(
Expand Down
10 changes: 10 additions & 0 deletions java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -709,4 +709,14 @@ public void cfPaths() throws IOException {
assertThat(options.cfPaths()).isEqualTo(paths);
}
}

@Test
public void memtableMaxRangeDeletions() {
try (final ColumnFamilyOptions options = new ColumnFamilyOptions()) {
assertThat(options.memtableMaxRangeDeletions()).isEqualTo(0);
final int val = 32;
assertThat(options.setMemtableMaxRangeDeletions(val)).isEqualTo(options);
assertThat(options.memtableMaxRangeDeletions()).isEqualTo(val);
}
}
}
Loading

0 comments on commit feac267

Please sign in to comment.