Skip to content

Commit

Permalink
bip_buffer: Modify bip_buffer to spsc lock-free implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
shawnfeng0 committed Jun 25, 2023
1 parent be2e578 commit ce5f471
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 217 deletions.
305 changes: 115 additions & 190 deletions include/ulog/helper/bip_buffer.h
Original file line number Diff line number Diff line change
@@ -1,219 +1,144 @@
#pragma once

#include <atomic>
#include <cstddef>
#include <cstdlib>
#include <exception>
#include <stdexcept>
#include <vector>

/**
* @brief Check if value is in range [left, right], considering the wraparound
* case when right < left (overflow)
*/
static inline bool IsInRange(unsigned left, unsigned value, unsigned right) {
if (right >= left) {
// Normal
return (left <= value) && (value <= right);
} else {
// Maybe the data overflowed and a wraparound occurred
return (left <= value) || (value <= right);
}
}

/*
Copyright (c) 2003 Simon Cooke, All Rights Reserved
Licensed royalty-free for commercial and non-commercial
use, without warranty or guarantee of suitability for any purpose.
All that I ask is that you send me an email
telling me that you're using my code. It'll make me
feel warm and fuzzy inside. spectecjr@gmail.com
*/
#define OUT

template <typename T = char>
class BipBuffer {
private:
char* const buffer_start_ptr_;
size_t buffer_size_;
bool is_allocated_memory_;

size_t a_start_{0}; // a_start_ is always read_index_
size_t a_size_{0};
size_t b_size_{0};

size_t reserve_start_{0};
size_t reserve_size_{0};

public:
explicit BipBuffer(size_t buffer_size)
: buffer_start_ptr_((char*)::malloc(buffer_size)),
buffer_size_(buffer_size) {
if (buffer_start_ptr_ == nullptr)
throw std::runtime_error{"malloc failed!"};
is_allocated_memory_ = true;
}
: buffer_(buffer_size),
read_index_(0),
write_index_(0),
last_index_(0),
write_wrapped_(false) {}

~BipBuffer() = default;

/**
* Try to reserve space of size size
* @param size size of space to reserve
* @return data pointer if successful, otherwise nullptr
*/
T* Reserve(size_t size) {
const auto read = read_index_.load(std::memory_order_acquire);
const auto write = write_index_.load(std::memory_order_relaxed);

if (write < read) {
// |********|_____size____||*********|___________|
// ^0 ^write ^read ^last ^size
if (write + size < read) {
write_wrapped_ = false;
return &buffer_[write];
} else {
return nullptr;
}

explicit BipBuffer(char* pre_allocated_buffer, size_t buffer_size)
: buffer_start_ptr_(pre_allocated_buffer), buffer_size_(buffer_size) {
if (buffer_start_ptr_ == nullptr)
throw std::runtime_error{"buffer_start_ptr_ == nullptr!"};
is_allocated_memory_ = false;
}
} else if (write + size <= buffer_.size()) {
// Check trailing space first
// |__________|*************|______size______|
// ^0 ^read ^write ^size
// or
// |__________|_____________size_____________|
// ^0 ^read/write ^size
write_wrapped_ = false;
return &buffer_[write];

} else if (read > size) {
// Check leading space
// |______size______|*************|__________|
// ^0 ^read ^write ^size
write_wrapped_ = true;
return &buffer_[0];

~BipBuffer() {
// We don't call FreeBuffer, because we don't need to reset our variables -
// our object is dying
if (is_allocated_memory_) {
free((void*)buffer_start_ptr_);
}
}

// Reserve
//
// Reserves space in the buffer for a memory write operation
//
// Parameters:
// int size amount of space to reserve
// OUT int& reserved size of space actually reserved
//
// Returns:
// char* pointer to the reserved block
//
// Notes:
// Will return NULL for the pointer if no space can be allocated.
// Can return any value from 1 to size in reserved.
// Will return NULL if a previous reservation has not been committed.
char* Reserve(size_t size, OUT size_t& reserved) {
// We always allocate on B if B exists; this means we have two blocks and
// our buffer is filling.
if (b_size_) {
auto free_space = GetBFreeSpace();

if (size < free_space) free_space = size;

if (free_space == 0) return nullptr;

reserve_size_ = free_space;
reserved = free_space;
reserve_start_ = b_size_;
return buffer_start_ptr_ + reserve_start_;
} else {
// Block b does not exist, so we can check if the space AFTER a is bigger
// than the space before A, and allocate the bigger one.

auto free_space = GetSpaceAfterA();
if (free_space >= a_start_) {
if (free_space == 0) return nullptr;
if (size < free_space) free_space = size;

reserve_size_ = free_space;
reserved = free_space;
reserve_start_ = a_start_ + a_size_;
return buffer_start_ptr_ + reserve_start_;
} else {
if (a_start_ < size) size = a_start_;
reserve_size_ = size;
reserved = size;
reserve_start_ = 0;
return buffer_start_ptr_;
}
return nullptr;
}
}

// Commit
//
// Commits space that has been written to in the buffer
//
// Parameters:
// int size number of bytes to commit
//
// Notes:
// Committing a size > than the reserved size will cause an assert in a
// debug build; in a release build, the actual reserved size will be used.
// Committing a size < than the reserved size will commit that amount of
// data, and release the rest of the space. Committing a size of 0 will
// release the reservation.
//
/**
* Commits the data to the buffer, so that it can be read out.
* @param size the size of the data to be committed
*
* NOTE:
* The validity of the size is not checked, it needs to be within the range
* returned by the Reserve function.
*/
void Commit(size_t size) {
// If we try to commit more space than we asked for, clip to the size we
// asked for.
if (size > reserve_size_) {
size = reserve_size_;
}
// only written from push thread
const auto write = write_index_.load(std::memory_order_relaxed);

if (reserve_start_ == a_size_ + a_start_) {
a_size_ += size;
if (write_wrapped_) {
last_index_.store(write, std::memory_order_relaxed);
write_index_.store(size, std::memory_order_release);
} else {
b_size_ += size;
write_index_.store(write + size, std::memory_order_release);
}

reserve_start_ = 0;
reserve_size_ = 0;
}

// GetContiguousBlock
//
// Gets a pointer to the first contiguous block in the buffer, and returns the
// size of that block.
//
// Parameters:
// OUT int & size returns the size of the first contiguous block
//
// Returns:
// char* pointer to the first contiguous block, or NULL
// if empty.
char* GetContiguousBlock(OUT size_t& size) {
size = a_size_;
return buffer_start_ptr_ + a_start_;
}

// DecommitBlock
//
// Decommits space from the first contiguous block
//
// Parameters:
// int size amount of memory to decommit
//
// Returns:
// nothing
void DecommitBlock(size_t size) {
if (size == a_size_) {
a_start_ = 0;
a_size_ = b_size_;
b_size_ = 0;
} else {
a_size_ -= size;
a_start_ += size;
/**
* Gets a pointer to the contiguous block in the buffer, and returns the size
* of that block.
* @param size returns the size of the contiguous block
* @return pointer to the contiguous block
*/
T* Read(size_t* size) {
const auto write = write_index_.load(std::memory_order_acquire);
const auto last = last_index_.load(std::memory_order_relaxed);
auto read = read_index_.load(std::memory_order_relaxed);

if (read == write) return nullptr;
if (read == last) {
read_index_.store(0, std::memory_order_release);
read = 0;
}

auto read_limit = read <= write ? write : last;
*size = read_limit - read;

return (*size == 0) ? nullptr : &buffer_[read];
}

// GetCommittedSize
//
// Queries how much data (in total) has been committed in the buffer
//
// Parameters:
// none
//
// Returns:
// int total amount of committed data in the buffer
size_t GetCommittedSize() const { return a_size_ + b_size_; }

// GetReservationSize
//
// Queries how much space has been reserved in the buffer.
//
// Parameters:
// none
//
// Returns:
// int number of bytes that have been reserved
//
// Notes:
// A return value of 0 indicates that no space has been reserved
size_t GetReservationSize() const { return reserve_size_; }

// GetBufferSize
//
// Queries the maximum total size of the buffer
//
// Parameters:
// none
//
// Returns:
// int total size of buffer
size_t GetBufferSize() const { return buffer_size_; }
/**
* Releases data from the buffer, so that more data can be written in.
* @param size the size of the data to be released
*
* Note:
*
* The validity of the size is not checked, it needs to be within the range
* returned by the Read function.
*
*/
void Release(size_t size) {
read_index_.fetch_add(size, std::memory_order_acq_rel);
}

private:
size_t GetSpaceAfterA() const { return buffer_size_ - a_start_ - a_size_; }
std::vector<T> buffer_;

size_t GetBFreeSpace() const {
auto b_space_size = a_start_;
return b_space_size - b_size_;
}
// for reader thread
std::atomic<size_t> read_index_;

// for writer thread
std::atomic<size_t> write_index_;
std::atomic<size_t> last_index_;
bool write_wrapped_;
};
5 changes: 3 additions & 2 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ add_executable(ulog_test_cpp ulog_test.cc)
target_link_libraries(ulog_test_cpp ulog)
add_test(test_cpp_compile ulog_test_cpp)

add_executable(bipbuffer_test bip_buffer_test.cc)
target_link_libraries(bipbuffer_test ulog)
add_executable(ulog_unit_test bip_buffer_test.cc)
target_link_libraries(ulog_unit_test GTest::gtest_main ulog)
add_test(ulog_unit_test ulog_unit_test)

0 comments on commit ce5f471

Please sign in to comment.