Skip to content

Commit

Permalink
WriteThread
Browse files Browse the repository at this point in the history
Summary: This diff just moves the write thread control out of the DBImpl. I will need this as I will control column family data concurrency by only accessing some data in the write thread. That way, we won't have to lock our accesses to column family hash table (mappings from IDs to CFDs).

Test Plan: make check

Reviewers: sdong, yhchiang, ljin

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D23301
  • Loading branch information
igorcanadi committed Sep 12, 2014
1 parent 540a257 commit dee91c2
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 220 deletions.
178 changes: 12 additions & 166 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1915,14 +1915,6 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,

Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& options) {
Writer w(&mutex_);
w.batch = nullptr;
w.sync = false;
w.disableWAL = false;
w.in_batch_group = false;
w.done = false;
w.timeout_hint_us = kNoTimeOut;

Status s;
{
WriteContext context;
Expand All @@ -1933,7 +1925,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
return Status::OK();
}

s = BeginWrite(&w, 0);
WriteThread::Writer w(&mutex_);
s = write_thread_.EnterWriteThread(&w, 0);
assert(s.ok() && !w.done); // No timeout and nobody should do our job

// SetNewMemtableAndNewLogFile() will release and reacquire mutex
Expand All @@ -1942,12 +1935,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
cfd->imm()->FlushRequested();
MaybeScheduleFlushOrCompaction();

assert(!writers_.empty());
assert(writers_.front() == &w);
EndWrite(&w, &w, s);
write_thread_.ExitWriteThread(&w, &w, s);
}


if (s.ok() && options.wait) {
// Wait until the compaction completes
s = WaitForFlushMemTable(cfd);
Expand Down Expand Up @@ -3652,13 +3642,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
edit.DropColumnFamily();
edit.SetColumnFamily(cfd->GetID());

Writer w(&mutex_);
w.batch = nullptr;
w.sync = false;
w.disableWAL = false;
w.in_batch_group = false;
w.done = false;
w.timeout_hint_us = kNoTimeOut;

Status s;
{
Expand All @@ -3668,10 +3651,11 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
}
if (s.ok()) {
// we drop column family from a single write thread
s = BeginWrite(&w, 0);
WriteThread::Writer w(&mutex_);
s = write_thread_.EnterWriteThread(&w, 0);
assert(s.ok() && !w.done); // No timeout and nobody should do our job
s = versions_->LogAndApply(cfd, &edit, &mutex_);
EndWrite(&w, &w, s);
write_thread_.ExitWriteThread(&w, &w, s);
}
}

Expand Down Expand Up @@ -3891,88 +3875,12 @@ Status DBImpl::Delete(const WriteOptions& options,
return DB::Delete(options, column_family, key);
}

// REQUIRES: mutex_ is held
Status DBImpl::BeginWrite(Writer* w, uint64_t expiration_time) {
// the following code block pushes the current writer "w" into the writer
// queue "writers_" and wait until one of the following conditions met:
// 1. the job of "w" has been done by some other writers.
// 2. "w" becomes the first writer in "writers_"
// 3. "w" timed-out.
mutex_.AssertHeld();
writers_.push_back(w);

bool timed_out = false;
while (!w->done && w != writers_.front()) {
if (expiration_time == 0) {
w->cv.Wait();
} else if (w->cv.TimedWait(expiration_time)) {
if (w->in_batch_group) {
// then it means the front writer is currently doing the
// write on behalf of this "timed-out" writer. Then it
// should wait until the write completes.
expiration_time = 0;
} else {
timed_out = true;
break;
}
}
}

if (timed_out) {
#ifndef NDEBUG
bool found = false;
#endif
for (auto iter = writers_.begin(); iter != writers_.end(); iter++) {
if (*iter == w) {
writers_.erase(iter);
#ifndef NDEBUG
found = true;
#endif
break;
}
}
#ifndef NDEBUG
assert(found);
#endif
// writers_.front() might still be in cond_wait without a time-out.
// As a result, we need to signal it to wake it up. Otherwise no
// one else will wake him up, and RocksDB will hang.
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return Status::TimedOut();
}
return Status::OK();
}

// REQUIRES: mutex_ is held
void DBImpl::EndWrite(Writer* w, Writer* last_writer, Status status) {
// Pop out the current writer and all writers being pushed before the
// current writer from the writer queue.
mutex_.AssertHeld();
while (!writers_.empty()) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
}

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!");
}
PERF_TIMER_GUARD(write_pre_and_post_process_time);
Writer w(&mutex_);
WriteThread::Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.disableWAL = options.disableWAL;
Expand All @@ -3983,7 +3891,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
uint64_t expiration_time = 0;
bool has_timeout = false;
if (w.timeout_hint_us == 0) {
w.timeout_hint_us = kNoTimeOut;
w.timeout_hint_us = WriteThread::kNoTimeOut;
} else {
expiration_time = env_->NowMicros() + w.timeout_hint_us;
has_timeout = true;
Expand All @@ -3996,7 +3904,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {

WriteContext context;
mutex_.Lock();
Status status = BeginWrite(&w, expiration_time);
Status status = write_thread_.EnterWriteThread(&w, expiration_time);
assert(status.ok() || status.IsTimedOut());
if (status.IsTimedOut()) {
mutex_.Unlock();
Expand Down Expand Up @@ -4066,10 +3974,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}

uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
WriteThread::Writer* last_writer = &w;
if (status.ok()) {
autovector<WriteBatch*> write_batch_group;
BuildBatchGroup(&last_writer, &write_batch_group);
write_thread_.BuildBatchGroup(&last_writer, &write_batch_group);

// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
Expand Down Expand Up @@ -4161,7 +4069,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
bg_error_ = status; // stop compaction & fail any further writes
}

EndWrite(&w, last_writer, status);
write_thread_.ExitWriteThread(&w, last_writer, status);
mutex_.Unlock();

if (status.IsTimedOut()) {
Expand All @@ -4171,68 +4079,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
return status;
}

// This function will be called only when the first writer succeeds.
// All writers in the to-be-built batch group will be processed.
//
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-nullptr batch
void DBImpl::BuildBatchGroup(Writer** last_writer,
autovector<WriteBatch*>* write_batch_group) {
assert(!writers_.empty());
Writer* first = writers_.front();
assert(first->batch != nullptr);

size_t size = WriteBatchInternal::ByteSize(first->batch);
write_batch_group->push_back(first->batch);

// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128<<10)) {
max_size = size + (128<<10);
}

*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}

if (!w->disableWAL && first->disableWAL) {
// Do not include a write that needs WAL into a batch that has
// WAL disabled.
break;
}

if (w->timeout_hint_us < first->timeout_hint_us) {
// Do not include those writes with shorter timeout. Otherwise, we might
// execute a write that should instead be aborted because of timeout.
break;
}

if (w->batch == nullptr) {
// Do not include those writes with nullptr batch. Those are not writes,
// those are something else. They want to be alone
break;
}

size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// Do not make batch too big
break;
}

write_batch_group->push_back(w->batch);
w->in_batch_group = true;
*last_writer = w;
}
}

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
void DBImpl::DelayWrite(uint64_t expiration_time) {
Expand Down
47 changes: 3 additions & 44 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "db/internal_stats.h"
#include "db/write_controller.h"
#include "db/flush_scheduler.h"
#include "db/write_thread.h"

namespace rocksdb {

Expand Down Expand Up @@ -359,44 +360,6 @@ class DBImpl : public DB {
Status WriteLevel0Table(ColumnFamilyData* cfd, autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber,
LogBuffer* log_buffer);
// Information kept for every waiting writer
struct Writer {
Status status;
WriteBatch* batch;
bool sync;
bool disableWAL;
bool in_batch_group;
bool done;
uint64_t timeout_hint_us;
port::CondVar cv;

explicit Writer(port::Mutex* mu) : cv(mu) {}
};

// Before applying write operation (such as DBImpl::Write, DBImpl::Flush)
// thread should grab the mutex_ and be the first on writers queue.
// BeginWrite is used for it.
// Be aware! Writer's job can be done by other thread (see DBImpl::Write
// for examples), so check it via w.done before applying changes.
//
// Writer* w: writer to be placed in the queue
// uint64_t expiration_time: maximum time to be in the queue
// See also: EndWrite
Status BeginWrite(Writer* w, uint64_t expiration_time);

// After doing write job, we need to remove already used writers from
// writers_ queue and notify head of the queue about it.
// EndWrite is used for this.
//
// Writer* w: Writer, that was added by BeginWrite function
// Writer* last_writer: Since we can join a few Writers (as DBImpl::Write
// does)
// we should pass last_writer as a parameter to
// EndWrite
// (if you don't touch other writers, just pass w)
// Status status: Status of write operation
// See also: BeginWrite
void EndWrite(Writer* w, Writer* last_writer, Status status);

void DelayWrite(uint64_t expiration_time);

Expand All @@ -405,9 +368,6 @@ class DBImpl : public DB {
Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
WriteContext* context);

void BuildBatchGroup(Writer** last_writer,
autovector<WriteBatch*>* write_batch_group);

// Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options);

Expand Down Expand Up @@ -552,8 +512,8 @@ class DBImpl : public DB {

std::unique_ptr<Directory> db_directory_;

// Queue of writers.
std::deque<Writer*> writers_;
WriteThread write_thread_;

WriteBatch tmp_batch_;

WriteController write_controller_;
Expand Down Expand Up @@ -627,7 +587,6 @@ class DBImpl : public DB {
bool flush_on_destroy_; // Used when disableWAL is true.

static const int KEEP_LOG_FILE_NUM = 1000;
static const uint64_t kNoTimeOut = std::numeric_limits<uint64_t>::max();
std::string db_absolute_path_;

// The options to access storage files
Expand Down
14 changes: 4 additions & 10 deletions db/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,15 @@ void DBImpl::TEST_UnlockMutex() {
}

void* DBImpl::TEST_BeginWrite() {
auto w = new Writer(&mutex_);
w->batch = nullptr;
w->sync = false;
w->disableWAL = false;
w->in_batch_group = false;
w->done = false;
w->timeout_hint_us = kNoTimeOut;
Status s = BeginWrite(w, 0);
auto w = new WriteThread::Writer(&mutex_);
Status s = write_thread_.EnterWriteThread(w, 0);
assert(s.ok() && !w->done); // No timeout and nobody should do our job
return reinterpret_cast<void*>(w);
}

void DBImpl::TEST_EndWrite(void* w) {
auto writer = reinterpret_cast<Writer*>(w);
EndWrite(writer, writer, Status::OK());
auto writer = reinterpret_cast<WriteThread::Writer*>(w);
write_thread_.ExitWriteThread(writer, writer, Status::OK());
delete writer;
}

Expand Down

0 comments on commit dee91c2

Please sign in to comment.