Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix post-GC purging of streams view time histogram #3364

Merged
merged 14 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub mod test_util;

pub use self::arrow_util::ArrayExt;
pub use self::store::{DataStore, DataStoreConfig, StoreGeneration};
pub use self::store_gc::{GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::store_gc::{Deleted, GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::store_read::{LatestAtQuery, RangeQuery};
pub use self::store_stats::{DataStoreRowStats, DataStoreStats, EntityStats};
pub use self::store_write::{WriteError, WriteResult};
Expand Down
61 changes: 39 additions & 22 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ahash::{HashMap, HashSet};

use re_log_types::{RowId, SizeBytes as _, TimeInt, TimeRange};
use nohash_hasher::IntMap;
use re_log_types::{EntityPathHash, RowId, SizeBytes as _, TimeInt, TimeRange, Timeline};
use re_types::ComponentName;

use crate::{
Expand Down Expand Up @@ -58,6 +59,18 @@ impl std::fmt::Display for GarbageCollectionTarget {
}
}

#[derive(Default)]
pub struct Deleted {
/// What rows where deleted?
pub row_ids: HashSet<RowId>,

/// What time points where deleted for each entity?
pub timeful: IntMap<EntityPathHash, IntMap<Timeline, Vec<TimeInt>>>,
emilk marked this conversation as resolved.
Show resolved Hide resolved

/// For each entity, how many timeless entries were deleted?
pub timeless: IntMap<EntityPathHash, u64>,
}

impl DataStore {
/// Triggers a garbage collection according to the desired `target`.
///
Expand Down Expand Up @@ -102,7 +115,7 @@ impl DataStore {
// when purging data.
//
// TODO(#1823): Workload specific optimizations.
pub fn gc(&mut self, options: GarbageCollectionOptions) -> (Vec<RowId>, DataStoreStats) {
pub fn gc(&mut self, options: GarbageCollectionOptions) -> (Deleted, DataStoreStats) {
re_tracing::profile_function!();

self.gc_id += 1;
Expand All @@ -114,7 +127,7 @@ impl DataStore {

let protected_rows = self.find_all_protected_rows(options.protect_latest);

let mut row_ids = match options.target {
let mut deleted = match options.target {
GarbageCollectionTarget::DropAtLeastFraction(p) => {
assert!((0.0..=1.0).contains(&p));

Expand Down Expand Up @@ -153,7 +166,7 @@ impl DataStore {
};

if options.purge_empty_tables {
row_ids.extend(self.purge_empty_tables());
deleted.row_ids.extend(self.purge_empty_tables());
}

#[cfg(debug_assertions)]
Expand All @@ -177,7 +190,7 @@ impl DataStore {

let stats_diff = stats_before - stats_after;

(row_ids, stats_diff)
(deleted, stats_diff)
}

/// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store.
Expand All @@ -194,23 +207,20 @@ impl DataStore {
mut num_bytes_to_drop: f64,
include_timeless: bool,
protected_rows: &HashSet<RowId>,
) -> Vec<RowId> {
) -> Deleted {
re_tracing::profile_function!();

let mut row_ids = Vec::new();
let mut deleted = Deleted::default();

// The algorithm is straightforward:
// 1. Find the the oldest `RowId` that is not protected
// 2. Find all tables that potentially hold data associated with that `RowId`
// 3. Drop the associated row and account for the space we got back

let mut candidate_rows = self.metadata_registry.registry.iter();

while num_bytes_to_drop > 0.0 {
// Try to get the next candidate
let Some((row_id, timepoint)) = candidate_rows.next() else {
for (row_id, timepoint) in &self.metadata_registry.registry {
if num_bytes_to_drop <= 0.0 {
break;
};
}

if protected_rows.contains(row_id) {
continue;
Expand All @@ -221,32 +231,39 @@ impl DataStore {
self.metadata_registry.heap_size_bytes -= metadata_dropped_size_bytes;
num_bytes_to_drop -= metadata_dropped_size_bytes as f64;

row_ids.push(*row_id);
deleted.row_ids.insert(*row_id);

// find all tables that could possibly contain this `RowId`
let temporal_tables = self.tables.iter_mut().filter_map(|((timeline, _), table)| {
timepoint.get(timeline).map(|time| (*time, table))
});

for (time, table) in temporal_tables {
num_bytes_to_drop -= table.try_drop_row(*row_id, time.as_i64()) as f64;
for ((timeline, ent_path_hash), table) in &mut self.tables {
if let Some(time) = timepoint.get(timeline) {
num_bytes_to_drop -= table.try_drop_row(*row_id, time.as_i64()) as f64;
emilk marked this conversation as resolved.
Show resolved Hide resolved
deleted
.timeful
.entry(*ent_path_hash)
.or_default()
.entry(*timeline)
.or_default()
.push(*time);
}
}

// TODO(jleibs): This is a worst-case removal-order. Would be nice to collect all the rows
// first and then remove them in one pass.
if timepoint.is_timeless() && include_timeless {
for table in self.timeless_tables.values_mut() {
for (ent_path_hash, table) in &mut self.timeless_tables {
num_bytes_to_drop -= table.try_drop_row(*row_id) as f64;
*deleted.timeless.entry(*ent_path_hash).or_default() += 1;
}
}
}

// Purge the removed rows from the metadata_registry
for row_id in &row_ids {
for row_id in &deleted.row_ids {
self.metadata_registry.remove(row_id);
}

row_ids
deleted
}

/// For each `EntityPath`, `Timeline`, `Component` find the N latest [`RowId`]s.
Expand Down
4 changes: 0 additions & 4 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,6 @@ impl IndexedTable {
&mut self,
time_range: impl RangeBounds<TimeInt>,
) -> impl Iterator<Item = (TimeInt, &mut IndexedBucket)> {
// Beware! This merely measures the time it takes to gather all the necessary metadata
// for building the returned iterator.
re_tracing::profile_function!();

self.buckets
.range_mut(time_range)
.rev()
Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,10 @@ fn gc_correct() {

let stats = DataStoreStats::from_store(&store);

let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
let (deleted, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
let stats_diff = stats_diff + stats_empty; // account for fixed overhead

assert_eq!(row_ids.len() as u64, stats.total.num_rows);
assert_eq!(deleted.row_ids.len() as u64, stats.total.num_rows);
assert_eq!(
stats.metadata_registry.num_rows,
stats_diff.metadata_registry.num_rows
Expand All @@ -316,12 +316,12 @@ fn gc_correct() {

sanity_unwrap(&mut store);
check_still_readable(&store);
for row_id in &row_ids {
for row_id in &deleted.row_ids {
assert!(store.get_msg_metadata(row_id).is_none());
}

let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
assert!(row_ids.is_empty());
let (deleted, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
assert!(deleted.row_ids.is_empty());
assert_eq!(DataStoreStats::default(), stats_diff);

sanity_unwrap(&mut store);
Expand Down
4 changes: 2 additions & 2 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,13 +880,13 @@ fn gc_impl(store: &mut DataStore) {

let stats = DataStoreStats::from_store(store);

let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions {
let (deleted, stats_diff) = store.gc(GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0),
gc_timeless: false,
protect_latest: 0,
purge_empty_tables: false,
});
for row_id in &row_ids {
for row_id in &deleted.row_ids {
assert!(store.get_msg_metadata(row_id).is_none());
}

Expand Down
14 changes: 5 additions & 9 deletions crates/re_crash_handler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,11 @@ fn install_panic_hook(_build_info: BuildInfo) {
.name()
.map_or_else(|| format!("{:?}", thread.id()), |name| name.to_owned());

let file_line_suffix = if let Some(file_line) = &file_line {
format!(", {file_line}")
} else {
String::new()
};

eprintln!(
"\nthread '{thread_name}' panicked at '{msg}'{file_line_suffix}\n\n{callstack}"
);
eprintln!("\nthread '{thread_name}' panicked at '{msg}'");
if let Some(file_line) = &file_line {
eprintln!("{file_line}");
}
eprintln!("stack backtrace:\n{callstack}");
} else {
// This prints the panic message and callstack:
(*previous_panic_hook)(panic_info);
Expand Down