Skip to content

Commit

Permalink
Extend MultiGet batching to Transactions (#5210)
Browse files Browse the repository at this point in the history
Summary:
MultiGet batching was implemented in #5011 in order to reduce CPU utilization when looking up multiple keys at once. This PR implements corresponding ```MultiGet``` and ```MultiGetSingleCFForUpdate``` in ```rocksdb::Transaction``` that call the underlying batching implementation.
Pull Request resolved: facebook/rocksdb#5210

Differential Revision: D15048164

Pulled By: anand1976

fbshipit-source-id: c52f6043102ab0cbc723f4cba2a7b7d1767f6f52
  • Loading branch information
anand76 authored and facebook-github-bot committed Apr 23, 2019
1 parent a7d1031 commit 1c8cbf3
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 2 deletions.
13 changes: 13 additions & 0 deletions include/rocksdb/utilities/transaction.h
Expand Up @@ -205,6 +205,19 @@ class Transaction {
const std::vector<Slice>& keys,
std::vector<std::string>* values) = 0;

// Batched version of MultiGet - see DBImpl::MultiGet(). Sub-classes are
// expected to override this with an implementation that calls
// DBImpl::MultiGet()
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool /*sorted_input*/ = false) {
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = Get(options, column_family, keys[i], &values[i]);
}
}

// Read this key and ensure that this transaction will only
// be able to be committed if this key is not written outside this
// transaction after it has first been read (or after the snapshot if a
Expand Down
12 changes: 12 additions & 0 deletions include/rocksdb/utilities/write_batch_with_index.h
Expand Up @@ -14,6 +14,7 @@

#include <memory>
#include <string>
#include <vector>

#include "rocksdb/comparator.h"
#include "rocksdb/iterator.h"
Expand Down Expand Up @@ -207,6 +208,12 @@ class WriteBatchWithIndex : public WriteBatchBase {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value);

void MultiGetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
bool sorted_input);

// Records the state of the batch for future calls to RollbackToSavePoint().
// May be called multiple times to set multiple save points.
void SetSavePoint() override;
Expand Down Expand Up @@ -246,6 +253,11 @@ class WriteBatchWithIndex : public WriteBatchBase {
Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, ReadCallback* callback);
void MultiGetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
bool sorted_input, ReadCallback* callback);
struct Rep;
std::unique_ptr<Rep> rep;
};
Expand Down
10 changes: 10 additions & 0 deletions utilities/transactions/transaction_base.cc
Expand Up @@ -281,6 +281,16 @@ std::vector<Status> TransactionBaseImpl::MultiGet(
return stat_list;
}

void TransactionBaseImpl::MultiGet(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
bool sorted_input) {
write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
num_keys, keys, values, statuses,
sorted_input);
}

std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
Expand Down
8 changes: 7 additions & 1 deletion utilities/transactions/transaction_base.h
Expand Up @@ -47,7 +47,7 @@ class TransactionBaseImpl : public Transaction {
void SetSavePoint() override;

Status RollbackToSavePoint() override;

Status PopSavePoint() override;

using Transaction::Get;
Expand Down Expand Up @@ -80,6 +80,7 @@ class TransactionBaseImpl : public Transaction {
exclusive, do_validate);
}

using Transaction::MultiGet;
std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
Expand All @@ -94,6 +95,11 @@ class TransactionBaseImpl : public Transaction {
keys, values);
}

void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
Status* statuses, bool sorted_input = false) override;

using Transaction::MultiGetForUpdate;
std::vector<Status> MultiGetForUpdate(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
Expand Down
87 changes: 86 additions & 1 deletion utilities/transactions/transaction_test.cc
Expand Up @@ -2676,7 +2676,6 @@ TEST_P(TransactionTest, ColumnFamiliesTest) {
handles[0], handles[2]};
std::vector<Slice> multiget_keys = {"AAA", "AAAZZZ", "foo", "foo"};
std::vector<std::string> values(4);

std::vector<Status> results = txn->MultiGetForUpdate(
snapshot_read_options, multiget_cfh, multiget_keys, &values);
ASSERT_OK(results[0]);
Expand Down Expand Up @@ -2736,6 +2735,92 @@ TEST_P(TransactionTest, ColumnFamiliesTest) {
}
}

TEST_P(TransactionTest, MultiGetBatchedTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
TransactionOptions txn_options;
string value;
Status s;

ColumnFamilyHandle* cf;
ColumnFamilyOptions cf_options;

// Create a new column families
s = db->CreateColumnFamily(cf_options, "CF", &cf);
ASSERT_OK(s);

delete cf;
delete db;
db = nullptr;

// open DB with three column families
std::vector<ColumnFamilyDescriptor> column_families;
// have to open default column family
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
// open the new column families
cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
column_families.push_back(ColumnFamilyDescriptor("CF", cf_options));

std::vector<ColumnFamilyHandle*> handles;

options.merge_operator = MergeOperators::CreateStringAppendOperator();
s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
&handles, &db);
assert(db != nullptr);
ASSERT_OK(s);

// Write some data to the db
WriteBatch batch;
batch.Put(handles[1], "aaa", "val1");
batch.Put(handles[1], "bbb", "val2");
batch.Put(handles[1], "ccc", "val3");
batch.Put(handles[1], "ddd", "foo");
batch.Put(handles[1], "eee", "val5");
batch.Put(handles[1], "fff", "val6");
batch.Merge(handles[1], "ggg", "foo");
s = db->Write(write_options, &batch);
ASSERT_OK(s);

Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);

txn->SetSnapshot();
snapshot_read_options.snapshot = txn->GetSnapshot();

txn_options.set_snapshot = true;
// Write some data to the db
s = txn->Delete(handles[1], "bbb");
ASSERT_OK(s);
s = txn->Put(handles[1], "ccc", "val3_new");
ASSERT_OK(s);
s = txn->Merge(handles[1], "ddd", "bar");
ASSERT_OK(s);

std::vector<Slice> keys = {"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg"};
std::vector<PinnableSlice> values(keys.size());
std::vector<Status> statuses(keys.size());

txn->MultiGet(snapshot_read_options, handles[1], keys.size(), keys.data(),
values.data(), statuses.data());
ASSERT_TRUE(statuses[0].ok());
ASSERT_EQ(values[0], "val1");
ASSERT_TRUE(statuses[1].IsNotFound());
ASSERT_TRUE(statuses[2].ok());
ASSERT_EQ(values[2], "val3_new");
ASSERT_TRUE(statuses[3].IsMergeInProgress());
ASSERT_TRUE(statuses[4].ok());
ASSERT_EQ(values[4], "val5");
ASSERT_TRUE(statuses[5].ok());
ASSERT_EQ(values[5], "val6");
ASSERT_TRUE(statuses[6].ok());
ASSERT_EQ(values[6], "foo");
delete txn;
for (auto handle : handles) {
delete handle;
}
}

TEST_P(TransactionTest, ColumnFamiliesTest2) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
Expand Down
103 changes: 103 additions & 0 deletions utilities/write_batch_with_index/write_batch_with_index.cc
Expand Up @@ -922,6 +922,109 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(
return s;
}

void WriteBatchWithIndex::MultiGetFromBatchAndDB(
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
Status* statuses, bool sorted_input) {
MultiGetFromBatchAndDB(db, read_options, column_family, num_keys, keys,
values, statuses, sorted_input, nullptr);
}

void WriteBatchWithIndex::MultiGetFromBatchAndDB(
DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
Status* statuses, bool sorted_input, ReadCallback* callback) {
const ImmutableDBOptions& immuable_db_options =
static_cast_with_check<DBImpl, DB>(db->GetRootDB())
->immutable_db_options();

autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
// To hold merges from the write batch
autovector<std::pair<WriteBatchWithIndexInternal::Result, MergeContext>,
MultiGetContext::MAX_BATCH_SIZE>
merges;
// Since the lifetime of the WriteBatch is the same as that of the transaction
// we cannot pin it as otherwise the returned value will not be available
// after the transaction finishes.
for (size_t i = 0; i < num_keys; ++i) {
MergeContext merge_context;
PinnableSlice* pinnable_val = &values[i];
std::string& batch_value = *pinnable_val->GetSelf();
Status* s = &statuses[i];
WriteBatchWithIndexInternal::Result result =
WriteBatchWithIndexInternal::GetFromBatch(
immuable_db_options, this, column_family, keys[i], &merge_context,
&rep->comparator, &batch_value, rep->overwrite_key, s);

if (result == WriteBatchWithIndexInternal::Result::kFound) {
pinnable_val->PinSelf();
continue;
}
if (result == WriteBatchWithIndexInternal::Result::kDeleted) {
*s = Status::NotFound();
continue;
}
if (result == WriteBatchWithIndexInternal::Result::kError) {
continue;
}
if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress &&
rep->overwrite_key == true) {
// Since we've overwritten keys, we do not know what other operations are
// in this batch for this key, so we cannot do a Merge to compute the
// result. Instead, we will simply return MergeInProgress.
*s = Status::MergeInProgress();
continue;
}

assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress ||
result == WriteBatchWithIndexInternal::Result::kNotFound);
key_context.emplace_back(keys[i], &values[i], &statuses[i]);
merges.emplace_back(result, std::move(merge_context));
}

// Did not find key in batch OR could not resolve Merges. Try DB.
static_cast_with_check<DBImpl, DB>(db->GetRootDB())
->MultiGetImpl(read_options, column_family, key_context, sorted_input,
callback);

ColumnFamilyHandleImpl* cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
const MergeOperator* merge_operator = cfh->cfd()->ioptions()->merge_operator;
for (auto iter = key_context.begin(); iter != key_context.end(); ++iter) {
KeyContext& key = *iter;
if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded
size_t index = iter - key_context.begin();
std::pair<WriteBatchWithIndexInternal::Result, MergeContext>&
merge_result = merges[index];
if (merge_result.first ==
WriteBatchWithIndexInternal::Result::kMergeInProgress) {
// Merge result from DB with merges in Batch
Statistics* statistics = immuable_db_options.statistics.get();
Env* env = immuable_db_options.env;
Logger* logger = immuable_db_options.info_log.get();

Slice* merge_data;
if (key.s->ok()) {
merge_data = iter->value;
} else { // Key not present in db (s.IsNotFound())
merge_data = nullptr;
}

if (merge_operator) {
*key.s = MergeHelper::TimedFullMerge(
merge_operator, *key.key, merge_data,
merge_result.second.GetOperands(), key.value->GetSelf(), logger,
statistics, env);
key.value->PinSelf();
} else {
*key.s =
Status::InvalidArgument("Options::merge_operator must be set");
}
}
}
}
}

void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); }

Status WriteBatchWithIndex::RollbackToSavePoint() {
Expand Down

0 comments on commit 1c8cbf3

Please sign in to comment.