Skip to content
This repository was archived by the owner on Mar 22, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/libpmemobj++/detail/ringbuf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,17 @@ struct ringbuf_t {
/* Set by ringbuf_consume, reset by ringbuf_release. */
bool consume_in_progress;

/**
* Creates new ringbuf_t instance.
*
* Length must be < RBUF_OFF_MASK
*/
ringbuf_t(size_t max_workers, size_t length)
: workers(new ringbuf_worker_t[max_workers])
{
if (length >= RBUF_OFF_MASK)
throw std::out_of_range("ringbuf length too big");

written.store(0);
next.store(0);
end.store(0);
Expand Down
113 changes: 81 additions & 32 deletions include/libpmemobj++/experimental/mpsc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,19 @@ class mpsc_queue {

void clear_cachelines(first_block *block, size_t size);
void restore_offsets();

size_t consume_cachelines(size_t *offset);
void release_cachelines(size_t len);

inline pmem::detail::id_manager &get_id_manager();

/* ringbuf_t handle. Important: mpsc_queue operates on cachelines hence
* ringbuf_produce/release functions are called with number of
* cachelines, not bytes. */
std::unique_ptr<ringbuf::ringbuf_t> ring_buffer;
char *buf;
pmem::obj::pool_base pop;
size_t buff_size_;
size_t buf_size;
pmem_log_type *pmem;

/* Stores offset and length of next message to be consumed. Only
Expand Down Expand Up @@ -123,7 +130,12 @@ class mpsc_queue {
mpsc_queue *queue;
ringbuf::ringbuf_worker_t *w;
size_t id;

ptrdiff_t acquire_cachelines(size_t len);
void produce_cachelines();
void store_to_log(pmem::obj::string_view data, char *log_data);

friend class mpsc_queue;
};

class pmem_log_type {
Expand All @@ -147,25 +159,66 @@ mpsc_queue::mpsc_queue(pmem_log_type &pmem, size_t max_workers)
auto buf_data = pmem.data();

buf = const_cast<char *>(buf_data.data());
buff_size_ = buf_data.size();
buf_size = buf_data.size();

assert(buf_size % pmem::detail::CACHELINE_SIZE == 0);

ring_buffer = std::unique_ptr<ringbuf::ringbuf_t>(
new ringbuf::ringbuf_t(max_workers, buff_size_));
ring_buffer =
std::unique_ptr<ringbuf::ringbuf_t>(new ringbuf::ringbuf_t(
max_workers, buf_size / pmem::detail::CACHELINE_SIZE));

this->pmem = &pmem;

restore_offsets();
}

ptrdiff_t
mpsc_queue::worker::acquire_cachelines(size_t len)
{
assert(len % pmem::detail::CACHELINE_SIZE == 0);
auto ret = ringbuf_acquire(queue->ring_buffer.get(), w,
len / pmem::detail::CACHELINE_SIZE);

if (ret < 0)
return ret;

return ret * static_cast<ptrdiff_t>(pmem::detail::CACHELINE_SIZE);
}

void
mpsc_queue::worker::produce_cachelines()
{
ringbuf_produce(queue->ring_buffer.get(), w);
}

size_t
mpsc_queue::consume_cachelines(size_t *offset)
{
auto ret = ringbuf_consume(ring_buffer.get(), offset);
if (ret) {
*offset *= pmem::detail::CACHELINE_SIZE;
return ret * pmem::detail::CACHELINE_SIZE;
}

return 0;
}

void
mpsc_queue::release_cachelines(size_t len)
{
assert(len % pmem::detail::CACHELINE_SIZE == 0);
ringbuf_release(ring_buffer.get(), len / pmem::detail::CACHELINE_SIZE);
}

void
mpsc_queue::restore_offsets()
{
/* Invariant */
assert(pmem->written < buff_size_);
assert(pmem->written < buf_size);

/* XXX: implement restore_offset function in ringbuf */

auto w = ringbuf_register(ring_buffer.get(), 0);
auto w = register_worker();

if (!pmem->written) {
/* If pmem->written == 0 it means that consumer should start
Expand All @@ -174,14 +227,12 @@ mpsc_queue::restore_offsets()
* from overwriting the original content - mark the entire log
* as produced. */

auto acq = ringbuf_acquire(
ring_buffer.get(), w,
buff_size_ - pmem::detail::CACHELINE_SIZE);
auto acq = w.acquire_cachelines(buf_size -
pmem::detail::CACHELINE_SIZE);
assert(acq == 0);
(void)acq;
ringbuf_produce(ring_buffer.get(), w);

ringbuf_unregister(ring_buffer.get(), w);
w.produce_cachelines();

return;
}
Expand All @@ -201,31 +252,29 @@ mpsc_queue::restore_offsets()
* CACHELINE_SIZE and consumer offset equal to pmem->written.
*/

auto acq = ringbuf_acquire(ring_buffer.get(), w, pmem->written);
auto acq = w.acquire_cachelines(pmem->written);
assert(acq == 0);
ringbuf_produce(ring_buffer.get(), w);
w.produce_cachelines();

/* Restore consumer offset */
size_t offset;
auto len = ringbuf_consume(ring_buffer.get(), &offset);
auto len = consume_cachelines(&offset);
assert(len == pmem->written);
ringbuf_release(ring_buffer.get(), len);
release_cachelines(len);

assert(offset == 0);
assert(len == pmem->written);

acq = ringbuf_acquire(ring_buffer.get(), w, buff_size_ - pmem->written);
assert(acq != -1);
acq = w.acquire_cachelines(buf_size - pmem->written);
assert(acq >= 0);
assert(static_cast<size_t>(acq) == pmem->written);
ringbuf_produce(ring_buffer.get(), w);
w.produce_cachelines();

acq = ringbuf_acquire(ring_buffer.get(), w,
pmem->written - pmem::detail::CACHELINE_SIZE);
acq = w.acquire_cachelines(pmem->written -
pmem::detail::CACHELINE_SIZE);
assert(acq == 0);
ringbuf_produce(ring_buffer.get(), w);

ringbuf_unregister(ring_buffer.get(), w);
(void)acq;
w.produce_cachelines();
}

mpsc_queue::pmem_log_type::pmem_log_type(size_t size)
Expand Down Expand Up @@ -280,7 +329,7 @@ mpsc_queue::try_consume_batch(Function &&f)
* ringbuf_consume. */
if (!ring_buffer->consume_in_progress) {
size_t offset;
auto len = ringbuf_consume(ring_buffer.get(), &offset);
auto len = consume_cachelines(&offset);
if (!len)
return consumed;

Expand All @@ -307,9 +356,9 @@ mpsc_queue::try_consume_batch(Function &&f)
auto b = reinterpret_cast<first_block *>(data);
clear_cachelines(b, consume_len);

if (consume_offset + consume_len < buff_size_)
if (consume_offset + consume_len < buf_size)
pmem->written = consume_offset + consume_len;
else if (consume_offset + consume_len == buff_size_)
else if (consume_offset + consume_len == buf_size)
pmem->written = 0;
else
assert(false);
Expand All @@ -319,7 +368,7 @@ mpsc_queue::try_consume_batch(Function &&f)
ANNOTATE_HAPPENS_BEFORE(ring_buffer.get());
#endif

ringbuf_release(ring_buffer.get(), consume_len);
release_cachelines(consume_len);

assert(!ring_buffer->consume_in_progress);

Expand Down Expand Up @@ -391,7 +440,7 @@ mpsc_queue::worker::try_produce(size_t size, Function &&f)

auto req_size = pmem::detail::align_up(size + sizeof(first_block::size),
pmem::detail::CACHELINE_SIZE);
auto offset = ringbuf_acquire(queue->ring_buffer.get(), w, req_size);
auto offset = acquire_cachelines(req_size);

#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
ANNOTATE_HAPPENS_AFTER(queue->ring_buffer.get());
Expand All @@ -413,7 +462,7 @@ mpsc_queue::worker::try_produce(size_t size, Function &&f)
ANNOTATE_HAPPENS_BEFORE(queue->ring_buffer.get());
#endif

ringbuf_produce(queue->ring_buffer.get(), w);
produce_cachelines();

return true;
}
Expand All @@ -426,7 +475,7 @@ mpsc_queue::worker::try_produce(pmem::obj::string_view data,
auto req_size =
pmem::detail::align_up(data.size() + sizeof(first_block::size),
pmem::detail::CACHELINE_SIZE);
auto offset = ringbuf_acquire(queue->ring_buffer.get(), w, req_size);
auto offset = acquire_cachelines(req_size);

#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
ANNOTATE_HAPPENS_AFTER(queue->ring_buffer.get());
Expand All @@ -444,7 +493,7 @@ mpsc_queue::worker::try_produce(pmem::obj::string_view data,
on_produce(pmem::obj::string_view(
queue->buf + offset + sizeof(first_block::size), data.size()));

ringbuf_produce(queue->ring_buffer.get(), w);
produce_cachelines();

return true;
}
Expand Down Expand Up @@ -578,7 +627,7 @@ mpsc_queue::clear_cachelines(first_block *block, size_t size)
block++;
}

assert(end <= reinterpret_cast<first_block *>(buf + buff_size_));
assert(end <= reinterpret_cast<first_block *>(buf + buf_size));
}

mpsc_queue::iterator &
Expand Down
17 changes: 17 additions & 0 deletions tests/mpsc_queue/ringbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,22 @@ test_random(void)
delete r;
}

static void
test_size()
{
try {
auto size = (1ULL << 32) + 1;
auto r = new ringbuf_t(1, size);
(void)r;

ASSERT_UNREACHABLE;
} catch (std::out_of_range &) {

} catch (...) {
ASSERT_UNREACHABLE;
}
}

int
main(void)
{
Expand All @@ -326,5 +342,6 @@ main(void)
test_multi();
test_overlap();
test_random();
test_size();
return 0;
}