From 6426bb0dc255c9eae58da7e1ff3b454842fbcebc Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 30 Aug 2023 15:03:23 -0400 Subject: [PATCH] GC the blueprints before saving while preserving the current state (#3148) Resolves: https://github.com/rerun-io/rerun/issues/3098 Related to: https://github.com/rerun-io/rerun/issues/1803 Because blueprints used timeless data and timeless data wasn't GC'd, we previously had no great way to clean up blueprints. This PR paves the way for better overall GC behavior in the future but doesn't change the default behavior yet. This PR: - Introduces a new `GarbageCollectionOptions` instead of just providing a target. This allows you to configure whether you want to gc the timeless data, and additionally how many latest_at values you want to preserve. - Introduces a new gc target: Everything. - Calculates a set of protected rows for every component based on the last relevant row across every timeline (including timeless). - Modifies both `gc_drop_at_least_num_bytes` and the new `gc_everything` to respect the protected rows during gc. - Modifies the store_hub to gc the blueprint before saving it. Photogrammetry with `--no-frames` is another "worst-case" for blueprint because every image is a space-view, so you can easily create a huge blueprint history by repeatedly resetting the blueprint. ![image](https://github.com/rerun-io/rerun/assets/3312232/03df3d06-a780-47b3-b0d9-aaf564793230) * [x] I have read and agree to [Contributor Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and the [Code of Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md) * [x] I've included a screenshot or gif (if applicable) * [x] I have tested [demo.rerun.io](https://demo.rerun.io/pr/3148) (if applicable) - [PR Build Summary](https://build.rerun.io/pr/3148) - [Docs preview](https://rerun.io/preview/60f3747383780c50886ac781bdf81b32fbff76bd/docs) - [Examples preview](https://rerun.io/preview/60f3747383780c50886ac781bdf81b32fbff76bd/examples) - [Recent benchmark results](https://ref.rerun.io/dev/bench/) - [Wasm size tracking](https://ref.rerun.io/dev/sizes/) --- crates/re_arrow_store/benches/data_store.rs | 19 +- crates/re_arrow_store/src/lib.rs | 2 +- crates/re_arrow_store/src/store_gc.rs | 341 ++++++++++++++++++-- crates/re_arrow_store/src/store_stats.rs | 12 + crates/re_arrow_store/tests/correctness.rs | 6 +- crates/re_arrow_store/tests/data_store.rs | 235 ++++++++++++-- crates/re_arrow_store/tests/dump.rs | 12 +- crates/re_data_store/src/store_db.rs | 13 +- crates/re_viewer/src/app.rs | 3 +- crates/re_viewer/src/store_hub.rs | 43 ++- 10 files changed, 600 insertions(+), 86 deletions(-) diff --git a/crates/re_arrow_store/benches/data_store.rs b/crates/re_arrow_store/benches/data_store.rs index 8e213e3fbc95..24b2447496fe 100644 --- a/crates/re_arrow_store/benches/data_store.rs +++ b/crates/re_arrow_store/benches/data_store.rs @@ -5,8 +5,8 @@ use arrow2::array::UnionArray; use criterion::{criterion_group, criterion_main, Criterion}; use re_arrow_store::{ - DataStore, DataStoreConfig, GarbageCollectionTarget, LatestAtQuery, RangeQuery, TimeInt, - TimeRange, + DataStore, DataStoreConfig, GarbageCollectionOptions, GarbageCollectionTarget, LatestAtQuery, + RangeQuery, TimeInt, TimeRange, }; use re_components::{ datagen::{build_frame_nr, build_some_instances, build_some_rects}, @@ -276,7 +276,12 @@ fn gc(c: &mut Criterion) { let store = insert_table(Default::default(), InstanceKey::name(), &table); b.iter(|| { let mut store = store.clone(); - let (_, stats_diff) = store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0)); + let (_, stats_diff) = store.gc(GarbageCollectionOptions { + target: GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0), + gc_timeless: false, + protect_latest: 0, + purge_empty_tables: false, + }); stats_diff }); }); @@ -294,8 +299,12 @@ fn gc(c: &mut Criterion) { ); b.iter(|| { let mut store = store.clone(); - let (_, stats_diff) = - store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0)); + let (_, stats_diff) = store.gc(GarbageCollectionOptions { + target: GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0), + gc_timeless: false, + protect_latest: 0, + purge_empty_tables: false, + }); stats_diff }); }); diff --git a/crates/re_arrow_store/src/lib.rs b/crates/re_arrow_store/src/lib.rs index c093f9c9e861..5d9d4763aea0 100644 --- a/crates/re_arrow_store/src/lib.rs +++ b/crates/re_arrow_store/src/lib.rs @@ -37,7 +37,7 @@ pub mod test_util; pub use self::arrow_util::ArrayExt; pub use self::store::{DataStore, DataStoreConfig, StoreGeneration}; -pub use self::store_gc::GarbageCollectionTarget; +pub use self::store_gc::{GarbageCollectionOptions, GarbageCollectionTarget}; pub use self::store_read::{LatestAtQuery, RangeQuery}; pub use self::store_stats::{DataStoreRowStats, DataStoreStats}; pub use self::store_write::{WriteError, WriteResult}; diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index e696da10fc4e..36820a06a7eb 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -1,7 +1,10 @@ +use ahash::{HashMap, HashSet}; + use re_log_types::{RowId, SizeBytes as _, TimeInt, TimeRange}; +use re_types::ComponentName; use crate::{ - store::{IndexedBucketInner, IndexedTable}, + store::{IndexedBucketInner, IndexedTable, PersistentIndexedTable}, DataStore, DataStoreStats, }; @@ -13,6 +16,35 @@ pub enum GarbageCollectionTarget { /// /// The fraction must be a float in the range [0.0 : 1.0]. DropAtLeastFraction(f64), + + /// GC Everything that isn't protected + Everything, +} + +#[derive(Debug, Clone, Copy)] +pub struct GarbageCollectionOptions { + /// What target threshold should the GC try to meet. + pub target: GarbageCollectionTarget, + + /// Whether to also GC timeless data. + pub gc_timeless: bool, + + /// How many component revisions to preserve on each timeline. + pub protect_latest: usize, + + /// Whether to purge tables that no longer contain any data + pub purge_empty_tables: bool, +} + +impl GarbageCollectionOptions { + pub fn gc_everything() -> Self { + GarbageCollectionOptions { + target: GarbageCollectionTarget::Everything, + gc_timeless: true, + protect_latest: 0, + purge_empty_tables: true, + } + } } impl std::fmt::Display for GarbageCollectionTarget { @@ -21,6 +53,7 @@ impl std::fmt::Display for GarbageCollectionTarget { GarbageCollectionTarget::DropAtLeastFraction(p) => { write!(f, "DropAtLeast({:.3}%)", re_format::format_f64(*p * 100.0)) } + GarbageCollectionTarget::Everything => write!(f, "Everything"), } } } @@ -51,8 +84,14 @@ impl DataStore { /// /// ## Limitations /// - /// The garbage collector is currently unaware of our latest-at semantics, i.e. it will drop - /// old data even if doing so would impact the results of recent queries. + /// The garbage collector has limited support for latest-at semantics. The configuration option: + /// [`GarbageCollectionOptions::protect_latest`] will protect the N latest values of each + /// component on each timeline. The only practical guarantee this gives is that a latest-at query + /// with a value of max-int will be be unchanged. However, latest-at queries from other arbitrary + /// points in time may provide different results pre- and post- GC. + /// + /// NOTE: This configuration option is not yet enabled for the Rerun viewer GC pass. + /// /// See . // // TODO(#1804): There shouldn't be any need to return the purged `RowId`s, all secondary @@ -63,19 +102,19 @@ impl DataStore { // when purging data. // // TODO(#1823): Workload specific optimizations. - pub fn gc(&mut self, target: GarbageCollectionTarget) -> (Vec, DataStoreStats) { + pub fn gc(&mut self, options: GarbageCollectionOptions) -> (Vec, DataStoreStats) { re_tracing::profile_function!(); self.gc_id += 1; - // NOTE: only temporal data and row metadata get purged! let stats_before = DataStoreStats::from_store(self); - let initial_num_rows = - stats_before.temporal.num_rows + stats_before.metadata_registry.num_rows; - let initial_num_bytes = - (stats_before.temporal.num_bytes + stats_before.metadata_registry.num_bytes) as f64; - let row_ids = match target { + let (initial_num_rows, initial_num_bytes) = + stats_before.total_rows_and_bytes_with_timeless(options.gc_timeless); + + let protected_rows = self.find_all_protected_rows(options.protect_latest); + + let mut row_ids = match options.target { GarbageCollectionTarget::DropAtLeastFraction(p) => { assert!((0.0..=1.0).contains(&p)); @@ -85,7 +124,7 @@ impl DataStore { re_log::trace!( kind = "gc", id = self.gc_id, - %target, + %options.target, initial_num_rows = re_format::format_large_number(initial_num_rows as _), initial_num_bytes = re_format::format_bytes(initial_num_bytes), target_num_bytes = re_format::format_bytes(target_num_bytes), @@ -93,23 +132,42 @@ impl DataStore { "starting GC" ); - self.gc_drop_at_least_num_bytes(num_bytes_to_drop) + self.gc_drop_at_least_num_bytes( + num_bytes_to_drop, + options.gc_timeless, + &protected_rows, + ) + } + GarbageCollectionTarget::Everything => { + re_log::trace!( + kind = "gc", + id = self.gc_id, + %options.target, + initial_num_rows = re_format::format_large_number(initial_num_rows as _), + initial_num_bytes = re_format::format_bytes(initial_num_bytes), + "starting GC" + ); + + self.gc_drop_at_least_num_bytes(f64::INFINITY, options.gc_timeless, &protected_rows) } }; + if options.purge_empty_tables { + row_ids.extend(self.purge_empty_tables()); + } + #[cfg(debug_assertions)] self.sanity_check().unwrap(); // NOTE: only temporal data and row metadata get purged! let stats_after = DataStoreStats::from_store(self); - let new_num_rows = stats_after.temporal.num_rows + stats_after.metadata_registry.num_rows; - let new_num_bytes = - (stats_after.temporal.num_bytes + stats_after.metadata_registry.num_bytes) as f64; + let (new_num_rows, new_num_bytes) = + stats_after.total_rows_and_bytes_with_timeless(options.gc_timeless); re_log::trace!( kind = "gc", id = self.gc_id, - %target, + %options.target, initial_num_rows = re_format::format_large_number(initial_num_rows as _), initial_num_bytes = re_format::format_bytes(initial_num_bytes), new_num_rows = re_format::format_large_number(new_num_rows as _), @@ -125,36 +183,217 @@ impl DataStore { /// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store. /// /// Returns the list of `RowId`s that were purged from the store. - fn gc_drop_at_least_num_bytes(&mut self, mut num_bytes_to_drop: f64) -> Vec { + // + // TODO(jleibs): There are some easy optimizations here if we find GC taking too long: + // - If we stored the entity_path_hash along with timepoints in the metadata_registry we could jump + // directly to the relevant tables instead of needing to iterate over all tables. + // - If we know we are clearing almost everything, then we can batch-clear the rows from the + // the tables instead of needing to iterate over every single row incrementally. + fn gc_drop_at_least_num_bytes( + &mut self, + mut num_bytes_to_drop: f64, + include_timeless: bool, + protected_rows: &HashSet, + ) -> Vec { re_tracing::profile_function!(); let mut row_ids = Vec::new(); // The algorithm is straightforward: - // 1. Pop the oldest `RowId` available + // 1. Find the the oldest `RowId` that is not protected // 2. Find all tables that potentially hold data associated with that `RowId` // 3. Drop the associated row and account for the space we got back + + let mut candidate_rows = self.metadata_registry.registry.iter(); + while num_bytes_to_drop > 0.0 { - // pop next row id - let Some((row_id, timepoint)) = self.metadata_registry.pop_first() else { + // Try to get the next candidate + let Some((row_id, timepoint)) = candidate_rows.next() else { break; }; + + if protected_rows.contains(row_id) { + continue; + } + let metadata_dropped_size_bytes = row_id.total_size_bytes() + timepoint.total_size_bytes(); self.metadata_registry.heap_size_bytes -= metadata_dropped_size_bytes; num_bytes_to_drop -= metadata_dropped_size_bytes as f64; - row_ids.push(row_id); + + row_ids.push(*row_id); // find all tables that could possibly contain this `RowId` - let tables = self.tables.iter_mut().filter_map(|((timeline, _), table)| { + let temporal_tables = self.tables.iter_mut().filter_map(|((timeline, _), table)| { timepoint.get(timeline).map(|time| (*time, table)) }); - for (time, table) in tables { - num_bytes_to_drop -= table.try_drop_row(row_id, time.as_i64()) as f64; + for (time, table) in temporal_tables { + num_bytes_to_drop -= table.try_drop_row(*row_id, time.as_i64()) as f64; + } + + // TODO(jleibs): This is a worst-case removal-order. Would be nice to collect all the rows + // first and then remove them in one pass. + if timepoint.is_timeless() && include_timeless { + for table in self.timeless_tables.values_mut() { + num_bytes_to_drop -= table.try_drop_row(*row_id) as f64; + } } } + // Purge the removed rows from the metadata_registry + for row_id in &row_ids { + self.metadata_registry.remove(row_id); + } + + row_ids + } + + /// For each `EntityPath`, `Timeline`, `Component` find the N latest [`RowId`]s. + /// + /// These are the rows that must be protected so as not to impact a latest-at query. + /// Note that latest for Timeless is currently based on insertion-order rather than + /// tuid. [See: #1807](https://github.com/rerun-io/rerun/issues/1807) + // + // TODO(jleibs): More complex functionality might required expanding this to also + // *ignore* specific entities, components, timelines, etc. for this protection. + // + // TODO(jleibs): `RowId`s should never overlap between entities. Creating a single large + // HashSet might actually be sub-optimal here. Consider switching to a map of + // `EntityPath` -> `HashSet`. + fn find_all_protected_rows(&mut self, target_count: usize) -> HashSet { + re_tracing::profile_function!(); + + if target_count == 0 { + return Default::default(); + } + + // We need to sort to be able to determine latest-at. + self.sort_indices_if_needed(); + + let mut protected_rows: HashSet = Default::default(); + + // Find all protected rows in regular indexed tables + for table in self.tables.values() { + let mut components_to_find: HashMap = table + .all_components + .iter() + .filter(|c| **c != table.cluster_key) + .map(|c| (*c, target_count)) + .collect(); + + for bucket in table.buckets.values().rev() { + for (component, count) in &mut components_to_find { + if *count == 0 { + continue; + } + let inner = bucket.inner.read(); + // TODO(jleibs): If the entire column for a component is empty, we should + // make sure the column is dropped so we don't have to iterate over a + // bunch of Nones. + if let Some(column) = inner.columns.get(component) { + for row in column + .iter() + .enumerate() + .rev() + .filter_map(|(row_index, cell)| { + cell.as_ref().and_then(|_| inner.col_row_id.get(row_index)) + }) + .take(*count) + { + *count -= 1; + protected_rows.insert(*row); + } + } + } + } + } + + // Find all protected rows in timeless tables + // NOTE this is still based on insertion order. + // https://github.com/rerun-io/rerun/issues/1807 + for table in self.timeless_tables.values() { + let mut components_to_find: HashMap = table + .columns + .keys() + .filter(|c| **c != table.cluster_key) + .map(|c| (*c, target_count)) + .collect(); + + for (component, count) in &mut components_to_find { + if *count == 0 { + continue; + } + // TODO(jleibs): If the entire column for a component is empty, we should + // make sure the column is dropped so we don't have to iterate over a + // bunch of Nones. + if let Some(column) = table.columns.get(component) { + for row_id in column + .iter() + .enumerate() + .rev() + .filter_map(|(row_index, cell)| { + cell.as_ref().and_then(|_| table.col_row_id.get(row_index)) + }) + .take(*count) + { + *count -= 1; + protected_rows.insert(*row_id); + } + } + } + } + + protected_rows + } + + /// Remove any tables which contain only components which are empty. + // TODO(jleibs): We could optimize this further by also erasing empty columns. + fn purge_empty_tables(&mut self) -> Vec { + re_tracing::profile_function!(); + + let mut row_ids = Vec::new(); + + // Drop any empty timeless tables + self.timeless_tables.retain(|_, table| { + // If any column is non-empty, we need to keep this table + for num in &table.col_num_instances { + if num != &0 { + return true; + } + } + + // Otherwise we can drop it + row_ids.extend(table.col_row_id.iter()); + false + }); + + // Drop any empty temporal tables that aren't backed by a timeless table + self.tables.retain(|(_, entity), table| { + // If the timeless table still exists, this table might be storing empty values + // that hide the timeless values, so keep it around. + if self.timeless_tables.contains_key(entity) { + return true; + } + + // If any bucket has a non-empty component in any column, we keep it. + for bucket in table.buckets.values() { + let inner = bucket.inner.read(); + for num in &inner.col_num_instances { + if num != &0 { + return true; + } + } + } + + // Otherwise we can drop it + for bucket in table.buckets.values() { + let inner = bucket.inner.read(); + row_ids.extend(inner.col_row_id.iter()); + } + false + }); + row_ids } } @@ -165,8 +404,6 @@ impl IndexedTable { /// /// Returns how many bytes were actually dropped, or zero if the row wasn't found. fn try_drop_row(&mut self, row_id: RowId, time: i64) -> u64 { - re_tracing::profile_function!(); - let table_has_more_than_one_bucket = self.buckets.len() > 1; let (bucket_key, bucket) = self.find_bucket_mut(time.into()); @@ -211,8 +448,6 @@ impl IndexedBucketInner { /// /// Returns how many bytes were actually dropped, or zero if the row wasn't found. fn try_drop_row(&mut self, row_id: RowId, time: i64) -> u64 { - re_tracing::profile_function!(); - self.sort(); let IndexedBucketInner { @@ -285,3 +520,53 @@ impl IndexedBucketInner { dropped_num_bytes } } + +impl PersistentIndexedTable { + /// Tries to drop the given `row_id` from the table. + /// + /// Returns how many bytes were actually dropped, or zero if the row wasn't found. + fn try_drop_row(&mut self, row_id: RowId) -> u64 { + let mut dropped_num_bytes = 0u64; + + let PersistentIndexedTable { + ent_path: _, + cluster_key: _, + col_insert_id, + col_row_id, + col_num_instances, + columns, + } = self; + + // TODO(jleibs) Timeless data isn't sorted, so we need to do a full scan here. + // Speed this up when we implement: https://github.com/rerun-io/rerun/issues/1807 + if let Some(row_index) = col_row_id + .iter() + .enumerate() + .find(|(_, r)| **r == row_id) + .map(|(index, _)| index) + { + // col_row_id + // TODO(jleibs) Use swap_remove once we have a notion of sorted + let removed_row_id = col_row_id.remove(row_index); + debug_assert_eq!(row_id, removed_row_id); + dropped_num_bytes += removed_row_id.total_size_bytes(); + + // col_insert_id (if present) + if !col_insert_id.is_empty() { + // TODO(jleibs) Use swap_remove once we have a notion of sorted + dropped_num_bytes += col_insert_id.remove(row_index).total_size_bytes(); + } + + // col_num_instances + // TODO(jleibs) Use swap_remove once we have a notion of sorted + dropped_num_bytes += col_num_instances.remove(row_index).total_size_bytes(); + + // each data column + for column in columns.values_mut() { + dropped_num_bytes += column.0.remove(row_index).total_size_bytes(); + } + } + + dropped_num_bytes + } +} diff --git a/crates/re_arrow_store/src/store_stats.rs b/crates/re_arrow_store/src/store_stats.rs index e84e7dff7841..679dd9a168a2 100644 --- a/crates/re_arrow_store/src/store_stats.rs +++ b/crates/re_arrow_store/src/store_stats.rs @@ -146,6 +146,18 @@ impl DataStoreStats { total, } } + + pub fn total_rows_and_bytes_with_timeless(&self, include_timeless: bool) -> (u64, f64) { + let mut num_rows = self.temporal.num_rows + self.metadata_registry.num_rows; + let mut num_bytes = (self.temporal.num_bytes + self.metadata_registry.num_bytes) as f64; + + if include_timeless { + num_rows += self.timeless.num_rows; + num_bytes += self.timeless.num_bytes as f64; + } + + (num_rows, num_bytes) + } } // --- Data store --- diff --git a/crates/re_arrow_store/tests/correctness.rs b/crates/re_arrow_store/tests/correctness.rs index 7357bbead031..465daefdfec7 100644 --- a/crates/re_arrow_store/tests/correctness.rs +++ b/crates/re_arrow_store/tests/correctness.rs @@ -8,7 +8,7 @@ use rand::Rng; use re_arrow_store::{ test_row, test_util::sanity_unwrap, DataStore, DataStoreConfig, DataStoreStats, - GarbageCollectionTarget, LatestAtQuery, WriteError, + GarbageCollectionOptions, LatestAtQuery, WriteError, }; use re_components::datagen::{ build_frame_nr, build_log_time, build_some_colors, build_some_instances, build_some_point2d, @@ -301,7 +301,7 @@ fn gc_correct() { let stats = DataStoreStats::from_store(&store); - let (row_ids, stats_diff) = store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything()); let stats_diff = stats_diff + stats_empty; // account for fixed overhead assert_eq!(row_ids.len() as u64, stats.total.num_rows); @@ -321,7 +321,7 @@ fn gc_correct() { assert!(store.get_msg_metadata(row_id).is_none()); } - let (row_ids, stats_diff) = store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything()); assert!(row_ids.is_empty()); assert_eq!(DataStoreStats::default(), stats_diff); diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 1413eb028c37..7bccc0e45ae9 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -12,7 +12,8 @@ use polars_ops::prelude::DataFrameJoinOps; use rand::Rng; use re_arrow_store::{ polars_util, test_row, test_util::sanity_unwrap, DataStore, DataStoreConfig, DataStoreStats, - GarbageCollectionTarget, LatestAtQuery, RangeQuery, TimeInt, TimeRange, + GarbageCollectionOptions, GarbageCollectionTarget, LatestAtQuery, RangeQuery, TimeInt, + TimeRange, }; use re_components::{ datagen::{ @@ -32,11 +33,11 @@ fn all_components() { let ent_path = EntityPath::from("this/that"); - // let frame0: TimeInt = 0.into(); - let frame1: TimeInt = 1.into(); - let frame2: TimeInt = 2.into(); - let frame3: TimeInt = 3.into(); - let frame4: TimeInt = 4.into(); + // let frame0= TimeInt::from(0); + let frame1 = TimeInt::from(1); + let frame2 = TimeInt::from(2); + let frame3 = TimeInt::from(3); + let frame4 = TimeInt::from(4); let assert_latest_components_at = |store: &mut DataStore, ent_path: &EntityPath, expected: Option<&[ComponentName]>| { @@ -47,7 +48,7 @@ fn all_components() { } // Stress test GC - store2.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + store2.gc(GarbageCollectionOptions::gc_everything()); for table in store.to_data_tables(None) { store2.insert_table(&table).unwrap(); } @@ -259,11 +260,11 @@ fn latest_at_impl(store: &mut DataStore) { let ent_path = EntityPath::from("this/that"); - let frame0: TimeInt = 0.into(); - let frame1: TimeInt = 1.into(); - let frame2: TimeInt = 2.into(); - let frame3: TimeInt = 3.into(); - let frame4: TimeInt = 4.into(); + let frame0 = TimeInt::from(0); + let frame1 = TimeInt::from(1); + let frame2 = TimeInt::from(2); + let frame3 = TimeInt::from(3); + let frame4 = TimeInt::from(4); // helper to insert a table both as a temporal and timeless payload let insert_table = |store: &mut DataStore, table: &DataTable| { @@ -302,7 +303,7 @@ fn latest_at_impl(store: &mut DataStore) { store2.insert_table(&table).unwrap(); } // Stress test GC - store2.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + store2.gc(GarbageCollectionOptions::gc_everything()); for table in store.to_data_tables(None) { store2.insert_table(&table).unwrap(); } @@ -373,12 +374,12 @@ fn range_impl(store: &mut DataStore) { let ent_path = EntityPath::from("this/that"); - let frame0: TimeInt = 0.into(); - let frame1: TimeInt = 1.into(); - let frame2: TimeInt = 2.into(); - let frame3: TimeInt = 3.into(); - let frame4: TimeInt = 4.into(); - let frame5: TimeInt = 5.into(); + let frame0 = TimeInt::from(0); + let frame1 = TimeInt::from(1); + let frame2 = TimeInt::from(2); + let frame3 = TimeInt::from(3); + let frame4 = TimeInt::from(4); + let frame5 = TimeInt::from(5); // helper to insert a row both as a temporal and timeless payload let insert = |store: &mut DataStore, row| { @@ -444,7 +445,7 @@ fn range_impl(store: &mut DataStore) { store2.insert_table(&table).unwrap(); } store2.wipe_timeless_data(); - store2.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + store2.gc(GarbageCollectionOptions::gc_everything()); for table in store.to_data_tables(None) { store2.insert_table(&table).unwrap(); } @@ -873,8 +874,12 @@ fn gc_impl(store: &mut DataStore) { let stats = DataStoreStats::from_store(store); - let (row_ids, stats_diff) = - store.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0)); + let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions { + target: GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0), + gc_timeless: false, + protect_latest: 0, + purge_empty_tables: false, + }); for row_id in &row_ids { assert!(store.get_msg_metadata(row_id).is_none()); } @@ -897,6 +902,192 @@ fn gc_impl(store: &mut DataStore) { } } +#[test] +fn protected_gc() { + init_logs(); + + for config in re_arrow_store::test_util::all_configs() { + let mut store = DataStore::new(InstanceKey::name(), config.clone()); + protected_gc_impl(&mut store); + } +} + +fn protected_gc_impl(store: &mut DataStore) { + init_logs(); + + let ent_path = EntityPath::from("this/that"); + + let frame0 = TimeInt::from(0); + let frame1 = TimeInt::from(1); + let frame2 = TimeInt::from(2); + let frame3 = TimeInt::from(3); + let frame4 = TimeInt::from(4); + + let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); + let row1 = test_row!(ent_path @ [build_frame_nr(frame1)] => 3; [instances1.clone(), colors1]); + + let points2 = build_some_point2d(3); + let row2 = test_row!(ent_path @ [build_frame_nr(frame2)] => 3; [instances1, points2]); + + let points3 = build_some_point2d(10); + let row3 = test_row!(ent_path @ [build_frame_nr(frame3)] => 10; [points3]); + + let colors4 = build_some_colors(5); + let row4 = test_row!(ent_path @ [build_frame_nr(frame4)] => 5; [colors4]); + + store + .insert_table(&DataTable::from_rows( + TableId::random(), + [row1.clone(), row2.clone(), row3.clone(), row4.clone()], + )) + .unwrap(); + + // Re-insert row1 and row2 as timeless data as well + let mut table_timeless = DataTable::from_rows(TableId::random(), [row1.clone(), row2.clone()]); + table_timeless.col_timelines = Default::default(); + store.insert_table(&table_timeless).unwrap(); + + store.gc(GarbageCollectionOptions { + target: GarbageCollectionTarget::Everything, + gc_timeless: true, + protect_latest: 1, + purge_empty_tables: true, + }); + + let mut assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, &DataRow)]| { + let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); + let components_all = &[Color::name(), Point2D::name()]; + + let df = polars_util::latest_components( + store, + &LatestAtQuery::new(timeline_frame_nr, frame_nr), + &ent_path, + components_all, + &JoinType::Outer, + ) + .unwrap(); + + let df_expected = joint_df(store.cluster_key(), rows); + + store.sort_indices_if_needed(); + assert_eq!(df_expected, df, "{store}"); + }; + + // The timeless data was preserved + assert_latest_components( + frame0, + &[(Color::name(), &row1), (Point2D::name(), &row2)], // timeless + ); + + // + assert_latest_components( + frame3, + &[ + (Color::name(), &row1), // timeless + (Point2D::name(), &row3), // protected + ], + ); + + assert_latest_components( + frame4, + &[ + (Color::name(), &row4), //protected + (Point2D::name(), &row3), // protected + ], + ); +} + +#[test] +fn protected_gc_clear() { + init_logs(); + + for config in re_arrow_store::test_util::all_configs() { + let mut store = DataStore::new(InstanceKey::name(), config.clone()); + protected_gc_clear_impl(&mut store); + } +} + +fn protected_gc_clear_impl(store: &mut DataStore) { + init_logs(); + + let ent_path = EntityPath::from("this/that"); + + let frame0 = TimeInt::from(0); + let frame1 = TimeInt::from(1); + let frame2 = TimeInt::from(2); + let frame3 = TimeInt::from(3); + let frame4 = TimeInt::from(4); + + let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); + let row1 = test_row!(ent_path @ [build_frame_nr(frame1)] => 3; [instances1.clone(), colors1]); + + let points2 = build_some_point2d(3); + let row2 = test_row!(ent_path @ [build_frame_nr(frame2)] => 3; [instances1, points2]); + + let colors2 = build_some_colors(0); + let row3 = test_row!(ent_path @ [build_frame_nr(frame3)] => 0; [colors2]); + + let points4 = build_some_point2d(0); + let row4 = test_row!(ent_path @ [build_frame_nr(frame4)] => 0; [points4]); + + // Insert the 3 rows as timeless + let mut table_timeless = DataTable::from_rows( + TableId::random(), + [row1.clone(), row2.clone(), row3.clone()], + ); + table_timeless.col_timelines = Default::default(); + store.insert_table(&table_timeless).unwrap(); + + store.gc(GarbageCollectionOptions { + target: GarbageCollectionTarget::Everything, + gc_timeless: true, + protect_latest: 1, + purge_empty_tables: true, + }); + + let mut assert_latest_components = |frame_nr: TimeInt, rows: &[(ComponentName, &DataRow)]| { + let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence); + let components_all = &[Color::name(), Point2D::name()]; + + let df = polars_util::latest_components( + store, + &LatestAtQuery::new(timeline_frame_nr, frame_nr), + &ent_path, + components_all, + &JoinType::Outer, + ) + .unwrap(); + + let df_expected = joint_df(store.cluster_key(), rows); + + store.sort_indices_if_needed(); + assert_eq!(df_expected, df, "{store}"); + }; + + // Only points are preserved, since colors were cleared and then GC'd + assert_latest_components(frame0, &[(Color::name(), &row3), (Point2D::name(), &row2)]); + + // Only the 2 rows should remain in the table + let stats = DataStoreStats::from_store(store); + assert_eq!(stats.timeless.num_rows, 2); + + // Now erase points and GC again + let mut table_timeless = DataTable::from_rows(TableId::random(), [row4]); + table_timeless.col_timelines = Default::default(); + store.insert_table(&table_timeless).unwrap(); + + store.gc(GarbageCollectionOptions { + target: GarbageCollectionTarget::Everything, + gc_timeless: true, + protect_latest: 1, + purge_empty_tables: true, + }); + + // No rows should remain because the table should have been purged + let stats = DataStoreStats::from_store(store); + assert_eq!(stats.timeless.num_rows, 0); +} + // --- pub fn init_logs() { diff --git a/crates/re_arrow_store/tests/dump.rs b/crates/re_arrow_store/tests/dump.rs index 3ad7e21f5b21..d64885dda706 100644 --- a/crates/re_arrow_store/tests/dump.rs +++ b/crates/re_arrow_store/tests/dump.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use itertools::Itertools; use re_arrow_store::{ - test_row, test_util::sanity_unwrap, DataStore, DataStoreStats, GarbageCollectionTarget, + test_row, test_util::sanity_unwrap, DataStore, DataStoreStats, GarbageCollectionOptions, TimeInt, TimeRange, Timeline, }; use re_components::datagen::{ @@ -31,11 +31,11 @@ fn data_store_dump() { // stress-test GC impl store1.wipe_timeless_data(); - store1.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + store1.gc(GarbageCollectionOptions::gc_everything()); store2.wipe_timeless_data(); - store2.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + store2.gc(GarbageCollectionOptions::gc_everything()); store3.wipe_timeless_data(); - store3.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + store3.gc(GarbageCollectionOptions::gc_everything()); data_store_dump_impl(&mut store1, &mut store2, &mut store3); } @@ -125,8 +125,8 @@ fn data_store_dump_filtered() { data_store_dump_filtered_impl(&mut store1, &mut store2); // stress-test GC impl - store1.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); - store2.gc(GarbageCollectionTarget::DropAtLeastFraction(1.0)); + store1.gc(GarbageCollectionOptions::gc_everything()); + store2.gc(GarbageCollectionOptions::gc_everything()); data_store_dump_filtered_impl(&mut store1, &mut store2); } diff --git a/crates/re_data_store/src/store_db.rs b/crates/re_data_store/src/store_db.rs index 90735d0e362b..89b934c796bd 100644 --- a/crates/re_data_store/src/store_db.rs +++ b/crates/re_data_store/src/store_db.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use nohash_hasher::IntMap; -use re_arrow_store::{DataStoreConfig, TimeInt}; +use re_arrow_store::{DataStoreConfig, GarbageCollectionOptions, TimeInt}; use re_log_types::{ ApplicationId, ArrowMsg, ComponentPath, DataCell, DataRow, DataTable, EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, PathOp, RowId, SetStoreInfo, StoreId, StoreInfo, @@ -343,9 +343,14 @@ impl StoreDb { re_tracing::profile_function!(); assert!((0.0..=1.0).contains(&fraction_to_purge)); - let (drop_row_ids, stats_diff) = self.entity_db.data_store.gc( - re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(fraction_to_purge as _), - ); + let (drop_row_ids, stats_diff) = self.entity_db.data_store.gc(GarbageCollectionOptions { + target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction( + fraction_to_purge as _, + ), + gc_timeless: false, + protect_latest: 0, + purge_empty_tables: false, + }); re_log::trace!( num_row_ids_dropped = drop_row_ids.len(), size_bytes_dropped = re_format::format_bytes(stats_diff.total.num_bytes as _), diff --git a/crates/re_viewer/src/app.rs b/crates/re_viewer/src/app.rs index af725d5aa7e9..1280f6da0e49 100644 --- a/crates/re_viewer/src/app.rs +++ b/crates/re_viewer/src/app.rs @@ -798,9 +798,8 @@ impl eframe::App for App { // Save the blueprints // TODO(2579): implement web-storage for blueprints as well - #[cfg(not(target_arch = "wasm32"))] if let Some(hub) = &mut self.store_hub { - match hub.persist_app_blueprints() { + match hub.gc_and_persist_app_blueprints() { Ok(f) => f, Err(err) => { re_log::error!("Saving blueprints failed: {err}"); diff --git a/crates/re_viewer/src/store_hub.rs b/crates/re_viewer/src/store_hub.rs index 1c8207d62aa2..7609880ae57d 100644 --- a/crates/re_viewer/src/store_hub.rs +++ b/crates/re_viewer/src/store_hub.rs @@ -5,7 +5,6 @@ use re_data_store::StoreDb; use re_log_types::{ApplicationId, StoreId, StoreKind}; use re_viewer_context::StoreContext; -#[cfg(not(target_arch = "wasm32"))] use re_arrow_store::StoreGeneration; #[cfg(not(target_arch = "wasm32"))] @@ -30,7 +29,6 @@ pub struct StoreHub { store_dbs: StoreBundle, // The [`StoreGeneration`] from when the [`StoreDb`] was last saved - #[cfg(not(target_arch = "wasm32"))] blueprint_last_save: HashMap, } @@ -168,24 +166,39 @@ impl StoreHub { /// Persist any in-use blueprints to durable storage. // TODO(#2579): implement persistence for web - #[cfg(not(target_arch = "wasm32"))] - pub fn persist_app_blueprints(&mut self) -> anyhow::Result<()> { + #[allow(clippy::unnecessary_wraps)] + pub fn gc_and_persist_app_blueprints(&mut self) -> anyhow::Result<()> { + re_tracing::profile_function!(); // Because we save blueprints based on their `ApplicationId`, we only // save the blueprints referenced by `blueprint_by_app_id`, even though // there may be other Blueprints in the Hub. - for (app_id, blueprint_id) in &self.blueprint_by_app_id { - let blueprint_path = default_blueprint_path(app_id)?; - re_log::debug!("Saving blueprint for {app_id} to {blueprint_path:?}"); - if let Some(blueprint) = self.store_dbs.blueprint(blueprint_id) { + use re_arrow_store::GarbageCollectionOptions; + for (app_id, blueprint_id) in &self.blueprint_by_app_id { + if let Some(blueprint) = self.store_dbs.blueprint_mut(blueprint_id) { if self.blueprint_last_save.get(blueprint_id) != Some(&blueprint.generation()) { - // TODO(#1803): We should "flatten" blueprints when we save them - // TODO(jleibs): Should we push this into a background thread? Blueprints should generally - // be small & fast to save, but maybe not once we start adding big pieces of user data? - let file_saver = save_database_to_file(blueprint, blueprint_path, None)?; - file_saver()?; - self.blueprint_last_save - .insert(blueprint_id.clone(), blueprint.generation()); + // GC everything but the latest row. + blueprint.store_mut().gc(GarbageCollectionOptions { + target: re_arrow_store::GarbageCollectionTarget::Everything, + gc_timeless: true, + protect_latest: 1, // TODO(jleibs): Bump this after we have an undo buffer + purge_empty_tables: true, + }); + #[cfg(not(target_arch = "wasm32"))] + { + let blueprint_path = default_blueprint_path(app_id)?; + re_log::debug!("Saving blueprint for {app_id} to {blueprint_path:?}"); + // TODO(jleibs): Should we push this into a background thread? Blueprints should generally + // be small & fast to save, but maybe not once we start adding big pieces of user data? + let file_saver = save_database_to_file(blueprint, blueprint_path, None)?; + file_saver()?; + self.blueprint_last_save + .insert(blueprint_id.clone(), blueprint.generation()); + } + #[cfg(target_arch = "wasm32")] + { + _ = app_id; + } } } }