Skip to content

Commit

Permalink
GC improvements 6: introduce batched GC (#4400)
Browse files Browse the repository at this point in the history
Makes the GC capable of dropping entire buckets in one go when the
conditions are met (and they are pretty simple to meet in the common
case of in-order data).

Unfortunately, I couldn't make the batched GC match -- let alone improve
-- the performance of the standard GC.
I even have a branch with a parallel batched GC, and it's still slower:
the overhead of the batching datastructures just kills me everytime.
For that reason, batching is disabled by default.

I still want to commit the code so as to prevent it from rotting though,
so we can come back to it at a later time.
This introduces a slight performance deterioration on the non-batched
path, that's fine.

### Benchmarks

Compared to `main`:
```
group                                                                       gc_improvements_0                       gc_improvements_6
-----                                                                       -----------------                       -----------------
.../plotting_dashboard/drop_at_least=0.3/default                            7.62    652.8±4.12ms 89.8 KElem/sec     1.00     85.7±1.14ms 683.8 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256                       5.34    465.8±2.50ms 125.8 KElem/sec    1.00     87.2±0.55ms 671.9 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512                       7.12    655.3±2.61ms 89.4 KElem/sec     1.00     92.0±1.85ms 636.8 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024                      12.45  1084.0±4.47ms 54.1 KElem/sec     1.00     87.1±0.40ms 672.7 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048                      23.63      2.1±0.02s 27.6 KElem/sec     1.00     89.9±1.13ms 652.0 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256/gc_batching=true                                              1.00     98.6±0.82ms 594.2 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512/gc_batching=true                                              1.00     94.6±1.26ms 619.3 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024/gc_batching=true                                             1.00     93.2±1.29ms 628.9 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048/gc_batching=true                                             1.00     94.3±1.43ms 621.1 KElem/sec
.../timeless_logs/drop_at_least=0.3/default                                 33.30      2.4±0.03s 24.4 KElem/sec     1.00     72.2±2.46ms 811.4 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256                            35.16      2.5±0.08s 23.5 KElem/sec     1.00     71.1±2.31ms 824.5 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512                            35.08      2.4±0.02s 24.5 KElem/sec     1.00     68.1±1.20ms 859.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024                           36.86      2.4±0.05s 24.2 KElem/sec     1.00     65.7±0.87ms 891.4 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048                           35.99      2.4±0.03s 24.1 KElem/sec     1.00     67.7±1.33ms 865.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256/gc_batching=true                                                   1.00     68.7±1.40ms 853.1 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512/gc_batching=true                                                   1.00     67.3±0.32ms 870.8 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024/gc_batching=true                                                  1.00     67.7±1.21ms 865.2 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048/gc_batching=true                                                  1.00     67.6±1.31ms 866.6 KElem/sec
```

Compared to previous PR:
```
group                                                                       gc_improvements_5                       gc_improvements_6
-----                                                                       -----------------                       -----------------
.../plotting_dashboard/drop_at_least=0.3/default                            1.00     81.4±0.94ms 720.0 KElem/sec    1.05     85.7±1.14ms 683.8 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256                       1.00     84.0±0.50ms 697.8 KElem/sec    1.04     87.2±0.55ms 671.9 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512                       1.00     82.5±1.33ms 710.0 KElem/sec    1.11     92.0±1.85ms 636.8 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024                      1.00     83.4±1.16ms 702.9 KElem/sec    1.04     87.1±0.40ms 672.7 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048                      1.00     83.7±0.61ms 700.0 KElem/sec    1.07     89.9±1.13ms 652.0 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256/gc_batching=true                                              1.00     98.6±0.82ms 594.2 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512/gc_batching=true                                              1.00     94.6±1.26ms 619.3 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024/gc_batching=true                                             1.00     93.2±1.29ms 628.9 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048/gc_batching=true                                             1.00     94.3±1.43ms 621.1 KElem/sec
.../timeless_logs/drop_at_least=0.3/default                                 1.00     66.8±0.85ms 877.3 KElem/sec    1.08     72.2±2.46ms 811.4 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256                            1.00     67.5±1.43ms 868.2 KElem/sec    1.05     71.1±2.31ms 824.5 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512                            1.00     67.4±1.40ms 869.4 KElem/sec    1.01     68.1±1.20ms 859.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024                           1.03     67.5±2.21ms 867.5 KElem/sec    1.00     65.7±0.87ms 891.4 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048                           1.00     67.8±1.86ms 863.9 KElem/sec    1.00     67.7±1.33ms 865.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024/gc_batching=true                                                  1.00     67.7±1.21ms 865.2 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048/gc_batching=true                                                  1.00     67.6±1.31ms 866.6 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256/gc_batching=true                                                   1.00     68.7±1.40ms 853.1 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512/gc_batching=true                                                   1.00     67.3±0.32ms 870.8 KElem/sec
```

---

Part of the GC improvements series:
- #4394
- #4395
- #4396
- #4397
- #4398
- #4399
- #4400
- #4401
  • Loading branch information
teh-cmc committed Dec 2, 2023
1 parent 28d8336 commit 8e3cf2e
Show file tree
Hide file tree
Showing 15 changed files with 489 additions and 136 deletions.
2 changes: 2 additions & 0 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ fn gc(c: &mut Criterion) {
protect_latest: 0,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
});
stats_diff
});
Expand All @@ -315,6 +316,7 @@ fn gc(c: &mut Criterion) {
protect_latest: 0,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
});
stats_diff
});
Expand Down
106 changes: 70 additions & 36 deletions crates/re_arrow_store/benches/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ mod constants {

use constants::{NUM_ENTITY_PATHS, NUM_ROWS_PER_ENTITY_PATH};

fn gc_batching() -> &'static [bool] {
#[cfg(feature = "core_benchmarks_only")]
{
&[false]
}
#[cfg(not(feature = "core_benchmarks_only"))]
{
&[false, true]
}
}

fn num_rows_per_bucket() -> &'static [u64] {
#[cfg(feature = "core_benchmarks_only")]
{
Expand Down Expand Up @@ -63,8 +74,10 @@ fn plotting_dashboard(c: &mut Criterion) {
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
};

// NOTE: insert in multiple timelines to more closely match real world scenarios.
let mut timegen = |i| {
[
build_log_time(Time::from_seconds_since_epoch(i as _)),
Expand Down Expand Up @@ -98,26 +111,37 @@ fn plotting_dashboard(c: &mut Criterion) {

// Emulate more or less bucket
for &num_rows_per_bucket in num_rows_per_bucket() {
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
for &gc_batching in gc_batching() {
group.bench_function(
if gc_batching {
format!("bucketsz={num_rows_per_bucket}/gc_batching=true")
} else {
format!("bucketsz={num_rows_per_bucket}")
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
|b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
let mut gc_settings = gc_settings.clone();
gc_settings.enable_batching = gc_batching;
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
},
BatchSize::LargeInput,
);
});
}
}
}

Expand All @@ -138,6 +162,7 @@ fn timeless_logs(c: &mut Criterion) {
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching: false,
};

let mut timegen = |_| TimePoint::timeless();
Expand Down Expand Up @@ -165,28 +190,38 @@ fn timeless_logs(c: &mut Criterion) {
);
});

// Emulate more or less bucket
for &num_rows_per_bucket in num_rows_per_bucket() {
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
for &gc_batching in gc_batching() {
group.bench_function(
if gc_batching {
format!("bucketsz={num_rows_per_bucket}/gc_batching=true")
} else {
format!("bucketsz={num_rows_per_bucket}")
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
|b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
let mut gc_settings = gc_settings.clone();
gc_settings.enable_batching = gc_batching;
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
},
BatchSize::LargeInput,
);
});
}
}
}

Expand Down Expand Up @@ -241,7 +276,6 @@ where
(0..NUM_ROWS_PER_ENTITY_PATH).map(move |i| {
DataRow::from_component_batches(
RowId::random(),
// NOTE: insert in multiple timelines to more closely match real world scenarios.
timegen(i),
entity_path.clone(),
datagen(i)
Expand Down
52 changes: 45 additions & 7 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::{BTreeMap, VecDeque};
use std::sync::atomic::AtomicU64;

use ahash::HashMap;
use arrow2::datatypes::DataType;
use nohash_hasher::IntMap;
use parking_lot::RwLock;
Expand Down Expand Up @@ -210,7 +209,7 @@ pub struct DataStore {
/// All temporal [`IndexedTable`]s for all entities on all timelines.
///
/// See also [`Self::timeless_tables`].
pub(crate) tables: HashMap<(Timeline, EntityPathHash), IndexedTable>,
pub(crate) tables: BTreeMap<(EntityPathHash, Timeline), IndexedTable>,

/// All timeless indexed tables for all entities. Never garbage collected.
///
Expand Down Expand Up @@ -335,9 +334,9 @@ impl DataStore {
/// Do _not_ use this to try and assert the internal state of the datastore.
pub fn iter_indices(
&self,
) -> impl ExactSizeIterator<Item = ((Timeline, EntityPath), &IndexedTable)> {
self.tables.iter().map(|((timeline, _), table)| {
((*timeline, table.ent_path.clone() /* shallow */), table)
) -> impl ExactSizeIterator<Item = ((EntityPath, Timeline), &IndexedTable)> {
self.tables.iter().map(|((_, timeline), table)| {
((table.ent_path.clone() /* shallow */, *timeline), table)
})
}
}
Expand Down Expand Up @@ -439,13 +438,44 @@ impl IndexedTable {
Self {
timeline,
ent_path,
buckets: [(i64::MIN.into(), bucket)].into(),
buckets: [(TimeInt::MIN, bucket)].into(),
cluster_key,
all_components: Default::default(),
buckets_num_rows: 0,
buckets_size_bytes,
}
}

/// Makes sure bucketing invariants are upheld, and takes necessary actions if not.
///
/// Invariants are:
/// 1. There must always be at least one bucket alive.
/// 2. The first bucket must always have an _indexing time_ `-∞`.
pub(crate) fn uphold_indexing_invariants(&mut self) {
if self.buckets.is_empty() {
let Self {
timeline,
ent_path: _,
cluster_key,
buckets,
all_components: _, // keep the history on purpose
buckets_num_rows,
buckets_size_bytes,
} = self;

let bucket = IndexedBucket::new(*cluster_key, *timeline);
let size_bytes = bucket.total_size_bytes();

*buckets = [(TimeInt::MIN, bucket)].into();
*buckets_num_rows = 0;
*buckets_size_bytes = size_bytes;
}
// NOTE: Make sure the first bucket is responsible for `-∞`, which might or might not be
// the case now if we've been moving buckets around.
else if let Some((_, bucket)) = self.buckets.pop_first() {
self.buckets.insert(TimeInt::MIN, bucket);
}
}
}

/// An `IndexedBucket` holds a chunk of rows from an [`IndexedTable`]
Expand Down Expand Up @@ -474,7 +504,7 @@ impl Clone for IndexedBucket {
}

impl IndexedBucket {
fn new(cluster_key: ComponentName, timeline: Timeline) -> Self {
pub(crate) fn new(cluster_key: ComponentName, timeline: Timeline) -> Self {
Self {
timeline,
inner: RwLock::new(IndexedBucketInner::default()),
Expand Down Expand Up @@ -510,6 +540,13 @@ pub struct IndexedBucketInner {
/// Keeps track of the unique identifier for each row that was generated by the clients.
pub col_row_id: RowIdVec,

/// Keeps track of the latest/newest [`RowId`] present in this bucket.
///
/// Useful to batch GC buckets.
///
/// `RowId::ZERO` for empty buckets.
pub max_row_id: RowId,

/// The entire column of `num_instances`.
///
/// Keeps track of the expected number of instances in each row.
Expand Down Expand Up @@ -539,6 +576,7 @@ impl Default for IndexedBucketInner {
col_time: Default::default(),
col_insert_id: Default::default(),
col_row_id: Default::default(),
max_row_id: RowId::ZERO,
col_num_instances: Default::default(),
columns: Default::default(),
size_bytes: 0, // NOTE: computed below
Expand Down
1 change: 1 addition & 0 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl IndexedBucket {
col_time,
col_insert_id,
col_row_id,
max_row_id: _,
col_num_instances,
columns,
size_bytes: _,
Expand Down
2 changes: 2 additions & 0 deletions crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl DataStore {
col_time,
col_insert_id: _,
col_row_id,
max_row_id: _,
col_num_instances,
columns,
size_bytes: _,
Expand Down Expand Up @@ -169,6 +170,7 @@ impl DataStore {
col_time,
col_insert_id: _,
col_row_id,
max_row_id: _,
col_num_instances,
columns,
size_bytes: _,
Expand Down
3 changes: 2 additions & 1 deletion crates/re_arrow_store/src/store_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ mod tests {
view,
);

view.on_events(&store.gc(&GarbageCollectionOptions::gc_everything()).0);
let events = store.gc(&GarbageCollectionOptions::gc_everything()).0;
view.on_events(&events);

similar_asserts::assert_eq!(
GlobalCounts::new(
Expand Down
Loading

0 comments on commit 8e3cf2e

Please sign in to comment.