From ce5f471e8ef8b31517a193fc78d390f1ce5142e5 Mon Sep 17 00:00:00 2001 From: Shawn Feng Date: Sun, 25 Jun 2023 16:21:14 +0800 Subject: [PATCH] bip_buffer: Modify bip_buffer to spsc lock-free implementation --- include/ulog/helper/bip_buffer.h | 305 ++++++++++++------------------- tests/CMakeLists.txt | 5 +- tests/bip_buffer_test.cc | 105 ++++++++--- 3 files changed, 198 insertions(+), 217 deletions(-) diff --git a/include/ulog/helper/bip_buffer.h b/include/ulog/helper/bip_buffer.h index e5ec756..916d763 100644 --- a/include/ulog/helper/bip_buffer.h +++ b/include/ulog/helper/bip_buffer.h @@ -1,219 +1,144 @@ #pragma once +#include #include #include #include #include +#include + +/** + * @brief Check if value is in range [left, right], considering the wraparound + * case when right < left (overflow) + */ +static inline bool IsInRange(unsigned left, unsigned value, unsigned right) { + if (right >= left) { + // Normal + return (left <= value) && (value <= right); + } else { + // Maybe the data overflowed and a wraparound occurred + return (left <= value) || (value <= right); + } +} -/* - Copyright (c) 2003 Simon Cooke, All Rights Reserved - - Licensed royalty-free for commercial and non-commercial - use, without warranty or guarantee of suitability for any purpose. - All that I ask is that you send me an email - telling me that you're using my code. It'll make me - feel warm and fuzzy inside. spectecjr@gmail.com - -*/ -#define OUT - +template class BipBuffer { - private: - char* const buffer_start_ptr_; - size_t buffer_size_; - bool is_allocated_memory_; - - size_t a_start_{0}; // a_start_ is always read_index_ - size_t a_size_{0}; - size_t b_size_{0}; - - size_t reserve_start_{0}; - size_t reserve_size_{0}; - public: explicit BipBuffer(size_t buffer_size) - : buffer_start_ptr_((char*)::malloc(buffer_size)), - buffer_size_(buffer_size) { - if (buffer_start_ptr_ == nullptr) - throw std::runtime_error{"malloc failed!"}; - is_allocated_memory_ = true; - } + : buffer_(buffer_size), + read_index_(0), + write_index_(0), + last_index_(0), + write_wrapped_(false) {} + + ~BipBuffer() = default; + + /** + * Try to reserve space of size size + * @param size size of space to reserve + * @return data pointer if successful, otherwise nullptr + */ + T* Reserve(size_t size) { + const auto read = read_index_.load(std::memory_order_acquire); + const auto write = write_index_.load(std::memory_order_relaxed); + + if (write < read) { + // |********|_____size____||*********|___________| + // ^0 ^write ^read ^last ^size + if (write + size < read) { + write_wrapped_ = false; + return &buffer_[write]; + } else { + return nullptr; + } - explicit BipBuffer(char* pre_allocated_buffer, size_t buffer_size) - : buffer_start_ptr_(pre_allocated_buffer), buffer_size_(buffer_size) { - if (buffer_start_ptr_ == nullptr) - throw std::runtime_error{"buffer_start_ptr_ == nullptr!"}; - is_allocated_memory_ = false; - } + } else if (write + size <= buffer_.size()) { + // Check trailing space first + // |__________|*************|______size______| + // ^0 ^read ^write ^size + // or + // |__________|_____________size_____________| + // ^0 ^read/write ^size + write_wrapped_ = false; + return &buffer_[write]; + + } else if (read > size) { + // Check leading space + // |______size______|*************|__________| + // ^0 ^read ^write ^size + write_wrapped_ = true; + return &buffer_[0]; - ~BipBuffer() { - // We don't call FreeBuffer, because we don't need to reset our variables - - // our object is dying - if (is_allocated_memory_) { - free((void*)buffer_start_ptr_); - } - } - - // Reserve - // - // Reserves space in the buffer for a memory write operation - // - // Parameters: - // int size amount of space to reserve - // OUT int& reserved size of space actually reserved - // - // Returns: - // char* pointer to the reserved block - // - // Notes: - // Will return NULL for the pointer if no space can be allocated. - // Can return any value from 1 to size in reserved. - // Will return NULL if a previous reservation has not been committed. - char* Reserve(size_t size, OUT size_t& reserved) { - // We always allocate on B if B exists; this means we have two blocks and - // our buffer is filling. - if (b_size_) { - auto free_space = GetBFreeSpace(); - - if (size < free_space) free_space = size; - - if (free_space == 0) return nullptr; - - reserve_size_ = free_space; - reserved = free_space; - reserve_start_ = b_size_; - return buffer_start_ptr_ + reserve_start_; } else { - // Block b does not exist, so we can check if the space AFTER a is bigger - // than the space before A, and allocate the bigger one. - - auto free_space = GetSpaceAfterA(); - if (free_space >= a_start_) { - if (free_space == 0) return nullptr; - if (size < free_space) free_space = size; - - reserve_size_ = free_space; - reserved = free_space; - reserve_start_ = a_start_ + a_size_; - return buffer_start_ptr_ + reserve_start_; - } else { - if (a_start_ < size) size = a_start_; - reserve_size_ = size; - reserved = size; - reserve_start_ = 0; - return buffer_start_ptr_; - } + return nullptr; } } - // Commit - // - // Commits space that has been written to in the buffer - // - // Parameters: - // int size number of bytes to commit - // - // Notes: - // Committing a size > than the reserved size will cause an assert in a - // debug build; in a release build, the actual reserved size will be used. - // Committing a size < than the reserved size will commit that amount of - // data, and release the rest of the space. Committing a size of 0 will - // release the reservation. - // + /** + * Commits the data to the buffer, so that it can be read out. + * @param size the size of the data to be committed + * + * NOTE: + * The validity of the size is not checked, it needs to be within the range + * returned by the Reserve function. + */ void Commit(size_t size) { - // If we try to commit more space than we asked for, clip to the size we - // asked for. - if (size > reserve_size_) { - size = reserve_size_; - } + // only written from push thread + const auto write = write_index_.load(std::memory_order_relaxed); - if (reserve_start_ == a_size_ + a_start_) { - a_size_ += size; + if (write_wrapped_) { + last_index_.store(write, std::memory_order_relaxed); + write_index_.store(size, std::memory_order_release); } else { - b_size_ += size; + write_index_.store(write + size, std::memory_order_release); } - - reserve_start_ = 0; - reserve_size_ = 0; - } - - // GetContiguousBlock - // - // Gets a pointer to the first contiguous block in the buffer, and returns the - // size of that block. - // - // Parameters: - // OUT int & size returns the size of the first contiguous block - // - // Returns: - // char* pointer to the first contiguous block, or NULL - // if empty. - char* GetContiguousBlock(OUT size_t& size) { - size = a_size_; - return buffer_start_ptr_ + a_start_; } - // DecommitBlock - // - // Decommits space from the first contiguous block - // - // Parameters: - // int size amount of memory to decommit - // - // Returns: - // nothing - void DecommitBlock(size_t size) { - if (size == a_size_) { - a_start_ = 0; - a_size_ = b_size_; - b_size_ = 0; - } else { - a_size_ -= size; - a_start_ += size; + /** + * Gets a pointer to the contiguous block in the buffer, and returns the size + * of that block. + * @param size returns the size of the contiguous block + * @return pointer to the contiguous block + */ + T* Read(size_t* size) { + const auto write = write_index_.load(std::memory_order_acquire); + const auto last = last_index_.load(std::memory_order_relaxed); + auto read = read_index_.load(std::memory_order_relaxed); + + if (read == write) return nullptr; + if (read == last) { + read_index_.store(0, std::memory_order_release); + read = 0; } + + auto read_limit = read <= write ? write : last; + *size = read_limit - read; + + return (*size == 0) ? nullptr : &buffer_[read]; } - // GetCommittedSize - // - // Queries how much data (in total) has been committed in the buffer - // - // Parameters: - // none - // - // Returns: - // int total amount of committed data in the buffer - size_t GetCommittedSize() const { return a_size_ + b_size_; } - - // GetReservationSize - // - // Queries how much space has been reserved in the buffer. - // - // Parameters: - // none - // - // Returns: - // int number of bytes that have been reserved - // - // Notes: - // A return value of 0 indicates that no space has been reserved - size_t GetReservationSize() const { return reserve_size_; } - - // GetBufferSize - // - // Queries the maximum total size of the buffer - // - // Parameters: - // none - // - // Returns: - // int total size of buffer - size_t GetBufferSize() const { return buffer_size_; } + /** + * Releases data from the buffer, so that more data can be written in. + * @param size the size of the data to be released + * + * Note: + * + * The validity of the size is not checked, it needs to be within the range + * returned by the Read function. + * + */ + void Release(size_t size) { + read_index_.fetch_add(size, std::memory_order_acq_rel); + } private: - size_t GetSpaceAfterA() const { return buffer_size_ - a_start_ - a_size_; } + std::vector buffer_; - size_t GetBFreeSpace() const { - auto b_space_size = a_start_; - return b_space_size - b_size_; - } + // for reader thread + std::atomic read_index_; + + // for writer thread + std::atomic write_index_; + std::atomic last_index_; + bool write_wrapped_; }; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 60b4cf1..58c34ce 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -7,5 +7,6 @@ add_executable(ulog_test_cpp ulog_test.cc) target_link_libraries(ulog_test_cpp ulog) add_test(test_cpp_compile ulog_test_cpp) -add_executable(bipbuffer_test bip_buffer_test.cc) -target_link_libraries(bipbuffer_test ulog) +add_executable(ulog_unit_test bip_buffer_test.cc) +target_link_libraries(ulog_unit_test GTest::gtest_main ulog) +add_test(ulog_unit_test ulog_unit_test) diff --git a/tests/bip_buffer_test.cc b/tests/bip_buffer_test.cc index 9ef5fff..6a89493 100644 --- a/tests/bip_buffer_test.cc +++ b/tests/bip_buffer_test.cc @@ -2,31 +2,86 @@ // Created by shawnfeng on 23-3-31. // -#include "ulog/helper/BipBuffer.h" +#include "ulog/helper/bip_buffer.h" + +#include #include -#include - -int main() { - char c_array[] = "1234567890"; - - BipBuffer bip(32); - { - size_t reserve; - auto data_ptr = bip.Reserve(1024, reserve); - size_t ret = snprintf(data_ptr, reserve, "%s", c_array); - bip.Commit(std::min(ret, reserve)); - } - - for (int i = 0; i < 100; ++i) { - size_t reserve; - auto data_ptr = bip.Reserve(1024, reserve); - size_t ret = snprintf(data_ptr, reserve, "%s", c_array); - bip.Commit(std::min(ret, reserve)); - - size_t get_size; - bip.GetContiguousBlock(get_size); - bip.DecommitBlock(10); - } - return 0; +#include + +TEST(BipBuffer, IsInRange) { + ASSERT_EQ(IsInRange(10, 9, 20), false); + ASSERT_EQ(IsInRange(10, 10, 20), true); + ASSERT_EQ(IsInRange(10, 11, 20), true); + + ASSERT_EQ(IsInRange(10, 19, 20), true); + ASSERT_EQ(IsInRange(10, 20, 20), true); + ASSERT_EQ(IsInRange(10, 21, 20), false); + + ASSERT_EQ(IsInRange(10, 9, 10), false); + ASSERT_EQ(IsInRange(10, 10, 10), true); + ASSERT_EQ(IsInRange(10, 11, 10), false); + + ASSERT_EQ(IsInRange(UINT32_MAX - 1, UINT32_MAX - 2, 1), false); + ASSERT_EQ(IsInRange(UINT32_MAX - 1, UINT32_MAX - 1, 1), true); + ASSERT_EQ(IsInRange(UINT32_MAX - 1, UINT32_MAX, 1), true); + ASSERT_EQ(IsInRange(UINT32_MAX - 1, 0, 1), true); + ASSERT_EQ(IsInRange(UINT32_MAX - 1, 1, 1), true); + ASSERT_EQ(IsInRange(UINT32_MAX - 1, 2, 1), false); +} + +static void single_producer_single_consumer(uint32_t write_step, + uint32_t buffer_size) { + const uint32_t limit = buffer_size * 32; + BipBuffer buffer(buffer_size); + + std::thread write_thread{[&] { + uint32_t write_count = 0; + while (write_count < limit) { + size_t size = write_step; + auto data = buffer.Reserve(size); + if (data == nullptr) { + std::this_thread::yield(); + continue; + } + for (size_t i = 0; i < size; ++i) data[i] = write_count++; + buffer.Commit(size); + } + }}; + + std::thread read_thread{[&] { + uint32_t read_count = 0; + while (read_count < limit) { + size_t size; + auto data = buffer.Read(&size); + if (data == nullptr) { + std::this_thread::yield(); + continue; + } + for (size_t i = 0; i < size; ++i) { + ASSERT_EQ(data[i], read_count++); + } + buffer.Release(size); + } + }}; + + write_thread.join(); + read_thread.join(); + printf("Finished test: write_step: %u, buffer_size: %u, limit: %u\n", + write_step, buffer_size, limit); +} + +TEST(BipBufferTestSingle, singl_producer_single_consumer) { + single_producer_single_consumer(1, 1 << 8); + single_producer_single_consumer(2, 1 << 8); + single_producer_single_consumer(10, 1 << 8); + single_producer_single_consumer(16, 1 << 8); + + single_producer_single_consumer(1, 1 << 16); + single_producer_single_consumer(2, 1 << 16); + single_producer_single_consumer(5, 1 << 16); + single_producer_single_consumer(256, 1 << 16); + single_producer_single_consumer(1000, 1 << 16); + single_producer_single_consumer(4000, 1 << 16); + single_producer_single_consumer(4096, 1 << 16); }