Skip to content

Commit

Permalink
add multibatch write into memtable (#131)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

Support write multiple WriteBatch into RocksDB together. These WriteBatch will be assigned sequence number in order and pushed into queue. If a thread is waiting for some state, it could steal some job from work queue.
  • Loading branch information
Little-Wallace authored and yiwu-arbug committed Jan 7, 2020
1 parent 4dcfb87 commit ebec1bd
Show file tree
Hide file tree
Showing 20 changed files with 557 additions and 33 deletions.
12 changes: 12 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ class DBImpl : public DB {
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override;

using DB::MultiThreadWrite;
virtual Status MultiThreadWrite(
const WriteOptions& options,
const std::vector<WriteBatch*>& updates) override;

using DB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
Expand Down Expand Up @@ -1018,6 +1023,13 @@ class DBImpl : public DB {
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);

Status MultiThreadWriteImpl(const WriteOptions& write_options,
const autovector<WriteBatch*>& my_batch,
WriteCallback* callback,
uint64_t* log_used = nullptr,
uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
Expand Down
10 changes: 10 additions & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.avoid_flush_during_recovery = false;
}

// multi thread write do not support two-write-que or write in pipeline
if (result.two_write_queues || result.allow_2pc) {
result.enable_multi_thread_write = false;
}

if (result.enable_multi_thread_write) {
result.enable_pipelined_write = true;
result.allow_concurrent_memtable_write = true;
}

#ifndef ROCKSDB_LITE
ImmutableDBOptions immutable_db_options(result);
if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
Expand Down
176 changes: 166 additions & 10 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl/db_impl.h"

#include <cinttypes>
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>

#include <iostream>

#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "monitoring/perf_context_imp.h"
Expand Down Expand Up @@ -60,6 +66,146 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
}
#endif // ROCKSDB_LITE

Status DBImpl::MultiThreadWrite(const WriteOptions& options,
const std::vector<WriteBatch*>& updates) {
if (immutable_db_options_.enable_multi_thread_write) {
if (UNLIKELY(updates.empty())) {
return Status::OK();
}
autovector<WriteBatch*> batches;
for (auto w : updates) {
batches.push_back(w);
}
return MultiThreadWriteImpl(options, batches, nullptr, nullptr);
} else {
return Status::NotSupported();
}
}

Status DBImpl::MultiThreadWriteImpl(const WriteOptions& write_options,
const autovector<WriteBatch*>& updates,
WriteCallback* callback, uint64_t* log_used,
uint64_t log_ref, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
WriteThread::Writer writer(write_options, updates, callback, log_ref);
write_thread_.JoinBatchGroup(&writer);

WriteContext write_context;
bool ignore_missing_faimly = write_options.ignore_missing_column_families;
if (writer.state == WriteThread::STATE_GROUP_LEADER) {
if (writer.callback && !writer.callback->AllowWriteBatching()) {
write_thread_.WaitForMemTableWriters();
}
WriteThread::WriteGroup wal_write_group;
mutex_.Lock();
bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
PERF_TIMER_STOP(write_pre_and_post_process_time);
writer.status =
PreprocessWrite(write_options, &need_log_sync, &write_context);
PERF_TIMER_START(write_pre_and_post_process_time);
log::Writer* log_writer = logs_.back().writer;
mutex_.Unlock();

// This can set non-OK status if callback fail.
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&writer, &wal_write_group);
const SequenceNumber current_sequence =
write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
size_t total_count = 0;
size_t total_byte_size = 0;
size_t valid_batches = 0;
auto stats = default_cf_internal_stats_;
if (writer.status.ok()) {
SequenceNumber next_sequence = current_sequence;
for (auto w : wal_write_group) {
if (w->CheckCallback(this)) {
if (w->ShouldWriteToMemtable()) {
w->sequence = next_sequence;
size_t count = WriteBatchInternal::Count(w->batches);
next_sequence += count;
total_count += count;
}
valid_batches += 1;
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(w->batches));
}
}
if (writer.disable_wal) {
has_unpersisted_data_.store(true, std::memory_order_relaxed);
}
write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);

PERF_TIMER_STOP(write_pre_and_post_process_time);
if (!write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
if (wal_write_group.size > 1) {
stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
wal_write_group.size - 1);

RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
}
writer.status =
WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, current_sequence);
}
}
if (!writer.CallbackFailed()) {
WriteStatusCheck(writer.status);
}

if (need_log_sync) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, writer.status);
mutex_.Unlock();
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status);
}
if (writer.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
assert(writer.ShouldWriteToMemtable());
WriteThread::WriteGroup memtable_write_group;
write_thread_.EnterAsMemTableWriter(&writer, &memtable_write_group);
assert(immutable_db_options_.allow_concurrent_memtable_write);
auto version_set = versions_->GetColumnFamilySet();
memtable_write_group.running.store(0);
for (auto it = memtable_write_group.begin();
it != memtable_write_group.end(); ++it) {
if (!it.writer->ShouldWriteToMemtable()) {
continue;
}
WriteBatchInternal::AsyncInsertInto(
it.writer, it.writer->sequence, version_set, &flush_scheduler_,
ignore_missing_faimly, this,
&write_thread_.write_queue_);
}
while (memtable_write_group.running.load(std::memory_order_acquire) > 0) {
std::function<void()> work;
if (write_thread_.write_queue_.PopFront(work)) {
work();
} else {
std::this_thread::yield();
}
}
MemTableInsertStatusCheck(memtable_write_group.status);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&writer, memtable_write_group);
}
if (seq_used != nullptr) {
*seq_used = writer.sequence;
}
assert(writer.state == WriteThread::STATE_COMPLETED);
return writer.status;
}

// The main write queue. This is the only write queue that updates LastSequence.
// When using one write queue, the same sequence also indicates the last
// published sequence.
Expand Down Expand Up @@ -146,6 +292,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return status;
}

if (immutable_db_options_.enable_multi_thread_write) {
autovector<WriteBatch*> updates;
updates.push_back(my_batch);
return MultiThreadWriteImpl(write_options, updates, callback, log_used,
log_ref, seq_used);
}

if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable, seq_used);
Expand Down Expand Up @@ -918,11 +1071,12 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
auto* leader = write_group.leader;
assert(!leader->disable_wal); // Same holds for all in the batch group
if (write_group.size == 1 && !leader->CallbackFailed() &&
leader->batch->GetWalTerminationPoint().is_cleared()) {
leader->batches.size() == 1 &&
leader->batches[0]->GetWalTerminationPoint().is_cleared()) {
// we simply write the first WriteBatch to WAL if the group only
// contains one batch, that batch should be written to the WAL,
// and the batch is not wanting to be truncated
merged_batch = leader->batch;
merged_batch = leader->batches[0];
if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
*to_be_cached_state = merged_batch;
}
Expand All @@ -934,13 +1088,15 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
merged_batch = tmp_batch;
for (auto writer : write_group) {
if (!writer->CallbackFailed()) {
WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true);
if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
// We only need to cache the last of such write batch
*to_be_cached_state = writer->batch;
for (auto b : writer->batches) {
WriteBatchInternal::Append(merged_batch, b,
/*WAL_only*/ true);
if (WriteBatchInternal::IsLatestPersistentState(b)) {
// We only need to cache the last of such write batch
*to_be_cached_state = b;
}
(*write_with_wal)++;
}
(*write_with_wal)++;
}
}
}
Expand Down Expand Up @@ -993,7 +1149,7 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
WriteBatch* to_be_cached_state = nullptr;
WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
&write_with_wal, &to_be_cached_state);
if (merged_batch == write_group.leader->batch) {
if (merged_batch == write_group.leader->batches[0]) {
write_group.leader->log_used = logfile_number_;
} else if (write_with_wal > 1) {
for (auto writer : write_group) {
Expand Down
3 changes: 3 additions & 0 deletions db/db_properties_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ TEST_F(DBPropertiesTest, Empty) {
options.write_buffer_size = 100000; // Small write buffer
options.allow_concurrent_memtable_write = false;
options = CurrentOptions(options);
if (options.enable_multi_thread_write) {
continue;
}
CreateAndReopenWithCF({"pikachu"}, options);

std::string num;
Expand Down
5 changes: 5 additions & 0 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,11 @@ Options DBTestBase::GetOptions(
options.enable_pipelined_write = true;
break;
}
case kMultiThreadWrite: {
options.enable_multi_thread_write = true;
options.enable_pipelined_write = true;
break;
}
case kConcurrentWALWrites: {
// This options optimize 2PC commit path
options.two_write_queues = true;
Expand Down
1 change: 1 addition & 0 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ class DBTestBase : public testing::Test {
kConcurrentSkipList = 28,
kPipelinedWrite = 29,
kConcurrentWALWrites = 30,
kMultiThreadWrite = 31,
kDirectIO,
kLevelSubcompactions,
kBlockBasedTableWithIndexRestartInterval,
Expand Down
61 changes: 60 additions & 1 deletion db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,69 @@ TEST_P(DBWriteTest, LockWalInEffect) {
ASSERT_OK(dbfull()->UnlockWAL());
}

TEST_P(DBWriteTest, MultiThreadWrite) {
Options options = GetOptions();
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new FaultInjectionTestEnv(Env::Default()));
if (!options.enable_multi_thread_write) {
return;
}
constexpr int kNumThreads = 8;
options.env = mock_env.get();
options.write_buffer_size = 1024 * 128;
Reopen(options);
std::vector<port::Thread> threads;
for (int t = 0; t < kNumThreads; t++) {
threads.push_back(port::Thread(
[&](int index) {
WriteOptions opt;
for (int j = 0; j < 64; j++) {
std::vector<WriteBatch*> batches;
for (int i = 0; i < 4; i++) {
WriteBatch* batch = new WriteBatch;
for (int k = 0; k < 64; k++) {
batch->Put("key_" + ToString(index) + "_" + ToString(j) + "_" +
ToString(i) + "_" + ToString(k),
"value" + ToString(k));
}
batches.push_back(batch);
}
dbfull()->MultiThreadWrite(opt, batches);
for (auto b : batches) {
delete b;
}
}
},
t));
}
for (int i = 0; i < kNumThreads; i++) {
threads[i].join();
}
ReadOptions opt;
for (int t = 0; t < kNumThreads; t++) {
std::string value;
for (int i = 0; i < 64; i++) {
for (int j = 0; j < 4; j++) {
for (int k = 0; k < 64; k++) {
ASSERT_OK(dbfull()->Get(opt,
"key_" + ToString(t) + "_" + ToString(i) +
"_" + ToString(j) + "_" + ToString(k),
&value));
std::string expected_value = "value" + ToString(k);
ASSERT_EQ(expected_value, value);
}
}
}
}

Close();
}

INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault,
DBTestBase::kConcurrentWALWrites,
DBTestBase::kPipelinedWrite));
DBTestBase::kPipelinedWrite,
DBTestBase::kMultiThreadWrite));

} // namespace rocksdb

Expand Down
Loading

0 comments on commit ebec1bd

Please sign in to comment.