mpsc_queue: extend queue to support arbitrary sized values#1087
mpsc_queue: extend queue to support arbitrary sized values#1087lukaszstolarczuk merged 8 commits intopmem:masterfrom
Conversation
c411c05 to
ce3b747
Compare
|
|
||
| private: | ||
| void | ||
| store_to_log(pmem::obj::string_view data, char *log_data) |
There was a problem hiding this comment.
This algorithm in libpmemobj also uses a checksum to verify consistency of the log. If you don't have that checksum and you don't care about failure atomicity then this can be simplified to just aligning down to a cache line, doing a copy, and then copying the remainder.
| { | ||
| /* Advance to first, unconsumed element */ | ||
| while (this->data < end && | ||
| reinterpret_cast<first_block *>( |
There was a problem hiding this comment.
i'm not the biggest fan of accessing fields from a casted variable. but I'll leave it up to you.
| while (this->data < end && | ||
| reinterpret_cast<first_block *>( | ||
| this->data) | ||
| ->size == 0) { |
There was a problem hiding this comment.
so, the size field indicates whether the element was consumed or not? (but this makes no sense considering the implementation below)
This needs documentation.
|
|
||
| assert(element_end <= end); | ||
|
|
||
| /* Since after release, a user might produce |
|
|
||
| auto b = reinterpret_cast<first_block *>(log_data); | ||
|
|
||
| b->size = data.size(); |
There was a problem hiding this comment.
this is the fourth store, so the comment above is not accurate.
And, given that you have all the data already, you might as well just modify the fblock and do another copy of the entire cacheline - that will probably be faster than doing a temporal store here (since that will cause 2 memory operations).
| bool | ||
| try_produce(size_t size, Function &&f) | ||
| { | ||
| entry dram_entry; |
There was a problem hiding this comment.
hm, can't you implememt this as return try_produce(pmem::obj::string_view(data.get(), size)) ?
igchor
left a comment
There was a problem hiding this comment.
Reviewable status: 0 of 3 files reviewed, 5 unresolved discussions (waiting on @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 54 at r1 (raw file):
Previously, pbalcer (Piotr Balcer) wrote…
i'm not the biggest fan of accessing fields from a casted variable. but I'll leave it up to you.
Any better alternatives here? Just cast data to uint64_t* ?
include/libpmemobj++/experimental/mpsc_queue.hpp, line 56 at r1 (raw file):
Previously, pbalcer (Piotr Balcer) wrote…
so, the
sizefield indicates whether the element was consumed or not? (but this makes no sense considering the implementation below)
This needs documentation.
Yeah, I will definitely extend the documentation. The implementation below (operator++) assumes the iterator always points to unconsumed element . When iterator is advanced it first reads the size of the value and then zeroes out first 8 bytes in each cache line of the value to mark it as consumed. After that it moves to the next value (it searches for a first cacheline which has first 8 bytes != 0).
include/libpmemobj++/experimental/mpsc_queue.hpp, line 79 at r1 (raw file):
Previously, pbalcer (Piotr Balcer) wrote…
I don't get this comment.
Actually, I think I will change this implementation to use checksums. Since there are multiple producers a following situation is possible:
|--- produced1 ---|--- failed during store_to_log --- |--produced2--|
And with this approach with zeroing out first 8 bytes of each cache line we can't handle this. (if store_to_log failed we might have size == 0 and following cachlines filled with random data. We would not know how many cacheline we have to skip to get to the next element - produced2). I had some ideas how to make it work but they would require additional persist. I think it will be better to just use checksum (and skip all cachlines for which checksum does not match).
include/libpmemobj++/experimental/mpsc_queue.hpp, line 242 at r1 (raw file):
Previously, pbalcer (Piotr Balcer) wrote…
This algorithm in libpmemobj also uses a checksum to verify consistency of the log. If you don't have that checksum and you don't care about failure atomicity then this can be simplified to just aligning down to a cache line, doing a copy, and then copying the remainder.
I do care about atomicity, but right now, I rely on size instead of a checksum (size != 0 means data is consistent). In this implementation we have 8 bytes of metadata in the first cacheline (first_block) so we copy first 56B of data to fblock and set size to 0. Then copy fblock to pmem. The rest (aligned down to cacheline) is copied using nontemporal stores, and the remainder is copied to dram buffer first and then full cacheline to pmem (as in PMDK).
After all this, I do a drain and set size to the actual data length - after this, data is considered consistent.
I'm not sure if I understand how this can be simplfied, which step do you mean? (I still want to avoid copying partial cachelines)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 314 at r1 (raw file):
Previously, pbalcer (Piotr Balcer) wrote…
this is the fourth store, so the comment above is not accurate.
And, given that you have all the data already, you might as well just modify thefblockand do another copy of the entire cacheline - that will probably be faster than doing a temporal store here (since that will cause 2 memory operations).
Right, good point. But I will probably switch to checksum anyway (see comment above).
pbalcer
left a comment
There was a problem hiding this comment.
Reviewable status: 0 of 3 files reviewed, 4 unresolved discussions (waiting on @igchor)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 54 at r1 (raw file):
Previously, igchor (Igor Chorążewicz) wrote…
Any better alternatives here? Just cast data to uint64_t* ?
auto block = reinterpret_cast...;
while (block->size == 0) {
...
}
include/libpmemobj++/experimental/mpsc_queue.hpp, line 56 at r1 (raw file):
Previously, igchor (Igor Chorążewicz) wrote…
Yeah, I will definitely extend the documentation. The implementation below (operator++) assumes the iterator always points to unconsumed element . When iterator is advanced it first reads the size of the value and then zeroes out first 8 bytes in each cache line of the value to mark it as consumed. After that it moves to the next value (it searches for a first cacheline which has first 8 bytes != 0).
hm, how do you guarantee that any arbitrary cache line won't have its first 8 bytes set to some non-zero value given that you can have entries that span more than one cacheline?
include/libpmemobj++/experimental/mpsc_queue.hpp, line 79 at r1 (raw file):
Previously, igchor (Igor Chorążewicz) wrote…
Actually, I think I will change this implementation to use checksums. Since there are multiple producers a following situation is possible:
|--- produced1 ---|--- failed during store_to_log --- |--produced2--|
And with this approach with zeroing out first 8 bytes of each cache line we can't handle this. (if store_to_log failed we might have size == 0 and following cachlines filled with random data. We would not know how many cacheline we have to skip to get to the next element -
produced2). I had some ideas how to make it work but they would require additional persist. I think it will be better to just use checksum (and skip all cachlines for which checksum does not match).
I don't understand how it works with size tbh, but I think checksums should work if we are not worried about accidental matches (which are probably extremely unlikely).
include/libpmemobj++/experimental/mpsc_queue.hpp, line 242 at r1 (raw file):
Previously, igchor (Igor Chorążewicz) wrote…
I do care about atomicity, but right now, I rely on size instead of a checksum (size != 0 means data is consistent). In this implementation we have 8 bytes of metadata in the first cacheline (
first_block) so we copy first 56B of data tofblockand setsizeto 0. Then copyfblockto pmem. The rest (aligned down to cacheline) is copied using nontemporal stores, and the remainder is copied to dram buffer first and then full cacheline to pmem (as in PMDK).After all this, I do a drain and set size to the actual data length - after this, data is considered consistent.
I'm not sure if I understand how this can be simplfied, which step do you mean? (I still want to avoid copying partial cachelines)
The first step can be omitted if you don't care about fail safety. Because it was made specifically to separate the metadata write (which makes the log persistent) from the rest of the buffer write.
ce3b747 to
074cad8
Compare
igchor
left a comment
There was a problem hiding this comment.
Reviewable status: 0 of 4 files reviewed, 4 unresolved discussions (waiting on @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 54 at r1 (raw file):
Previously, pbalcer (Piotr Balcer) wrote…
auto block = reinterpret_cast...; while (block->size == 0) { ... }
Done.
include/libpmemobj++/experimental/mpsc_queue.hpp, line 56 at r1 (raw file):
Previously, pbalcer (Piotr Balcer) wrote…
hm, how do you guarantee that any arbitrary cache line won't have its first 8 bytes set to some non-zero value given that you can have entries that span more than one cacheline?
I've pushed the implementation which uses dirty flag - it should be clear know.
include/libpmemobj++/experimental/mpsc_queue.hpp, line 79 at r1 (raw file):
Previously, pbalcer (Piotr Balcer) wrote…
I don't understand how it works with size tbh, but I think checksums should work if we are not worried about accidental matches (which are probably extremely unlikely).
Yeah, sorry, the implementation was incomplete - I pushed the corrected version. To handle interrupted store_to_log I first store size with dirty flag set, then memcpy and then clear the dirty flag. On recovery, if size has dirty flag set just skip to the next entry. But this requires 3 persists and still we must clear first 8 bytes of each cacheline on consume.
I think I prefer the checksum approach but maybe I will try to benchmark this.
include/libpmemobj++/experimental/mpsc_queue.hpp, line 242 at r1 (raw file):
Previously, pbalcer (Piotr Balcer) wrote…
The first step can be omitted if you don't care about fail safety. Because it was made specifically to separate the metadata write (which makes the log persistent) from the rest of the buffer write.
I still don't understand. If we omit first step, won't we have misaligned write (since we want to start writing data at address % 64 == 8 because size is stored stored first)?
pbalcer
left a comment
There was a problem hiding this comment.
Reviewable status: 0 of 4 files reviewed, 4 unresolved discussions (waiting on @igchor and @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 242 at r1 (raw file):
Previously, igchor (Igor Chorążewicz) wrote…
I still don't understand. If we omit first step, won't we have misaligned write (since we want to start writing data at
address % 64 == 8becausesizeis stored stored first)?
The assumption is that the buffer is aligned. So, imagine you want to copy 200 bytes.
You can simply align it down to a cacheline - 192 bytes, and just do a copy into the destination buffer.
Then, you create a temporary 64 byte zeroed buffer, copy the remaining 8 bytes into that buffer, and do another copy into the last cacheline of the destination.
Done, 0 misaligned writes. But this assumes that failure atomicity is not important, so it's kinda irrelevant here.
074cad8 to
930fa4d
Compare
lukaszstolarczuk
left a comment
There was a problem hiding this comment.
Reviewed 1 of 3 files at r1, 4 of 8 files at r3.
Reviewable status: 4 of 10 files reviewed, 14 unresolved discussions (waiting on @igchor and @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 117 at r2 (raw file):
while (b < e) { if (b->size == 0) { b++;
can this happen? we assert above: assert(block->size != 0);
include/libpmemobj++/experimental/mpsc_queue.hpp, line 265 at r2 (raw file):
/* * First step is to copy upto 56B of data and store
up to
include/libpmemobj++/experimental/mpsc_queue.hpp, line 275 at r2 (raw file):
* buffer, fill in the remainder of the data, and * copy the entire cacheline. After all data is * store we clear the dirty flag from size.
is stored, we clear
tests/CMakeLists.txt, line 979 at r3 (raw file):
build_test(mpsc_queue_recovery_after_consume mpsc_queue/recovery_after_consume.cpp) add_test_generic(NAME mpsc_queue_recovery_after_consume CASE 0 SCRIPT mpsc_queue/recovery_after_consume_0.cmake TRACERS none)
do you have to specify CASE?
tests/mpsc_queue/recovery.cpp, line 5 at r3 (raw file):
/* * recovery_after_consume.pp -- pmreorder test for mpsc_queue
different file name
tests/mpsc_queue/recovery_0.cmake, line 2 at r3 (raw file):
# SPDX-License-Identifier: BSD-3-Clause # Copyright 2018, Intel Corporation
-2021
tests/mpsc_queue/recovery_0.cmake, line 10 at r3 (raw file):
execute(${TEST_EXECUTABLE} c ${DIR}/testfile) pmreorder_create_store_log(${DIR}/testfile ${TEST_EXECUTABLE} x ${DIR}/testfile 1) pmreorder_execute(true ReorderAccumulative "PMREORDER_FILL=ReorderAccumulative" ${TEST_EXECUTABLE} o)
use of the same engine? is that expected?
tests/mpsc_queue/recovery_1.cmake, line 2 at r3 (raw file):
# SPDX-License-Identifier: BSD-3-Clause # Copyright 2018, Intel Corporation
-2021
tests/mpsc_queue/recovery_after_consume.cpp, line 5 at r3 (raw file):
/* * recovery_after_consume.pp -- pmreorder test for mpsc_queue
.cpp
tests/mpsc_queue/recovery_after_consume_0.cmake, line 2 at r3 (raw file):
# SPDX-License-Identifier: BSD-3-Clause # Copyright 2018, Intel Corporation
-2021
e1bad1e to
0388f1e
Compare
igchor
left a comment
There was a problem hiding this comment.
Reviewable status: 0 of 11 files reviewed, 14 unresolved discussions (waiting on @lukaszstolarczuk and @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 242 at r1 (raw file):
Previously, pbalcer (Piotr Balcer) wrote…
The assumption is that the buffer is aligned. So, imagine you want to copy 200 bytes.
You can simply align it down to a cacheline - 192 bytes, and just do a copy into the destination buffer.
Then, you create a temporary 64 byte zeroed buffer, copy the remaining 8 bytes into that buffer, and do another copy into the last cacheline of the destination.Done, 0 misaligned writes. But this assumes that failure atomicity is not important, so it's kinda irrelevant here.
Ok, so for now I will leave it as is since I need failure atomicity (using size + DIRTY flag or checksum).
include/libpmemobj++/experimental/mpsc_queue.hpp, line 117 at r2 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
can this happen? we assert above:
assert(block->size != 0);
Yes, after we process the element (for which block->size != 0) we might encounter empty space.
include/libpmemobj++/experimental/mpsc_queue.hpp, line 265 at r2 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
up to
Done.
include/libpmemobj++/experimental/mpsc_queue.hpp, line 275 at r2 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
is stored, we clear
Done.
tests/CMakeLists.txt, line 979 at r3 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
do you have to specify
CASE?
Done.
tests/mpsc_queue/recovery.cpp, line 5 at r3 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
different file name
Done.
tests/mpsc_queue/recovery_0.cmake, line 2 at r3 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
-2021
Done.
tests/mpsc_queue/recovery_0.cmake, line 10 at r3 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
use of the same engine? is that expected?
Yes, I have to put something here because this is not an optional parameter.
tests/mpsc_queue/recovery_1.cmake, line 2 at r3 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
-2021
Done.
tests/mpsc_queue/recovery_after_consume.cpp, line 5 at r3 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
.cpp
Done.
tests/mpsc_queue/recovery_after_consume_0.cmake, line 2 at r3 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
-2021
Done.
fa766e7 to
d5f8398
Compare
Codecov Report
@@ Coverage Diff @@
## master #1087 +/- ##
==========================================
+ Coverage 94.28% 94.32% +0.03%
==========================================
Files 51 51
Lines 5061 5181 +120
==========================================
+ Hits 4772 4887 +115
- Misses 289 294 +5
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
c1e12e1 to
be7ec32
Compare
lukaszstolarczuk
left a comment
There was a problem hiding this comment.
Reviewed 5 of 17 files at r4, 1 of 4 files at r5.
Reviewable status: 6 of 19 files reviewed, 11 unresolved discussions (waiting on @igchor, @lukaszstolarczuk, and @pbalcer)
include/libpmemobj++/detail/common.hpp, line 315 at r5 (raw file):
static constexpr size_t align_up(size_t size, size_t align)
you could actually align up/down to CACHLINE already (add a separate overload maybe...?)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 183 at r4 (raw file):
} /* If pmem->written != there still might be element in the log. Morever,
pmem->written != what(?)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 183 at r4 (raw file):
} /* If pmem->written != there still might be element in the log. Morever,
Morever -> Moreover
include/libpmemobj++/experimental/mpsc_queue.hpp, line 333 at r4 (raw file):
if (offset == -1) return false;
redundant empty line ;)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 251 at r5 (raw file):
if (pmemobj_tx_stage() != TX_STAGE_NONE) throw pmem::transaction_scope_error( "Function called inside transaction scope.");
inside a tx
tests/CMakeLists.txt, line 983 at r5 (raw file):
build_test(mpsc_queue_recovery_order mpsc_queue/recovery_order.cpp) add_test_generic(NAME mpsc_queue_recovery_order SCRIPT mpsc_queue/mpsc_queue_recovery_order_0.cmake TRACERS none memcheck pmemcheck)
the _0 part from the name can also be removed, I guess (cmake script)
tests/mpsc_queue/recovery.cpp, line 5 at r3 (raw file):
Previously, igchor (Igor Chorążewicz) wrote…
Done.
close enough 😄 (it's cpp)
tests/mpsc_queue/recovery_0.cmake, line 10 at r3 (raw file):
Previously, igchor (Igor Chorążewicz) wrote…
Yes, I have to put something here because this is not an optional parameter.
ok, makes sense, just double-checking
tests/mpsc_queue/recovery_after_consume.cpp, line 5 at r3 (raw file):
Previously, igchor (Igor Chorążewicz) wrote…
Done.
no :P
92672d3 to
d66ad16
Compare
igchor
left a comment
There was a problem hiding this comment.
Reviewable status: 5 of 20 files reviewed, 11 unresolved discussions (waiting on @lukaszstolarczuk and @pbalcer)
include/libpmemobj++/detail/common.hpp, line 315 at r5 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
you could actually align up/down to CACHLINE already (add a separate overload maybe...?)
Hm, I'm not sure if it's worth adding two more functions. Imo current code looks quite clear.
include/libpmemobj++/experimental/mpsc_queue.hpp, line 183 at r4 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
pmem->written !=what(?)
Done.
include/libpmemobj++/experimental/mpsc_queue.hpp, line 183 at r4 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
Morever->Moreover
Done.
include/libpmemobj++/experimental/mpsc_queue.hpp, line 333 at r4 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
redundant empty line ;)
Done (I think). One empty line was added on purpose.
include/libpmemobj++/experimental/mpsc_queue.hpp, line 251 at r5 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
inside a tx
Done.
tests/CMakeLists.txt, line 983 at r5 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
the
_0part from the name can also be removed, I guess (cmake script)
Done.
tests/mpsc_queue/recovery_after_consume.cpp, line 5 at r3 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
no :P
Done.
92e2582 to
9ca36ad
Compare
And remove size parameter from mpsc_queue ctor - rely on pmem_log_type size.
6de06c1 to
23ed77c
Compare
23ed77c to
082a044
Compare
karczex
left a comment
There was a problem hiding this comment.
Reviewed 2 of 17 files at r4, 1 of 4 files at r5, 3 of 12 files at r6, 1 of 2 files at r7.
Reviewable status: 10 of 20 files reviewed, 14 unresolved discussions (waiting on @igchor, @karczex, @lukaszstolarczuk, and @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 125 at r7 (raw file):
private: pmem::obj::vector<char> data;
Probably not in this PR, but this should be some kind of non-resizable vector.
include/libpmemobj++/experimental/mpsc_queue.hpp, line 136 at r7 (raw file):
pop = pmem::obj::pool_by_vptr(&pmem); auto addr = reinterpret_cast<uintptr_t>(&pmem.data[0]);
&pmem.data() ?
tests/mpsc_queue/basic.cpp, line 64 at r7 (raw file):
ret = queue.try_consume_batch( [&](queue_type::batch_type rd_acc) { for (const auto &str : rd_acc) {
As you are changing rd_acc behavior
- clear cachelines only after user callback executes:
this allows iterating over the data arbitrary number of times
(including 0)
you should read this data twice and 0 times in this test, or add separate test test if you like.
082a044 to
939b1f7
Compare
igchor
left a comment
There was a problem hiding this comment.
Reviewable status: 10 of 20 files reviewed, 14 unresolved discussions (waiting on @karczex, @lukaszstolarczuk, and @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 125 at r7 (raw file):
Previously, karczex (Paweł Karczewski) wrote…
Probably not in this PR, but this should be some kind of non-resizable vector.
No neccessarily, I can imagine that growing could be possible in future.
include/libpmemobj++/experimental/mpsc_queue.hpp, line 136 at r7 (raw file):
Previously, karczex (Paweł Karczewski) wrote…
&pmem.data() ?
I've done it this way to avoid snapshotting entire pmem vector (in case we would be in a transaction). This is a limitation of vector - we have no way of accessing data without snapshot within a tx.
tests/mpsc_queue/basic.cpp, line 64 at r7 (raw file):
Previously, karczex (Paweł Karczewski) wrote…
As you are changing rd_acc behavior
- clear cachelines only after user callback executes: this allows iterating over the data arbitrary number of times (including 0)you should read this data twice and 0 times in this test, or add separate test test if you like.
Done.
939b1f7 to
dcfc881
Compare
karczex
left a comment
There was a problem hiding this comment.
Reviewable status: 9 of 21 files reviewed, 11 unresolved discussions (waiting on @karczex, @lukaszstolarczuk, and @pbalcer)
KFilipek
left a comment
There was a problem hiding this comment.
Reviewed 1 of 3 files at r1, 2 of 17 files at r4, 1 of 4 files at r5, 2 of 12 files at r6, 1 of 2 files at r7, 2 of 2 files at r8.
Reviewable status: 15 of 21 files reviewed, 17 unresolved discussions (waiting on @igchor, @lukaszstolarczuk, and @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 97 at r7 (raw file):
Quoted 4 lines of code…
private: mpsc_queue *queue; ringbuf::ringbuf_worker_t *w; size_t id;
public always first, internals at the end
tests/CMakeLists.txt, line 12 at r8 (raw file):
Quoted 18 lines of code…
add_cppstyle(tests-common ${CMAKE_CURRENT_SOURCE_DIR}/common/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/*.c ${CMAKE_CURRENT_SOURCE_DIR}/*.h ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*.hpp ${CMAKE_CURRENT_SOURCE_DIR}/check_is_pmem/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/container_generic/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/radix_tree/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/mpsc_queue/*.*pp) add_check_whitespace(tests-common ${CMAKE_CURRENT_SOURCE_DIR}/common/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/*.c ${CMAKE_CURRENT_SOURCE_DIR}/*.h ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*.hpp ${CMAKE_CURRENT_SOURCE_DIR}/check_is_pmem/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/container_generic/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/radix_tree/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/mpsc_queue/*.*pp)
Whats the problem with using one variable containing all files? Now it's duplicated.
tests/mpsc_queue/basic.cpp, line 84 at r8 (raw file):
std::vector<std::string> values_on_pmem; /* Recover the data in second run of application */ queue.try_consume_batch([&](queue_type::batch_type acc) {
What about status? Above was handled. Can be useful when debugging
tests/mpsc_queue/consume_multipass.cpp, line 35 at r8 (raw file):
{ auto proot = pop.root();
too many new lines between declarations
tests/mpsc_queue/consume_multipass.cpp, line 87 at r8 (raw file):
pmem::obj::pool<struct root> pop; pop = pmem::obj::pool<root>::create( std::string(path), LAYOUT, PMEMOBJ_MIN_POOL, S_IWUSR | S_IRUSR);
change declaration to definition
tests/mpsc_queue/empty.cpp, line 38 at r8 (raw file):
auto proot = pop.root(); auto queue = queue_type(*proot->log, 1); auto worker = queue.register_worker();
new lines to remove
igchor
left a comment
There was a problem hiding this comment.
Reviewable status: 15 of 21 files reviewed, 17 unresolved discussions (waiting on @KFilipek, @lukaszstolarczuk, and @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 97 at r7 (raw file):
Previously, KFilipek (Krzysztof Filipek) wrote…
private: mpsc_queue *queue; ringbuf::ringbuf_worker_t *w; size_t id;public always first, internals at the end
Done.
tests/CMakeLists.txt, line 12 at r8 (raw file):
Previously, KFilipek (Krzysztof Filipek) wrote…
add_cppstyle(tests-common ${CMAKE_CURRENT_SOURCE_DIR}/common/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/*.c ${CMAKE_CURRENT_SOURCE_DIR}/*.h ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*.hpp ${CMAKE_CURRENT_SOURCE_DIR}/check_is_pmem/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/container_generic/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/radix_tree/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/mpsc_queue/*.*pp) add_check_whitespace(tests-common ${CMAKE_CURRENT_SOURCE_DIR}/common/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/*.c ${CMAKE_CURRENT_SOURCE_DIR}/*.h ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*.hpp ${CMAKE_CURRENT_SOURCE_DIR}/check_is_pmem/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/container_generic/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/radix_tree/*.*pp ${CMAKE_CURRENT_SOURCE_DIR}/mpsc_queue/*.*pp)Whats the problem with using one variable containing all files? Now it's duplicated.
Done.
tests/mpsc_queue/basic.cpp, line 84 at r8 (raw file):
Previously, KFilipek (Krzysztof Filipek) wrote…
What about status? Above was handled. Can be useful when debugging
Done.
tests/mpsc_queue/consume_multipass.cpp, line 35 at r8 (raw file):
Previously, KFilipek (Krzysztof Filipek) wrote…
too many new lines between declarations
Done.
tests/mpsc_queue/consume_multipass.cpp, line 87 at r8 (raw file):
Previously, KFilipek (Krzysztof Filipek) wrote…
pmem::obj::pool<struct root> pop; pop = pmem::obj::pool<root>::create( std::string(path), LAYOUT, PMEMOBJ_MIN_POOL, S_IWUSR | S_IRUSR);change declaration to definition
You mean to assign to to pop in the line where it's defined? Done.
tests/mpsc_queue/empty.cpp, line 38 at r8 (raw file):
Previously, KFilipek (Krzysztof Filipek) wrote…
auto proot = pop.root(); auto queue = queue_type(*proot->log, 1); auto worker = queue.register_worker();new lines to remove
Done.
dcfc881 to
326def7
Compare
KFilipek
left a comment
There was a problem hiding this comment.
Reviewed 1 of 17 files at r4, 4 of 12 files at r6, 1 of 5 files at r9.
Reviewable status: 14 of 21 files reviewed, 11 unresolved discussions (waiting on @karczex, @KFilipek, @lukaszstolarczuk, and @pbalcer)
lukaszstolarczuk
left a comment
There was a problem hiding this comment.
Reviewed 2 of 17 files at r4, 1 of 4 files at r5, 8 of 12 files at r6, 5 of 5 files at r9.
Reviewable status: all files reviewed, 9 unresolved discussions (waiting on @igchor and @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 49 at r9 (raw file):
static constexpr size_t CAPACITY = pmem::detail::CACHELINE_SIZE - sizeof(size_t); static constexpr size_t DIRTY_FLAG = (1ULL << 63);
shouldn't this be shifted by CACHLINE_SIZE-1? (instead of 63)
tests/mpsc_queue/recovery.cpp, line 151 at r9 (raw file):
test(int argc, char *argv[]) { if (argc < 3 || strchr("cox", argv[1][0]) == nullptr)
< 4
tests/mpsc_queue/recovery.cpp, line 183 at r9 (raw file):
} catch (pmem::pool_error &pe) { UT_FATAL("!pool::create: %s %s", pe.what(), path); }
pop.close()
tests/mpsc_queue/recovery_after_consume.cpp, line 10 at r9 (raw file):
#include "queue.hpp" #include "unittest.hpp"
not required, included already in queue.hpp
tests/mpsc_queue/recovery_order.cpp, line 9 at r9 (raw file):
#include "queue.hpp" #include "unittest.hpp"
.
- remove recover() method (try_consume can now be used) - store written offset on pmem so that we can recover consumer/producer offsets on restart - call user callback twice in try_consume if wraparound happens
- rename try_consume to try_consume_batch - rename read_accessor to batch_type - clear cachelines only after user callback executes: this allows iterating over the data arbitrary number of times (including 0) - call user callback within a transaction to simplify logic
It needs to be further debugged
326def7 to
26fdc8e
Compare
igchor
left a comment
There was a problem hiding this comment.
Reviewable status: all files reviewed, 9 unresolved discussions (waiting on @lukaszstolarczuk and @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 49 at r9 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
shouldn't this be shifted by
CACHLINE_SIZE-1? (instead of63)
No, this is just setting the most significant bit (it does not depend on CACHELINE_SIZE).
tests/mpsc_queue/recovery.cpp, line 151 at r9 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
< 4
Done.
tests/mpsc_queue/recovery.cpp, line 183 at r9 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
pop.close()
Done.
tests/mpsc_queue/recovery_after_consume.cpp, line 10 at r9 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
not required, included already in queue.hpp
Done.
tests/mpsc_queue/recovery_order.cpp, line 9 at r9 (raw file):
Previously, lukaszstolarczuk (Łukasz Stolarczuk) wrote…
.
Done.
lukaszstolarczuk
left a comment
There was a problem hiding this comment.
Reviewed 3 of 3 files at r10.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @pbalcer)
include/libpmemobj++/experimental/mpsc_queue.hpp, line 49 at r9 (raw file):
Previously, igchor (Igor Chorążewicz) wrote…
No, this is just setting the most significant bit (it does not depend on CACHELINE_SIZE).
hah, of course, mea culpa, I've misread that
store_to_log implementation based on https://github.com/pmem/pmdk/blob/master/src/libpmemobj/ulog.c#L421
This change is