Skip to content

Commit

Permalink
Add --verify option to spead2_send and spead2_recv
Browse files Browse the repository at this point in the history
It puts random data (created from a linear congruential generator) into
the data stream, and verifies it on the receive side. This can help with
testing issues that are missed by unit tests e.g. corner cases that only
affect a small number of heaps, or that require the buffer to be
recycled before they show up.
  • Loading branch information
bmerry committed Oct 15, 2020
1 parent b09f1e3 commit bb5f1fd
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 28 deletions.
5 changes: 5 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ Other changes:
(with the default being ``AUTO``) so that it can later be re-enabled under
circumstances where it is known to work well, while still allowing it to be
explicitly enabled or disabled.
- Add :option:`!--verify` option to :program:`spead2_send` and
:program:`spead2_recv` to aid in testing the code. To support this,
:program:`spead2_send` was modified so that each in-flight heap uses
different memory, which may reduce performance (due to less cache re-use)
even when the option is not given.

Additionally, refer to the changes for 3.0.0b1 below.

Expand Down
83 changes: 82 additions & 1 deletion src/spead2_recv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
#include <vector>
#include <string>
#include <memory>
#include <random>
#include <boost/program_options.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <spead2/common_thread_pool.h>
#include <spead2/common_endian.h>
#include <spead2/recv_udp.h>
#include <spead2/recv_tcp.h>
#if SPEAD2_USE_IBV
Expand All @@ -48,6 +50,7 @@ struct options
bool descriptors = false;
bool joint = false;
int threads = 1;
bool verify = false;
std::vector<std::string> sources;
spead2::protocol_options protocol;
spead2::recv::receiver_options receiver;
Expand Down Expand Up @@ -75,6 +78,7 @@ static options parse_args(int argc, const char **argv)
("descriptors", spead2::make_value_semantic(&opts.descriptors), "Show descriptors")
("joint", spead2::make_value_semantic(&opts.joint), "Treat all sources as a single stream")
("threads", spead2::make_value_semantic(&opts.threads), "Number of worker threads")
("verify", spead2::make_value_semantic(&opts.verify), "Verify payload (use spead2_send with same option")
;

hidden.add_options()
Expand Down Expand Up @@ -114,7 +118,7 @@ static options parse_args(int argc, const char **argv)
}
}

void show_heap(const spead2::recv::heap &fheap, const options &opts)
static void show_heap(const spead2::recv::heap &fheap, const options &opts)
{
if (opts.quiet)
return;
Expand Down Expand Up @@ -170,6 +174,82 @@ void show_heap(const spead2::recv::heap &fheap, const options &opts)
std::cout << std::noshowbase;
}

/* O(log(z)) version of engine.discard(z), which in GCC seems to be implemented in O(z). */
template<std::uint_fast32_t a, std::uint_fast32_t m>
static void fast_discard(std::linear_congruential_engine<std::uint_fast32_t, a, 0, m> &engine,
unsigned long long z)
{
if (z == 0)
return;
// There is no way to directly observe the current state. We can only
// advance to the following state.
std::uint64_t x = engine();
z--;
// Multiply by a^z mod m
std::uint64_t apow = a;
while (z > 0)
{
if (z & 1)
x = x * apow % m;
apow = apow * apow % m;
z >>= 1;
}
engine.seed(x);
}

static void verify_heap(const spead2::recv::heap &fheap, const options &opts)
{
if (!opts.verify)
return;
const auto &items = fheap.get_items();

typedef uint32_t element_t;
bool first = true;
std::size_t elements = 0;
std::minstd_rand generator;
for (const auto &item : items)
{
if (item.id < 0x1000)
continue;
if (first)
{
elements = item.length / sizeof(element_t);
// The first heap gets numbered 1 rather than 0
std::uint64_t start_pos = elements * items.size() * (fheap.get_cnt() - 1);
fast_discard(generator, start_pos);
first = false;
}
if (item.length != elements * sizeof(element_t))
{
std::cerr << "Heap " << fheap.get_cnt()
<< ", item 0x" << std::hex << item.id << std::dec
<< " has an inconsistent length\n";
std::exit(1);
}
const element_t *data = reinterpret_cast<const element_t *>(item.ptr);
for (std::size_t i = 0; i < elements; i++)
{
element_t expected = generator();
element_t actual = spead2::betoh(data[i]);
if (expected != actual)
{
std::cerr << "Verification mismatch in heap " << fheap.get_cnt()
<< ", item 0x" << std::hex << item.id << std::dec
<< " offset " << i
<< "\nexpected 0x" << std::hex << expected << ", actual 0x" << actual << std::dec
<< std::endl;
std::exit(1);
}
}
}

if (first && !fheap.is_end_of_stream())
{
spead2::log_warning("Heap %d has no verifiable items but it not an end-of-stream heap",
fheap.get_cnt());
}
}

class callback_stream : public spead2::recv::stream
{
private:
Expand All @@ -182,6 +262,7 @@ class callback_stream : public spead2::recv::stream
{
spead2::recv::heap frozen(std::move(heap));
show_heap(frozen, opts);
verify_heap(frozen, opts);
n_complete++;
}
else if (!opts.quiet)
Expand Down
83 changes: 56 additions & 27 deletions src/spead2_send.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
#include <cstdlib>
#include <cstdint>
#include <memory>
#include <random>
#include <boost/program_options.hpp>
#include <boost/asio.hpp>
#include <spead2/common_thread_pool.h>
#include <spead2/common_semaphore.h>
#include <spead2/common_endian.h>
#include <spead2/common_memory_allocator.h>
#include <spead2/send_stream.h>
#include <spead2/send_udp.h>
#include <spead2/send_tcp.h>
Expand All @@ -49,6 +52,7 @@ struct options
std::size_t heap_size = 4194304;
std::size_t items = 1;
std::int64_t heaps = -1;
bool verify = false;
std::vector<std::string> dest;
};

Expand All @@ -66,6 +70,7 @@ static options parse_args(int argc, const char **argv)
("heap-size", spead2::make_value_semantic(&opts.heap_size), "Payload size for heap")
("items", spead2::make_value_semantic(&opts.items), "Number of items per heap")
("heaps", spead2::make_value_semantic(&opts.heaps), "Number of data heaps to send (-1=infinite)")
("verify", spead2::make_value_semantic(&opts.verify), "Insert payload values that receiver can verify")
;
spead2::option_adder adder(desc);
opts.protocol.enumerate(adder);
Expand Down Expand Up @@ -119,19 +124,22 @@ class sender
const std::size_t n_substreams;
const std::int64_t n_heaps;
const spead2::flavour flavour;
const std::size_t value_size;
const std::size_t elements; // number of element_t's per item
const bool verify;

spead2::send::heap first_heap; // has descriptors
std::vector<spead2::send::heap> first_heaps; // have descriptors
std::vector<spead2::send::heap> heaps;
spead2::send::heap last_heap; // has end-of-stream marker
typedef std::pair<float, float> item_t;
std::vector<std::unique_ptr<item_t[]>> values;
typedef std::uint32_t element_t;
spead2::memory_allocator::pointer storage; // data for all heaps
std::vector<std::vector<element_t *>> pointers; // start of data per item per heap
std::minstd_rand generator;

std::uint64_t bytes_transferred = 0;
boost::system::error_code error;
spead2::semaphore done_sem{0};

const spead2::send::heap &get_heap(std::uint64_t idx) const noexcept;
const spead2::send::heap &get_heap(std::uint64_t idx) noexcept;

void callback(spead2::send::stream &stream,
std::uint64_t idx,
Expand All @@ -151,22 +159,28 @@ sender::sender(const options &opts)
? opts.sender.max_heaps : opts.heaps + opts.dest.size()),
n_substreams(opts.dest.size()),
n_heaps(opts.heaps),
value_size(opts.heap_size / (opts.items * sizeof(item_t)) * sizeof(item_t)),
first_heap(opts.sender.make_flavour(opts.protocol)),
elements(opts.heap_size / (opts.items * sizeof(element_t))),
verify(opts.verify),
last_heap(opts.sender.make_flavour(opts.protocol))
{
first_heaps.reserve(max_heaps);
heaps.reserve(max_heaps);
for (std::size_t i = 0; i < max_heaps; i++)
{
first_heaps.emplace_back(flavour);
heaps.emplace_back(flavour);
}

const std::size_t elements = opts.heap_size / (opts.items * sizeof(item_t));
const std::size_t heap_size = elements * opts.items * sizeof(item_t);
const std::size_t item_size = elements * sizeof(element_t);
const std::size_t heap_size = item_size * opts.items;
if (heap_size != opts.heap_size)
{
std::cerr << "Heap size is not an exact multiple: using " << heap_size << " instead of " << opts.heap_size << '\n';
}

values.reserve(opts.items);
auto allocator = std::make_shared<spead2::mmap_allocator>(0, true);
storage = allocator->allocate(heap_size * max_heaps, nullptr);

for (std::size_t i = 0; i < opts.items; i++)
{
spead2::descriptor d;
Expand All @@ -176,35 +190,52 @@ sender::sender(const options &opts)
d.name = sstr.str();
d.description = "A test item with arbitrary value";
sstr.str("");
sstr << "{'shape': (" << elements << ",), 'fortran_order': False, 'descr': '<c8'}";
sstr << "{'shape': (" << elements << ",), 'fortran_order': False, 'descr': ";
sstr << "'>u4'}";
d.numpy_header = sstr.str();
first_heap.add_descriptor(d);
std::unique_ptr<item_t[]> item(new item_t[elements]);
item_t *ptr = item.get();
values.push_back(std::move(item));
for (std::size_t j = 0; j < max_heaps; j++)
heaps[j].add_item(0x1000 + i, ptr, elements * sizeof(item_t), true);
first_heap.add_item(0x1000 + i, ptr, elements * sizeof(item_t), true);
first_heaps[j].add_descriptor(d);
}

pointers.resize(max_heaps);
for (std::size_t i = 0; i < max_heaps; i++)
{
pointers[i].resize(opts.items);
for (std::size_t j = 0; j < opts.items; j++)
{
pointers[i][j] = reinterpret_cast<element_t *>(
storage.get() + i * heap_size + j * item_size);
first_heaps[i].add_item(0x1000 + j, pointers[i][j], item_size, true);
heaps[i].add_item(0x1000 + j, pointers[i][j], item_size, true);
}
}
last_heap.add_end();
}

std::vector<std::pair<const void *, std::size_t>> sender::memory_regions() const
{
std::vector<std::pair<const void *, std::size_t>> out;
for (const auto &value : values)
out.emplace_back(value.get(), value_size);
return out;
return {{storage.get(), max_heaps * elements * sizeof(element_t)}};
}

const spead2::send::heap &sender::get_heap(std::uint64_t idx) const noexcept
const spead2::send::heap &sender::get_heap(std::uint64_t idx) noexcept
{
spead2::send::heap *heap;
if (idx < n_substreams)
return first_heap;
heap = &first_heaps[idx % max_heaps];
else if (n_heaps >= 0 && idx >= std::uint64_t(n_heaps))
return last_heap;
else
return heaps[idx % max_heaps];
heap = &heaps[idx % max_heaps];

if (verify)
{
const std::vector<element_t *> &ptrs = pointers[idx % max_heaps];
// Fill in random values to be checked by the receiver
for (std::size_t i = 0; i < ptrs.size(); i++)
for (std::size_t j = 0; j < elements; j++)
ptrs[i][j] = spead2::htobe(std::uint32_t(generator()));
}
return *heap;
}

void sender::callback(spead2::send::stream &stream,
Expand Down Expand Up @@ -240,9 +271,7 @@ std::uint64_t sender::run(spead2::send::stream &stream)
error = boost::system::error_code();
/* Send the initial heaps from the worker thread. This ensures that no
* callbacks can happen until the initial heaps are all sent, which would
* otherwise lead to heaps being queued out of order. For this benchmark it
* doesn't really matter since the heaps are all the same, but it makes it
* a more realistic benchmark.
* otherwise lead to heaps being queued out of order.
*/
stream.get_io_service().post([this, &stream] {
for (int i = 0; i < max_heaps; i++)
Expand Down

0 comments on commit bb5f1fd

Please sign in to comment.