diff --git a/Cargo.lock b/Cargo.lock index 05d4265ba04f..5d76592d8fc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4056,6 +4056,7 @@ dependencies = [ "itertools 0.11.0", "mimalloc", "nohash-hasher", + "once_cell", "parking_lot 0.12.1", "polars-core", "polars-ops", diff --git a/crates/re_arrow_store/Cargo.toml b/crates/re_arrow_store/Cargo.toml index 1bca1ed7b181..b7b0b7da33cf 100644 --- a/crates/re_arrow_store/Cargo.toml +++ b/crates/re_arrow_store/Cargo.toml @@ -49,6 +49,7 @@ document-features.workspace = true indent.workspace = true itertools = { workspace = true } nohash-hasher.workspace = true +once_cell.workspace = true parking_lot.workspace = true smallvec.workspace = true thiserror.workspace = true diff --git a/crates/re_arrow_store/src/lib.rs b/crates/re_arrow_store/src/lib.rs index 50b80e9a9544..298f268f7219 100644 --- a/crates/re_arrow_store/src/lib.rs +++ b/crates/re_arrow_store/src/lib.rs @@ -25,6 +25,7 @@ mod store_helpers; mod store_read; mod store_sanity; mod store_stats; +mod store_view; mod store_write; #[cfg(feature = "polars")] @@ -43,6 +44,7 @@ pub use self::store_gc::{Deleted, GarbageCollectionOptions, GarbageCollectionTar pub use self::store_helpers::VersionedComponent; pub use self::store_read::{LatestAtQuery, RangeQuery}; pub use self::store_stats::{DataStoreRowStats, DataStoreStats, EntityStats}; +pub use self::store_view::{StoreView, StoreViewHandle}; pub use self::store_write::{WriteError, WriteResult}; pub(crate) use self::store::{ diff --git a/crates/re_arrow_store/src/store_event.rs b/crates/re_arrow_store/src/store_event.rs index ce8b6650a7f0..c8b28f192a11 100644 --- a/crates/re_arrow_store/src/store_event.rs +++ b/crates/re_arrow_store/src/store_event.rs @@ -7,7 +7,7 @@ use crate::StoreGeneration; // Used all over in docstrings. #[allow(unused_imports)] -use crate::DataStore; +use crate::{DataStore, StoreView}; // --- @@ -18,6 +18,8 @@ use crate::DataStore; /// /// Methods that mutate the [`DataStore`], such as [`DataStore::insert_row`] and [`DataStore::gc`], /// return [`StoreEvent`]s that describe the changes. +/// You can also register your own [`StoreView`] in order to be notified of changes as soon as they +/// happen. /// /// Refer to field-level documentation for more details and check out [`StoreDiff`] for a precise /// definition of what an event involves. @@ -239,11 +241,11 @@ mod tests { }; use re_types_core::{components::InstanceKey, Loggable as _}; - use crate::{DataStore, GarbageCollectionOptions}; + use crate::{DataStore, GarbageCollectionOptions, StoreView, StoreViewHandle}; use super::*; - /// A simple store view for test purposes that keeps track of the quantity of data available + /// A simple store subscriber for test purposes that keeps track of the quantity of data available /// in the store a the lowest level of detail. /// /// The counts represent numbers of rows: e.g. how many unique rows contain this entity path? diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index 5840acc50fd8..9f69e9525aff 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -222,10 +222,14 @@ impl DataStore { }) .collect(); - if cfg!(debug_assertions) { - let any_event_other_than_deletion = - events.iter().any(|e| e.kind != StoreDiffKind::Deletion); - assert!(!any_event_other_than_deletion); + { + if cfg!(debug_assertions) { + let any_event_other_than_deletion = + events.iter().any(|e| e.kind != StoreDiffKind::Deletion); + assert!(!any_event_other_than_deletion); + } + + Self::on_events(&events); } // TODO(cmc): Temporary, we'll return raw events soon, but need to rework EntityTree first. diff --git a/crates/re_arrow_store/src/store_stats.rs b/crates/re_arrow_store/src/store_stats.rs index cdb813b679c4..fd1d35fcea1c 100644 --- a/crates/re_arrow_store/src/store_stats.rs +++ b/crates/re_arrow_store/src/store_stats.rs @@ -9,6 +9,9 @@ use crate::{ // --- +// NOTE: Not implemented as a StoreView because it also measures implementation details of the +// store (buckets etc), while StoreEvents work at a data-model level. + #[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd)] pub struct DataStoreRowStats { pub num_rows: u64, diff --git a/crates/re_arrow_store/src/store_view.rs b/crates/re_arrow_store/src/store_view.rs new file mode 100644 index 000000000000..c4754a42de78 --- /dev/null +++ b/crates/re_arrow_store/src/store_view.rs @@ -0,0 +1,254 @@ +use parking_lot::RwLock; + +use crate::{DataStore, StoreEvent}; + +// --- + +// TODO(cmc): Not sure why I need the extra Box here, RwLock should be `?Sized`. +type SharedStoreView = RwLock>; + +/// A [`StoreView`] subscribes to atomic changes from all [`DataStore`]s through [`StoreEvent`]s. +/// +/// [`StoreView`]s can be used to build both secondary indices and trigger systems. +// +// TODO(#4204): StoreView should require SizeBytes so they can be part of memstats. +pub trait StoreView: std::any::Any + Send + Sync { + /// Arbitrary name for the view. + /// + /// Does not need to be unique. + fn name(&self) -> String; + + /// Workaround for downcasting support, simply return `self`: + /// ```ignore + /// fn as_any(&self) -> &dyn std::any::Any { + /// self + /// } + /// ``` + fn as_any(&self) -> &dyn std::any::Any; + + /// Workaround for downcasting support, simply return `self`: + /// ```ignore + /// fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + /// self + /// } + /// ``` + fn as_any_mut(&mut self) -> &mut dyn std::any::Any; + + /// The core of this trait: get notified of changes happening in all [`DataStore`]s. + /// + /// This will be called automatically by the [`DataStore`] itself if the view has been + /// registered: [`DataStore::register_view`]. + /// Or you might want to feed it [`StoreEvent`]s manually, depending on your use case. + /// + /// ## Example + /// + /// ```ignore + /// fn on_events(&mut self, events: &[StoreEvent]) { + /// use re_arrow_store::StoreDiffKind; + /// for event in events { + /// match event.kind { + /// StoreDiffKind::Addition => println!("Row added: {}", event.row_id), + /// StoreDiffKind::Deletion => println!("Row removed: {}", event.row_id), + /// } + /// } + /// } + /// ``` + fn on_events(&mut self, events: &[StoreEvent]); +} + +/// All registered [`StoreView`]s. +static VIEWS: once_cell::sync::Lazy>> = + once_cell::sync::Lazy::new(|| RwLock::new(Vec::new())); + +#[derive(Debug, Clone, Copy)] +pub struct StoreViewHandle(u32); + +impl DataStore { + /// Registers a [`StoreView`] so it gets automatically notified when data gets added and/or + /// removed to/from a [`DataStore`]. + /// + /// Refer to [`StoreEvent`]'s documentation for more information about these events. + /// + /// ## Scope + /// + /// Registered [`StoreView`]s are global scope: they get notified of all events from all + /// existing [`DataStore`]s, including [`DataStore`]s created after the view was registered. + /// + /// Use [`StoreEvent::store_id`] to identify the source of an event. + /// + /// ## Late registration + /// + /// Views must be registered before a store gets created to guarantee that no events were + /// missed. + /// + /// [`StoreEvent::event_id`] can be used to identify missing events. + /// + /// ## Ordering + /// + /// The order in which registered views are notified is undefined and will likely become + /// concurrent in the future. + /// + /// If you need a specific order across multiple views, embed them into an orchestrating view. + // + // TODO(cmc): send a compacted snapshot to late registerers for bootstrapping + pub fn register_view(view: Box) -> StoreViewHandle { + let mut views = VIEWS.write(); + views.push(RwLock::new(view)); + StoreViewHandle(views.len() as u32 - 1) + } + + /// Passes a reference to the downcasted view to the given callback. + /// + /// Returns `None` if the view doesn't exist or downcasting failed. + pub fn with_view T>( + StoreViewHandle(handle): StoreViewHandle, + mut f: F, + ) -> Option { + let views = VIEWS.read(); + views.get(handle as usize).and_then(|view| { + let view = view.read(); + view.as_any().downcast_ref::().map(&mut f) + }) + } + + /// Passes a mutable reference to the downcasted view to the given callback. + /// + /// Returns `None` if the view doesn't exist or downcasting failed. + pub fn with_view_mut T>( + StoreViewHandle(handle): StoreViewHandle, + mut f: F, + ) -> Option { + let views = VIEWS.read(); + views.get(handle as usize).and_then(|view| { + let mut view = view.write(); + view.as_any_mut().downcast_mut::().map(&mut f) + }) + } + + /// Called by [`DataStore`]'s mutating methods to notify view subscribers of upcoming events. + pub(crate) fn on_events(events: &[StoreEvent]) { + let views = VIEWS.read(); + // TODO(cmc): might want to parallelize at some point. + for view in views.iter() { + view.write().on_events(events); + } + } +} + +#[cfg(tests)] +mod tests { + use std::collections::BTreeMap; + + use re_log_types::{ + example_components::{MyColor, MyPoint, MyPoints}, + DataRow, DataTable, EntityPath, RowId, TableId, Time, TimePoint, Timeline, + }; + use re_types_core::{components::InstanceKey, Loggable as _}; + + use crate::{DataStore, GarbageCollectionOptions, StoreView, StoreViewHandle}; + + use super::*; + + /// A simple [`StoreView`] for test purposes that just accumulates [`StoreEvent`]s. + #[derive(Default, Debug)] + struct AllEvents { + events: Vec, + } + + impl StoreView for AllEvents { + fn name(&self) -> String { + "rerun.testing.store_views.AllEvents".into() + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } + + fn on_events(&mut self, events: &[StoreEvent]) { + self.events.extend(events.to_owned()); + } + } + + #[test] + fn store_view() -> anyhow::Result<()> { + let mut store1 = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + InstanceKey::name(), + Default::default(), + ); + let mut store2 = DataStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + InstanceKey::name(), + Default::default(), + ); + + let mut expected_events = Vec::new(); + + let view_handle = DataStore::register_view(Box::::default()); + + let timeline_frame = Timeline::new_sequence("frame"); + let timeline_other = Timeline::new_temporal("other"); + let timeline_yet_another = Timeline::new_sequence("yet_another"); + + let row = DataRow::from_component_batches( + RowId::random(), + TimePoint::from_iter([ + (timeline_frame, 42.into()), // + (timeline_other, 666.into()), // + (timeline_yet_another, 1.into()), // + ]), + "entity_a".into(), + [&InstanceKey::from_iter(0..10) as _], + )?; + + expected_events.extend(store1.insert_row(&row)); + + let row = { + let num_instances = 3; + let points: Vec<_> = (0..num_instances) + .map(|i| MyPoint::new(0.0, i as f32)) + .collect(); + let colors = vec![MyColor::from(0xFF0000FF)]; + DataRow::from_component_batches( + RowId::random(), + TimePoint::from_iter([ + (timeline_frame, 42.into()), // + (timeline_yet_another, 1.into()), // + ]), + "entity_b".into(), + [&points as _, &colors as _], + )? + }; + + expected_events.extend(store2.insert_row(&row)); + + let row = { + let num_instances = 6; + let colors = vec![MyColor::from(0x00DD00FF); num_instances]; + DataRow::from_component_batches( + RowId::random(), + TimePoint::timeless(), + "entity_b".into(), + [ + &InstanceKey::from_iter(0..num_instances as _) as _, + &colors as _, + ], + )? + }; + + expected_events.extend(store1.insert_row(&row)); + + expected_events.extend(store1.gc(GarbageCollectionOptions::gc_everything()).0); + expected_events.extend(store2.gc(GarbageCollectionOptions::gc_everything()).0); + + DataStore::with_view::(view_handle, |got| { + similar_asserts::assert_eq!(expected_events, got.events); + }); + + Ok(()) + } +} diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index 92dc4417964b..58deb5f03f2a 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -14,7 +14,7 @@ use re_types_core::{ use crate::{ DataStore, DataStoreConfig, IndexedBucket, IndexedBucketInner, IndexedTable, MetadataRegistry, - PersistentIndexedTable, StoreDiff, StoreEvent, + PersistentIndexedTable, StoreDiff, StoreDiffKind, StoreEvent, }; // --- Data store --- @@ -202,6 +202,18 @@ impl DataStore { diff, }; + { + let events = &[event.clone()]; + + if cfg!(debug_assertions) { + let any_event_other_than_addition = + events.iter().any(|e| e.kind != StoreDiffKind::Addition); + assert!(!any_event_other_than_addition); + } + + Self::on_events(events); + } + Ok(event) }