Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix post-GC purging of streams view time histogram #3364

Merged
merged 14 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub mod test_util;

pub use self::arrow_util::ArrayExt;
pub use self::store::{DataStore, DataStoreConfig, StoreGeneration};
pub use self::store_gc::{GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::store_gc::{Deleted, GarbageCollectionOptions, GarbageCollectionTarget};
pub use self::store_read::{LatestAtQuery, RangeQuery};
pub use self::store_stats::{DataStoreRowStats, DataStoreStats, EntityStats};
pub use self::store_write::{WriteError, WriteResult};
Expand Down
98 changes: 69 additions & 29 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ahash::{HashMap, HashSet};

use re_log_types::{RowId, SizeBytes as _, TimeInt, TimeRange};
use nohash_hasher::IntMap;
use re_log_types::{EntityPathHash, RowId, SizeBytes as _, TimeInt, TimeRange, Timeline};
use re_types::ComponentName;

use crate::{
Expand Down Expand Up @@ -58,6 +59,18 @@ impl std::fmt::Display for GarbageCollectionTarget {
}
}

#[derive(Default)]
pub struct Deleted {
/// What rows where deleted?
pub row_ids: HashSet<RowId>,

/// What time points where deleted for each entity+timeline+component?
pub timeful: IntMap<EntityPathHash, IntMap<Timeline, IntMap<ComponentName, Vec<TimeInt>>>>,

/// For each entity+component, how many timeless entries were deleted?
pub timeless: IntMap<EntityPathHash, IntMap<ComponentName, u64>>,
}

impl DataStore {
/// Triggers a garbage collection according to the desired `target`.
///
Expand Down Expand Up @@ -102,7 +115,7 @@ impl DataStore {
// when purging data.
//
// TODO(#1823): Workload specific optimizations.
pub fn gc(&mut self, options: GarbageCollectionOptions) -> (Vec<RowId>, DataStoreStats) {
pub fn gc(&mut self, options: GarbageCollectionOptions) -> (Deleted, DataStoreStats) {
re_tracing::profile_function!();

self.gc_id += 1;
Expand All @@ -114,7 +127,7 @@ impl DataStore {

let protected_rows = self.find_all_protected_rows(options.protect_latest);

let mut row_ids = match options.target {
let mut deleted = match options.target {
GarbageCollectionTarget::DropAtLeastFraction(p) => {
assert!((0.0..=1.0).contains(&p));

Expand Down Expand Up @@ -153,7 +166,7 @@ impl DataStore {
};

if options.purge_empty_tables {
row_ids.extend(self.purge_empty_tables());
deleted.row_ids.extend(self.purge_empty_tables());
}

#[cfg(debug_assertions)]
Expand All @@ -177,7 +190,7 @@ impl DataStore {

let stats_diff = stats_before - stats_after;

(row_ids, stats_diff)
(deleted, stats_diff)
}

/// Tries to drop _at least_ `num_bytes_to_drop` bytes of data from the store.
Expand All @@ -194,23 +207,20 @@ impl DataStore {
mut num_bytes_to_drop: f64,
include_timeless: bool,
protected_rows: &HashSet<RowId>,
) -> Vec<RowId> {
) -> Deleted {
re_tracing::profile_function!();

let mut row_ids = Vec::new();
let mut deleted = Deleted::default();

// The algorithm is straightforward:
// 1. Find the the oldest `RowId` that is not protected
// 2. Find all tables that potentially hold data associated with that `RowId`
// 3. Drop the associated row and account for the space we got back

let mut candidate_rows = self.metadata_registry.registry.iter();

while num_bytes_to_drop > 0.0 {
// Try to get the next candidate
let Some((row_id, timepoint)) = candidate_rows.next() else {
for (row_id, timepoint) in &self.metadata_registry.registry {
if num_bytes_to_drop <= 0.0 {
break;
};
}

if protected_rows.contains(row_id) {
continue;
Expand All @@ -221,32 +231,40 @@ impl DataStore {
self.metadata_registry.heap_size_bytes -= metadata_dropped_size_bytes;
num_bytes_to_drop -= metadata_dropped_size_bytes as f64;

row_ids.push(*row_id);
deleted.row_ids.insert(*row_id);

// find all tables that could possibly contain this `RowId`
let temporal_tables = self.tables.iter_mut().filter_map(|((timeline, _), table)| {
timepoint.get(timeline).map(|time| (*time, table))
});

for (time, table) in temporal_tables {
num_bytes_to_drop -= table.try_drop_row(*row_id, time.as_i64()) as f64;
for ((timeline, ent_path_hash), table) in &mut self.tables {
if let Some(time) = timepoint.get(timeline) {
let deleted_comps = deleted
.timeful
.entry(*ent_path_hash)
.or_default()
.entry(*timeline)
.or_default();

num_bytes_to_drop -=
table.try_drop_row(*row_id, time.as_i64(), deleted_comps) as f64;
}
}

// TODO(jleibs): This is a worst-case removal-order. Would be nice to collect all the rows
// first and then remove them in one pass.
if timepoint.is_timeless() && include_timeless {
for table in self.timeless_tables.values_mut() {
num_bytes_to_drop -= table.try_drop_row(*row_id) as f64;
for (ent_path_hash, table) in &mut self.timeless_tables {
let deleted_comps = deleted.timeless.entry(*ent_path_hash).or_default();
num_bytes_to_drop -= table.try_drop_row(*row_id, deleted_comps) as f64;
}
}
}

// Purge the removed rows from the metadata_registry
for row_id in &row_ids {
for row_id in &deleted.row_ids {
self.metadata_registry.remove(row_id);
}

row_ids
deleted
}

/// For each `EntityPath`, `Timeline`, `Component` find the N latest [`RowId`]s.
Expand Down Expand Up @@ -403,15 +421,21 @@ impl IndexedTable {
/// specified `time`.
///
/// Returns how many bytes were actually dropped, or zero if the row wasn't found.
fn try_drop_row(&mut self, row_id: RowId, time: i64) -> u64 {
fn try_drop_row(
&mut self,
row_id: RowId,
time: i64,
deleted_comps: &mut IntMap<ComponentName, Vec<TimeInt>>,
) -> u64 {
re_tracing::profile_function!();
let table_has_more_than_one_bucket = self.buckets.len() > 1;

let (bucket_key, bucket) = self.find_bucket_mut(time.into());
let bucket_num_bytes = bucket.total_size_bytes();

let mut dropped_num_bytes = {
let inner = &mut *bucket.inner.write();
inner.try_drop_row(row_id, time)
inner.try_drop_row(row_id, time, deleted_comps)
};

// NOTE: We always need to keep at least one bucket alive, otherwise we have
Expand Down Expand Up @@ -447,7 +471,12 @@ impl IndexedBucketInner {
/// specified `time`.
///
/// Returns how many bytes were actually dropped, or zero if the row wasn't found.
fn try_drop_row(&mut self, row_id: RowId, time: i64) -> u64 {
fn try_drop_row(
&mut self,
row_id: RowId,
time: i64,
deleted_comps: &mut IntMap<ComponentName, Vec<TimeInt>>,
) -> u64 {
self.sort();

let IndexedBucketInner {
Expand Down Expand Up @@ -506,8 +535,12 @@ impl IndexedBucketInner {
dropped_num_bytes += col_num_instances.swap_remove(row_index).total_size_bytes();

// each data column
for column in columns.values_mut() {
for (comp_name, column) in columns {
dropped_num_bytes += column.0.swap_remove(row_index).total_size_bytes();
deleted_comps
.entry(*comp_name)
.or_default()
.push(time.into());
}

// NOTE: A single `RowId` cannot possibly have more than one datapoint for
Expand All @@ -525,7 +558,13 @@ impl PersistentIndexedTable {
/// Tries to drop the given `row_id` from the table.
///
/// Returns how many bytes were actually dropped, or zero if the row wasn't found.
fn try_drop_row(&mut self, row_id: RowId) -> u64 {
fn try_drop_row(
&mut self,
row_id: RowId,
deleted_comps: &mut IntMap<ComponentName, u64>,
) -> u64 {
re_tracing::profile_function!();

let mut dropped_num_bytes = 0u64;

let PersistentIndexedTable {
Expand Down Expand Up @@ -562,8 +601,9 @@ impl PersistentIndexedTable {
dropped_num_bytes += col_num_instances.remove(row_index).total_size_bytes();

// each data column
for column in columns.values_mut() {
for (comp_name, column) in columns {
dropped_num_bytes += column.0.remove(row_index).total_size_bytes();
*deleted_comps.entry(*comp_name).or_default() += 1;
}
}

Expand Down
10 changes: 1 addition & 9 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,6 @@ impl IndexedTable {
&mut self,
time_range: impl RangeBounds<TimeInt>,
) -> impl Iterator<Item = (TimeInt, &mut IndexedBucket)> {
// Beware! This merely measures the time it takes to gather all the necessary metadata
// for building the returned iterator.
re_tracing::profile_function!();

self.buckets
.range_mut(time_range)
.rev()
Expand Down Expand Up @@ -990,12 +986,8 @@ impl IndexedBucketInner {
re_tracing::profile_scope!("data");
// shuffle component columns back into a sorted state
for column in columns.values_mut() {
let mut source = {
re_tracing::profile_scope!("clone");
column.clone()
};
let mut source = column.clone();
{
re_tracing::profile_scope!("rotate");
for (from, to) in swaps.iter().copied() {
column[to] = source[from].take();
}
Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,10 @@ fn gc_correct() {

let stats = DataStoreStats::from_store(&store);

let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
let (deleted, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
let stats_diff = stats_diff + stats_empty; // account for fixed overhead

assert_eq!(row_ids.len() as u64, stats.total.num_rows);
assert_eq!(deleted.row_ids.len() as u64, stats.total.num_rows);
assert_eq!(
stats.metadata_registry.num_rows,
stats_diff.metadata_registry.num_rows
Expand All @@ -316,12 +316,12 @@ fn gc_correct() {

sanity_unwrap(&mut store);
check_still_readable(&store);
for row_id in &row_ids {
for row_id in &deleted.row_ids {
assert!(store.get_msg_metadata(row_id).is_none());
}

let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
assert!(row_ids.is_empty());
let (deleted, stats_diff) = store.gc(GarbageCollectionOptions::gc_everything());
assert!(deleted.row_ids.is_empty());
assert_eq!(DataStoreStats::default(), stats_diff);

sanity_unwrap(&mut store);
Expand Down
4 changes: 2 additions & 2 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,13 +880,13 @@ fn gc_impl(store: &mut DataStore) {

let stats = DataStoreStats::from_store(store);

let (row_ids, stats_diff) = store.gc(GarbageCollectionOptions {
let (deleted, stats_diff) = store.gc(GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(1.0 / 3.0),
gc_timeless: false,
protect_latest: 0,
purge_empty_tables: false,
});
for row_id in &row_ids {
for row_id in &deleted.row_ids {
assert!(store.get_msg_metadata(row_id).is_none());
}

Expand Down
14 changes: 5 additions & 9 deletions crates/re_crash_handler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,11 @@ fn install_panic_hook(_build_info: BuildInfo) {
.name()
.map_or_else(|| format!("{:?}", thread.id()), |name| name.to_owned());

let file_line_suffix = if let Some(file_line) = &file_line {
format!(", {file_line}")
} else {
String::new()
};

eprintln!(
"\nthread '{thread_name}' panicked at '{msg}'{file_line_suffix}\n\n{callstack}"
);
eprintln!("\nthread '{thread_name}' panicked at '{msg}'");
if let Some(file_line) = &file_line {
eprintln!("{file_line}");
}
eprintln!("stack backtrace:\n{callstack}");
} else {
// This prints the panic message and callstack:
(*previous_panic_hook)(panic_info);
Expand Down
Loading
Loading