Skip to content

Commit

Permalink
[c++] Add a new value_size() method to RmwContext for RCU operati…
Browse files Browse the repository at this point in the history
…ons (#145)

* cc: core: Add a value_size method taking old value for RMW

* cc: add new value_size method to existing RmwContexts

* cc: test: add a test to show use of new value_size method

* in_memory_test: make compatible with gcc
  • Loading branch information
matthewbrookes authored and badrishc committed Jul 4, 2019
1 parent 5372cfc commit 506b1bf
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 2 deletions.
3 changes: 3 additions & 0 deletions cc/benchmark-dir/benchmark.cc
Expand Up @@ -222,6 +222,9 @@ class RmwContext : public IAsyncContext {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const value_t& old_value) {
return sizeof(value_t);
}

/// Initial, non-atomic, and atomic RMW methods.
inline void RmwInitial(value_t& value) {
Expand Down
3 changes: 3 additions & 0 deletions cc/playground/sum_store-dir/sum_store.h
Expand Up @@ -102,6 +102,9 @@ class RmwContext : public IAsyncContext {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const NumClicks& old_value) {
return sizeof(value_t);
}

protected:
/// The explicit interface requires a DeepCopy_Internal() implementation.
Expand Down
10 changes: 8 additions & 2 deletions cc/src/core/faster.h
Expand Up @@ -1074,7 +1074,14 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r

// Create a record and attempt RCU.
create_record:
uint32_t record_size = record_t::size(key, pending_context.value_size());
uint32_t record_size;
const record_t* old_record;
if (address >= head_address) {
old_record = reinterpret_cast<const record_t*>(hlog.Get(address));
record_size = record_t::size(key, pending_context.value_size(old_record));
} else {
record_size = record_t::size(key, pending_context.value_size());
}
Address new_address = BlockAllocate(record_size);
record_t* new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));

Expand All @@ -1095,7 +1102,6 @@ inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool r
if(address < hlog.begin_address.load()) {
pending_context.RmwInitial(new_record);
} else if(address >= head_address) {
const record_t* old_record = reinterpret_cast<const record_t*>(hlog.Get(address));
pending_context.RmwCopy(old_record, new_record);
} else {
// The block we allocated for the new record caused the head address to advance beyond
Expand Down
9 changes: 9 additions & 0 deletions cc/src/core/internal_contexts.h
Expand Up @@ -260,7 +260,10 @@ class AsyncPendingRmwContext : public PendingContext<K> {
virtual void RmwCopy(const void* old_rec, void* rec) = 0;
/// in-place update.
virtual bool RmwAtomic(void* rec) = 0;
/// Get value size for initial value or in-place update
virtual uint32_t value_size() const = 0;
/// Get value size for RCU
virtual uint32_t value_size(const void* old_rec) const = 0;
};

/// A synchronous Rmw() context preserves its type information.
Expand Down Expand Up @@ -312,9 +315,15 @@ class PendingRmwContext : public AsyncPendingRmwContext<typename MC::key_t> {
record_t* record = reinterpret_cast<record_t*>(rec);
return rmw_context().RmwAtomic(record->value());
}
/// Get value size for initial value or in-place update
inline constexpr uint32_t value_size() const final {
return rmw_context().value_size();
}
/// Get value size for RCU
inline constexpr uint32_t value_size(const void* old_rec) const final {
const record_t* old_record = reinterpret_cast<const record_t*>(old_rec);
return rmw_context().value_size(old_record->value());
}
};

class AsyncIOContext;
Expand Down
242 changes: 242 additions & 0 deletions cc/test/in_memory_test.cc
Expand Up @@ -989,6 +989,9 @@ TEST(InMemFaster, Rmw) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}
inline void RmwInitial(Value& value) {
value.value_ = incr_;
}
Expand Down Expand Up @@ -1178,6 +1181,9 @@ TEST(InMemFaster, Rmw_Concurrent) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}

inline void RmwInitial(Value& value) {
value.value_ = incr_;
Expand Down Expand Up @@ -1472,6 +1478,9 @@ TEST(InMemFaster, Rmw_ResizeValue_Concurrent) {
inline uint32_t value_size() const {
return sizeof(value_t) + length_;
}
inline uint32_t value_size(const Value& old_value) const {
return sizeof(value_t) + length_;
}

inline void RmwInitial(Value& value) {
value.gen_lock_.store(GenLock{});
Expand Down Expand Up @@ -1653,6 +1662,236 @@ TEST(InMemFaster, Rmw_ResizeValue_Concurrent) {
store.StopSession();
}

TEST(InMemFaster, Rmw_GrowString_Concurrent) {
class Key {
public:
Key(uint64_t key)
: key_{ key } {
}

inline static constexpr uint32_t size() {
return static_cast<uint32_t>(sizeof(Key));
}
inline KeyHash GetHash() const {
std::hash<uint64_t> hash_fn;
return KeyHash{ hash_fn(key_) };
}

/// Comparison operators.
inline bool operator==(const Key& other) const {
return key_ == other.key_;
}
inline bool operator!=(const Key& other) const {
return key_ != other.key_;
}

private:
uint64_t key_;
};

class RmwContext;
class ReadContext;

class Value {
public:
Value()
: length_{ 0 } {
}

inline uint32_t size() const {
return length_;
}

friend class RmwContext;
friend class ReadContext;

private:
uint32_t length_;

const char* buffer() const {
return reinterpret_cast<const char*>(this + 1);
}
char* buffer() {
return reinterpret_cast<char*>(this + 1);
}
};

class RmwContext : public IAsyncContext {
public:
typedef Key key_t;
typedef Value value_t;

RmwContext(uint64_t key, char letter)
: key_{ key }
, letter_{ letter } {
}

/// Copy (and deep-copy) constructor.
RmwContext(const RmwContext& other)
: key_{ other.key_ }
, letter_{ other.letter_ } {
}

/// The implicit and explicit interfaces require a key() accessor.
inline const Key& key() const {
return key_;
}
inline uint32_t value_size() const {
return sizeof(value_t) + sizeof(char);
}
inline uint32_t value_size(const Value& old_value) const {
return sizeof(value_t) + old_value.length_ + sizeof(char);
}

inline void RmwInitial(Value& value) {
value.length_ = sizeof(char);
value.buffer()[0] = letter_;
}
inline void RmwCopy(const Value& old_value, Value& value) {
value.length_ = old_value.length_ + sizeof(char);
std::memcpy(value.buffer(), old_value.buffer(), old_value.length_);
value.buffer()[old_value.length_] = letter_;
}
inline bool RmwAtomic(Value& value) {
// All RMW operations use Read-Copy-Update
return false;
}

protected:
/// The explicit interface requires a DeepCopy_Internal() implementation.
Status DeepCopy_Internal(IAsyncContext*& context_copy) {
return IAsyncContext::DeepCopy_Internal(*this, context_copy);
}

private:
char letter_;
Key key_;
};

class ReadContext : public IAsyncContext {
public:
typedef Key key_t;
typedef Value value_t;

ReadContext(uint64_t key)
: key_{ key }
, output_length{ 0 } {
}

/// Copy (and deep-copy) constructor.
ReadContext(const ReadContext& other)
: key_{ other.key_ }
, output_length{ 0 } {
}

/// The implicit and explicit interfaces require a key() accessor.
inline const Key& key() const {
return key_;
}

inline void Get(const Value& value) {
// All reads should be atomic (from the mutable tail).
ASSERT_TRUE(false);
}
inline void GetAtomic(const Value& value) {
// There are no concurrent updates
output_length = value.length_;
output_letters[0] = value.buffer()[0];
output_letters[1] = value.buffer()[value.length_ - 1];
}

protected:
/// The explicit interface requires a DeepCopy_Internal() implementation.
Status DeepCopy_Internal(IAsyncContext*& context_copy) {
return IAsyncContext::DeepCopy_Internal(*this, context_copy);
}

private:
Key key_;
public:
uint8_t output_length;
// Extract two letters of output.
char output_letters[2];
};

static constexpr int8_t kNumThreads = 2;
static constexpr size_t kNumRmws = 2048;
static constexpr size_t kRange = 512;

auto rmw_worker = [](FasterKv<Key, Value, FASTER::device::NullDisk>* store_, char start_letter){
store_->StartSession();

for(size_t idx = 0; idx < kNumRmws; ++idx) {
auto callback = [](IAsyncContext* ctxt, Status result) {
// In-memory test.
ASSERT_TRUE(false);
};
char letter = static_cast<char>(start_letter + idx / kRange);
RmwContext context{ idx % kRange, letter };
Status result = store_->Rmw(context, callback, 1);
ASSERT_EQ(Status::Ok, result);
}

store_->StopSession();
};

FasterKv<Key, Value, FASTER::device::NullDisk> store{ 256, 1073741824, "" };

// Rmw.
std::deque<std::thread> threads{};
for(int64_t idx = 0; idx < kNumThreads; ++idx) {
threads.emplace_back(rmw_worker, &store, 'A');
}
for(auto& thread : threads) {
thread.join();
}

// Read.
store.StartSession();

for(size_t idx = 0; idx < kRange; ++idx) {
auto callback = [](IAsyncContext* ctxt, Status result) {
// In-memory test.
ASSERT_TRUE(false);
};
ReadContext context{ idx };
Status result = store.Read(context, callback, 1);
ASSERT_EQ(Status::Ok, result) << idx;
ASSERT_EQ(kNumThreads * kNumRmws / kRange, context.output_length);
ASSERT_EQ('A', context.output_letters[0]);
ASSERT_EQ('D', context.output_letters[1]);
}

store.StopSession();

// Rmw.
threads.clear();
for(int64_t idx = 0; idx < kNumThreads; ++idx) {
threads.emplace_back(rmw_worker, &store, 'E');
}
for(auto& thread : threads) {
thread.join();
}

// Read again.
store.StartSession();

for(size_t idx = 0; idx < kRange; ++idx) {
auto callback = [](IAsyncContext* ctxt, Status result) {
// In-memory test.
ASSERT_TRUE(false);
};
ReadContext context{ static_cast<uint8_t>(idx) };
Status result = store.Read(context, callback, 1);
ASSERT_EQ(Status::Ok, result);
ASSERT_EQ(2 * kNumThreads * kNumRmws / kRange, context.output_length);
ASSERT_EQ('A', context.output_letters[0]);
ASSERT_EQ('H', context.output_letters[1]);
}

store.StopSession();
}

TEST(InMemFaster, GrowHashTable) {
class Key {
public:
Expand Down Expand Up @@ -1729,6 +1968,9 @@ TEST(InMemFaster, GrowHashTable) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}

inline void RmwInitial(Value& value) {
value.value_ = incr_;
Expand Down
12 changes: 12 additions & 0 deletions cc/test/paging_test.h
Expand Up @@ -90,6 +90,9 @@ TEST(CLASS, UpsertRead_Serial) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}
/// Non-atomic and atomic Put() methods.
inline void Put(Value& value) {
value.gen_ = 0;
Expand Down Expand Up @@ -369,6 +372,9 @@ TEST(CLASS, UpsertRead_Concurrent) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}
/// Non-atomic and atomic Put() methods.
inline void Put(Value& value) {
value.gen_ = 0;
Expand Down Expand Up @@ -665,6 +671,9 @@ TEST(CLASS, Rmw) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}
inline void RmwInitial(Value& value) {
value.counter_ = incr_;
val_ = value.counter_;
Expand Down Expand Up @@ -836,6 +845,9 @@ TEST(CLASS, Rmw_Concurrent) {
inline static constexpr uint32_t value_size() {
return sizeof(value_t);
}
inline static constexpr uint32_t value_size(const Value& old_value) {
return sizeof(value_t);
}
inline void RmwInitial(Value& value) {
value.counter_ = incr_;
}
Expand Down

0 comments on commit 506b1bf

Please sign in to comment.