Skip to content

Commit

Permalink
Initial work on tut 12
Browse files Browse the repository at this point in the history
The code is written - just needs to be explained
  • Loading branch information
bmerry committed Jan 26, 2024
1 parent f18966d commit ca34797
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 0 deletions.
43 changes: 43 additions & 0 deletions doc/tut-12-recv-chunks.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
Receiving chunks
================

The previous two sections vastly improved transmit performance for small
heaps. We'll now turn to improving the receiver performance for small heaps.
The APIs we'll use can actually be useful for more than just very small heaps
though; they're generally useful when you want to group the payload for
multiple heaps into a larger contiguous array, or when you want more control
over the memory allocation and management.

The basic approach is similar to the batching used in the previous section,
but batching on the receive side is more complicated:

1. UDP is unreliable, so we need to be able to handle batches that are missing
some heaps.

1. We don't know in what order heaps will be received, so we need to be able
to use metadata (in our application, the timestamp) to steer heaps to the
right place.

This steering is done by a callback function that we provide to spead2. This
poses a challenge for Python: we saw in the previous section that interacting
with the Python interpreter on a per-heap basis is bad for performance, but
this callback function needs to be called for each heap. Additionally, the
function is called from the thread pool rather than the main thread, and if it
tries to execute Python code there will be contention for the Global
Interpreter Lock (GIL). To avoid these problems, we need a function that
doesn't interact with Python at all. We could write it in C (or another
compiled language like C++ or Rust), but numba_ conveniently provides a way to
write C-compatible functions right inside our Python code using Python syntax.
In the C++ version of the tutorial we'll just have a normal C++ function.

.. _numba: http://numba.org/

Full code
---------
.. tab-set-code::

.. literalinclude:: ../examples/tut_12_recv_chunks.py
:language: python

.. literalinclude:: ../examples/tut_12_recv_chunks.cpp
:language: c++
1 change: 1 addition & 0 deletions doc/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ different.
tut-9-recv-memory-pool
tut-10-send-reuse-heaps
tut-11-send-batch-heaps
tut-12-recv-chunks
1 change: 1 addition & 0 deletions examples/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ foreach name : [
'tut_9_recv_memory_pool',
'tut_10_send_reuse_heaps',
'tut_11_send_batch_heaps',
'tut_12_recv_chunks',
]
executable(name, name + '.cpp', dependencies : [st_dep])
endforeach
Expand Down
155 changes: 155 additions & 0 deletions examples/tut_12_recv_chunks.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/* Copyright 2023-2024 National Research Foundation (SARAO)
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#include <cassert>
#include <cstdint>
#include <iostream>
#include <iomanip>
#include <algorithm>
#include <numeric>
#include <unistd.h>
#include <boost/asio.hpp>
#include <spead2/common_ringbuffer.h>
#include <spead2/common_thread_pool.h>
#include <spead2/recv_ring_stream.h>
#include <spead2/recv_chunk_stream.h>
#include <spead2/recv_udp.h>
#include <spead2/recv_heap.h>

static void usage(const char *name)
{
std::cerr << "Usage: " << name << " [-H heap-size] port\n";
}

#if defined(__GNUC__) && defined(__x86_64__)
// Compile this function with AVX2 for better performance. Remove this if your
// CPU does not support AVX2 (e.g., if you get an Illegal Instruction error).
[[gnu::target("avx2")]]
#endif
static double mean_power(const std::int8_t *adc_samples, const std::uint8_t *present,
std::size_t heap_size, std::size_t heaps)
{
std::int64_t sum = 0;
std::size_t n = 0;
for (std::size_t i = 0; i < heaps; i++)
{
if (present[i])
{
for (std::size_t j = 0; j < heap_size; j++)
{
std::int64_t sample = adc_samples[i * heap_size + j];
sum += sample * sample;
}
n += heap_size;
}
}
return double(sum) / n;
}

void place_callback(
spead2::recv::chunk_place_data *data,
std::int64_t heap_size, std::int64_t chunk_size)
{
auto payload_size = data->items[0];
auto timestamp = data->items[1];
if (timestamp >= 0 && payload_size == heap_size)
{
data->chunk_id = timestamp / chunk_size;
data->heap_offset = timestamp % chunk_size;
data->heap_index = data->heap_offset / heap_size;
}
}

int main(int argc, char * const argv[])
{
int opt;
std::int64_t heap_size = 1024 * 1024;
while ((opt = getopt(argc, argv, "H:")) != -1)
{
switch (opt)
{
case 'H':
heap_size = std::stoll(optarg);
break;
default:
usage(argv[0]);
return 2;
}
}
if (argc - optind != 1)
{
usage(argv[0]);
return 2;
}

std::int64_t chunk_size = 1024 * 1024; // Preliminary value
std::int64_t chunk_heaps = std::max(std::int64_t(1), chunk_size / heap_size);
chunk_size = chunk_heaps * heap_size; // Final value

spead2::thread_pool thread_pool;
spead2::recv::stream_config config;
config.set_max_heaps(2);
spead2::recv::chunk_stream_config chunk_config;
chunk_config.set_items({spead2::HEAP_LENGTH_ID, 0x1600});
chunk_config.set_max_chunks(1);
chunk_config.set_place(
[=](auto data, auto) { place_callback(data, heap_size, chunk_size); }
);

using ringbuffer = spead2::ringbuffer<std::unique_ptr<spead2::recv::chunk>>;
auto data_ring = std::make_shared<ringbuffer>(2);
auto free_ring = std::make_shared<ringbuffer>(4);
spead2::recv::chunk_ring_stream stream(
thread_pool, config, chunk_config, data_ring, free_ring
);
auto allocator = std::make_shared<spead2::memory_allocator>();
for (std::size_t i = 0; i < free_ring->capacity(); i++)
{
auto chunk = std::make_unique<spead2::recv::chunk>();
chunk->present = allocator->allocate(chunk_heaps, nullptr);
chunk->present_size = chunk_heaps;
chunk->data = allocator->allocate(chunk_size, nullptr);
stream.add_free_chunk(std::move(chunk));
}

boost::asio::ip::udp::endpoint endpoint(
boost::asio::ip::address_v4::any(), std::atoi(argv[optind]));
stream.emplace_reader<spead2::recv::udp_reader>(endpoint);
std::int64_t n_heaps = 0;
while (true)
{
std::unique_ptr<spead2::recv::chunk> chunk;
try
{
chunk = data_ring->pop();
}
catch (spead2::ringbuffer_stopped &)
{
break; // We've received the end-of-stream heap
}
std::int64_t timestamp = chunk->chunk_id * chunk_size;
auto adc_samples = (const std::int8_t *) chunk->data.get();
auto present = chunk->present.get();
n_heaps += std::accumulate(present, present + chunk_heaps, std::size_t(0));
double power = mean_power(adc_samples, present, heap_size, chunk_heaps);
stream.add_free_chunk(std::move(chunk));
std::cout
<< "Timestamp: " << std::setw(10) << std::left << timestamp
<< " Power: " << power << '\n';
}
std::cout << "Received " << n_heaps << " heaps\n";
return 0;
}
107 changes: 107 additions & 0 deletions examples/tut_12_recv_chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env python3

# Copyright 2023-2024 National Research Foundation (SARAO)
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import argparse
import ctypes

import numba
import numpy as np
import scipy
from numba import types

import spead2.recv
from spead2.numba import intp_to_voidptr
from spead2.recv.numba import chunk_place_data


@numba.njit
def mean_power(adc_samples, present):
total = np.int64(0)
n = 0
for i in range(len(present)):
if present[i]:
for j in range(adc_samples.shape[1]):
sample = np.int64(adc_samples[i, j])
total += sample * sample
n += adc_samples.shape[1]
return np.float64(total) / n


@numba.cfunc(
types.void(types.CPointer(chunk_place_data), types.size_t, types.CPointer(types.int64)),
nopython=True,
)
def place_callback(data_ptr, data_size, sizes_ptr):
data = numba.carray(data_ptr, 1)
items = numba.carray(intp_to_voidptr(data[0].items), 2, dtype=np.int64)
sizes = numba.carray(sizes_ptr, 2)
payload_size = items[0]
timestamp = items[1]
if timestamp >= 0 and payload_size == sizes[0]:
data[0].chunk_id = timestamp // sizes[1]
data[0].heap_offset = timestamp % sizes[1]
data[0].heap_index = data[0].heap_offset // sizes[0]


def main():
parser = argparse.ArgumentParser()
parser.add_argument("-H", "--heap-size", type=int, default=1024 * 1024)
parser.add_argument("port", type=int)
args = parser.parse_args()

heap_size = args.heap_size
chunk_size = 1024 * 1024 # Preliminary value
chunk_heaps = max(1, chunk_size // heap_size)
chunk_size = chunk_heaps * heap_size # Final value

thread_pool = spead2.ThreadPool()
config = spead2.recv.StreamConfig(max_heaps=2)
user_data = np.array([heap_size, chunk_size], np.int64)
chunk_config = spead2.recv.ChunkStreamConfig(
items=[spead2.HEAP_LENGTH_ID, 0x1600],
max_chunks=1,
place=scipy.LowLevelCallable(
place_callback.ctypes,
user_data.ctypes.data_as(ctypes.c_void_p),
"void (void *, size_t, void *)",
),
)
data_ring = spead2.recv.ChunkRingbuffer(2)
free_ring = spead2.recv.ChunkRingbuffer(4)
stream = spead2.recv.ChunkRingStream(thread_pool, config, chunk_config, data_ring, free_ring)
for _ in range(free_ring.maxsize):
chunk = spead2.recv.Chunk(
data=np.zeros((chunk_heaps, heap_size), np.int8),
present=np.zeros(chunk_heaps, np.uint8),
)
stream.add_free_chunk(chunk)

stream.add_udp_reader(args.port)
n_heaps = 0
# Run it once to trigger compilation for int8
mean_power(np.ones((1, 1), np.int8), np.ones(1, np.uint8))
for chunk in data_ring:
timestamp = chunk.chunk_id * chunk_size
power = mean_power(chunk.data, chunk.present)
n_heaps += int(np.sum(chunk.present, dtype=np.int64))
stream.add_free_chunk(chunk)
print(f"Timestamp: {timestamp:<10} Power: {power:.2f}")
print(f"Received {n_heaps} heaps")


if __name__ == "__main__":
main()

0 comments on commit ca34797

Please sign in to comment.