From e3dcfa9c2ce312beecab442cdb4fd0d8b7f5317a Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 3 Jan 2024 15:28:29 +0100 Subject: [PATCH 1/5] Add way to install a callback into the batcher --- crates/re_log_types/src/arrow_msg.rs | 4 +- crates/re_log_types/src/data_table_batcher.rs | 74 +++++++++++++++++-- crates/re_sdk/src/recording_stream.rs | 2 +- rerun_py/src/python_bridge.rs | 4 +- 4 files changed, 71 insertions(+), 13 deletions(-) diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index 20d1ea574397..c62f60eed676 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -39,7 +39,7 @@ where impl ArrowChunkReleaseCallback { #[inline] - pub fn as_ptr(&self) -> *const () { + fn as_ptr(&self) -> *const () { Arc::as_ptr(&self.0).cast::<()>() } } @@ -47,7 +47,7 @@ impl ArrowChunkReleaseCallback { impl PartialEq for ArrowChunkReleaseCallback { #[inline] fn eq(&self, other: &Self) -> bool { - std::ptr::eq(self.as_ptr(), other.as_ptr()) + Arc::ptr_eq(&self.0, &other.0) } } diff --git a/crates/re_log_types/src/data_table_batcher.rs b/crates/re_log_types/src/data_table_batcher.rs index d5bbe6a00b10..4b884bc1d211 100644 --- a/crates/re_log_types/src/data_table_batcher.rs +++ b/crates/re_log_types/src/data_table_batcher.rs @@ -32,12 +32,67 @@ pub enum DataTableBatcherError { pub type DataTableBatcherResult = Result; +/// Callbacks you can install on the [`DataTableBatcher`]. +#[derive(Clone, Default)] +pub struct BatcherHooks { + /// Called when a new row arrives. + /// + /// The callback is given the slice of all rows not yet batched, + /// including the new one. + /// + /// Used for testing. + #[allow(clippy::type_complexity)] + pub on_insert: Option>, + + /// Callback to be run when an Arrow Chunk` goes out of scope. + /// + /// See [`crate::ArrowChunkReleaseCallback`] for more information. + pub on_release: Option, +} + +impl BatcherHooks { + pub const NONE: Self = Self { + on_insert: None, + on_release: None, + }; +} + +impl PartialEq for BatcherHooks { + fn eq(&self, other: &Self) -> bool { + let Self { + on_insert, + on_release, + } = self; + + let on_insert_eq = match (on_insert, &other.on_insert) { + (Some(a), Some(b)) => Arc::ptr_eq(a, b), + (None, None) => true, + _ => false, + }; + + on_insert_eq && on_release == &other.on_release + } +} + +impl std::fmt::Debug for BatcherHooks { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + on_insert, + on_release, + } = self; + f.debug_struct("BatcherHooks") + .field("on_insert", &on_insert.as_ref().map(|_| "…")) + .field("on_release", &on_release) + .finish() + } +} + // --- /// Defines the different thresholds of the associated [`DataTableBatcher`]. /// /// See [`Self::default`] and [`Self::from_env`]. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq)] pub struct DataTableBatcherConfig { /// Duration of the periodic tick. // @@ -65,10 +120,8 @@ pub struct DataTableBatcherConfig { /// Unbounded if left unspecified. pub max_tables_in_flight: Option, - /// Callback to be run when an Arrow Chunk` goes out of scope. - /// - /// See [`crate::ArrowChunkReleaseCallback`] for more information. - pub on_release: Option, + /// Callbacks you can install on the [`DataTableBatcher`]. + pub hooks: BatcherHooks, } impl Default for DataTableBatcherConfig { @@ -85,7 +138,7 @@ impl DataTableBatcherConfig { flush_num_rows: u64::MAX, max_commands_in_flight: None, max_tables_in_flight: None, - on_release: None, + hooks: BatcherHooks::NONE, }; /// Always flushes ASAP. @@ -95,7 +148,7 @@ impl DataTableBatcherConfig { flush_num_rows: 0, max_commands_in_flight: None, max_tables_in_flight: None, - on_release: None, + hooks: BatcherHooks::NONE, }; /// Never flushes unless manually told to. @@ -105,7 +158,7 @@ impl DataTableBatcherConfig { flush_num_rows: u64::MAX, max_commands_in_flight: None, max_tables_in_flight: None, - on_release: None, + hooks: BatcherHooks::NONE, }; /// Environment variable to configure [`Self::flush_tick`]. @@ -447,6 +500,11 @@ fn batching_thread( match cmd { Command::AppendRow(row) => { do_push_row(&mut acc, row); + + if let Some(config) = config.hooks.on_insert.as_ref() { + config(&acc.pending_rows); + } + if acc.pending_num_rows >= config.flush_num_rows { do_flush_all(&mut acc, &tx_table, "rows"); } else if acc.pending_num_bytes >= config.flush_num_bytes { diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index 028221e97871..7eedfe84195e 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -611,7 +611,7 @@ impl RecordingStreamInner { batcher_config: DataTableBatcherConfig, sink: Box, ) -> RecordingStreamResult { - let on_release = batcher_config.on_release.clone(); + let on_release = batcher_config.hooks.on_release.clone(); let batcher = DataTableBatcher::new(batcher_config)?; { diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 97357f9aedf0..7a3cf8bdb0db 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -242,7 +242,7 @@ fn new_recording( let on_release = |chunk| { GARBAGE_QUEUE.0.send(chunk).ok(); }; - batcher_config.on_release = Some(on_release.into()); + batcher_config.hooks.on_release = Some(on_release.into()); let recording = RecordingStreamBuilder::new(application_id) .batcher_config(batcher_config) @@ -299,7 +299,7 @@ fn new_blueprint( let on_release = |chunk| { GARBAGE_QUEUE.0.send(chunk).ok(); }; - batcher_config.on_release = Some(on_release.into()); + batcher_config.hooks.on_release = Some(on_release.into()); let blueprint = RecordingStreamBuilder::new(application_id) .batcher_config(batcher_config) From eb831b74d49651fb2ac1a2e24934e340b95d6a7c Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 3 Jan 2024 15:28:54 +0100 Subject: [PATCH 2/5] Fix insertion order of RowIDs --- crates/re_sdk/src/recording_stream.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index 7eedfe84195e..ad87ff38a5cd 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -916,7 +916,7 @@ impl RecordingStream { None } else { Some(DataRow::from_cells( - row_id, + row_id.incremented_by(1), // we need a unique RowId from what is used for the splatted data timepoint.clone(), ent_path.clone(), num_instances as _, @@ -930,11 +930,7 @@ impl RecordingStream { } else { splatted.push(DataCell::from_native([InstanceKey::SPLAT])); Some(DataRow::from_cells( - row_id.incremented_by(1), // we need a unique RowId from what is used for the instanced data - timepoint, - ent_path, - 1, - splatted, + row_id, timepoint, ent_path, 1, splatted, )?) }; From 7dd0a5d3c621cd77e94a4166b4d71b3a8c9326cd Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 3 Jan 2024 15:29:08 +0100 Subject: [PATCH 3/5] Add regression test to make sure RowIds arrive in order --- crates/rerun/tests/rerun_tests.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 crates/rerun/tests/rerun_tests.rs diff --git a/crates/rerun/tests/rerun_tests.rs b/crates/rerun/tests/rerun_tests.rs new file mode 100644 index 000000000000..1ab1aebbf236 --- /dev/null +++ b/crates/rerun/tests/rerun_tests.rs @@ -0,0 +1,26 @@ +/// Regression test for checking that `RowId`s are generated in-order (when single-threaded). +/// +/// Out-of-order row IDs is technically fine, but can cause unnecessary performance issues. +/// +/// See for instance . +#[test] +fn test_row_id_order() { + let mut batcher_config = rerun::log::DataTableBatcherConfig::NEVER; + batcher_config.hooks.on_insert = Some(std::sync::Arc::new(|rows| { + if let [.., penultimate, ultimate] = rows { + assert!( + penultimate.row_id() <= ultimate.row_id(), + "Rows coming to batcher out-of-order" + ); + } + })); + let (rec, _mem_storage) = rerun::RecordingStreamBuilder::new("rerun_example_test") + .batcher_config(batcher_config) + .memory() + .unwrap(); + + for _ in 0..10 { + rec.log("foo", &rerun::Points2D::new([(1.0, 2.0), (3.0, 4.0)])) + .unwrap(); + } +} From a3004d8f803b888602661e9121054396102a81fb Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 3 Jan 2024 18:01:20 +0100 Subject: [PATCH 4/5] Reverse order of creating splatted and instanced --- crates/re_sdk/src/recording_stream.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index ad87ff38a5cd..452bfc74809e 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -912,25 +912,29 @@ impl RecordingStream { // internal clock. let timepoint = TimePoint::timeless(); - let instanced = if instanced.is_empty() { + // TODO(#1893): unsplit splats once new data cells are in + let splatted = if splatted.is_empty() { None } else { + splatted.push(DataCell::from_native([InstanceKey::SPLAT])); Some(DataRow::from_cells( - row_id.incremented_by(1), // we need a unique RowId from what is used for the splatted data + row_id, timepoint.clone(), ent_path.clone(), - num_instances as _, - instanced, + 1, + splatted, )?) }; - // TODO(#1893): unsplit splats once new data cells are in - let splatted = if splatted.is_empty() { + let instanced = if instanced.is_empty() { None } else { - splatted.push(DataCell::from_native([InstanceKey::SPLAT])); Some(DataRow::from_cells( - row_id, timepoint, ent_path, 1, splatted, + row_id.incremented_by(1), // we need a unique RowId from what is used for the splatted data + timepoint, + ent_path, + num_instances as _, + instanced, )?) }; From b58459b740b693d86be9e9b3e846d37bf565f060 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 3 Jan 2024 18:03:32 +0100 Subject: [PATCH 5/5] Explicitly test splats --- crates/rerun/tests/rerun_tests.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/rerun/tests/rerun_tests.rs b/crates/rerun/tests/rerun_tests.rs index 1ab1aebbf236..9f6793ed1630 100644 --- a/crates/rerun/tests/rerun_tests.rs +++ b/crates/rerun/tests/rerun_tests.rs @@ -20,7 +20,10 @@ fn test_row_id_order() { .unwrap(); for _ in 0..10 { - rec.log("foo", &rerun::Points2D::new([(1.0, 2.0), (3.0, 4.0)])) - .unwrap(); + rec.log( + "foo", + &rerun::Points2D::new([(1.0, 2.0), (3.0, 4.0)]).with_radii([1.0]), + ) + .unwrap(); } }