Skip to content

Commit

Permalink
DataStore changelog 3: introduce StoreViews (#4205)
Browse files Browse the repository at this point in the history
Introducing the `StoreView` trait and registration system, allowing
anybody to subscribe to `DataStore` changes, even from external code.

`StoreView`s global scope: by registering a view you subscribe to
changes to _all_ `DataStore`s, including those that are yet to be
created.
This is very powerful as it allows views & triggers implementers to
build cross-recording indices as well as be notified as soon as new
recordings come in and go out.

```rust
/// A [`StoreView`] subscribes to atomic changes in one or more [`DataStore`]s through [`StoreEvent`]s.
///
/// [`StoreView`]s can be used to build both secondary indices and trigger systems.
pub trait StoreView: std::any::Any + Send + Sync {
    /// Arbitrary name for the view.
    ///
    /// Does not need to be unique.
    fn name(&self) -> String;

    /// Workaround for downcasting support, simply return `self`:
    /// ```ignore
    /// fn as_any(&self) -> &dyn std::any::Any {
    ///     self
    /// }
    /// ```
    fn as_any(&self) -> &dyn std::any::Any;

    /// Workaround for downcasting support, simply return `self`:
    /// ```ignore
    /// fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
    ///     self
    /// }
    /// ```
    fn as_any_mut(&mut self) -> &mut dyn std::any::Any;

    /// The core of this trait: get notified of changes happening in one or more [`DataStore`]s.
    ///
    /// This will be called automatically by the [`DataStore`] itself if the view has been
    /// registered: [`DataStore::register_view`].
    /// Or you might want to feed it [`StoreEvent`]s manually, depending on your use case.
    ///
    /// ## Example
    ///
    /// ```ignore
    /// fn on_events(&mut self, events: &[StoreEvent]) {
    ///     use re_arrow_store::StoreDiffKind;
    ///     for event in events {
    ///         match event.kind {
    ///             StoreDiffKind::Addition => println!("Row added: {}", event.row_id),
    ///             StoreDiffKind::Deletion => println!("Row removed: {}", event.row_id),
    ///         }
    ///     }
    /// }
    /// ```
    fn on_events(&mut self, events: &[StoreEvent]);
}
```


---

`DataStore` changelog PR series:
- #4202
- #4203
- #4205
- #4206
- #4208
- #4209
  • Loading branch information
teh-cmc committed Nov 15, 2023
1 parent 8263309 commit 5edaea8
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ document-features.workspace = true
indent.workspace = true
itertools = { workspace = true }
nohash-hasher.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
smallvec.workspace = true
thiserror.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod store_helpers;
mod store_read;
mod store_sanity;
mod store_stats;
mod store_view;
mod store_write;

#[cfg(feature = "polars")]
Expand All @@ -43,6 +44,7 @@ pub use self::store_gc::{Deleted, GarbageCollectionOptions, GarbageCollectionTar
pub use self::store_helpers::VersionedComponent;
pub use self::store_read::{LatestAtQuery, RangeQuery};
pub use self::store_stats::{DataStoreRowStats, DataStoreStats, EntityStats};
pub use self::store_view::{StoreView, StoreViewHandle};
pub use self::store_write::{WriteError, WriteResult};

pub(crate) use self::store::{
Expand Down
8 changes: 5 additions & 3 deletions crates/re_arrow_store/src/store_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::StoreGeneration;

// Used all over in docstrings.
#[allow(unused_imports)]
use crate::DataStore;
use crate::{DataStore, StoreView};

// ---

Expand All @@ -18,6 +18,8 @@ use crate::DataStore;
///
/// Methods that mutate the [`DataStore`], such as [`DataStore::insert_row`] and [`DataStore::gc`],
/// return [`StoreEvent`]s that describe the changes.
/// You can also register your own [`StoreView`] in order to be notified of changes as soon as they
/// happen.
///
/// Refer to field-level documentation for more details and check out [`StoreDiff`] for a precise
/// definition of what an event involves.
Expand Down Expand Up @@ -239,11 +241,11 @@ mod tests {
};
use re_types_core::{components::InstanceKey, Loggable as _};

use crate::{DataStore, GarbageCollectionOptions};
use crate::{DataStore, GarbageCollectionOptions, StoreView, StoreViewHandle};

use super::*;

/// A simple store view for test purposes that keeps track of the quantity of data available
/// A simple store subscriber for test purposes that keeps track of the quantity of data available
/// in the store a the lowest level of detail.
///
/// The counts represent numbers of rows: e.g. how many unique rows contain this entity path?
Expand Down
12 changes: 8 additions & 4 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,14 @@ impl DataStore {
})
.collect();

if cfg!(debug_assertions) {
let any_event_other_than_deletion =
events.iter().any(|e| e.kind != StoreDiffKind::Deletion);
assert!(!any_event_other_than_deletion);
{
if cfg!(debug_assertions) {
let any_event_other_than_deletion =
events.iter().any(|e| e.kind != StoreDiffKind::Deletion);
assert!(!any_event_other_than_deletion);
}

Self::on_events(&events);
}

// TODO(cmc): Temporary, we'll return raw events soon, but need to rework EntityTree first.
Expand Down
3 changes: 3 additions & 0 deletions crates/re_arrow_store/src/store_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use crate::{

// ---

// NOTE: Not implemented as a StoreView because it also measures implementation details of the
// store (buckets etc), while StoreEvents work at a data-model level.

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
pub struct DataStoreRowStats {
pub num_rows: u64,
Expand Down
254 changes: 254 additions & 0 deletions crates/re_arrow_store/src/store_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
use parking_lot::RwLock;

use crate::{DataStore, StoreEvent};

// ---

// TODO(cmc): Not sure why I need the extra Box here, RwLock should be `?Sized`.
type SharedStoreView = RwLock<Box<dyn StoreView>>;

/// A [`StoreView`] subscribes to atomic changes from all [`DataStore`]s through [`StoreEvent`]s.
///
/// [`StoreView`]s can be used to build both secondary indices and trigger systems.
//
// TODO(#4204): StoreView should require SizeBytes so they can be part of memstats.
pub trait StoreView: std::any::Any + Send + Sync {
/// Arbitrary name for the view.
///
/// Does not need to be unique.
fn name(&self) -> String;

/// Workaround for downcasting support, simply return `self`:
/// ```ignore
/// fn as_any(&self) -> &dyn std::any::Any {
/// self
/// }
/// ```
fn as_any(&self) -> &dyn std::any::Any;

/// Workaround for downcasting support, simply return `self`:
/// ```ignore
/// fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
/// self
/// }
/// ```
fn as_any_mut(&mut self) -> &mut dyn std::any::Any;

/// The core of this trait: get notified of changes happening in all [`DataStore`]s.
///
/// This will be called automatically by the [`DataStore`] itself if the view has been
/// registered: [`DataStore::register_view`].
/// Or you might want to feed it [`StoreEvent`]s manually, depending on your use case.
///
/// ## Example
///
/// ```ignore
/// fn on_events(&mut self, events: &[StoreEvent]) {
/// use re_arrow_store::StoreDiffKind;
/// for event in events {
/// match event.kind {
/// StoreDiffKind::Addition => println!("Row added: {}", event.row_id),
/// StoreDiffKind::Deletion => println!("Row removed: {}", event.row_id),
/// }
/// }
/// }
/// ```
fn on_events(&mut self, events: &[StoreEvent]);
}

/// All registered [`StoreView`]s.
static VIEWS: once_cell::sync::Lazy<RwLock<Vec<SharedStoreView>>> =
once_cell::sync::Lazy::new(|| RwLock::new(Vec::new()));

#[derive(Debug, Clone, Copy)]
pub struct StoreViewHandle(u32);

impl DataStore {
/// Registers a [`StoreView`] so it gets automatically notified when data gets added and/or
/// removed to/from a [`DataStore`].
///
/// Refer to [`StoreEvent`]'s documentation for more information about these events.
///
/// ## Scope
///
/// Registered [`StoreView`]s are global scope: they get notified of all events from all
/// existing [`DataStore`]s, including [`DataStore`]s created after the view was registered.
///
/// Use [`StoreEvent::store_id`] to identify the source of an event.
///
/// ## Late registration
///
/// Views must be registered before a store gets created to guarantee that no events were
/// missed.
///
/// [`StoreEvent::event_id`] can be used to identify missing events.
///
/// ## Ordering
///
/// The order in which registered views are notified is undefined and will likely become
/// concurrent in the future.
///
/// If you need a specific order across multiple views, embed them into an orchestrating view.
//
// TODO(cmc): send a compacted snapshot to late registerers for bootstrapping
pub fn register_view(view: Box<dyn StoreView>) -> StoreViewHandle {
let mut views = VIEWS.write();
views.push(RwLock::new(view));
StoreViewHandle(views.len() as u32 - 1)
}

/// Passes a reference to the downcasted view to the given callback.
///
/// Returns `None` if the view doesn't exist or downcasting failed.
pub fn with_view<V: StoreView, T, F: FnMut(&V) -> T>(
StoreViewHandle(handle): StoreViewHandle,
mut f: F,
) -> Option<T> {
let views = VIEWS.read();
views.get(handle as usize).and_then(|view| {
let view = view.read();
view.as_any().downcast_ref::<V>().map(&mut f)
})
}

/// Passes a mutable reference to the downcasted view to the given callback.
///
/// Returns `None` if the view doesn't exist or downcasting failed.
pub fn with_view_mut<V: StoreView, T, F: FnMut(&mut V) -> T>(
StoreViewHandle(handle): StoreViewHandle,
mut f: F,
) -> Option<T> {
let views = VIEWS.read();
views.get(handle as usize).and_then(|view| {
let mut view = view.write();
view.as_any_mut().downcast_mut::<V>().map(&mut f)
})
}

/// Called by [`DataStore`]'s mutating methods to notify view subscribers of upcoming events.
pub(crate) fn on_events(events: &[StoreEvent]) {
let views = VIEWS.read();
// TODO(cmc): might want to parallelize at some point.
for view in views.iter() {
view.write().on_events(events);
}
}
}

#[cfg(tests)]
mod tests {
use std::collections::BTreeMap;

use re_log_types::{
example_components::{MyColor, MyPoint, MyPoints},
DataRow, DataTable, EntityPath, RowId, TableId, Time, TimePoint, Timeline,
};
use re_types_core::{components::InstanceKey, Loggable as _};

use crate::{DataStore, GarbageCollectionOptions, StoreView, StoreViewHandle};

use super::*;

/// A simple [`StoreView`] for test purposes that just accumulates [`StoreEvent`]s.
#[derive(Default, Debug)]
struct AllEvents {
events: Vec<StoreEvent>,
}

impl StoreView for AllEvents {
fn name(&self) -> String {
"rerun.testing.store_views.AllEvents".into()
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}

fn on_events(&mut self, events: &[StoreEvent]) {
self.events.extend(events.to_owned());
}
}

#[test]
fn store_view() -> anyhow::Result<()> {
let mut store1 = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let mut store2 = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let mut expected_events = Vec::new();

let view_handle = DataStore::register_view(Box::<AllEvents>::default());

let timeline_frame = Timeline::new_sequence("frame");
let timeline_other = Timeline::new_temporal("other");
let timeline_yet_another = Timeline::new_sequence("yet_another");

let row = DataRow::from_component_batches(
RowId::random(),
TimePoint::from_iter([
(timeline_frame, 42.into()), //
(timeline_other, 666.into()), //
(timeline_yet_another, 1.into()), //
]),
"entity_a".into(),
[&InstanceKey::from_iter(0..10) as _],
)?;

expected_events.extend(store1.insert_row(&row));

let row = {
let num_instances = 3;
let points: Vec<_> = (0..num_instances)
.map(|i| MyPoint::new(0.0, i as f32))
.collect();
let colors = vec![MyColor::from(0xFF0000FF)];
DataRow::from_component_batches(
RowId::random(),
TimePoint::from_iter([
(timeline_frame, 42.into()), //
(timeline_yet_another, 1.into()), //
]),
"entity_b".into(),
[&points as _, &colors as _],
)?
};

expected_events.extend(store2.insert_row(&row));

let row = {
let num_instances = 6;
let colors = vec![MyColor::from(0x00DD00FF); num_instances];
DataRow::from_component_batches(
RowId::random(),
TimePoint::timeless(),
"entity_b".into(),
[
&InstanceKey::from_iter(0..num_instances as _) as _,
&colors as _,
],
)?
};

expected_events.extend(store1.insert_row(&row));

expected_events.extend(store1.gc(GarbageCollectionOptions::gc_everything()).0);
expected_events.extend(store2.gc(GarbageCollectionOptions::gc_everything()).0);

DataStore::with_view::<AllEvents, _, _>(view_handle, |got| {
similar_asserts::assert_eq!(expected_events, got.events);
});

Ok(())
}
}
14 changes: 13 additions & 1 deletion crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use re_types_core::{

use crate::{
DataStore, DataStoreConfig, IndexedBucket, IndexedBucketInner, IndexedTable, MetadataRegistry,
PersistentIndexedTable, StoreDiff, StoreEvent,
PersistentIndexedTable, StoreDiff, StoreDiffKind, StoreEvent,
};

// --- Data store ---
Expand Down Expand Up @@ -202,6 +202,18 @@ impl DataStore {
diff,
};

{
let events = &[event.clone()];

if cfg!(debug_assertions) {
let any_event_other_than_addition =
events.iter().any(|e| e.kind != StoreDiffKind::Addition);
assert!(!any_event_other_than_addition);
}

Self::on_events(events);
}

Ok(event)
}

Expand Down

0 comments on commit 5edaea8

Please sign in to comment.