Permalink
Browse files

Normalized API

  • Loading branch information...
saleyn committed Jun 19, 2014
1 parent 2756265 commit 61f2fa9b0ecfa5c47e3f6067cbc87b2773912327
Showing with 42 additions and 24 deletions.
  1. +3 −3 include/utxx/concurrent_alloc_fixed_page.hpp
  2. +39 −21 include/utxx/concurrent_spsc_queue.hpp
@@ -139,12 +139,12 @@ class concurrent_aligned_page_allocator : public boost::noncopyable {
m_page = NULL;
}
}
-
+
pointer allocate(size_type n, const void *hint = 0) {
pointer end = reinterpret_cast<pointer>(
- reinterpret_cast<char*>(m_page) + PageSize);
+ reinterpret_cast<char*>(m_page) + PageSize);
- m_page = page_alloc();
+ m_page = page_alloc();
atomic::add(&m_page->alloc_count);
@@ -49,64 +49,77 @@
namespace utxx {
-/*
- * concurrent_spsc_queue is a one producer and one consumer queue
- * without locks.
+/**
+ * \brief A lock-free single producer and single consumer queue
+ * based on a ring-buffer.
+ *
+ * Implementation allows to use either internally allocated heap
+ * memory or externally allocated shared memory.
*/
template<class T>
class concurrent_spsc_queue : private boost::noncopyable {
struct header {
std::atomic<int> head;
std::atomic<int> tail;
uint32_t const size;
- uint32_t __padding;
+ uint32_t total_memory;
- static size_t calc_size(size_t a_size) {
- size_t n = math::upper_power(a_size, 2);
- if (n != a_size) n /= 2;
- return n;
+ static size_t normalize_count(size_t a_mem_size) {
+ size_t i = a_mem_size / sizeof(T);
+ size_t n = math::upper_power(i, 2);
+ return n == i ? n : n /= 2;
}
header() {}
- header(size_t a_size)
- : size(calc_size(a_size))
- , head(0)
+ header(size_t a_mem_size)
+ : head(0)
, tail(0)
+ , size(normalize_count(a_mem_size))
+ , total_memory(a_mem_size)
{
+ assert((size * sizeof(T)) <= a_mem_size);
assert((size & (size-1)) == 0); // Power of 2.
if (size < 2)
throw std::runtime_error
- ("utxx::concurrent_spsc_queue: size must be a power of 2");
+ ("utxx::concurrent_spsc_queue: size must be greater or equal to 2");
}
};
- concurrent_spsc_queue(header* a_header, void* a_storage, uint32_t a_size, bool a_own)
- : m_header_data(a_size)
+ concurrent_spsc_queue(header* a_header, T* a_storage, size_t a_mem_size, bool a_own)
+ : m_header_data(a_mem_size)
, m_header(a_header)
, m_records(a_own
? reinterpret_cast<T*>(::malloc(sizeof(T)*m_header_data.size))
: static_cast<T*>(a_storage))
, m_own_data(a_own)
, m_mask(m_header_data.size - 1)
{
- new (m_header) header(a_size);
+ new (m_header) header(a_mem_size);
+ }
+
+ // Round size to the nearest power of 2 greater than the total size needed to
+ // hold a_count elements.
+ static size_t round_size(size_t a_count, size_t a_addon = 0) {
+ return math::upper_power(a_count, 2) * sizeof(T) + a_addon;
}
public:
typedef T value_type;
/// @return memory size needed for allocating internal queue data
static size_t memory_size(size_t a_item_count) {
- return sizeof(header) + a_item_count * sizeof(T);
+ return round_size(a_item_count, sizeof(header));
}
- // Ctor for using external memory (e.g. shared memory)
- // Size must be obtained by the call to memory_size().
+ /// \brief Ctor for using external memory (e.g. shared memory).
+ ///
+ /// When the desired capacity of the queue is known in the number of items,
+ /// \a a_size can be obtained by the call to memory_size().
concurrent_spsc_queue(void* a_storage, uint32_t a_size)
: concurrent_spsc_queue(
static_cast<header*>(a_storage),
reinterpret_cast<T*>(static_cast<char*>(a_storage) + sizeof(m_header)),
- a_size - sizeof(header),
+ a_size - sizeof(header),
false
)
{}
@@ -116,8 +129,8 @@ class concurrent_spsc_queue : private boost::noncopyable {
// Also, note that the number of usable slots in the queue at any
// given time is actually (\a a_size-1), so if you start with an empty queue,
// full() will return true after \a a_size-1 insertions.
- explicit concurrent_spsc_queue(uint32_t a_size)
- : concurrent_spsc_queue(&m_header_data, nullptr, a_size, true)
+ explicit concurrent_spsc_queue(uint32_t a_item_count)
+ : concurrent_spsc_queue(&m_header_data, nullptr, round_size(a_item_count), true)
{}
~concurrent_spsc_queue() {
@@ -216,6 +229,11 @@ class concurrent_spsc_queue : private boost::noncopyable {
return ret < 0 ? ret + size() : ret;
}
+ /// @return maximum capacity of items the queue can hold.
+ size_t capacity() const { return m_header->size; }
+ /// @return total memory size used internally by the queue.
+ size_t total_memory() const { return m_header->total_memory; }
+
private:
header m_header_data;
header* m_header;

0 comments on commit 61f2fa9

Please sign in to comment.