diff --git a/crates/re_log_types/src/component_types/mod.rs b/crates/re_log_types/src/component_types/mod.rs index b849418e73d4..66ec5aefb10e 100644 --- a/crates/re_log_types/src/component_types/mod.rs +++ b/crates/re_log_types/src/component_types/mod.rs @@ -4,7 +4,7 @@ //! schemas are additionally documented in doctests. use arrow2::{ - array::{FixedSizeListArray, MutableFixedSizeListArray}, + array::{FixedSizeListArray, MutableFixedSizeListArray, PrimitiveArray}, datatypes::{DataType, Field}, }; use arrow2_convert::{ @@ -126,8 +126,8 @@ pub type Result = std::result::Result; /// /// #[derive(ArrowField, ArrowSerialize, ArrowDeserialize)] /// pub struct ConvertibleType { -/// #[arrow_field(type = "FixedSizeArrayField")] -/// data: [bool; 2], +/// #[arrow_field(type = "FixedSizeArrayField")] +/// data: [u32; 2], /// } /// ``` pub struct FixedSizeArrayField(std::marker::PhantomData); @@ -172,27 +172,106 @@ where } } +pub struct FastFixedSizeArrayIter<'a, T, const SIZE: usize> +where + T: arrow2::types::NativeType, +{ + offset: usize, + end: usize, + array: &'a FixedSizeListArray, + values: &'a PrimitiveArray, +} + +impl<'a, T, const SIZE: usize> Iterator for FastFixedSizeArrayIter<'a, T, SIZE> +where + T: arrow2::types::NativeType, +{ + type Item = Option<[T; SIZE]>; + + fn next(&mut self) -> Option { + if self.offset < self.end { + if let Some(validity) = self.array.validity() { + if !validity.get_bit(self.offset) { + self.offset += 1; + return Some(None); + } + } + + let out: [T; SIZE] = + array_init::array_init(|i: usize| self.values.value(self.offset * SIZE + i)); + self.offset += 1; + Some(Some(out)) + } else { + None + } + } +} + +pub struct FastFixedSizeListArray(std::marker::PhantomData); + +extern "C" { + fn do_not_call_into_iter(); // we never define this function, so the linker will fail +} + +impl<'a, T, const SIZE: usize> IntoIterator for &'a FastFixedSizeListArray +where + T: arrow2::types::NativeType, +{ + type Item = Option<[T; SIZE]>; + + type IntoIter = FastFixedSizeArrayIter<'a, T, SIZE>; + + fn into_iter(self) -> Self::IntoIter { + #[allow(unsafe_code)] + // SAFETY: + // This exists so we get a link-error if some code tries to call into_iter + // Iteration should only happen via iter_from_array_ref. + // This is a quirk of the way the traits work in arrow2_convert. + unsafe { + do_not_call_into_iter(); + } + unreachable!(); + } +} + +impl ArrowArray for FastFixedSizeListArray +where + T: arrow2::types::NativeType, +{ + type BaseArrayType = FixedSizeListArray; + + fn iter_from_array_ref(b: &dyn arrow2::array::Array) -> <&Self as IntoIterator>::IntoIter { + let array = b.as_any().downcast_ref::().unwrap(); + let values = array + .values() + .as_any() + .downcast_ref::>() + .unwrap(); + FastFixedSizeArrayIter:: { + offset: 0, + end: array.len(), + array, + values, + } + } +} + impl ArrowDeserialize for FixedSizeArrayField where - T: ArrowDeserialize + ArrowEnableVecForType + ArrowField + 'static, + T: arrow2::types::NativeType + + ArrowDeserialize + + ArrowEnableVecForType + + ArrowField + + 'static, ::ArrayType: 'static, for<'b> &'b ::ArrayType: IntoIterator, { - type ArrayType = FixedSizeListArray; + type ArrayType = FastFixedSizeListArray; + #[inline] fn arrow_deserialize( v: <&Self::ArrayType as IntoIterator>::Item, ) -> Option<::Type> { - if let Some(array) = v { - let mut iter = <::ArrayType as ArrowArray>::iter_from_array_ref( - array.as_ref(), - ) - .map(::arrow_deserialize_internal); - let out: Result<[T; SIZE]> = - array_init::try_array_init(|_i: usize| iter.next().ok_or(FieldError::BadValue)); - out.ok() - } else { - None - } + v } } diff --git a/crates/re_log_types/src/datagen.rs b/crates/re_log_types/src/datagen.rs index 92efd2b2791a..21b2262f5d4f 100644 --- a/crates/re_log_types/src/datagen.rs +++ b/crates/re_log_types/src/datagen.rs @@ -42,6 +42,23 @@ pub fn build_some_point2d(len: usize) -> Vec { .collect() } +/// Create `len` dummy `Vec3D` +pub fn build_some_vec3d(len: usize) -> Vec { + use rand::Rng as _; + let mut rng = rand::thread_rng(); + + (0..len) + .into_iter() + .map(|_| { + component_types::Vec3D::new( + rng.gen_range(0.0..10.0), + rng.gen_range(0.0..10.0), + rng.gen_range(0.0..10.0), + ) + }) + .collect() +} + /// Build a ([`Timeline`], [`TimeInt`]) tuple from `log_time` suitable for inserting in a [`crate::TimePoint`]. pub fn build_log_time(log_time: Time) -> (Timeline, TimeInt) { (Timeline::log_time(), log_time.into()) diff --git a/crates/re_query/benches/query_benchmark.rs b/crates/re_query/benches/query_benchmark.rs index fe70964a5166..29c71f952b2b 100644 --- a/crates/re_query/benches/query_benchmark.rs +++ b/crates/re_query/benches/query_benchmark.rs @@ -6,10 +6,10 @@ use criterion::{criterion_group, criterion_main, Criterion}; use itertools::Itertools; use re_arrow_store::{DataStore, LatestAtQuery}; use re_log_types::{ - component_types::{ColorRGBA, InstanceKey, Point2D}, - datagen::{build_frame_nr, build_some_colors, build_some_point2d}, + component_types::{ColorRGBA, InstanceKey, Point2D, Vec3D}, + datagen::{build_frame_nr, build_some_colors, build_some_point2d, build_some_vec3d}, entity_path, - msg_bundle::{try_build_msg_bundle2, Component, MsgBundle}, + msg_bundle::{try_build_msg_bundle1, try_build_msg_bundle2, Component, MsgBundle}, EntityPath, Index, MsgId, TimeType, Timeline, }; use re_query::query_entity_with_primary; @@ -17,17 +17,25 @@ use re_query::query_entity_with_primary; // --- #[cfg(not(debug_assertions))] -const NUM_FRAMES: u32 = 1_000; +const NUM_FRAMES_POINTS: u32 = 1_000; #[cfg(not(debug_assertions))] const NUM_POINTS: u32 = 1_000; +#[cfg(not(debug_assertions))] +const NUM_FRAMES_VECS: u32 = 10; +#[cfg(not(debug_assertions))] +const NUM_VECS: u32 = 100_000; // `cargo test` also runs the benchmark setup code, so make sure they run quickly: #[cfg(debug_assertions)] -const NUM_FRAMES: u32 = 1; +const NUM_FRAMES_POINTS: u32 = 1; #[cfg(debug_assertions)] const NUM_POINTS: u32 = 1; +#[cfg(debug_assertions)] +const NUM_FRAMES_VECS: u32 = 1; +#[cfg(debug_assertions)] +const NUM_VECS: u32 = 1; -criterion_group!(benches, mono_points, batch_points); +criterion_group!(benches, mono_points, batch_points, batch_vecs); criterion_main!(benches); // --- Benchmarks --- @@ -38,14 +46,14 @@ fn mono_points(c: &mut Criterion) { .into_iter() .map(move |point_idx| entity_path!("points", Index::Sequence(point_idx as _))) .collect_vec(); - let msgs = build_messages(&paths, 1); + let msgs = build_points_messages(&paths, 1); { let mut group = c.benchmark_group("arrow_mono_points"); // Mono-insert is slow -- decrease the sample size group.sample_size(10); group.throughput(criterion::Throughput::Elements( - (NUM_POINTS * NUM_FRAMES) as _, + (NUM_POINTS * NUM_FRAMES_POINTS) as _, )); group.bench_function("insert", |b| { b.iter(|| insert_messages(msgs.iter())); @@ -57,7 +65,7 @@ fn mono_points(c: &mut Criterion) { group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); let mut store = insert_messages(msgs.iter()); group.bench_function("query", |b| { - b.iter(|| query_and_visit(&mut store, &paths)); + b.iter(|| query_and_visit_points(&mut store, &paths)); }); } } @@ -65,12 +73,12 @@ fn mono_points(c: &mut Criterion) { fn batch_points(c: &mut Criterion) { // Batch points are logged together at a single path let paths = [EntityPath::from("points")]; - let msgs = build_messages(&paths, NUM_POINTS as _); + let msgs = build_points_messages(&paths, NUM_POINTS as _); { let mut group = c.benchmark_group("arrow_batch_points"); group.throughput(criterion::Throughput::Elements( - (NUM_POINTS * NUM_FRAMES) as _, + (NUM_POINTS * NUM_FRAMES_POINTS) as _, )); group.bench_function("insert", |b| { b.iter(|| insert_messages(msgs.iter())); @@ -82,15 +90,40 @@ fn batch_points(c: &mut Criterion) { group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); let mut store = insert_messages(msgs.iter()); group.bench_function("query", |b| { - b.iter(|| query_and_visit(&mut store, &paths)); + b.iter(|| query_and_visit_points(&mut store, &paths)); + }); + } +} + +fn batch_vecs(c: &mut Criterion) { + // Batch points are logged together at a single path + let paths = [EntityPath::from("vec")]; + let msgs = build_vecs_messages(&paths, NUM_VECS as _); + + { + let mut group = c.benchmark_group("arrow_batch_vecs"); + group.throughput(criterion::Throughput::Elements( + (NUM_VECS * NUM_FRAMES_VECS) as _, + )); + group.bench_function("insert", |b| { + b.iter(|| insert_messages(msgs.iter())); + }); + } + + { + let mut group = c.benchmark_group("arrow_batch_vecs"); + group.throughput(criterion::Throughput::Elements(NUM_VECS as _)); + let mut store = insert_messages(msgs.iter()); + group.bench_function("query", |b| { + b.iter(|| query_and_visit_vecs(&mut store, &paths)); }); } } // --- Helpers --- -fn build_messages(paths: &[EntityPath], pts: usize) -> Vec { - (0..NUM_FRAMES) +fn build_points_messages(paths: &[EntityPath], pts: usize) -> Vec { + (0..NUM_FRAMES_POINTS) .into_iter() .flat_map(move |frame_idx| { paths.iter().map(move |path| { @@ -106,20 +139,37 @@ fn build_messages(paths: &[EntityPath], pts: usize) -> Vec { .collect() } +fn build_vecs_messages(paths: &[EntityPath], pts: usize) -> Vec { + (0..NUM_FRAMES_VECS) + .into_iter() + .flat_map(move |frame_idx| { + paths.iter().map(move |path| { + try_build_msg_bundle1( + MsgId::ZERO, + path.clone(), + [build_frame_nr((frame_idx as i64).into())], + build_some_vec3d(pts), + ) + .unwrap() + }) + }) + .collect() +} + fn insert_messages<'a>(msgs: impl Iterator) -> DataStore { let mut store = DataStore::new(InstanceKey::name(), Default::default()); msgs.for_each(|msg_bundle| store.insert(msg_bundle).unwrap()); store } -struct Point { +struct SavePoint { _pos: Point2D, _color: Option, } -fn query_and_visit(store: &mut DataStore, paths: &[EntityPath]) -> Vec { +fn query_and_visit_points(store: &mut DataStore, paths: &[EntityPath]) -> Vec { let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); - let query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES as i64 / 2).into()); + let query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES_POINTS as i64 / 2).into()); let mut points = Vec::with_capacity(NUM_POINTS as _); @@ -128,7 +178,7 @@ fn query_and_visit(store: &mut DataStore, paths: &[EntityPath]) -> Vec { query_entity_with_primary::(store, &query, path, &[ColorRGBA::name()]) .and_then(|entity_view| { entity_view.visit2(|_: InstanceKey, pos: Point2D, color: Option| { - points.push(Point { + points.push(SavePoint { _pos: pos, _color: color, }); @@ -140,3 +190,27 @@ fn query_and_visit(store: &mut DataStore, paths: &[EntityPath]) -> Vec { assert_eq!(NUM_POINTS as usize, points.len()); points } + +struct SaveVec { + _vec: Vec3D, +} + +fn query_and_visit_vecs(store: &mut DataStore, paths: &[EntityPath]) -> Vec { + let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); + let query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES_POINTS as i64 / 2).into()); + + let mut rects = Vec::with_capacity(NUM_VECS as _); + + for path in paths.iter() { + query_entity_with_primary::(store, &query, path, &[]) + .and_then(|entity_view| { + entity_view.visit1(|_: InstanceKey, vec: Vec3D| { + rects.push(SaveVec { _vec: vec }); + }) + }) + .ok() + .unwrap(); + } + assert_eq!(NUM_VECS as usize, rects.len()); + rects +}