Skip to content

Commit

Permalink
options: change the default value for compaction threads to 8 (#169)
Browse files Browse the repository at this point in the history
This is done through the deprecated `max_background_compactions` option,
which overrides `max_background_jobs`, so in order to avoid having an
adverse effect on users who set `max_background_jobs` to a high value
we try to detect that case and shift back to the user's choice in case
there are enough background jobs to be enough for both flushes and
compactions.
  • Loading branch information
isaac-io committed Oct 19, 2022
1 parent 3132712 commit 09f9928
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 89 deletions.
28 changes: 20 additions & 8 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3128,8 +3128,6 @@ TEST_P(ColumnFamilyTest, IteratorCloseWALFile2) {
#ifndef ROCKSDB_LITE // TEST functions are not supported in lite
TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
SpecialEnv env(Env::Default());
// Allow both of flush and purge job to schedule.
env.SetBackgroundThreads(2, Env::HIGH);
db_options_.env = &env;
db_options_.max_background_flushes = 1;
column_family_options_.memtable_factory.reset(
Expand Down Expand Up @@ -3163,9 +3161,8 @@ TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"ColumnFamilyTest::IteratorCloseWALFile2:0",
"DBImpl::BGWorkPurge:start"},
{"ColumnFamilyTest::IteratorCloseWALFile2:2",
{"ColumnFamilyTest::IteratorCloseWALFile2:1",
"DBImpl::BackgroundCallFlush:start"},
{"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

Expand All @@ -3177,22 +3174,37 @@ TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
ASSERT_EQ(2, env.num_open_wal_file_.load());
// Deleting the iterator will clear its super version, triggering
// closing all files
it->Seek("");
it->Seek(""); // purge (x2)
ASSERT_OK(it->status());

ASSERT_EQ(2, env.num_open_wal_file_.load());
ASSERT_EQ(0, env.delete_count_.load());

TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");

// Fill the low priority pool in order to ensure that all background purges
// finished before we continue
std::vector<test::SleepingBackgroundTask> sleeping_tasks(
std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)));
for (auto& task : sleeping_tasks) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &task,
Env::Priority::LOW);
task.WaitUntilSleeping();
}
// Release and wait for all of the tasks to finish
for (auto& task : sleeping_tasks) {
task.WakeUp();
task.WaitUntilDone();
}

ASSERT_EQ(1, env.num_open_wal_file_.load());
ASSERT_EQ(1, env.delete_count_.load());
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
WaitForFlush(1);
ASSERT_EQ(1, env.num_open_wal_file_.load());
ASSERT_EQ(1, env.delete_count_.load());

delete it;
delete it; // purge
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();

Reopen();
Expand Down
61 changes: 41 additions & 20 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2848,6 +2848,7 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
options.max_bytes_for_level_multiplier = 2;
options.compression = kNoCompression;
options.max_subcompactions = max_subcompactions_;
options.max_background_compactions = 1;

env_->SetBackgroundThreads(1, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW);
Expand Down Expand Up @@ -2945,17 +2946,22 @@ TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
ASSERT_EQ("0,1", FilesPerLevel(0));

// block compactions
test::SleepingBackgroundTask sleeping_task;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::LOW);
std::vector<test::SleepingBackgroundTask> sleeping_tasks(
std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)));
for (auto& task : sleeping_tasks) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &task,
Env::Priority::LOW);
}

options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
Reopen(options);
std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
ASSERT_EQ("0,1", FilesPerLevel(0));
// let compactions go
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
for (auto& task : sleeping_tasks) {
task.WakeUp();
task.WaitUntilDone();
}

// this should execute L1->L2 (move)
ASSERT_OK(dbfull()->TEST_WaitForCompact());
Expand Down Expand Up @@ -7119,9 +7125,12 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) {
Reopen(options);

// Block compaction queue
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
std::vector<test::SleepingBackgroundTask> sleeping_task_low(
std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)));
for (auto& sleeping_task : sleeping_task_low) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::LOW);
}

// generate files, but avoid trigger auto compaction
for (int i = 0; i < kNumL0Files / 2; i++) {
Expand Down Expand Up @@ -7156,8 +7165,10 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) {
// CompactRange should return before the compaction has the chance to run
compact_thread.join();

sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
for (auto& sleeping_task : sleeping_task_low) {
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
}
ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
ASSERT_EQ("0,1", FilesPerLevel(0));
}
Expand All @@ -7176,9 +7187,12 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {
Reopen(options);

// Block compaction queue
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
std::vector<test::SleepingBackgroundTask> sleeping_task_low(
std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)));
for (auto& sleeping_task : sleeping_task_low) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::LOW);
}

// generate files, but avoid trigger auto compaction
for (int i = 0; i < kNumL0Files / 2; i++) {
Expand Down Expand Up @@ -7218,8 +7232,10 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {
auto s = db_->Close();
ASSERT_OK(s);

sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
for (auto& sleeping_task : sleeping_task_low) {
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
}
}

TEST_F(DBCompactionTest, DBCloseWithManualCompaction) {
Expand All @@ -7236,9 +7252,12 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) {
Reopen(options);

// Block compaction queue
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
std::vector<test::SleepingBackgroundTask> sleeping_task_low(
std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)));
for (auto& sleeping_task : sleeping_task_low) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::LOW);
}

// generate files, but avoid trigger auto compaction
for (int i = 0; i < kNumL0Files / 2; i++) {
Expand Down Expand Up @@ -7275,8 +7294,10 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) {
// manual compaction thread should return with Incomplete().
compact_thread.join();

sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
for (auto& sleeping_task : sleeping_task_low) {
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
}
}

TEST_F(DBCompactionTest,
Expand Down
3 changes: 3 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
Reopen(options);
env_->SetBackgroundThreads(0, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW);

std::thread::id tid;
int num_flushes = 0, num_compactions = 0;
Expand Down Expand Up @@ -1686,6 +1687,7 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
options.create_if_missing = true;
options.listeners.push_back(listener);
// Setting max_flush_jobs = max_background_jobs / 4 = 2.
options.max_background_flushes = options.max_background_compactions = -1;
options.max_background_jobs = 8;
// Allow 2 immutable memtables.
options.max_write_buffer_number = 3;
Expand Down Expand Up @@ -2706,6 +2708,7 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) {
options.env = fault_injection_env.get();
// Set a larger value than default so that RocksDB can schedule concurrent
// background flush threads.
options.max_background_flushes = options.max_background_compactions = -1;
options.max_background_jobs = 8;
options.max_write_buffer_number = 8;
CreateAndReopenWithCF({"pikachu"}, options);
Expand Down
6 changes: 5 additions & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2492,7 +2492,11 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
int max_background_jobs,
bool parallelize_compactions) {
BGJobLimits res;
if (max_background_flushes == -1 && max_background_compactions == -1) {
const int flushes = std::max(1, max_background_flushes);
const int compactions = std::max(1, max_background_compactions);

if ((max_background_flushes == -1 && max_background_compactions == -1) ||
(max_background_jobs > flushes + compactions)) {
// for our first stab implementing max_background_jobs, simply allocate a
// quarter of the threads to flushes.
res.max_flushes = std::max(1, max_background_jobs / 4);
Expand Down
3 changes: 3 additions & 0 deletions db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,9 @@ TEST_F(DBOptionsTest, SetBackgroundJobs) {
Options options;
options.create_if_missing = true;
options.max_background_jobs = 8;
options.max_background_compactions = options.max_background_flushes = -1;
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
env_->SetBackgroundThreads(1, Env::Priority::LOW);
options.env = env_;
Reopen(options);

Expand Down
Loading

0 comments on commit 09f9928

Please sign in to comment.