Skip to content

Commit

Permalink
Speed up fixed-sized array iteration (#1050)
Browse files Browse the repository at this point in the history
* Add bench for vecs (which use FixedSizeVec)

* Speed up fixed-sized array iteration

* Cache the len for fastfixedsizelist

* Use extern C to prevent into_iter from being used
  • Loading branch information
jleibs committed Mar 2, 2023
1 parent bebf243 commit 63c6983
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 34 deletions.
111 changes: 95 additions & 16 deletions crates/re_log_types/src/component_types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! schemas are additionally documented in doctests.

use arrow2::{
array::{FixedSizeListArray, MutableFixedSizeListArray},
array::{FixedSizeListArray, MutableFixedSizeListArray, PrimitiveArray},
datatypes::{DataType, Field},
};
use arrow2_convert::{
Expand Down Expand Up @@ -126,8 +126,8 @@ pub type Result<T> = std::result::Result<T, FieldError>;
///
/// #[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
/// pub struct ConvertibleType {
/// #[arrow_field(type = "FixedSizeArrayField<bool,2>")]
/// data: [bool; 2],
/// #[arrow_field(type = "FixedSizeArrayField<u32,2>")]
/// data: [u32; 2],
/// }
/// ```
pub struct FixedSizeArrayField<T, const SIZE: usize>(std::marker::PhantomData<T>);
Expand Down Expand Up @@ -172,27 +172,106 @@ where
}
}

pub struct FastFixedSizeArrayIter<'a, T, const SIZE: usize>
where
T: arrow2::types::NativeType,
{
offset: usize,
end: usize,
array: &'a FixedSizeListArray,
values: &'a PrimitiveArray<T>,
}

impl<'a, T, const SIZE: usize> Iterator for FastFixedSizeArrayIter<'a, T, SIZE>
where
T: arrow2::types::NativeType,
{
type Item = Option<[T; SIZE]>;

fn next(&mut self) -> Option<Self::Item> {
if self.offset < self.end {
if let Some(validity) = self.array.validity() {
if !validity.get_bit(self.offset) {
self.offset += 1;
return Some(None);
}
}

let out: [T; SIZE] =
array_init::array_init(|i: usize| self.values.value(self.offset * SIZE + i));
self.offset += 1;
Some(Some(out))
} else {
None
}
}
}

pub struct FastFixedSizeListArray<T, const SIZE: usize>(std::marker::PhantomData<T>);

extern "C" {
fn do_not_call_into_iter(); // we never define this function, so the linker will fail
}

impl<'a, T, const SIZE: usize> IntoIterator for &'a FastFixedSizeListArray<T, SIZE>
where
T: arrow2::types::NativeType,
{
type Item = Option<[T; SIZE]>;

type IntoIter = FastFixedSizeArrayIter<'a, T, SIZE>;

fn into_iter(self) -> Self::IntoIter {
#[allow(unsafe_code)]
// SAFETY:
// This exists so we get a link-error if some code tries to call into_iter
// Iteration should only happen via iter_from_array_ref.
// This is a quirk of the way the traits work in arrow2_convert.
unsafe {
do_not_call_into_iter();
}
unreachable!();
}
}

impl<T, const SIZE: usize> ArrowArray for FastFixedSizeListArray<T, SIZE>
where
T: arrow2::types::NativeType,
{
type BaseArrayType = FixedSizeListArray;

fn iter_from_array_ref(b: &dyn arrow2::array::Array) -> <&Self as IntoIterator>::IntoIter {
let array = b.as_any().downcast_ref::<Self::BaseArrayType>().unwrap();
let values = array
.values()
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.unwrap();
FastFixedSizeArrayIter::<T, SIZE> {
offset: 0,
end: array.len(),
array,
values,
}
}
}

impl<T, const SIZE: usize> ArrowDeserialize for FixedSizeArrayField<T, SIZE>
where
T: ArrowDeserialize + ArrowEnableVecForType + ArrowField<Type = T> + 'static,
T: arrow2::types::NativeType
+ ArrowDeserialize
+ ArrowEnableVecForType
+ ArrowField<Type = T>
+ 'static,
<T as ArrowDeserialize>::ArrayType: 'static,
for<'b> &'b <T as ArrowDeserialize>::ArrayType: IntoIterator,
{
type ArrayType = FixedSizeListArray;
type ArrayType = FastFixedSizeListArray<T, SIZE>;

#[inline]
fn arrow_deserialize(
v: <&Self::ArrayType as IntoIterator>::Item,
) -> Option<<Self as ArrowField>::Type> {
if let Some(array) = v {
let mut iter = <<T as ArrowDeserialize>::ArrayType as ArrowArray>::iter_from_array_ref(
array.as_ref(),
)
.map(<T as ArrowDeserialize>::arrow_deserialize_internal);
let out: Result<[T; SIZE]> =
array_init::try_array_init(|_i: usize| iter.next().ok_or(FieldError::BadValue));
out.ok()
} else {
None
}
v
}
}
17 changes: 17 additions & 0 deletions crates/re_log_types/src/datagen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ pub fn build_some_point2d(len: usize) -> Vec<component_types::Point2D> {
.collect()
}

/// Create `len` dummy `Vec3D`
pub fn build_some_vec3d(len: usize) -> Vec<component_types::Vec3D> {
use rand::Rng as _;
let mut rng = rand::thread_rng();

(0..len)
.into_iter()
.map(|_| {
component_types::Vec3D::new(
rng.gen_range(0.0..10.0),
rng.gen_range(0.0..10.0),
rng.gen_range(0.0..10.0),
)
})
.collect()
}

/// Build a ([`Timeline`], [`TimeInt`]) tuple from `log_time` suitable for inserting in a [`crate::TimePoint`].
pub fn build_log_time(log_time: Time) -> (Timeline, TimeInt) {
(Timeline::log_time(), log_time.into())
Expand Down
110 changes: 92 additions & 18 deletions crates/re_query/benches/query_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,36 @@ use criterion::{criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use re_arrow_store::{DataStore, LatestAtQuery};
use re_log_types::{
component_types::{ColorRGBA, InstanceKey, Point2D},
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
component_types::{ColorRGBA, InstanceKey, Point2D, Vec3D},
datagen::{build_frame_nr, build_some_colors, build_some_point2d, build_some_vec3d},
entity_path,
msg_bundle::{try_build_msg_bundle2, Component, MsgBundle},
msg_bundle::{try_build_msg_bundle1, try_build_msg_bundle2, Component, MsgBundle},
EntityPath, Index, MsgId, TimeType, Timeline,
};
use re_query::query_entity_with_primary;

// ---

#[cfg(not(debug_assertions))]
const NUM_FRAMES: u32 = 1_000;
const NUM_FRAMES_POINTS: u32 = 1_000;
#[cfg(not(debug_assertions))]
const NUM_POINTS: u32 = 1_000;
#[cfg(not(debug_assertions))]
const NUM_FRAMES_VECS: u32 = 10;
#[cfg(not(debug_assertions))]
const NUM_VECS: u32 = 100_000;

// `cargo test` also runs the benchmark setup code, so make sure they run quickly:
#[cfg(debug_assertions)]
const NUM_FRAMES: u32 = 1;
const NUM_FRAMES_POINTS: u32 = 1;
#[cfg(debug_assertions)]
const NUM_POINTS: u32 = 1;
#[cfg(debug_assertions)]
const NUM_FRAMES_VECS: u32 = 1;
#[cfg(debug_assertions)]
const NUM_VECS: u32 = 1;

criterion_group!(benches, mono_points, batch_points);
criterion_group!(benches, mono_points, batch_points, batch_vecs);
criterion_main!(benches);

// --- Benchmarks ---
Expand All @@ -38,14 +46,14 @@ fn mono_points(c: &mut Criterion) {
.into_iter()
.map(move |point_idx| entity_path!("points", Index::Sequence(point_idx as _)))
.collect_vec();
let msgs = build_messages(&paths, 1);
let msgs = build_points_messages(&paths, 1);

{
let mut group = c.benchmark_group("arrow_mono_points");
// Mono-insert is slow -- decrease the sample size
group.sample_size(10);
group.throughput(criterion::Throughput::Elements(
(NUM_POINTS * NUM_FRAMES) as _,
(NUM_POINTS * NUM_FRAMES_POINTS) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_messages(msgs.iter()));
Expand All @@ -57,20 +65,20 @@ fn mono_points(c: &mut Criterion) {
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
let mut store = insert_messages(msgs.iter());
group.bench_function("query", |b| {
b.iter(|| query_and_visit(&mut store, &paths));
b.iter(|| query_and_visit_points(&mut store, &paths));
});
}
}

fn batch_points(c: &mut Criterion) {
// Batch points are logged together at a single path
let paths = [EntityPath::from("points")];
let msgs = build_messages(&paths, NUM_POINTS as _);
let msgs = build_points_messages(&paths, NUM_POINTS as _);

{
let mut group = c.benchmark_group("arrow_batch_points");
group.throughput(criterion::Throughput::Elements(
(NUM_POINTS * NUM_FRAMES) as _,
(NUM_POINTS * NUM_FRAMES_POINTS) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_messages(msgs.iter()));
Expand All @@ -82,15 +90,40 @@ fn batch_points(c: &mut Criterion) {
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
let mut store = insert_messages(msgs.iter());
group.bench_function("query", |b| {
b.iter(|| query_and_visit(&mut store, &paths));
b.iter(|| query_and_visit_points(&mut store, &paths));
});
}
}

fn batch_vecs(c: &mut Criterion) {
// Batch points are logged together at a single path
let paths = [EntityPath::from("vec")];
let msgs = build_vecs_messages(&paths, NUM_VECS as _);

{
let mut group = c.benchmark_group("arrow_batch_vecs");
group.throughput(criterion::Throughput::Elements(
(NUM_VECS * NUM_FRAMES_VECS) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_messages(msgs.iter()));
});
}

{
let mut group = c.benchmark_group("arrow_batch_vecs");
group.throughput(criterion::Throughput::Elements(NUM_VECS as _));
let mut store = insert_messages(msgs.iter());
group.bench_function("query", |b| {
b.iter(|| query_and_visit_vecs(&mut store, &paths));
});
}
}

// --- Helpers ---

fn build_messages(paths: &[EntityPath], pts: usize) -> Vec<MsgBundle> {
(0..NUM_FRAMES)
fn build_points_messages(paths: &[EntityPath], pts: usize) -> Vec<MsgBundle> {
(0..NUM_FRAMES_POINTS)
.into_iter()
.flat_map(move |frame_idx| {
paths.iter().map(move |path| {
Expand All @@ -106,20 +139,37 @@ fn build_messages(paths: &[EntityPath], pts: usize) -> Vec<MsgBundle> {
.collect()
}

fn build_vecs_messages(paths: &[EntityPath], pts: usize) -> Vec<MsgBundle> {
(0..NUM_FRAMES_VECS)
.into_iter()
.flat_map(move |frame_idx| {
paths.iter().map(move |path| {
try_build_msg_bundle1(
MsgId::ZERO,
path.clone(),
[build_frame_nr((frame_idx as i64).into())],
build_some_vec3d(pts),
)
.unwrap()
})
})
.collect()
}

fn insert_messages<'a>(msgs: impl Iterator<Item = &'a MsgBundle>) -> DataStore {
let mut store = DataStore::new(InstanceKey::name(), Default::default());
msgs.for_each(|msg_bundle| store.insert(msg_bundle).unwrap());
store
}

struct Point {
struct SavePoint {
_pos: Point2D,
_color: Option<ColorRGBA>,
}

fn query_and_visit(store: &mut DataStore, paths: &[EntityPath]) -> Vec<Point> {
fn query_and_visit_points(store: &mut DataStore, paths: &[EntityPath]) -> Vec<SavePoint> {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES as i64 / 2).into());
let query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES_POINTS as i64 / 2).into());

let mut points = Vec::with_capacity(NUM_POINTS as _);

Expand All @@ -128,7 +178,7 @@ fn query_and_visit(store: &mut DataStore, paths: &[EntityPath]) -> Vec<Point> {
query_entity_with_primary::<Point2D>(store, &query, path, &[ColorRGBA::name()])
.and_then(|entity_view| {
entity_view.visit2(|_: InstanceKey, pos: Point2D, color: Option<ColorRGBA>| {
points.push(Point {
points.push(SavePoint {
_pos: pos,
_color: color,
});
Expand All @@ -140,3 +190,27 @@ fn query_and_visit(store: &mut DataStore, paths: &[EntityPath]) -> Vec<Point> {
assert_eq!(NUM_POINTS as usize, points.len());
points
}

struct SaveVec {
_vec: Vec3D,
}

fn query_and_visit_vecs(store: &mut DataStore, paths: &[EntityPath]) -> Vec<SaveVec> {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES_POINTS as i64 / 2).into());

let mut rects = Vec::with_capacity(NUM_VECS as _);

for path in paths.iter() {
query_entity_with_primary::<Vec3D>(store, &query, path, &[])
.and_then(|entity_view| {
entity_view.visit1(|_: InstanceKey, vec: Vec3D| {
rects.push(SaveVec { _vec: vec });
})
})
.ok()
.unwrap();
}
assert_eq!(NUM_VECS as usize, rects.len());
rects
}

1 comment on commit 63c6983

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 63c6983 Previous: 83d2a35 Ratio
datastore/insert/batch/rects/insert 552316 ns/iter (± 2383) 550839 ns/iter (± 1525) 1.00
datastore/latest_at/batch/rects/query 1822 ns/iter (± 7) 1836 ns/iter (± 6) 0.99
datastore/latest_at/missing_components/primary 355 ns/iter (± 5) 355 ns/iter (± 0) 1
datastore/latest_at/missing_components/secondaries 425 ns/iter (± 4) 425 ns/iter (± 0) 1
datastore/range/batch/rects/query 152972 ns/iter (± 397) 152389 ns/iter (± 373) 1.00
mono_points_arrow/generate_message_bundles 50439069 ns/iter (± 1406261) 47372515 ns/iter (± 630596) 1.06
mono_points_arrow/generate_messages 139364239 ns/iter (± 1352225) 124935945 ns/iter (± 1111718) 1.12
mono_points_arrow/encode_log_msg 165412326 ns/iter (± 1018263) 155735095 ns/iter (± 830480) 1.06
mono_points_arrow/encode_total 358268137 ns/iter (± 2318907) 329452628 ns/iter (± 1403973) 1.09
mono_points_arrow/decode_log_msg 187647094 ns/iter (± 1105885) 177738848 ns/iter (± 772148) 1.06
mono_points_arrow/decode_message_bundles 76576151 ns/iter (± 1193769) 65932294 ns/iter (± 868436) 1.16
mono_points_arrow/decode_total 259760980 ns/iter (± 1955469) 241526260 ns/iter (± 1777000) 1.08
batch_points_arrow/generate_message_bundles 336340 ns/iter (± 556) 336025 ns/iter (± 703) 1.00
batch_points_arrow/generate_messages 6326 ns/iter (± 16) 6324 ns/iter (± 15) 1.00
batch_points_arrow/encode_log_msg 373754 ns/iter (± 1448) 366676 ns/iter (± 1674) 1.02
batch_points_arrow/encode_total 730554 ns/iter (± 3057) 735929 ns/iter (± 2564) 0.99
batch_points_arrow/decode_log_msg 345406 ns/iter (± 1700) 351146 ns/iter (± 4186) 0.98
batch_points_arrow/decode_message_bundles 2084 ns/iter (± 8) 2060 ns/iter (± 1) 1.01
batch_points_arrow/decode_total 353760 ns/iter (± 1078) 357323 ns/iter (± 1056) 0.99
arrow_mono_points/insert 7025517516 ns/iter (± 25787964) 6053105766 ns/iter (± 15291172) 1.16
arrow_mono_points/query 1737304 ns/iter (± 20350) 1727372 ns/iter (± 11088) 1.01
arrow_batch_points/insert 2643718 ns/iter (± 6961) 2644070 ns/iter (± 15560) 1.00
arrow_batch_points/query 16995 ns/iter (± 51) 17586 ns/iter (± 39) 0.97
arrow_batch_vecs/insert 42367 ns/iter (± 143)
arrow_batch_vecs/query 389510 ns/iter (± 722)
tuid/Tuid::random 34 ns/iter (± 0) 34 ns/iter (± 0) 1

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.