From 58abec4b3b18931636ca9115c3de624bf3a65a56 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 19 Sep 2023 11:01:02 +0200 Subject: [PATCH 01/14] Add IntHistogram::decrement --- crates/re_int_histogram/src/tree.rs | 113 +++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 2 deletions(-) diff --git a/crates/re_int_histogram/src/tree.rs b/crates/re_int_histogram/src/tree.rs index 704bc4d14253..67b42e9af71d 100644 --- a/crates/re_int_histogram/src/tree.rs +++ b/crates/re_int_histogram/src/tree.rs @@ -128,8 +128,26 @@ impl Int64Histogram { /// /// Incrementing with one is similar to inserting the key in a multi-set. pub fn increment(&mut self, key: i64, inc: u32) { - self.root - .increment(ROOT_LEVEL, u64_key_from_i64_key(key), inc); + if inc != 0 { + self.root + .increment(ROOT_LEVEL, u64_key_from_i64_key(key), inc); + } + } + + /// Decremenmt the count for the given key. + /// + /// The decrement is saturating. + /// + /// Returns how much was actually decremented (found). + /// If the returned value is less than the given value, + /// it means that the key was either no found, or had a lower count. + pub fn decrement(&mut self, key: i64, dec: u32) -> u32 { + if dec == 0 { + 0 + } else { + self.root + .decrement(ROOT_LEVEL, u64_key_from_i64_key(key), dec) + } } /// Is the total count zero? @@ -311,6 +329,14 @@ impl Node { } } + fn decrement(&mut self, level: Level, addr: u64, dec: u32) -> u32 { + match self { + Node::BranchNode(node) => node.decrement(level, addr, dec), + Node::SparseLeaf(sparse) => sparse.decrement(addr, dec), + Node::DenseLeaf(dense) => dense.decrement(addr, dec), + } + } + fn is_empty(&self) -> bool { match self { Node::BranchNode(node) => node.is_empty(), @@ -381,6 +407,22 @@ impl BranchNode { self.total_count += inc as u64; } + fn decrement(&mut self, level: Level, addr: u64, dec: u32) -> u32 { + debug_assert!(level != BOTTOM_LEVEL); + let child_level = level - LEVEL_STEP; + let top_addr = (addr >> level) & ADDR_MASK; + if let Some(child) = &mut self.children[top_addr as usize] { + let actually_decremented = child.decrement(child_level, addr, dec); + if child.is_empty() { + self.children[top_addr as usize] = None; + } + self.total_count = self.total_count.saturating_sub(actually_decremented as _); + actually_decremented + } else { + 0 + } + } + fn is_empty(&self) -> bool { self.total_count == 0 } @@ -513,6 +555,25 @@ impl SparseLeaf { } } + fn decrement(&mut self, abs_addr: u64, dec: u32) -> u32 { + let index = self.addrs.partition_point(|&addr| addr < abs_addr); + + if let (Some(addr), Some(count)) = (self.addrs.get_mut(index), self.counts.get_mut(index)) { + if *addr == abs_addr { + return if dec <= *count { + *count -= dec; + dec + } else { + let actually_decremented = *count; + *count = 0; + actually_decremented + }; + } + } + + 0 // not found + } + fn is_empty(&self) -> bool { self.addrs.is_empty() } @@ -564,6 +625,18 @@ impl DenseLeaf { self.counts[(abs_addr & (NUM_CHILDREN_IN_DENSE - 1)) as usize] += inc; } + fn decrement(&mut self, abs_addr: u64, dec: u32) -> u32 { + let bucket = &mut self.counts[(abs_addr & (NUM_CHILDREN_IN_DENSE - 1)) as usize]; + if dec <= *bucket { + *bucket -= dec; + dec + } else { + let actually_decremented = *bucket; + *bucket = 0; + actually_decremented + } + } + fn is_empty(&self) -> bool { self.total_count() == 0 } @@ -736,6 +809,11 @@ mod tests { assert_eq!(set.range(.., 1).collect::>(), expected_ranges); assert_eq!(set.range(..10, 1).count(), 10); + + assert_eq!(set.decrement(5, 1), 1); + assert_eq!(set.range(..10, 1).count(), 9); + assert_eq!(set.decrement(5, 1), 0); + assert_eq!(set.range(..10, 1).count(), 9); } #[test] @@ -911,4 +989,35 @@ mod tests { vec![(RangeI64::single(i64::MAX - 1), 2),] ); } + + #[test] + fn test_decrement() { + let mut set = Int64Histogram::default(); + + for i in 0..100 { + set.increment(i, 2); + } + + assert_eq!((set.min_key(), set.max_key()), (Some(0), Some(99))); + + for i in 0..100 { + assert_eq!(set.decrement(i, 1), 1); + } + + assert_eq!((set.min_key(), set.max_key()), (Some(0), Some(99))); + + for i in 0..50 { + assert_eq!(set.decrement(i, 1), 1); + } + + assert_eq!((set.min_key(), set.max_key()), (Some(50), Some(99))); + + for i in 0..50 { + assert_eq!( + set.decrement(i, 1), + 0, + "Should already have been decremented" + ); + } + } } From 282ff479e06ee36d583135551d31846d728be323 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 19 Sep 2023 11:01:48 +0200 Subject: [PATCH 02/14] Improve error in live_camera_edge_detection when theres's no cameras --- examples/python/live_camera_edge_detection/main.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/python/live_camera_edge_detection/main.py b/examples/python/live_camera_edge_detection/main.py index 5307c227a368..7840697bc514 100755 --- a/examples/python/live_camera_edge_detection/main.py +++ b/examples/python/live_camera_edge_detection/main.py @@ -25,7 +25,10 @@ def run_canny(num_frames: int | None) -> None: # Read the frame ret, img = cap.read() if not ret: - print("Can't receive frame (stream end?). Exiting ...") + if frame_nr == 0: + print("Failed to capture any frame. No camera connected?") + else: + print("Can't receive frame (stream end?). Exiting…") break # Get the current frame time. On some platforms it always returns zero. From 91aeb7bb0e97336633ea6325e8efc05b5d508abd Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 19 Sep 2023 11:02:07 +0200 Subject: [PATCH 03/14] Improve hashing of `Timeline`s --- crates/re_log_types/src/time_point/timeline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/re_log_types/src/time_point/timeline.rs b/crates/re_log_types/src/time_point/timeline.rs index de763ba27662..1fdef9488367 100644 --- a/crates/re_log_types/src/time_point/timeline.rs +++ b/crates/re_log_types/src/time_point/timeline.rs @@ -126,6 +126,6 @@ impl SizeBytes for Timeline { impl std::hash::Hash for Timeline { #[inline] fn hash(&self, state: &mut H) { - state.write_u64(self.name.hash() | self.typ.hash()); + state.write_u64(self.name.hash() ^ self.typ.hash()); } } From 4d71ca3778cba36049fd5ed842a4803d1f2ad643 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 19 Sep 2023 11:03:10 +0200 Subject: [PATCH 04/14] Book-keep exactly what was deleted during store GC --- crates/re_arrow_store/src/lib.rs | 2 +- crates/re_arrow_store/src/store_gc.rs | 61 ++++++++++++++-------- crates/re_arrow_store/tests/correctness.rs | 10 ++-- crates/re_arrow_store/tests/data_store.rs | 4 +- crates/re_data_store/src/store_db.rs | 9 ++-- 5 files changed, 51 insertions(+), 35 deletions(-) diff --git a/crates/re_arrow_store/src/lib.rs b/crates/re_arrow_store/src/lib.rs index 789c0d4a03f5..e13081fb42c9 100644 --- a/crates/re_arrow_store/src/lib.rs +++ b/crates/re_arrow_store/src/lib.rs @@ -37,7 +37,7 @@ pub mod test_util; pub use self::arrow_util::ArrayExt; pub use self::store::{DataStore, DataStoreConfig, StoreGeneration}; -pub use self::store_gc::{GarbageCollectionOptions, GarbageCollectionTarget}; +pub use self::store_gc::{Deleted, GarbageCollectionOptions, GarbageCollectionTarget}; pub use self::store_read::{LatestAtQuery, RangeQuery}; pub use self::store_stats::{DataStoreRowStats, DataStoreStats, EntityStats}; pub use self::store_write::{WriteError, WriteResult}; diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index 36820a06a7eb..7ad67f1ae6d5 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -1,6 +1,7 @@ use ahash::{HashMap, HashSet}; -use re_log_types::{RowId, SizeBytes as _, TimeInt, TimeRange}; +use nohash_hasher::IntMap; +use re_log_types::{EntityPathHash, RowId, SizeBytes as _, TimeInt, TimeRange, Timeline}; use re_types::ComponentName; use crate::{ @@ -58,6 +59,18 @@ impl std::fmt::Display for GarbageCollectionTarget { } } +#[derive(Default)] +pub struct Deleted { + /// What rows where deleted? + pub row_ids: HashSet, + + /// What time points where deleted for each entity? + pub timeful: IntMap>>, + + /// For each entity, how many timeless entries were deleted? + pub timeless: IntMap, +} + impl DataStore { /// Triggers a garbage collection according to the desired `target`. /// @@ -102,7 +115,7 @@ impl DataStore { // when purging data. // // TODO(#1823): Workload specific optimizations. - pub fn gc(&mut self, options: GarbageCollectionOptions) -> (Vec, DataStoreStats) { + pub fn gc(&mut self, options: GarbageCollectionOptions) -> (Deleted, DataStoreStats) { re_tracing::profile_function!(); self.gc_id += 1; @@ -114,7 +127,7 @@ impl DataStore { let protected_rows = self.find_all_protected_rows(options.protect_latest); - let mut row_ids = match options.target { + let mut deleted = match options.target { GarbageCollectionTarget::DropAtLeastFraction(p) => { assert!((0.0..=1.0).contains(&p)); @@ -153,7 +166,7 @@ impl DataStore { }; if options.purge_empty_tables { - row_ids.extend(self.purge_empty_tables()); + deleted.row_ids.extend(self.purge_empty_tables()); } #[cfg(debug_assertions)] @@ -177,7 +190,7 @@ impl DataStore { let stats_diff = stats_before - stats_after; - (row_ids, stats_diff) + (deleted, stats_diff) } /// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store. @@ -194,23 +207,20 @@ impl DataStore { mut num_bytes_to_drop: f64, include_timeless: bool, protected_rows: &HashSet, - ) -> Vec { + ) -> Deleted { re_tracing::profile_function!(); - let mut row_ids = Vec::new(); + let mut deleted = Deleted::default(); // The algorithm is straightforward: // 1. Find the the oldest `RowId` that is not protected // 2. Find all tables that potentially hold data associated with that `RowId` // 3. Drop the associated row and account for the space we got back - let mut candidate_rows = self.metadata_registry.registry.iter(); - - while num_bytes_to_drop > 0.0 { - // Try to get the next candidate - let Some((row_id, timepoint)) = candidate_rows.next() else { + for (row_id, timepoint) in &self.metadata_registry.registry { + if num_bytes_to_drop <= 0.0 { break; - }; + } if protected_rows.contains(row_id) { continue; @@ -221,32 +231,39 @@ impl DataStore { self.metadata_registry.heap_size_bytes -= metadata_dropped_size_bytes; num_bytes_to_drop -= metadata_dropped_size_bytes as f64; - row_ids.push(*row_id); + deleted.row_ids.insert(*row_id); // find all tables that could possibly contain this `RowId` - let temporal_tables = self.tables.iter_mut().filter_map(|((timeline, _), table)| { - timepoint.get(timeline).map(|time| (*time, table)) - }); - for (time, table) in temporal_tables { - num_bytes_to_drop -= table.try_drop_row(*row_id, time.as_i64()) as f64; + for ((timeline, ent_path_hash), table) in &mut self.tables { + if let Some(time) = timepoint.get(timeline) { + num_bytes_to_drop -= table.try_drop_row(*row_id, time.as_i64()) as f64; + deleted + .timeful + .entry(*ent_path_hash) + .or_default() + .entry(*timeline) + .or_default() + .push(*time); + } } // TODO(jleibs): This is a worst-case removal-order. Would be nice to collect all the rows // first and then remove them in one pass. if timepoint.is_timeless() && include_timeless { - for table in self.timeless_tables.values_mut() { + for (ent_path_hash, table) in &mut self.timeless_tables { num_bytes_to_drop -= table.try_drop_row(*row_id) as f64; + *deleted.timeless.entry(*ent_path_hash).or_default() += 1; } } } // Purge the removed rows from the metadata_registry - for row_id in &row_ids { + for row_id in &deleted.row_ids { self.metadata_registry.remove(row_id); } - row_ids + deleted } /// For each `EntityPath`, `Timeline`, `Component` find the N latest [`RowId`]s. diff --git a/crates/re_arrow_store/tests/correctness.rs b/crates/re_arrow_store/tests/correctness.rs index 73bf5b3b525b..c3a2f8f65022 100644 --- a/crates/re_arrow_store/tests/correctness.rs +++ b/crates/re_arrow_store/tests/correctness.rs @@ -300,10 +300,10 @@ fn gc_correct() { let stats = DataStoreStats::from_store(&store); - let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything()); + let (deleted, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything()); let stats_diff = stats_diff + stats_empty; // account for fixed overhead - assert_eq!(row_ids.len() as u64, stats.total.num_rows); + assert_eq!(deleted.row_ids.len() as u64, stats.total.num_rows); assert_eq!( stats.metadata_registry.num_rows, stats_diff.metadata_registry.num_rows @@ -316,12 +316,12 @@ fn gc_correct() { sanity_unwrap(&mut store); check_still_readable(&store); - for row_id in &row_ids { + for row_id in &deleted.row_ids { assert!(store.get_msg_metadata(row_id).is_none()); } - let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything()); - assert!(row_ids.is_empty()); + let (deleted, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything()); + assert!(deleted.row_ids.is_empty()); assert_eq!(DataStoreStats::default(), stats_diff); sanity_unwrap(&mut store); diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 5701d20bc8a7..58ee3950448b 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -880,13 +880,13 @@ fn gc_impl(store: &mut DataStore) { let stats = DataStoreStats::from_store(store); - let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions { + let (deleted, stats_diff) = store.gc(GarbageCollectionOptions { target: GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0), gc_timeless: false, protect_latest: 0, purge_empty_tables: false, }); - for row_id in &row_ids { + for row_id in &deleted.row_ids { assert!(store.get_msg_metadata(row_id).is_none()); } diff --git a/crates/re_data_store/src/store_db.rs b/crates/re_data_store/src/store_db.rs index 839c1efc5d17..9cf1abd5ad48 100644 --- a/crates/re_data_store/src/store_db.rs +++ b/crates/re_data_store/src/store_db.rs @@ -373,7 +373,7 @@ impl StoreDb { re_tracing::profile_function!(); assert!((0.0..=1.0).contains(&fraction_to_purge)); - let (drop_row_ids, stats_diff) = self.entity_db.data_store.gc(GarbageCollectionOptions { + let (deleted, stats_diff) = self.entity_db.data_store.gc(GarbageCollectionOptions { target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction( fraction_to_purge as _, ), @@ -382,12 +382,11 @@ impl StoreDb { purge_empty_tables: false, }); re_log::trace!( - num_row_ids_dropped = drop_row_ids.len(), + num_row_ids_dropped = deleted.row_ids.len(), size_bytes_dropped = re_format::format_bytes(stats_diff.total.num_bytes as _), "purged datastore" ); - let drop_row_ids: ahash::HashSet<_> = drop_row_ids.into_iter().collect(); let cutoff_times = self.entity_db.data_store.oldest_time_per_timeline(); let Self { @@ -400,10 +399,10 @@ impl StoreDb { { re_tracing::profile_scope!("entity_op_msgs"); - entity_op_msgs.retain(|row_id, _| !drop_row_ids.contains(row_id)); + entity_op_msgs.retain(|row_id, _| !deleted.row_ids.contains(row_id)); } - entity_db.purge(&cutoff_times, &drop_row_ids); + entity_db.purge(&cutoff_times, &deleted.row_ids); } /// Key used for sorting recordings in the UI. From d423bdf370649c050bf9557a20e315445bc8a0d4 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 19 Sep 2023 12:17:08 +0200 Subject: [PATCH 05/14] Fix: use `rerun_example_` prefix in .ipynb files --- examples/python/notebook/blueprint.ipynb | 2 +- examples/python/notebook/cube.ipynb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/python/notebook/blueprint.ipynb b/examples/python/notebook/blueprint.ipynb index f9161e408b55..53e46a60be76 100644 --- a/examples/python/notebook/blueprint.ipynb +++ b/examples/python/notebook/blueprint.ipynb @@ -33,7 +33,7 @@ "metadata": {}, "outputs": [], "source": [ - "rr.init(\"Blueprint demo\")\n", + "rr.init(\"rerun_example_blueprint_demo\")\n", "rr.start_web_viewer_server()\n", "\n", "rec = rr.memory_recording()" diff --git a/examples/python/notebook/cube.ipynb b/examples/python/notebook/cube.ipynb index 413210a46420..1d378bfbab82 100644 --- a/examples/python/notebook/cube.ipynb +++ b/examples/python/notebook/cube.ipynb @@ -22,7 +22,7 @@ "import numpy as np\n", "import rerun as rr # pip install rerun-sdk\n", "\n", - "rr.init(\"cube\")" + "rr.init(\"rerun_example_cube\")" ] }, { From 804459706d6a2f05bd4700e9211d29edb2d99326 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 19 Sep 2023 13:53:14 +0200 Subject: [PATCH 06/14] Make our crash handler output in the same format as the standard panic --- crates/re_crash_handler/src/lib.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/crates/re_crash_handler/src/lib.rs b/crates/re_crash_handler/src/lib.rs index 293861b9d50f..45f2f6ef787e 100644 --- a/crates/re_crash_handler/src/lib.rs +++ b/crates/re_crash_handler/src/lib.rs @@ -45,15 +45,11 @@ fn install_panic_hook(_build_info: BuildInfo) { .name() .map_or_else(|| format!("{:?}", thread.id()), |name| name.to_owned()); - let file_line_suffix = if let Some(file_line) = &file_line { - format!(", {file_line}") - } else { - String::new() - }; - - eprintln!( - "\nthread '{thread_name}' panicked at '{msg}'{file_line_suffix}\n\n{callstack}" - ); + eprintln!("\nthread '{thread_name}' panicked at '{msg}'"); + if let Some(file_line) = &file_line { + eprintln!("{file_line}"); + } + eprintln!("stack backtrace:\n{callstack}"); } else { // This prints the panic message and callstack: (*previous_panic_hook)(panic_info); From d0d39590b56332378306d625a9ac3e903b0a0ff0 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 19 Sep 2023 13:54:30 +0200 Subject: [PATCH 07/14] Make it easy to show the "/" root in the streams view --- crates/re_time_panel/src/lib.rs | 46 +++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/crates/re_time_panel/src/lib.rs b/crates/re_time_panel/src/lib.rs index 62a5b491ce2e..8b08700e449f 100644 --- a/crates/re_time_panel/src/lib.rs +++ b/crates/re_time_panel/src/lib.rs @@ -378,14 +378,30 @@ impl TimePanel { if time_area_response.dragged_by(PointerButton::Primary) { ui.scroll_with_delta(Vec2::Y * time_area_response.drag_delta().y); } - self.show_children( - ctx, - time_area_response, - time_area_painter, - tree_max_y, - &ctx.store_db.entity_db.tree, - ui, - ); + + // Show "/" on top? + let show_root = false; + + if show_root { + self.show_tree( + ctx, + time_area_response, + time_area_painter, + tree_max_y, + None, + &ctx.store_db.entity_db.tree, + ui, + ); + } else { + self.show_children( + ctx, + time_area_response, + time_area_painter, + tree_max_y, + &ctx.store_db.entity_db.tree, + ui, + ); + } }); } @@ -396,7 +412,7 @@ impl TimePanel { time_area_response: &egui::Response, time_area_painter: &egui::Painter, tree_max_y: f32, - last_path_part: &EntityPathPart, + last_path_part: Option<&EntityPathPart>, tree: &EntityTree, ui: &mut egui::Ui, ) { @@ -409,10 +425,14 @@ impl TimePanel { } // The last part of the the path component - let text = if tree.is_leaf() { - last_path_part.to_string() + let text = if let Some(last_path_part) = last_path_part { + if tree.is_leaf() { + last_path_part.to_string() + } else { + format!("{last_path_part}/") // show we have children with a / + } } else { - format!("{last_path_part}/") // show we have children with a / + "/".to_owned() }; let collapsing_header_id = ui.make_persistent_id(&tree.path); @@ -513,7 +533,7 @@ impl TimePanel { time_area_response, time_area_painter, tree_max_y, - last_component, + Some(last_component), child, ui, ); From 60e46db2e44ec2441f4301b71f995c2a0a16fa56 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 19 Sep 2023 13:55:55 +0200 Subject: [PATCH 08/14] Remove a small oft-hit profile scope --- crates/re_arrow_store/src/store_read.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/re_arrow_store/src/store_read.rs b/crates/re_arrow_store/src/store_read.rs index 06803e9a0eef..f3b74b6c2cfa 100644 --- a/crates/re_arrow_store/src/store_read.rs +++ b/crates/re_arrow_store/src/store_read.rs @@ -657,10 +657,6 @@ impl IndexedTable { &mut self, time_range: impl RangeBounds, ) -> impl Iterator { - // Beware! This merely measures the time it takes to gather all the necessary metadata - // for building the returned iterator. - re_tracing::profile_function!(); - self.buckets .range_mut(time_range) .rev() From f3d8fae8abc15b1931e063e70658e30206be2b5d Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 19 Sep 2023 14:01:47 +0200 Subject: [PATCH 09/14] Use what the GC returns to purge the secondary indices --- crates/re_data_store/src/entity_tree.rs | 155 ++++++++++++++++++------ crates/re_data_store/src/store_db.rs | 29 +++-- 2 files changed, 137 insertions(+), 47 deletions(-) diff --git a/crates/re_data_store/src/entity_tree.rs b/crates/re_data_store/src/entity_tree.rs index 5c8922150830..4bc8d91bcc27 100644 --- a/crates/re_data_store/src/entity_tree.rs +++ b/crates/re_data_store/src/entity_tree.rs @@ -1,17 +1,33 @@ use std::collections::{BTreeMap, BTreeSet}; use itertools::Itertools; +use nohash_hasher::IntMap; + use re_log_types::{ - ComponentPath, EntityPath, EntityPathPart, PathOp, RowId, TimeInt, TimePoint, Timeline, + ComponentPath, EntityPath, EntityPathHash, EntityPathPart, PathOp, RowId, TimeInt, TimePoint, + Timeline, }; use re_types::{ComponentName, Loggable}; // ---------------------------------------------------------------------------- +/// Book-keeping required after a GC purge to keep track +/// of what was removed from children, so it can also be removed +/// from the parents. +#[derive(Default)] +pub struct ActuallyDeleted { + pub timeful: IntMap>, + pub timeless: u64, +} + +// ---------------------------------------------------------------------------- + /// Number of messages per time pub type TimeHistogram = re_int_histogram::Int64Histogram; -/// Number of messages per time per timeline +/// Number of messages per time per timeline. +/// +/// Does NOT include timeless. #[derive(Default)] pub struct TimeHistogramPerTimeline(BTreeMap); @@ -35,11 +51,42 @@ impl TimeHistogramPerTimeline { pub fn iter_mut(&mut self) -> impl ExactSizeIterator { self.0.iter_mut() } + + pub fn purge( + &mut self, + ent_path_hash: EntityPathHash, + deleted: &re_arrow_store::Deleted, + deleted_by_us_and_children: &mut ActuallyDeleted, + ) { + re_tracing::profile_function!(); + + for (timeline, histogram) in &mut self.0 { + if let Some(times) = deleted + .timeful + .get(&ent_path_hash) + .and_then(|map| map.get(timeline)) + { + for &time in times { + histogram.decrement(time.as_i64(), 1); + + deleted_by_us_and_children + .timeful + .entry(*timeline) + .or_default() + .push(time); + } + } + + // NOTE: we don't include timeless in the histogram. + } + } } // ---------------------------------------------------------------------------- -/// Number of messages per time per timeline +/// Number of messages per time per timeline. +/// +/// Does NOT include timeless. pub struct TimesPerTimeline(BTreeMap>); impl TimesPerTimeline { @@ -51,16 +98,28 @@ impl TimesPerTimeline { self.0.get(timeline) } + pub fn get_mut(&mut self, timeline: &Timeline) -> Option<&mut BTreeSet> { + self.0.get_mut(timeline) + } + pub fn insert(&mut self, timeline: Timeline, time: TimeInt) { self.0.entry(timeline).or_default().insert(time); } - pub fn purge(&mut self, cutoff_times: &std::collections::BTreeMap) { - for (timeline, time_set) in &mut self.0 { - if let Some(cutoff_time) = cutoff_times.get(timeline) { - time_set.retain(|time| cutoff_time <= time); + pub fn purge(&mut self, deleted: &re_arrow_store::Deleted) { + re_tracing::profile_function!(); + + for deleted_times in deleted.timeful.values() { + for (timeline, time_set) in &mut self.0 { + if let Some(times) = deleted_times.get(timeline) { + for &time in times { + time_set.remove(&time); + } + } } } + + // NOTE: we don't include timeless. } pub fn has_timeline(&self, timeline: &Timeline) -> bool { @@ -301,45 +360,82 @@ impl EntityTree { /// Purge all times before the cutoff, or in the given set pub fn purge( &mut self, - cutoff_times: &BTreeMap, - drop_row_ids: &ahash::HashSet, + deleted: &re_arrow_store::Deleted, + deleted_by_us_and_children: &mut ActuallyDeleted, ) { let Self { - path: _, + path, children, prefix_times, - num_timeless_messages: _, + num_timeless_messages, nonrecursive_clears, recursive_clears, - components: fields, + components, } = self; + if let Some(decrement) = deleted.timeless.get(&path.hash()) { + *num_timeless_messages = num_timeless_messages.saturating_sub(*decrement as _); + deleted_by_us_and_children.timeless += decrement; + } + { re_tracing::profile_scope!("prefix_times"); - for (timeline, histogram) in &mut prefix_times.0 { - if let Some(cutoff_time) = cutoff_times.get(timeline) { - histogram.remove(..cutoff_time.as_i64()); - } - } + prefix_times.purge(path.hash(), deleted, deleted_by_us_and_children); } { re_tracing::profile_scope!("nonrecursive_clears"); - nonrecursive_clears.retain(|row_id, _| !drop_row_ids.contains(row_id)); + nonrecursive_clears.retain(|row_id, _| !deleted.row_ids.contains(row_id)); } { re_tracing::profile_scope!("recursive_clears"); - recursive_clears.retain(|row_id, _| !drop_row_ids.contains(row_id)); + recursive_clears.retain(|row_id, _| !deleted.row_ids.contains(row_id)); } { - re_tracing::profile_scope!("fields"); - for columns in fields.values_mut() { - columns.purge(cutoff_times); + re_tracing::profile_scope!("ComponentStats"); + for stats in components.values_mut() { + let ComponentStats { + times, + num_timeless_messages, + } = stats; + + times.purge(path.hash(), deleted, deleted_by_us_and_children); + + if let Some(decrement) = deleted.timeless.get(&path.hash()) { + *num_timeless_messages = num_timeless_messages.saturating_sub(*decrement as _); + } } } + let mut deleted_by_children = ActuallyDeleted::default(); + for child in children.values_mut() { - child.purge(cutoff_times, drop_row_ids); + child.purge(deleted, &mut deleted_by_children); + } + + { + re_tracing::profile_scope!("apply_children"); + let ActuallyDeleted { timeful, timeless } = deleted_by_children; + + // Apply things that where deleted in children. + // For instance - if `/foo/bar` has some things deleted, + // we need to note it in `/foo` and `/` too. + for (timeline, mut times) in timeful { + if let Some(time_histogram) = prefix_times.0.get_mut(&timeline) { + for &time in × { + time_histogram.decrement(time.as_i64(), 1); + } + } + + deleted_by_us_and_children + .timeful + .entry(timeline) + .or_default() + .append(&mut times); + } + + *num_timeless_messages = num_timeless_messages.saturating_sub(timeless as _); + deleted_by_us_and_children.timeless += timeless; } } @@ -380,17 +476,4 @@ impl ComponentStats { } } } - - pub fn purge(&mut self, cutoff_times: &BTreeMap) { - let Self { - times, - num_timeless_messages: _, - } = self; - - for (timeline, histogram) in &mut times.0 { - if let Some(cutoff_time) = cutoff_times.get(timeline) { - histogram.remove(..cutoff_time.as_i64()); - } - } - } } diff --git a/crates/re_data_store/src/store_db.rs b/crates/re_data_store/src/store_db.rs index 9cf1abd5ad48..cee9958f3825 100644 --- a/crates/re_data_store/src/store_db.rs +++ b/crates/re_data_store/src/store_db.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use nohash_hasher::IntMap; -use re_arrow_store::{DataStoreConfig, GarbageCollectionOptions, TimeInt}; +use re_arrow_store::{DataStoreConfig, GarbageCollectionOptions}; use re_log_types::{ ApplicationId, ArrowMsg, ComponentPath, DataCell, DataRow, DataTable, EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, PathOp, RowId, SetStoreInfo, StoreId, StoreInfo, @@ -175,11 +175,7 @@ impl EntityDb { } } - pub fn purge( - &mut self, - cutoff_times: &std::collections::BTreeMap, - drop_row_ids: &ahash::HashSet, - ) { + pub fn purge(&mut self, deleted: &re_arrow_store::Deleted) { re_tracing::profile_function!(); let Self { @@ -191,12 +187,25 @@ impl EntityDb { { re_tracing::profile_scope!("times_per_timeline"); - times_per_timeline.purge(cutoff_times); + times_per_timeline.purge(deleted); } + let mut actually_deleted = Default::default(); + { re_tracing::profile_scope!("tree"); - tree.purge(cutoff_times, drop_row_ids); + tree.purge(deleted, &mut actually_deleted); + } + + { + re_tracing::profile_scope!("times_per_timeline"); + for (timeline, times) in actually_deleted.timeful { + if let Some(time_set) = times_per_timeline.get_mut(&timeline) { + for time in times { + time_set.remove(&time); + } + } + } } } } @@ -387,8 +396,6 @@ impl StoreDb { "purged datastore" ); - let cutoff_times = self.entity_db.data_store.oldest_time_per_timeline(); - let Self { store_id: _, entity_op_msgs, @@ -402,7 +409,7 @@ impl StoreDb { entity_op_msgs.retain(|row_id, _| !deleted.row_ids.contains(row_id)); } - entity_db.purge(&cutoff_times, &deleted.row_ids); + entity_db.purge(&deleted); } /// Key used for sorting recordings in the UI. From 8e11f3bea63ecfb8b0b63eb16d2ae04eb613ba40 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 19 Sep 2023 15:48:03 +0200 Subject: [PATCH 10/14] Improve histogram docs --- crates/re_int_histogram/src/tree.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/re_int_histogram/src/tree.rs b/crates/re_int_histogram/src/tree.rs index 67b42e9af71d..0b2a2fe7d547 100644 --- a/crates/re_int_histogram/src/tree.rs +++ b/crates/re_int_histogram/src/tree.rs @@ -1,6 +1,10 @@ -//! The histogram is implemented as a tree. +//! The histogram is implemented as a trie. //! -//! The branches are based on the next few bits of the key (also known as the "address"). +//! Each node in the trie stores a count of a key/address sharing a prefix up to `depth * LEVEL_STEP` bits. +//! The key/address is always 64 bits. +//! +//! There are branch nodes, and two types of leaf nodes: dense, and sparse. +//! Dense leaves are only found at the very bottom of the trie. use smallvec::{smallvec, SmallVec}; From f36c9809fdf9a907547e4b262a0759fe92377f10 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 20 Sep 2023 11:09:43 +0200 Subject: [PATCH 11/14] tree cleanup: move similar code closer, and simplify tree on decrement --- crates/re_int_histogram/src/tree.rs | 265 ++++++++++++++++------------ 1 file changed, 154 insertions(+), 111 deletions(-) diff --git a/crates/re_int_histogram/src/tree.rs b/crates/re_int_histogram/src/tree.rs index 0b2a2fe7d547..1cf9f35a1565 100644 --- a/crates/re_int_histogram/src/tree.rs +++ b/crates/re_int_histogram/src/tree.rs @@ -138,7 +138,7 @@ impl Int64Histogram { } } - /// Decremenmt the count for the given key. + /// Decrement the count for the given key. /// /// The decrement is saturating. /// @@ -154,6 +154,20 @@ impl Int64Histogram { } } + /// Remove all data in the given range. + /// + /// Returns how much count was removed. + /// + /// Currently the implementation is optimized for the case of removing + /// large continuous ranges. + /// Removing many small, scattered ranges (e.g. individual elements) + /// may cause performance problems! + /// This can be remedied with some more code. + pub fn remove(&mut self, range: impl std::ops::RangeBounds) -> u64 { + let range = range_u64_from_range_bounds(range); + self.root.remove(0, ROOT_LEVEL, range) + } + /// Is the total count zero? /// /// Note that incrementing a key with zero is a no-op and @@ -218,20 +232,6 @@ impl Int64Histogram { }, } } - - /// Remove all data in the given range. - /// - /// Returns how much count was removed. - /// - /// Currently the implementation is optimized for the case of removing - /// large continuous ranges. - /// Removing many small, scattered ranges (e.g. individual elements) - /// may cause performance problems! - /// This can be remedied with some more code. - pub fn remove(&mut self, range: impl std::ops::RangeBounds) -> u64 { - let range = range_u64_from_range_bounds(range); - self.root.remove(0, ROOT_LEVEL, range) - } } /// An iterator over an [`Int64Histogram`]. @@ -333,14 +333,43 @@ impl Node { } } + /// Returns how much the total count decreased by. + #[must_use] fn decrement(&mut self, level: Level, addr: u64, dec: u32) -> u32 { match self { - Node::BranchNode(node) => node.decrement(level, addr, dec), + Node::BranchNode(node) => { + let count_loss = node.decrement(level, addr, dec); + if node.is_empty() { + *self = Node::SparseLeaf(SparseLeaf::default()); + } + // TODO(emilk): if we only have leaf children (sparse or dense) + // and the number of keys in all of them is less then `MAX_SPARSE_LEAF_LEN`, + // then we should convert this BranchNode into a SparseLeaf. + count_loss + } Node::SparseLeaf(sparse) => sparse.decrement(addr, dec), Node::DenseLeaf(dense) => dense.decrement(addr, dec), } } + /// Returns how much the total count decreased by. + fn remove(&mut self, my_addr: u64, my_level: Level, range: RangeU64) -> u64 { + match self { + Node::BranchNode(node) => { + let count_loss = node.remove(my_addr, my_level, range); + if node.is_empty() { + *self = Node::SparseLeaf(SparseLeaf::default()); + } + // TODO(emilk): if we only have leaf children (sparse or dense) + // and the number of keys in all of them is less then `MAX_SPARSE_LEAF_LEN`, + // then we should convert this BranchNode into a SparseLeaf. + count_loss + } + Node::SparseLeaf(sparse) => sparse.remove(range), + Node::DenseLeaf(dense) => dense.remove(my_addr, range), + } + } + fn is_empty(&self) -> bool { match self { Node::BranchNode(node) => node.is_empty(), @@ -380,24 +409,6 @@ impl Node { Node::DenseLeaf(dense) => dense.range_count(my_addr, range), } } - - /// Returns how much the total count decreased by. - fn remove(&mut self, my_addr: u64, my_level: Level, range: RangeU64) -> u64 { - match self { - Node::BranchNode(node) => { - let count_loss = node.remove(my_addr, my_level, range); - if node.is_empty() { - *self = Node::SparseLeaf(SparseLeaf::default()); - } - // TODO(emilk): if we only have leaf children (sparse or dense) - // and the number of keys in all of them is less then `MAX_SPARSE_LEAF_LEN`, - // then we should convert this BranchNode into a SparseLeaf. - count_loss - } - Node::SparseLeaf(sparse) => sparse.remove(range), - Node::DenseLeaf(dense) => dense.remove(my_addr, range), - } - } } impl BranchNode { @@ -411,22 +422,56 @@ impl BranchNode { self.total_count += inc as u64; } + /// Returns how much the total count decreased by. + #[must_use] fn decrement(&mut self, level: Level, addr: u64, dec: u32) -> u32 { debug_assert!(level != BOTTOM_LEVEL); let child_level = level - LEVEL_STEP; let top_addr = (addr >> level) & ADDR_MASK; if let Some(child) = &mut self.children[top_addr as usize] { - let actually_decremented = child.decrement(child_level, addr, dec); + let count_loss = child.decrement(child_level, addr, dec); if child.is_empty() { self.children[top_addr as usize] = None; } - self.total_count = self.total_count.saturating_sub(actually_decremented as _); - actually_decremented + self.total_count -= count_loss as u64; + count_loss } else { 0 } } + /// Returns how much the total count decreased by. + #[must_use] + fn remove(&mut self, my_addr: u64, my_level: Level, range: RangeU64) -> u64 { + debug_assert!(range.min <= range.max); + debug_assert!(my_level != BOTTOM_LEVEL); + + let mut count_loss = 0; + let (child_level, child_size) = child_level_and_size(my_level); + + for ci in 0..NUM_CHILDREN_IN_NODE { + let child_addr = my_addr + ci * child_size; + let child_range = RangeU64::new(child_addr, child_addr + (child_size - 1)); + if range.intersects(child_range) { + if let Some(child) = &mut self.children[ci as usize] { + if range.contains_all_of(child_range) { + count_loss += child.total_count(); + self.children[ci as usize] = None; + } else { + count_loss += child.remove(child_addr, child_level, range); + if child.is_empty() { + self.children[ci as usize] = None; + } + } + } + } + } + + self.total_count -= count_loss; + + count_loss + } + fn is_empty(&self) -> bool { self.total_count == 0 } @@ -491,38 +536,6 @@ impl BranchNode { total_count } - - /// Returns how much the total count decreased by. - #[must_use] - fn remove(&mut self, my_addr: u64, my_level: Level, range: RangeU64) -> u64 { - debug_assert!(range.min <= range.max); - debug_assert!(my_level != BOTTOM_LEVEL); - - let mut count_loss = 0; - let (child_level, child_size) = child_level_and_size(my_level); - - for ci in 0..NUM_CHILDREN_IN_NODE { - let child_addr = my_addr + ci * child_size; - let child_range = RangeU64::new(child_addr, child_addr + (child_size - 1)); - if range.intersects(child_range) { - if let Some(child) = &mut self.children[ci as usize] { - if range.contains_all_of(child_range) { - count_loss += child.total_count(); - self.children[ci as usize] = None; - } else { - count_loss += child.remove(child_addr, child_level, range); - if child.is_empty() { - self.children[ci as usize] = None; - } - } - } - } - } - - self.total_count -= count_loss; - - count_loss - } } impl SparseLeaf { @@ -559,18 +572,27 @@ impl SparseLeaf { } } + /// Returns how much the total count decreased by. + #[must_use] fn decrement(&mut self, abs_addr: u64, dec: u32) -> u32 { + debug_assert_eq!(self.addrs.len(), self.counts.len()); + let index = self.addrs.partition_point(|&addr| addr < abs_addr); if let (Some(addr), Some(count)) = (self.addrs.get_mut(index), self.counts.get_mut(index)) { if *addr == abs_addr { - return if dec <= *count { + return if dec < *count { *count -= dec; dec } else { - let actually_decremented = *count; - *count = 0; - actually_decremented + let count_loss = *count; + + // The bucket is now empty - remove it: + self.addrs.remove(index); + self.counts.remove(index); + debug_assert_eq!(self.addrs.len(), self.counts.len()); + + count_loss }; } } @@ -578,6 +600,25 @@ impl SparseLeaf { 0 // not found } + /// Returns how much the total count decreased by. + #[must_use] + fn remove(&mut self, range: RangeU64) -> u64 { + debug_assert_eq!(self.addrs.len(), self.counts.len()); + + let mut count_loss = 0; + for (key, count) in self.addrs.iter().zip(&mut self.counts) { + if range.contains(*key) { + count_loss += *count as u64; + *count = 0; + } + } + + self.addrs.retain(|addr| !range.contains(*addr)); + self.counts.retain(|count| *count > 0); + debug_assert_eq!(self.addrs.len(), self.counts.len()); + count_loss + } + fn is_empty(&self) -> bool { self.addrs.is_empty() } @@ -603,25 +644,6 @@ impl SparseLeaf { } total } - - /// Returns how much the total count decreased by. - #[must_use] - fn remove(&mut self, range: RangeU64) -> u64 { - debug_assert_eq!(self.addrs.len(), self.counts.len()); - - let mut count_loss = 0; - for (key, count) in self.addrs.iter().zip(&mut self.counts) { - if range.contains(*key) { - count_loss += *count as u64; - *count = 0; - } - } - - self.addrs.retain(|addr| !range.contains(*addr)); - self.counts.retain(|count| *count > 0); - debug_assert_eq!(self.addrs.len(), self.counts.len()); - count_loss - } } impl DenseLeaf { @@ -629,18 +651,35 @@ impl DenseLeaf { self.counts[(abs_addr & (NUM_CHILDREN_IN_DENSE - 1)) as usize] += inc; } + /// Returns how much the total count decreased by. + #[must_use] fn decrement(&mut self, abs_addr: u64, dec: u32) -> u32 { - let bucket = &mut self.counts[(abs_addr & (NUM_CHILDREN_IN_DENSE - 1)) as usize]; - if dec <= *bucket { + let bucket_index = (abs_addr & (NUM_CHILDREN_IN_DENSE - 1)) as usize; + let bucket = &mut self.counts[bucket_index]; + if dec < *bucket { *bucket -= dec; dec } else { - let actually_decremented = *bucket; + let count_loss = *bucket; *bucket = 0; - actually_decremented + count_loss } } + /// Returns how much the total count decreased by. + #[must_use] + fn remove(&mut self, my_addr: u64, range: RangeU64) -> u64 { + debug_assert!(range.min <= range.max); + let mut count_loss = 0; + for (i, count) in self.counts.iter_mut().enumerate() { + if range.contains(my_addr + i as u64) { + count_loss += *count as u64; + *count = 0; + } + } + count_loss + } + fn is_empty(&self) -> bool { self.total_count() == 0 } @@ -677,20 +716,6 @@ impl DenseLeaf { } total_count } - - /// Returns how much the total count decreased by. - #[must_use] - fn remove(&mut self, my_addr: u64, range: RangeU64) -> u64 { - debug_assert!(range.min <= range.max); - let mut count_loss = 0; - for (i, count) in self.counts.iter_mut().enumerate() { - if range.contains(my_addr + i as u64) { - count_loss += *count as u64; - *count = 0; - } - } - count_loss - } } // ---------------------------------------------------------------------------- @@ -735,7 +760,7 @@ impl<'a> Iterator for TreeIterator<'a> { if child_size <= self.cutoff_size && self.range.contains_all_of(child_range) { - // We can return the whole child, but first find a tight range of i: + // We can return the whole child, but first find a tight range of it: if let (Some(min_key), Some(max_key)) = ( child.min_key(child_addr, child_level), child.max_key(child_addr, child_level), @@ -1003,18 +1028,21 @@ mod tests { } assert_eq!((set.min_key(), set.max_key()), (Some(0), Some(99))); + assert_eq!(set.range(.., 1).count(), 100); for i in 0..100 { assert_eq!(set.decrement(i, 1), 1); } assert_eq!((set.min_key(), set.max_key()), (Some(0), Some(99))); + assert_eq!(set.range(.., 1).count(), 100); for i in 0..50 { assert_eq!(set.decrement(i, 1), 1); } assert_eq!((set.min_key(), set.max_key()), (Some(50), Some(99))); + assert_eq!(set.range(.., 1).count(), 50); for i in 0..50 { assert_eq!( @@ -1023,5 +1051,20 @@ mod tests { "Should already have been decremented" ); } + + assert_eq!((set.min_key(), set.max_key()), (Some(50), Some(99))); + assert_eq!(set.range(.., 1).count(), 50); + + for i in 50..99 { + assert_eq!(set.decrement(i, 1), 1); + } + + assert_eq!((set.min_key(), set.max_key()), (Some(99), Some(99))); + assert_eq!(set.range(.., 1).count(), 1); + + assert_eq!(set.decrement(99, 1), 1); + + assert_eq!((set.min_key(), set.max_key()), (None, None)); + assert_eq!(set.range(.., 1).count(), 0); } } From f01aa3517eed82203331639aab1d8223f29125ac Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 20 Sep 2023 11:21:38 +0200 Subject: [PATCH 12/14] Keep track of which components where deleted --- crates/re_arrow_store/src/store_gc.rs | 55 +++++++--- crates/re_data_store/src/entity_tree.rs | 133 +++++++++++------------- crates/re_data_store/src/store_db.rs | 5 - 3 files changed, 97 insertions(+), 96 deletions(-) diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index 7ad67f1ae6d5..c70143ac2228 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -64,11 +64,11 @@ pub struct Deleted { /// What rows where deleted? pub row_ids: HashSet, - /// What time points where deleted for each entity? - pub timeful: IntMap>>, + /// What time points where deleted for each entity+timeline+component? + pub timeful: IntMap>>>, - /// For each entity, how many timeless entries were deleted? - pub timeless: IntMap, + /// For each entity+component, how many timeless entries were deleted? + pub timeless: IntMap>, } impl DataStore { @@ -237,14 +237,15 @@ impl DataStore { for ((timeline, ent_path_hash), table) in &mut self.tables { if let Some(time) = timepoint.get(timeline) { - num_bytes_to_drop -= table.try_drop_row(*row_id, time.as_i64()) as f64; - deleted + let deleted_comps = deleted .timeful .entry(*ent_path_hash) .or_default() .entry(*timeline) - .or_default() - .push(*time); + .or_default(); + + num_bytes_to_drop -= + table.try_drop_row(*row_id, time.as_i64(), deleted_comps) as f64; } } @@ -252,8 +253,8 @@ impl DataStore { // first and then remove them in one pass. if timepoint.is_timeless() && include_timeless { for (ent_path_hash, table) in &mut self.timeless_tables { - num_bytes_to_drop -= table.try_drop_row(*row_id) as f64; - *deleted.timeless.entry(*ent_path_hash).or_default() += 1; + let deleted_comps = deleted.timeless.entry(*ent_path_hash).or_default(); + num_bytes_to_drop -= table.try_drop_row(*row_id, deleted_comps) as f64; } } } @@ -420,7 +421,13 @@ impl IndexedTable { /// specified `time`. /// /// Returns how many bytes were actually dropped, or zero if the row wasn't found. - fn try_drop_row(&mut self, row_id: RowId, time: i64) -> u64 { + fn try_drop_row( + &mut self, + row_id: RowId, + time: i64, + deleted_comps: &mut IntMap>, + ) -> u64 { + re_tracing::profile_function!(); let table_has_more_than_one_bucket = self.buckets.len() > 1; let (bucket_key, bucket) = self.find_bucket_mut(time.into()); @@ -428,7 +435,7 @@ impl IndexedTable { let mut dropped_num_bytes = { let inner = &mut *bucket.inner.write(); - inner.try_drop_row(row_id, time) + inner.try_drop_row(row_id, time, deleted_comps) }; // NOTE: We always need to keep at least one bucket alive, otherwise we have @@ -464,7 +471,12 @@ impl IndexedBucketInner { /// specified `time`. /// /// Returns how many bytes were actually dropped, or zero if the row wasn't found. - fn try_drop_row(&mut self, row_id: RowId, time: i64) -> u64 { + fn try_drop_row( + &mut self, + row_id: RowId, + time: i64, + deleted_comps: &mut IntMap>, + ) -> u64 { self.sort(); let IndexedBucketInner { @@ -523,8 +535,12 @@ impl IndexedBucketInner { dropped_num_bytes += col_num_instances.swap_remove(row_index).total_size_bytes(); // each data column - for column in columns.values_mut() { + for (comp_name, column) in columns { dropped_num_bytes += column.0.swap_remove(row_index).total_size_bytes(); + deleted_comps + .entry(*comp_name) + .or_default() + .push(time.into()); } // NOTE: A single `RowId` cannot possibly have more than one datapoint for @@ -542,7 +558,13 @@ impl PersistentIndexedTable { /// Tries to drop the given `row_id` from the table. /// /// Returns how many bytes were actually dropped, or zero if the row wasn't found. - fn try_drop_row(&mut self, row_id: RowId) -> u64 { + fn try_drop_row( + &mut self, + row_id: RowId, + deleted_comps: &mut IntMap, + ) -> u64 { + re_tracing::profile_function!(); + let mut dropped_num_bytes = 0u64; let PersistentIndexedTable { @@ -579,8 +601,9 @@ impl PersistentIndexedTable { dropped_num_bytes += col_num_instances.remove(row_index).total_size_bytes(); // each data column - for column in columns.values_mut() { + for (comp_name, column) in columns { dropped_num_bytes += column.0.remove(row_index).total_size_bytes(); + *deleted_comps.entry(*comp_name).or_default() += 1; } } diff --git a/crates/re_data_store/src/entity_tree.rs b/crates/re_data_store/src/entity_tree.rs index 4bc8d91bcc27..ae64ae8484a7 100644 --- a/crates/re_data_store/src/entity_tree.rs +++ b/crates/re_data_store/src/entity_tree.rs @@ -4,8 +4,7 @@ use itertools::Itertools; use nohash_hasher::IntMap; use re_log_types::{ - ComponentPath, EntityPath, EntityPathHash, EntityPathPart, PathOp, RowId, TimeInt, TimePoint, - Timeline, + ComponentPath, EntityPath, EntityPathPart, PathOp, RowId, TimeInt, TimePoint, Timeline, }; use re_types::{ComponentName, Loggable}; @@ -20,6 +19,17 @@ pub struct ActuallyDeleted { pub timeless: u64, } +impl ActuallyDeleted { + fn append(&mut self, other: Self) { + let Self { timeful, timeless } = other; + + for (timeline, mut times) in timeful { + self.timeful.entry(timeline).or_default().append(&mut times); + } + self.timeless += timeless; + } +} + // ---------------------------------------------------------------------------- /// Number of messages per time @@ -52,28 +62,13 @@ impl TimeHistogramPerTimeline { self.0.iter_mut() } - pub fn purge( - &mut self, - ent_path_hash: EntityPathHash, - deleted: &re_arrow_store::Deleted, - deleted_by_us_and_children: &mut ActuallyDeleted, - ) { + pub fn purge(&mut self, deleted: &ActuallyDeleted) { re_tracing::profile_function!(); for (timeline, histogram) in &mut self.0 { - if let Some(times) = deleted - .timeful - .get(&ent_path_hash) - .and_then(|map| map.get(timeline)) - { + if let Some(times) = deleted.timeful.get(timeline) { for &time in times { histogram.decrement(time.as_i64(), 1); - - deleted_by_us_and_children - .timeful - .entry(*timeline) - .or_default() - .push(time); } } @@ -106,22 +101,6 @@ impl TimesPerTimeline { self.0.entry(timeline).or_default().insert(time); } - pub fn purge(&mut self, deleted: &re_arrow_store::Deleted) { - re_tracing::profile_function!(); - - for deleted_times in deleted.timeful.values() { - for (timeline, time_set) in &mut self.0 { - if let Some(times) = deleted_times.get(timeline) { - for &time in times { - time_set.remove(&time); - } - } - } - } - - // NOTE: we don't include timeless. - } - pub fn has_timeline(&self, timeline: &Timeline) -> bool { self.0.contains_key(timeline) } @@ -373,15 +352,6 @@ impl EntityTree { components, } = self; - if let Some(decrement) = deleted.timeless.get(&path.hash()) { - *num_timeless_messages = num_timeless_messages.saturating_sub(*decrement as _); - deleted_by_us_and_children.timeless += decrement; - } - - { - re_tracing::profile_scope!("prefix_times"); - prefix_times.purge(path.hash(), deleted, deleted_by_us_and_children); - } { re_tracing::profile_scope!("nonrecursive_clears"); nonrecursive_clears.retain(|row_id, _| !deleted.row_ids.contains(row_id)); @@ -391,52 +361,65 @@ impl EntityTree { recursive_clears.retain(|row_id, _| !deleted.row_ids.contains(row_id)); } + let mut deleted_by_children = ActuallyDeleted::default(); + + for child in children.values_mut() { + child.purge(deleted, &mut deleted_by_children); + } + { re_tracing::profile_scope!("ComponentStats"); - for stats in components.values_mut() { + + // The `deleted` stats are per component, so start here: + + for (comp_name, stats) in components { let ComponentStats { times, num_timeless_messages, } = stats; - times.purge(path.hash(), deleted, deleted_by_us_and_children); + for (timeline, histogram) in &mut times.0 { + if let Some(times) = deleted + .timeful + .get(&path.hash()) + .and_then(|map| map.get(timeline)) + .and_then(|map| map.get(comp_name)) + { + for &time in times { + histogram.decrement(time.as_i64(), 1); + + deleted_by_children + .timeful + .entry(*timeline) + .or_default() + .push(time); + } + } - if let Some(decrement) = deleted.timeless.get(&path.hash()) { - *num_timeless_messages = num_timeless_messages.saturating_sub(*decrement as _); + // NOTE: we don't include timeless in the histogram. } - } - } - let mut deleted_by_children = ActuallyDeleted::default(); - - for child in children.values_mut() { - child.purge(deleted, &mut deleted_by_children); + if let Some(num_deleted) = deleted + .timeless + .get(&path.hash()) + .and_then(|map| map.get(comp_name)) + { + *num_timeless_messages = + num_timeless_messages.saturating_sub(*num_deleted as _); + deleted_by_children.timeless += num_deleted; + } + } } { - re_tracing::profile_scope!("apply_children"); - let ActuallyDeleted { timeful, timeless } = deleted_by_children; - - // Apply things that where deleted in children. - // For instance - if `/foo/bar` has some things deleted, - // we need to note it in `/foo` and `/` too. - for (timeline, mut times) in timeful { - if let Some(time_histogram) = prefix_times.0.get_mut(&timeline) { - for &time in × { - time_histogram.decrement(time.as_i64(), 1); - } - } - - deleted_by_us_and_children - .timeful - .entry(timeline) - .or_default() - .append(&mut times); - } + // Apply what was deleted by children and by our components: + *num_timeless_messages = + num_timeless_messages.saturating_sub(deleted_by_us_and_children.timeless as _); - *num_timeless_messages = num_timeless_messages.saturating_sub(timeless as _); - deleted_by_us_and_children.timeless += timeless; + prefix_times.purge(&deleted_by_children); } + + deleted_by_us_and_children.append(deleted_by_children); } // Invokes visitor for `self` all children recursively. diff --git a/crates/re_data_store/src/store_db.rs b/crates/re_data_store/src/store_db.rs index cee9958f3825..d7dd5d10f315 100644 --- a/crates/re_data_store/src/store_db.rs +++ b/crates/re_data_store/src/store_db.rs @@ -185,11 +185,6 @@ impl EntityDb { data_store: _, // purged before this function is called } = self; - { - re_tracing::profile_scope!("times_per_timeline"); - times_per_timeline.purge(deleted); - } - let mut actually_deleted = Default::default(); { From c7ebb4c797240da8e862cbaf5accf422e92580b2 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 20 Sep 2023 11:21:48 +0200 Subject: [PATCH 13/14] Remove very small profiling scopes --- crates/re_arrow_store/src/store_read.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/re_arrow_store/src/store_read.rs b/crates/re_arrow_store/src/store_read.rs index f3b74b6c2cfa..730b388f27df 100644 --- a/crates/re_arrow_store/src/store_read.rs +++ b/crates/re_arrow_store/src/store_read.rs @@ -986,12 +986,8 @@ impl IndexedBucketInner { re_tracing::profile_scope!("data"); // shuffle component columns back into a sorted state for column in columns.values_mut() { - let mut source = { - re_tracing::profile_scope!("clone"); - column.clone() - }; + let mut source = column.clone(); { - re_tracing::profile_scope!("rotate"); for (from, to) in swaps.iter().copied() { column[to] = source[from].take(); } From 18cbc53fe96798cbe45ab84ddeb66aa183253e12 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 20 Sep 2023 11:27:01 +0200 Subject: [PATCH 14/14] Clean up tree docs/code --- crates/re_int_histogram/src/tree.rs | 34 +++++++++++++++++------------ 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/crates/re_int_histogram/src/tree.rs b/crates/re_int_histogram/src/tree.rs index 1cf9f35a1565..0b7e7f136483 100644 --- a/crates/re_int_histogram/src/tree.rs +++ b/crates/re_int_histogram/src/tree.rs @@ -297,6 +297,8 @@ struct SparseLeaf { /// making up (addr, count) pairs, /// sorted by `addr`. addrs: SmallVec<[u64; 3]>, + + /// The count may never be zero. counts: SmallVec<[u32; 3]>, } @@ -539,17 +541,6 @@ impl BranchNode { } impl SparseLeaf { - #[must_use] - fn overflow(self, level: Level) -> BranchNode { - debug_assert!(level != BOTTOM_LEVEL); - - let mut node = BranchNode::default(); - for (key, count) in self.addrs.iter().zip(&self.counts) { - node.increment(level, *key, *count); - } - node - } - #[must_use] fn increment(mut self, level: Level, abs_addr: u64, inc: u32) -> Node { let index = self.addrs.partition_point(|&addr| addr < abs_addr); @@ -566,12 +557,25 @@ impl SparseLeaf { self.counts.insert(index, inc); Node::SparseLeaf(self) } else { - let mut node = self.overflow(level); + // Overflow: + let mut node = self.into_branch_node(level); node.increment(level, abs_addr, inc); Node::BranchNode(node) } } + /// Called on overflow + #[must_use] + fn into_branch_node(self, level: Level) -> BranchNode { + debug_assert!(level != BOTTOM_LEVEL); + + let mut node = BranchNode::default(); + for (key, count) in self.addrs.iter().zip(&self.counts) { + node.increment(level, *key, *count); + } + node + } + /// Returns how much the total count decreased by. #[must_use] fn decrement(&mut self, abs_addr: u64, dec: u32) -> u32 { @@ -620,7 +624,7 @@ impl SparseLeaf { } fn is_empty(&self) -> bool { - self.addrs.is_empty() + self.addrs.is_empty() // we don't allow zero-sized buckets } fn total_count(&self) -> u64 { @@ -770,7 +774,9 @@ impl<'a> Iterator for TreeIterator<'a> { child.total_count(), )); } else { - unreachable!("We only have non-empty children"); + unreachable!( + "A `BranchNode` can only have non-empty children" + ); } }