-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
bip_buffer: Modify bip_buffer to spsc lock-free implementation
- Loading branch information
1 parent
be2e578
commit 0caa45d
Showing
3 changed files
with
198 additions
and
217 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,219 +1,144 @@ | ||
#pragma once | ||
|
||
#include <atomic> | ||
#include <cstddef> | ||
#include <cstdlib> | ||
#include <exception> | ||
#include <stdexcept> | ||
#include <vector> | ||
|
||
/** | ||
* @brief Check if value is in range [left, right], considering the wraparound | ||
* case when right < left (overflow) | ||
*/ | ||
static inline bool IsInRange(unsigned left, unsigned value, unsigned right) { | ||
if (right >= left) { | ||
// Normal | ||
return (left <= value) && (value <= right); | ||
} else { | ||
// Maybe the data overflowed and a wraparound occurred | ||
return (left <= value) || (value <= right); | ||
} | ||
} | ||
|
||
/* | ||
Copyright (c) 2003 Simon Cooke, All Rights Reserved | ||
Licensed royalty-free for commercial and non-commercial | ||
use, without warranty or guarantee of suitability for any purpose. | ||
All that I ask is that you send me an email | ||
telling me that you're using my code. It'll make me | ||
feel warm and fuzzy inside. spectecjr@gmail.com | ||
*/ | ||
#define OUT | ||
|
||
template <typename T = char> | ||
class BipBuffer { | ||
private: | ||
char* const buffer_start_ptr_; | ||
size_t buffer_size_; | ||
bool is_allocated_memory_; | ||
|
||
size_t a_start_{0}; // a_start_ is always read_index_ | ||
size_t a_size_{0}; | ||
size_t b_size_{0}; | ||
|
||
size_t reserve_start_{0}; | ||
size_t reserve_size_{0}; | ||
|
||
public: | ||
explicit BipBuffer(size_t buffer_size) | ||
: buffer_start_ptr_((char*)::malloc(buffer_size)), | ||
buffer_size_(buffer_size) { | ||
if (buffer_start_ptr_ == nullptr) | ||
throw std::runtime_error{"malloc failed!"}; | ||
is_allocated_memory_ = true; | ||
} | ||
: buffer_(buffer_size), | ||
read_index_(0), | ||
write_index_(0), | ||
last_index_(0), | ||
write_wrapped_(false) {} | ||
|
||
~BipBuffer() = default; | ||
|
||
/** | ||
* Try to reserve space of size size | ||
* @param size size of space to reserve | ||
* @return data pointer if successful, otherwise nullptr | ||
*/ | ||
T* Reserve(size_t size) { | ||
const auto read = read_index_.load(std::memory_order_acquire); | ||
const auto write = write_index_.load(std::memory_order_relaxed); | ||
|
||
if (write < read) { | ||
// |********|_____size____||*********|___________| | ||
// ^0 ^write ^read ^last ^size | ||
if (write + size < read) { | ||
write_wrapped_ = false; | ||
return &buffer_[write]; | ||
} else { | ||
return nullptr; | ||
} | ||
|
||
explicit BipBuffer(char* pre_allocated_buffer, size_t buffer_size) | ||
: buffer_start_ptr_(pre_allocated_buffer), buffer_size_(buffer_size) { | ||
if (buffer_start_ptr_ == nullptr) | ||
throw std::runtime_error{"buffer_start_ptr_ == nullptr!"}; | ||
is_allocated_memory_ = false; | ||
} | ||
} else if (write + size <= buffer_.size()) { | ||
// Check trailing space first | ||
// |__________|*************|______size______| | ||
// ^0 ^read ^write ^size | ||
// or | ||
// |__________|_____________size_____________| | ||
// ^0 ^read/write ^size | ||
write_wrapped_ = false; | ||
return &buffer_[write]; | ||
|
||
} else if (read > size) { | ||
// Check leading space | ||
// |______size______|*************|__________| | ||
// ^0 ^read ^write ^size | ||
write_wrapped_ = true; | ||
return &buffer_[0]; | ||
|
||
~BipBuffer() { | ||
// We don't call FreeBuffer, because we don't need to reset our variables - | ||
// our object is dying | ||
if (is_allocated_memory_) { | ||
free((void*)buffer_start_ptr_); | ||
} | ||
} | ||
|
||
// Reserve | ||
// | ||
// Reserves space in the buffer for a memory write operation | ||
// | ||
// Parameters: | ||
// int size amount of space to reserve | ||
// OUT int& reserved size of space actually reserved | ||
// | ||
// Returns: | ||
// char* pointer to the reserved block | ||
// | ||
// Notes: | ||
// Will return NULL for the pointer if no space can be allocated. | ||
// Can return any value from 1 to size in reserved. | ||
// Will return NULL if a previous reservation has not been committed. | ||
char* Reserve(size_t size, OUT size_t& reserved) { | ||
// We always allocate on B if B exists; this means we have two blocks and | ||
// our buffer is filling. | ||
if (b_size_) { | ||
auto free_space = GetBFreeSpace(); | ||
|
||
if (size < free_space) free_space = size; | ||
|
||
if (free_space == 0) return nullptr; | ||
|
||
reserve_size_ = free_space; | ||
reserved = free_space; | ||
reserve_start_ = b_size_; | ||
return buffer_start_ptr_ + reserve_start_; | ||
} else { | ||
// Block b does not exist, so we can check if the space AFTER a is bigger | ||
// than the space before A, and allocate the bigger one. | ||
|
||
auto free_space = GetSpaceAfterA(); | ||
if (free_space >= a_start_) { | ||
if (free_space == 0) return nullptr; | ||
if (size < free_space) free_space = size; | ||
|
||
reserve_size_ = free_space; | ||
reserved = free_space; | ||
reserve_start_ = a_start_ + a_size_; | ||
return buffer_start_ptr_ + reserve_start_; | ||
} else { | ||
if (a_start_ < size) size = a_start_; | ||
reserve_size_ = size; | ||
reserved = size; | ||
reserve_start_ = 0; | ||
return buffer_start_ptr_; | ||
} | ||
return nullptr; | ||
} | ||
} | ||
|
||
// Commit | ||
// | ||
// Commits space that has been written to in the buffer | ||
// | ||
// Parameters: | ||
// int size number of bytes to commit | ||
// | ||
// Notes: | ||
// Committing a size > than the reserved size will cause an assert in a | ||
// debug build; in a release build, the actual reserved size will be used. | ||
// Committing a size < than the reserved size will commit that amount of | ||
// data, and release the rest of the space. Committing a size of 0 will | ||
// release the reservation. | ||
// | ||
/** | ||
* Commits the data to the buffer, so that it can be read out. | ||
* @param size the size of the data to be committed | ||
* | ||
* NOTE: | ||
* The validity of the size is not checked, it needs to be within the range | ||
* returned by the Reserve function. | ||
*/ | ||
void Commit(size_t size) { | ||
// If we try to commit more space than we asked for, clip to the size we | ||
// asked for. | ||
if (size > reserve_size_) { | ||
size = reserve_size_; | ||
} | ||
// only written from push thread | ||
const auto write = write_index_.load(std::memory_order_relaxed); | ||
|
||
if (reserve_start_ == a_size_ + a_start_) { | ||
a_size_ += size; | ||
if (write_wrapped_) { | ||
last_index_.store(write, std::memory_order_relaxed); | ||
write_index_.store(size, std::memory_order_release); | ||
} else { | ||
b_size_ += size; | ||
write_index_.store(write + size, std::memory_order_release); | ||
} | ||
|
||
reserve_start_ = 0; | ||
reserve_size_ = 0; | ||
} | ||
|
||
// GetContiguousBlock | ||
// | ||
// Gets a pointer to the first contiguous block in the buffer, and returns the | ||
// size of that block. | ||
// | ||
// Parameters: | ||
// OUT int & size returns the size of the first contiguous block | ||
// | ||
// Returns: | ||
// char* pointer to the first contiguous block, or NULL | ||
// if empty. | ||
char* GetContiguousBlock(OUT size_t& size) { | ||
size = a_size_; | ||
return buffer_start_ptr_ + a_start_; | ||
} | ||
|
||
// DecommitBlock | ||
// | ||
// Decommits space from the first contiguous block | ||
// | ||
// Parameters: | ||
// int size amount of memory to decommit | ||
// | ||
// Returns: | ||
// nothing | ||
void DecommitBlock(size_t size) { | ||
if (size == a_size_) { | ||
a_start_ = 0; | ||
a_size_ = b_size_; | ||
b_size_ = 0; | ||
} else { | ||
a_size_ -= size; | ||
a_start_ += size; | ||
/** | ||
* Gets a pointer to the contiguous block in the buffer, and returns the size | ||
* of that block. | ||
* @param size returns the size of the contiguous block | ||
* @return pointer to the contiguous block | ||
*/ | ||
T* Read(size_t* size) { | ||
const auto write = write_index_.load(std::memory_order_acquire); | ||
const auto last = last_index_.load(std::memory_order_relaxed); | ||
auto read = read_index_.load(std::memory_order_relaxed); | ||
|
||
if (read == write) return nullptr; | ||
if (read == last) { | ||
read_index_.store(0, std::memory_order_release); | ||
read = 0; | ||
} | ||
|
||
auto read_limit = read <= write ? write : last; | ||
*size = read_limit - read; | ||
|
||
return (*size == 0) ? nullptr : &buffer_[read]; | ||
} | ||
|
||
// GetCommittedSize | ||
// | ||
// Queries how much data (in total) has been committed in the buffer | ||
// | ||
// Parameters: | ||
// none | ||
// | ||
// Returns: | ||
// int total amount of committed data in the buffer | ||
size_t GetCommittedSize() const { return a_size_ + b_size_; } | ||
|
||
// GetReservationSize | ||
// | ||
// Queries how much space has been reserved in the buffer. | ||
// | ||
// Parameters: | ||
// none | ||
// | ||
// Returns: | ||
// int number of bytes that have been reserved | ||
// | ||
// Notes: | ||
// A return value of 0 indicates that no space has been reserved | ||
size_t GetReservationSize() const { return reserve_size_; } | ||
|
||
// GetBufferSize | ||
// | ||
// Queries the maximum total size of the buffer | ||
// | ||
// Parameters: | ||
// none | ||
// | ||
// Returns: | ||
// int total size of buffer | ||
size_t GetBufferSize() const { return buffer_size_; } | ||
/** | ||
* Releases data from the buffer, so that more data can be written in. | ||
* @param size the size of the data to be released | ||
* | ||
* Note: | ||
* | ||
* The validity of the size is not checked, it needs to be within the range | ||
* returned by the Read function. | ||
* | ||
*/ | ||
void Release(size_t size) { | ||
read_index_.fetch_add(size, std::memory_order_acq_rel); | ||
} | ||
|
||
private: | ||
size_t GetSpaceAfterA() const { return buffer_size_ - a_start_ - a_size_; } | ||
std::vector<T> buffer_; | ||
|
||
size_t GetBFreeSpace() const { | ||
auto b_space_size = a_start_; | ||
return b_space_size - b_size_; | ||
} | ||
// for reader thread | ||
std::atomic<size_t> read_index_; | ||
|
||
// for writer thread | ||
std::atomic<size_t> write_index_; | ||
std::atomic<size_t> last_index_; | ||
bool write_wrapped_; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.