diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index 20d1ea574397..c62f60eed676 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -39,7 +39,7 @@ where impl ArrowChunkReleaseCallback { #[inline] - pub fn as_ptr(&self) -> *const () { + fn as_ptr(&self) -> *const () { Arc::as_ptr(&self.0).cast::<()>() } } @@ -47,7 +47,7 @@ impl ArrowChunkReleaseCallback { impl PartialEq for ArrowChunkReleaseCallback { #[inline] fn eq(&self, other: &Self) -> bool { - std::ptr::eq(self.as_ptr(), other.as_ptr()) + Arc::ptr_eq(&self.0, &other.0) } } diff --git a/crates/re_log_types/src/data_table_batcher.rs b/crates/re_log_types/src/data_table_batcher.rs index d5bbe6a00b10..4b884bc1d211 100644 --- a/crates/re_log_types/src/data_table_batcher.rs +++ b/crates/re_log_types/src/data_table_batcher.rs @@ -32,12 +32,67 @@ pub enum DataTableBatcherError { pub type DataTableBatcherResult = Result; +/// Callbacks you can install on the [`DataTableBatcher`]. +#[derive(Clone, Default)] +pub struct BatcherHooks { + /// Called when a new row arrives. + /// + /// The callback is given the slice of all rows not yet batched, + /// including the new one. + /// + /// Used for testing. + #[allow(clippy::type_complexity)] + pub on_insert: Option>, + + /// Callback to be run when an Arrow Chunk` goes out of scope. + /// + /// See [`crate::ArrowChunkReleaseCallback`] for more information. + pub on_release: Option, +} + +impl BatcherHooks { + pub const NONE: Self = Self { + on_insert: None, + on_release: None, + }; +} + +impl PartialEq for BatcherHooks { + fn eq(&self, other: &Self) -> bool { + let Self { + on_insert, + on_release, + } = self; + + let on_insert_eq = match (on_insert, &other.on_insert) { + (Some(a), Some(b)) => Arc::ptr_eq(a, b), + (None, None) => true, + _ => false, + }; + + on_insert_eq && on_release == &other.on_release + } +} + +impl std::fmt::Debug for BatcherHooks { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + on_insert, + on_release, + } = self; + f.debug_struct("BatcherHooks") + .field("on_insert", &on_insert.as_ref().map(|_| "…")) + .field("on_release", &on_release) + .finish() + } +} + // --- /// Defines the different thresholds of the associated [`DataTableBatcher`]. /// /// See [`Self::default`] and [`Self::from_env`]. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq)] pub struct DataTableBatcherConfig { /// Duration of the periodic tick. // @@ -65,10 +120,8 @@ pub struct DataTableBatcherConfig { /// Unbounded if left unspecified. pub max_tables_in_flight: Option, - /// Callback to be run when an Arrow Chunk` goes out of scope. - /// - /// See [`crate::ArrowChunkReleaseCallback`] for more information. - pub on_release: Option, + /// Callbacks you can install on the [`DataTableBatcher`]. + pub hooks: BatcherHooks, } impl Default for DataTableBatcherConfig { @@ -85,7 +138,7 @@ impl DataTableBatcherConfig { flush_num_rows: u64::MAX, max_commands_in_flight: None, max_tables_in_flight: None, - on_release: None, + hooks: BatcherHooks::NONE, }; /// Always flushes ASAP. @@ -95,7 +148,7 @@ impl DataTableBatcherConfig { flush_num_rows: 0, max_commands_in_flight: None, max_tables_in_flight: None, - on_release: None, + hooks: BatcherHooks::NONE, }; /// Never flushes unless manually told to. @@ -105,7 +158,7 @@ impl DataTableBatcherConfig { flush_num_rows: u64::MAX, max_commands_in_flight: None, max_tables_in_flight: None, - on_release: None, + hooks: BatcherHooks::NONE, }; /// Environment variable to configure [`Self::flush_tick`]. @@ -447,6 +500,11 @@ fn batching_thread( match cmd { Command::AppendRow(row) => { do_push_row(&mut acc, row); + + if let Some(config) = config.hooks.on_insert.as_ref() { + config(&acc.pending_rows); + } + if acc.pending_num_rows >= config.flush_num_rows { do_flush_all(&mut acc, &tx_table, "rows"); } else if acc.pending_num_bytes >= config.flush_num_bytes { diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index 028221e97871..452bfc74809e 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -611,7 +611,7 @@ impl RecordingStreamInner { batcher_config: DataTableBatcherConfig, sink: Box, ) -> RecordingStreamResult { - let on_release = batcher_config.on_release.clone(); + let on_release = batcher_config.hooks.on_release.clone(); let batcher = DataTableBatcher::new(batcher_config)?; { @@ -912,29 +912,29 @@ impl RecordingStream { // internal clock. let timepoint = TimePoint::timeless(); - let instanced = if instanced.is_empty() { + // TODO(#1893): unsplit splats once new data cells are in + let splatted = if splatted.is_empty() { None } else { + splatted.push(DataCell::from_native([InstanceKey::SPLAT])); Some(DataRow::from_cells( row_id, timepoint.clone(), ent_path.clone(), - num_instances as _, - instanced, + 1, + splatted, )?) }; - // TODO(#1893): unsplit splats once new data cells are in - let splatted = if splatted.is_empty() { + let instanced = if instanced.is_empty() { None } else { - splatted.push(DataCell::from_native([InstanceKey::SPLAT])); Some(DataRow::from_cells( - row_id.incremented_by(1), // we need a unique RowId from what is used for the instanced data + row_id.incremented_by(1), // we need a unique RowId from what is used for the splatted data timepoint, ent_path, - 1, - splatted, + num_instances as _, + instanced, )?) }; diff --git a/crates/rerun/tests/rerun_tests.rs b/crates/rerun/tests/rerun_tests.rs new file mode 100644 index 000000000000..9f6793ed1630 --- /dev/null +++ b/crates/rerun/tests/rerun_tests.rs @@ -0,0 +1,29 @@ +/// Regression test for checking that `RowId`s are generated in-order (when single-threaded). +/// +/// Out-of-order row IDs is technically fine, but can cause unnecessary performance issues. +/// +/// See for instance . +#[test] +fn test_row_id_order() { + let mut batcher_config = rerun::log::DataTableBatcherConfig::NEVER; + batcher_config.hooks.on_insert = Some(std::sync::Arc::new(|rows| { + if let [.., penultimate, ultimate] = rows { + assert!( + penultimate.row_id() <= ultimate.row_id(), + "Rows coming to batcher out-of-order" + ); + } + })); + let (rec, _mem_storage) = rerun::RecordingStreamBuilder::new("rerun_example_test") + .batcher_config(batcher_config) + .memory() + .unwrap(); + + for _ in 0..10 { + rec.log( + "foo", + &rerun::Points2D::new([(1.0, 2.0), (3.0, 4.0)]).with_radii([1.0]), + ) + .unwrap(); + } +} diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 97357f9aedf0..7a3cf8bdb0db 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -242,7 +242,7 @@ fn new_recording( let on_release = |chunk| { GARBAGE_QUEUE.0.send(chunk).ok(); }; - batcher_config.on_release = Some(on_release.into()); + batcher_config.hooks.on_release = Some(on_release.into()); let recording = RecordingStreamBuilder::new(application_id) .batcher_config(batcher_config) @@ -299,7 +299,7 @@ fn new_blueprint( let on_release = |chunk| { GARBAGE_QUEUE.0.send(chunk).ok(); }; - batcher_config.on_release = Some(on_release.into()); + batcher_config.hooks.on_release = Some(on_release.into()); let blueprint = RecordingStreamBuilder::new(application_id) .batcher_config(batcher_config)