Skip to content

Commit

Permalink
Support kvs (store key and value separately) on dcpmm
Browse files Browse the repository at this point in the history
  • Loading branch information
peifengsi committed Aug 28, 2019
1 parent 4064b4b commit 56f3867
Show file tree
Hide file tree
Showing 21 changed files with 615 additions and 11 deletions.
14 changes: 9 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ LDFLAGS += -lmemkind
CXXFLAGS += -DBC_ON_DCPMM
endif

ifdef ROCKSDB_KVS_ON_DCPMM
LDFLAGS += -L/usr/local/lib64/ -lpmem -lpmemobj
CXXFLAGS += -DKVS_ON_DCPMM
endif

ifdef ROCKSDB_WAL_ON_DCPMM
LDFLAGS += -L/usr/local/lib/ -L/usr/local/lib64/ -lpmem
CXXFLAGS += -I/usr/local/include -DWAL_ON_DCPMM
endif

# Transform parallel LOG output into something more readable.
perl_command = perl -n \
Expand Down Expand Up @@ -202,11 +211,6 @@ LIB_SOURCES += utilities/env_librados.cc
LDFLAGS += -lrados
endif

ifdef ROCKSDB_WAL_ON_DCPMM
LDFLAGS += -L/usr/local/lib/ -L/usr/local/lib64/ -lpmem
CXXFLAGS += -I/usr/local/include -DWAL_ON_DCPMM
endif

AM_LINK = $(AM_V_CCLD)$(CXX) $^ $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
# detect what platform we're building on
dummy := $(shell (export ROCKSDB_ROOT="$(CURDIR)"; export PORTABLE="$(PORTABLE)"; "$(CURDIR)/build_tools/build_detect_platform" "$(CURDIR)/make_config.mk"))
Expand Down
9 changes: 9 additions & 0 deletions db/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include "table/internal_iterator.h"
#include "util/sync_point.h"

#ifdef KVS_ON_DCPMM
#include "dcpmm/kvs_dcpmm.h"
#endif

#define DEFINITELY_IN_SNAPSHOT(seq, snapshot) \
((seq) <= (snapshot) && \
(snapshot_checker_ == nullptr || \
Expand Down Expand Up @@ -301,6 +305,11 @@ void CompactionIterator::NextFromInput() {
InvokeFilterIfNeeded(&need_skip, &skip_until);
}
} else {
#ifdef KVS_ON_DCPMM
if (ikey_.type == kTypeValue) {
KVSFreeValue(value_);
}
#endif
// Update the current key to reflect the new sequence number/type without
// copying the user key.
// TODO(rven): Compaction filter does not process keys in this path
Expand Down
26 changes: 26 additions & 0 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
#include "util/string_util.h"
#include "util/sync_point.h"

#ifdef KVS_ON_DCPMM
#include "dcpmm/kvs_dcpmm.h"
#endif

namespace rocksdb {

const char* GetCompactionReasonString(CompactionReason compaction_reason) {
Expand Down Expand Up @@ -815,6 +819,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);

#ifdef KVS_ON_DCPMM
size_t dcpmm_extra_value_size = 0;
#endif
uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;

ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
Expand Down Expand Up @@ -933,8 +940,24 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
assert(sub_compact->builder != nullptr);
assert(sub_compact->current_output() != nullptr);
#ifdef KVS_ON_DCPMM
// TODO(Peifeng) dump value content to SST if no available DCPMM space.
if ((sub_compact->compaction->output_level() >= 5) &&
(KVSGetEncoding(value.data()) != kEncodingRawCompressed) &&
(KVSGetEncoding(value.data()) != kEncodingRawUncompressed)) {
auto add = std::bind(&TableBuilder::Add, sub_compact->builder.get(),
key, std::placeholders::_1);
KVSDumpFromValueRef(value.data(), add);
} else {
sub_compact->builder->Add(key, value);
dcpmm_extra_value_size += KVSGetExtraValueSize(value);
}
sub_compact->current_output_file_size = sub_compact->builder->FileSize() +
dcpmm_extra_value_size;
#else
sub_compact->builder->Add(key, value);
sub_compact->current_output_file_size = sub_compact->builder->FileSize();
#endif
sub_compact->current_output()->meta.UpdateBoundaries(
key, c_iter->ikey().sequence);
sub_compact->num_output_records++;
Expand Down Expand Up @@ -980,6 +1003,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&range_del_out_stats, next_key);
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
#ifdef KVS_ON_DCPMM
dcpmm_extra_value_size = 0;
#endif
}
}

Expand Down
24 changes: 24 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@
#include "util/string_util.h"
#include "util/sync_point.h"

#ifdef KVS_ON_DCPMM
#include "dcpmm/kvs_dcpmm.h"
#endif

namespace rocksdb {
const std::string kDefaultColumnFamilyName("default");
void DumpRocksDBBuildVersion(Logger* log);
Expand Down Expand Up @@ -562,6 +566,13 @@ Status DBImpl::CloseHelper() {
env_->UnlockFile(db_lock_);
}

#ifdef KVS_ON_DCPMM
if (env_->pm_pool) {
pmemobj_close(env_->pm_pool);
env_->pm_pool = nullptr;
}
#endif

ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
LogFlush(immutable_db_options_.info_log);

Expand Down Expand Up @@ -1450,6 +1461,19 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
RecordTick(stats_, MEMTABLE_MISS);
}

#ifdef KVS_ON_DCPMM
if (s.ok()) {
enum ValueEncoding type = KVSGetEncoding(pinnable_val->data());
if (type == kEncodingPtrCompressed || type == kEncodingPtrUncompressed) {
KVSDecodeValueRef(pinnable_val->data(), pinnable_val->GetSelf());
if (pinnable_val->IsPinned()) {
pinnable_val->Reset();
}
pinnable_val->PinSelf();
}
}
#endif

{
PERF_TIMER_GUARD(get_post_process_time);

Expand Down
19 changes: 19 additions & 0 deletions db/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "util/rate_limiter.h"
#include "util/sst_file_manager_impl.h"
#include "util/sync_point.h"
#ifdef KVS_ON_DCPMM
#include "dcpmm/kvs_dcpmm.h"
#endif

namespace rocksdb {
Options SanitizeOptions(const std::string& dbname, const Options& src) {
Expand Down Expand Up @@ -1199,6 +1202,22 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
return s;
}

#ifdef KVS_ON_DCPMM
if (impl->immutable_db_options_.dcpmm_kvs_mmapped_file_fullpath != "" &&
impl->env_->pm_pool == NULL) {
impl->env_->pm_pool = KVSOpen(
impl->immutable_db_options_.dcpmm_kvs_mmapped_file_fullpath.data(),
impl->immutable_db_options_.dcpmm_kvs_mmapped_file_size);
if (impl->env_->pm_pool == NULL) {
exit(1);
}
impl->env_->pool_uuid_lo = KVSGetUUID();
KVSSetKVSValueThres(impl->immutable_db_options_.dcpmm_kvs_value_thres);
KVSSetCompressKnob(impl->immutable_db_options_.dcpmm_compress_value);
}
#endif


s = impl->CreateArchivalDirectory();
if (!s.ok()) {
delete impl;
Expand Down
6 changes: 6 additions & 0 deletions db/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,9 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
if (!writer->CallbackFailed()) {
WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true);
#ifdef KVS_ON_DCPMM
WriteBatchInternal::DCPMMMergeActions(merged_batch, writer->batch);
#endif
if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
// We only need to cache the last of such write batch
*to_be_cached_state = writer->batch;
Expand Down Expand Up @@ -839,6 +842,9 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
// since alive_log_files_ might be modified concurrently
alive_log_files_.back().AddSize(log_entry.size());
log_empty_ = false;
#ifdef KVS_ON_DCPMM
WriteBatchInternal::DCPMMPublishActions(&merged_batch);
#endif
return status;
}

Expand Down
24 changes: 24 additions & 0 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,31 @@ class DBIter final: public Iterator {
if (current_entry_is_merged_) {
// If pinned_value_ is set then the result of merge operator is one of
// the merge operands and we should return it.
#ifdef KVS_ON_DCPMM
// TODO(Peifeng) decode the value ref
#endif
return pinned_value_.data() ? pinned_value_ : saved_value_;
} else if (direction_ == kReverse) {
#ifdef KVS_ON_DCPMM
// TODO(Peifeng) decode the value ref
#endif
return pinned_value_;
} else {
#ifdef KVS_ON_DCPMM
enum ValueEncoding type = KVSGetEncoding(iter_.value().data());
if (type == kEncodingPtrCompressed ||
type == kEncodingPtrUncompressed) {
KVSDecodeValueRef(iter_.value().data(), &decoded_value_);
return decoded_value_;
} else if (type == kEncodingRawCompressed) {
// TODO(Peifeng) uncompress
return iter_.value();
} else
// TODO(Peifeng) remove first one encoding byte.
return iter_.value();
#else
return iter_.value();
#endif
}
}
Status status() const override {
Expand Down Expand Up @@ -352,6 +372,10 @@ class DBIter final: public Iterator {
// if this value > 0 iterator will return internal keys
SequenceNumber start_seqnum_;

#ifdef KVS_ON_DCPMM
mutable std::string decoded_value_;
#endif

// No copying allowed
DBIter(const DBIter&);
void operator=(const DBIter&);
Expand Down
19 changes: 19 additions & 0 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
#include "util/string_util.h"
#include "util/util.h"

#ifdef KVS_ON_DCPMM
#include "dcpmm/kvs_dcpmm.h"
#endif

namespace rocksdb {

// anon namespace for file-local types
Expand Down Expand Up @@ -643,8 +647,23 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}

PutLengthPrefixedSlice(&b->rep_, key);
#ifdef KVS_ON_DCPMM
static size_t thres = KVSGetKVSValueThres();
static bool compress = KVSGetCompressKnob();
struct KVSHdr hdr;
if (KVSEnabled() && (value.size() >= thres) &&
KVSEncodeValue(value, compress, &hdr)) {
b->act_.push_back(hdr.pact);
PutLengthPrefixedSlice(&b->rep_, Slice((char*)(&hdr), sizeof(hdr)));
} else {
hdr.base.encoding = kEncodingRawUncompressed;
PutLengthHdrPrefixedSlice(&b->rep_, &(hdr.base), value);
}
#else
PutLengthPrefixedSlice(&b->rep_, value);
#endif
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed);
Expand Down
24 changes: 24 additions & 0 deletions db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include "rocksdb/options.h"
#include "util/autovector.h"

#ifdef KVS_ON_DCPMM
#include "dcpmm/kvs_dcpmm.h"
#endif

namespace rocksdb {

class MemTable;
Expand Down Expand Up @@ -67,6 +71,26 @@ class WriteBatchInternal {
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;

#ifdef KVS_ON_DCPMM
// Merge the action of the write batch
static void DCPMMMergeActions(WriteBatch *merged_batch, WriteBatch *batch) {
merged_batch->act_.insert(merged_batch->act_.end(), batch->act_.begin(),
batch->act_.end());
batch->act_.clear();
}

// Publish all the actions of a grouped batch
static void DCPMMPublishActions(const WriteBatch* batch) {
std::vector<pobj_action *> a = batch->act_;
std::vector<pobj_action *>::iterator it;
for (it = a.begin(); it != a.end(); ++it) {
KVSPublish((struct pobj_action *)*it, 1);
free((void *)*it);
}
batch->act_.clear();
}
#endif

// WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
static Status Put(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);
Expand Down
1 change: 0 additions & 1 deletion dcpmm/env_dcpmm.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#define UNUSED(x) ((void)(x))

#ifdef WAL_ON_DCPMM

namespace rocksdb {

class DCPMMEnv : public EnvWrapper {
Expand Down
Loading

0 comments on commit 56f3867

Please sign in to comment.