From 506b1bfb69daf1ebdc12704788b1dc8d34784433 Mon Sep 17 00:00:00 2001 From: Matthew Brookes Date: Thu, 4 Jul 2019 12:16:30 +0200 Subject: [PATCH] [c++] Add a new `value_size()` method to `RmwContext` for RCU operations (#145) * cc: core: Add a value_size method taking old value for RMW * cc: add new value_size method to existing RmwContexts * cc: test: add a test to show use of new value_size method * in_memory_test: make compatible with gcc --- cc/benchmark-dir/benchmark.cc | 3 + cc/playground/sum_store-dir/sum_store.h | 3 + cc/src/core/faster.h | 10 +- cc/src/core/internal_contexts.h | 9 + cc/test/in_memory_test.cc | 242 ++++++++++++++++++++++++ cc/test/paging_test.h | 12 ++ cc/test/recovery_test.h | 6 + 7 files changed, 283 insertions(+), 2 deletions(-) diff --git a/cc/benchmark-dir/benchmark.cc b/cc/benchmark-dir/benchmark.cc index a91e0cd70..607566643 100644 --- a/cc/benchmark-dir/benchmark.cc +++ b/cc/benchmark-dir/benchmark.cc @@ -222,6 +222,9 @@ class RmwContext : public IAsyncContext { inline static constexpr uint32_t value_size() { return sizeof(value_t); } + inline static constexpr uint32_t value_size(const value_t& old_value) { + return sizeof(value_t); + } /// Initial, non-atomic, and atomic RMW methods. inline void RmwInitial(value_t& value) { diff --git a/cc/playground/sum_store-dir/sum_store.h b/cc/playground/sum_store-dir/sum_store.h index 38c4861b6..35e1c2882 100644 --- a/cc/playground/sum_store-dir/sum_store.h +++ b/cc/playground/sum_store-dir/sum_store.h @@ -102,6 +102,9 @@ class RmwContext : public IAsyncContext { inline static constexpr uint32_t value_size() { return sizeof(value_t); } + inline static constexpr uint32_t value_size(const NumClicks& old_value) { + return sizeof(value_t); + } protected: /// The explicit interface requires a DeepCopy_Internal() implementation. diff --git a/cc/src/core/faster.h b/cc/src/core/faster.h index c5e0e99e1..d26968f25 100644 --- a/cc/src/core/faster.h +++ b/cc/src/core/faster.h @@ -1074,7 +1074,14 @@ inline OperationStatus FasterKv::InternalRmw(C& pending_context, bool r // Create a record and attempt RCU. create_record: - uint32_t record_size = record_t::size(key, pending_context.value_size()); + uint32_t record_size; + const record_t* old_record; + if (address >= head_address) { + old_record = reinterpret_cast(hlog.Get(address)); + record_size = record_t::size(key, pending_context.value_size(old_record)); + } else { + record_size = record_t::size(key, pending_context.value_size()); + } Address new_address = BlockAllocate(record_size); record_t* new_record = reinterpret_cast(hlog.Get(new_address)); @@ -1095,7 +1102,6 @@ inline OperationStatus FasterKv::InternalRmw(C& pending_context, bool r if(address < hlog.begin_address.load()) { pending_context.RmwInitial(new_record); } else if(address >= head_address) { - const record_t* old_record = reinterpret_cast(hlog.Get(address)); pending_context.RmwCopy(old_record, new_record); } else { // The block we allocated for the new record caused the head address to advance beyond diff --git a/cc/src/core/internal_contexts.h b/cc/src/core/internal_contexts.h index 8a0cfd787..c40a5e861 100644 --- a/cc/src/core/internal_contexts.h +++ b/cc/src/core/internal_contexts.h @@ -260,7 +260,10 @@ class AsyncPendingRmwContext : public PendingContext { virtual void RmwCopy(const void* old_rec, void* rec) = 0; /// in-place update. virtual bool RmwAtomic(void* rec) = 0; + /// Get value size for initial value or in-place update virtual uint32_t value_size() const = 0; + /// Get value size for RCU + virtual uint32_t value_size(const void* old_rec) const = 0; }; /// A synchronous Rmw() context preserves its type information. @@ -312,9 +315,15 @@ class PendingRmwContext : public AsyncPendingRmwContext { record_t* record = reinterpret_cast(rec); return rmw_context().RmwAtomic(record->value()); } + /// Get value size for initial value or in-place update inline constexpr uint32_t value_size() const final { return rmw_context().value_size(); } + /// Get value size for RCU + inline constexpr uint32_t value_size(const void* old_rec) const final { + const record_t* old_record = reinterpret_cast(old_rec); + return rmw_context().value_size(old_record->value()); + } }; class AsyncIOContext; diff --git a/cc/test/in_memory_test.cc b/cc/test/in_memory_test.cc index 309fa3e04..7e9934cc4 100644 --- a/cc/test/in_memory_test.cc +++ b/cc/test/in_memory_test.cc @@ -989,6 +989,9 @@ TEST(InMemFaster, Rmw) { inline static constexpr uint32_t value_size() { return sizeof(value_t); } + inline static constexpr uint32_t value_size(const Value& old_value) { + return sizeof(value_t); + } inline void RmwInitial(Value& value) { value.value_ = incr_; } @@ -1178,6 +1181,9 @@ TEST(InMemFaster, Rmw_Concurrent) { inline static constexpr uint32_t value_size() { return sizeof(value_t); } + inline static constexpr uint32_t value_size(const Value& old_value) { + return sizeof(value_t); + } inline void RmwInitial(Value& value) { value.value_ = incr_; @@ -1472,6 +1478,9 @@ TEST(InMemFaster, Rmw_ResizeValue_Concurrent) { inline uint32_t value_size() const { return sizeof(value_t) + length_; } + inline uint32_t value_size(const Value& old_value) const { + return sizeof(value_t) + length_; + } inline void RmwInitial(Value& value) { value.gen_lock_.store(GenLock{}); @@ -1653,6 +1662,236 @@ TEST(InMemFaster, Rmw_ResizeValue_Concurrent) { store.StopSession(); } +TEST(InMemFaster, Rmw_GrowString_Concurrent) { +class Key { +public: + Key(uint64_t key) + : key_{ key } { + } + + inline static constexpr uint32_t size() { + return static_cast(sizeof(Key)); + } + inline KeyHash GetHash() const { + std::hash hash_fn; + return KeyHash{ hash_fn(key_) }; + } + + /// Comparison operators. + inline bool operator==(const Key& other) const { + return key_ == other.key_; + } + inline bool operator!=(const Key& other) const { + return key_ != other.key_; + } + +private: + uint64_t key_; +}; + +class RmwContext; +class ReadContext; + +class Value { +public: + Value() + : length_{ 0 } { + } + + inline uint32_t size() const { + return length_; + } + + friend class RmwContext; + friend class ReadContext; + +private: + uint32_t length_; + + const char* buffer() const { + return reinterpret_cast(this + 1); + } + char* buffer() { + return reinterpret_cast(this + 1); + } +}; + +class RmwContext : public IAsyncContext { +public: + typedef Key key_t; + typedef Value value_t; + + RmwContext(uint64_t key, char letter) + : key_{ key } + , letter_{ letter } { + } + + /// Copy (and deep-copy) constructor. + RmwContext(const RmwContext& other) + : key_{ other.key_ } + , letter_{ other.letter_ } { + } + + /// The implicit and explicit interfaces require a key() accessor. + inline const Key& key() const { + return key_; + } + inline uint32_t value_size() const { + return sizeof(value_t) + sizeof(char); + } + inline uint32_t value_size(const Value& old_value) const { + return sizeof(value_t) + old_value.length_ + sizeof(char); + } + + inline void RmwInitial(Value& value) { + value.length_ = sizeof(char); + value.buffer()[0] = letter_; + } + inline void RmwCopy(const Value& old_value, Value& value) { + value.length_ = old_value.length_ + sizeof(char); + std::memcpy(value.buffer(), old_value.buffer(), old_value.length_); + value.buffer()[old_value.length_] = letter_; + } + inline bool RmwAtomic(Value& value) { + // All RMW operations use Read-Copy-Update + return false; + } + +protected: + /// The explicit interface requires a DeepCopy_Internal() implementation. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + +private: + char letter_; + Key key_; +}; + +class ReadContext : public IAsyncContext { +public: + typedef Key key_t; + typedef Value value_t; + + ReadContext(uint64_t key) + : key_{ key } + , output_length{ 0 } { + } + + /// Copy (and deep-copy) constructor. + ReadContext(const ReadContext& other) + : key_{ other.key_ } + , output_length{ 0 } { + } + + /// The implicit and explicit interfaces require a key() accessor. + inline const Key& key() const { + return key_; + } + + inline void Get(const Value& value) { + // All reads should be atomic (from the mutable tail). + ASSERT_TRUE(false); + } + inline void GetAtomic(const Value& value) { + // There are no concurrent updates + output_length = value.length_; + output_letters[0] = value.buffer()[0]; + output_letters[1] = value.buffer()[value.length_ - 1]; + } + +protected: + /// The explicit interface requires a DeepCopy_Internal() implementation. + Status DeepCopy_Internal(IAsyncContext*& context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } + +private: + Key key_; +public: + uint8_t output_length; + // Extract two letters of output. + char output_letters[2]; +}; + +static constexpr int8_t kNumThreads = 2; +static constexpr size_t kNumRmws = 2048; +static constexpr size_t kRange = 512; + +auto rmw_worker = [](FasterKv* store_, char start_letter){ + store_->StartSession(); + + for(size_t idx = 0; idx < kNumRmws; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + // In-memory test. + ASSERT_TRUE(false); + }; + char letter = static_cast(start_letter + idx / kRange); + RmwContext context{ idx % kRange, letter }; + Status result = store_->Rmw(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + } + + store_->StopSession(); +}; + +FasterKv store{ 256, 1073741824, "" }; + +// Rmw. +std::deque threads{}; +for(int64_t idx = 0; idx < kNumThreads; ++idx) { + threads.emplace_back(rmw_worker, &store, 'A'); +} +for(auto& thread : threads) { + thread.join(); +} + +// Read. +store.StartSession(); + +for(size_t idx = 0; idx < kRange; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + // In-memory test. + ASSERT_TRUE(false); + }; + ReadContext context{ idx }; + Status result = store.Read(context, callback, 1); + ASSERT_EQ(Status::Ok, result) << idx; + ASSERT_EQ(kNumThreads * kNumRmws / kRange, context.output_length); + ASSERT_EQ('A', context.output_letters[0]); + ASSERT_EQ('D', context.output_letters[1]); +} + +store.StopSession(); + +// Rmw. +threads.clear(); +for(int64_t idx = 0; idx < kNumThreads; ++idx) { + threads.emplace_back(rmw_worker, &store, 'E'); +} +for(auto& thread : threads) { + thread.join(); +} + +// Read again. +store.StartSession(); + +for(size_t idx = 0; idx < kRange; ++idx) { + auto callback = [](IAsyncContext* ctxt, Status result) { + // In-memory test. + ASSERT_TRUE(false); + }; + ReadContext context{ static_cast(idx) }; + Status result = store.Read(context, callback, 1); + ASSERT_EQ(Status::Ok, result); + ASSERT_EQ(2 * kNumThreads * kNumRmws / kRange, context.output_length); + ASSERT_EQ('A', context.output_letters[0]); + ASSERT_EQ('H', context.output_letters[1]); +} + +store.StopSession(); +} + TEST(InMemFaster, GrowHashTable) { class Key { public: @@ -1729,6 +1968,9 @@ TEST(InMemFaster, GrowHashTable) { inline static constexpr uint32_t value_size() { return sizeof(value_t); } + inline static constexpr uint32_t value_size(const Value& old_value) { + return sizeof(value_t); + } inline void RmwInitial(Value& value) { value.value_ = incr_; diff --git a/cc/test/paging_test.h b/cc/test/paging_test.h index 4c0195333..64cce7426 100644 --- a/cc/test/paging_test.h +++ b/cc/test/paging_test.h @@ -90,6 +90,9 @@ TEST(CLASS, UpsertRead_Serial) { inline static constexpr uint32_t value_size() { return sizeof(value_t); } + inline static constexpr uint32_t value_size(const Value& old_value) { + return sizeof(value_t); + } /// Non-atomic and atomic Put() methods. inline void Put(Value& value) { value.gen_ = 0; @@ -369,6 +372,9 @@ TEST(CLASS, UpsertRead_Concurrent) { inline static constexpr uint32_t value_size() { return sizeof(value_t); } + inline static constexpr uint32_t value_size(const Value& old_value) { + return sizeof(value_t); + } /// Non-atomic and atomic Put() methods. inline void Put(Value& value) { value.gen_ = 0; @@ -665,6 +671,9 @@ TEST(CLASS, Rmw) { inline static constexpr uint32_t value_size() { return sizeof(value_t); } + inline static constexpr uint32_t value_size(const Value& old_value) { + return sizeof(value_t); + } inline void RmwInitial(Value& value) { value.counter_ = incr_; val_ = value.counter_; @@ -836,6 +845,9 @@ TEST(CLASS, Rmw_Concurrent) { inline static constexpr uint32_t value_size() { return sizeof(value_t); } + inline static constexpr uint32_t value_size(const Value& old_value) { + return sizeof(value_t); + } inline void RmwInitial(Value& value) { value.counter_ = incr_; } diff --git a/cc/test/recovery_test.h b/cc/test/recovery_test.h index c3148facc..ff5c12d35 100644 --- a/cc/test/recovery_test.h +++ b/cc/test/recovery_test.h @@ -2948,6 +2948,9 @@ TEST(CLASS, Concurrent_Rmw_Small) { inline static constexpr uint32_t value_size() { return sizeof(value_t); } + inline static constexpr uint32_t value_size(const value_t& old_value) { + return sizeof(value_t); + } /// Non-atomic and atomic Put() methods. inline void RmwInitial(Value& value) { value.val_ = key_.key_; @@ -3388,6 +3391,9 @@ TEST(CLASS, Concurrent_Rmw_Large) { inline static constexpr uint32_t value_size() { return sizeof(value_t); } + inline static constexpr uint32_t value_size(const value_t& old_value) { + return sizeof(value_t); + } /// Non-atomic and atomic Put() methods. inline void RmwInitial(Value& value) { value.val_ = key_.key_;