Skip to content

Commit

Permalink
GC improvements 0: use-case driven benchmarks (#4394)
Browse files Browse the repository at this point in the history
Introduce 2 new benchmark suites that drive the development of this PR
series:
1. Logging a tons of scalars, in order, across a bunch of series,
themselves scattered across a bunch of plots.
2. Logging a tons of timeless data, across a bunch of entities.

### Benchmarks

Hint: it's bad.

```
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024    1.00   1084.0±4.47ms 54.1 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048    1.00       2.1±0.02s 27.6 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256     1.00    465.8±2.50ms 125.8 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512     1.00    655.3±2.61ms 89.4 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/default          1.00    652.8±4.12ms 89.8 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024         1.00       2.4±0.05s 24.2 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048         1.00       2.4±0.03s 24.1 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256          1.00       2.5±0.08s 23.5 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512          1.00       2.4±0.02s 24.5 KElem/sec
.../timeless_logs/drop_at_least=0.3/default               1.00       2.4±0.03s 24.4 KElem/sec
```

---

Part of the GC improvements series:
- #4394
- #4395
- #4396
- #4397
- #4398
- #4399
- #4400
- #4401
  • Loading branch information
teh-cmc committed Dec 2, 2023
1 parent 6fe6d11 commit 9a9d990
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 9 deletions.
15 changes: 7 additions & 8 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ all-features = true
[features]
default = []

## Enables access to re_types' test components/datatypes.
testing = ["re_log_types/testing", "re_types/testing"]

## Enables `parking_lot`'s deadlock detection background thread.
deadlock_detection = ["parking_lot/deadlock_detection"]

Expand Down Expand Up @@ -73,7 +70,7 @@ polars-ops = { workspace = true, optional = true, features = [

[dev-dependencies]
re_log_types = { workspace = true, features = ["testing"] }
re_types = { workspace = true, features = ["datagen"] }
re_types = { workspace = true, features = ["datagen", "testing"] }

anyhow.workspace = true
criterion.workspace = true
Expand All @@ -98,7 +95,7 @@ bench = false
[[example]]
name = "dump_dataframe"
path = "examples/dump_dataframe.rs"
required-features = ["polars", "testing"]
required-features = ["polars"]

[[example]]
name = "latest_component"
Expand All @@ -116,15 +113,17 @@ path = "examples/range_components.rs"
required-features = ["polars"]


[[bench]]
name = "arrow2"
harness = false

[[bench]]
name = "data_store"
harness = false
required-features = ["testing"]

[[bench]]
name = "arrow2"
name = "gc"
harness = false
required-features = ["testing"]

[[bench]]
name = "vectors"
Expand Down
265 changes: 265 additions & 0 deletions crates/re_arrow_store/benches/gc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use criterion::{criterion_group, criterion_main, BatchSize, Criterion};

use itertools::Itertools;
use re_arrow_store::{
DataStore, DataStoreConfig, GarbageCollectionOptions, GarbageCollectionTarget,
};
use re_log_types::{
build_frame_nr, build_log_time, DataRow, DataTable, EntityPath, RowId, TableId, Time, TimePoint,
};
use re_types::components::InstanceKey;
use re_types_core::{AsComponents, ComponentBatch, ComponentName, Loggable as _};

criterion_group!(benches, plotting_dashboard, timeless_logs);
criterion_main!(benches);

// ---

#[cfg(not(debug_assertions))]
mod constants {
pub const NUM_ENTITY_PATHS: usize = 20;
pub const NUM_ROWS_PER_ENTITY_PATH: usize = 10_000;
}

// `cargo test` also runs the benchmark setup code, so make sure they run quickly:
#[cfg(debug_assertions)]
mod constants {
pub const NUM_ENTITY_PATHS: usize = 1;
pub const NUM_ROWS_PER_ENTITY_PATH: usize = 1;
}

use constants::{NUM_ENTITY_PATHS, NUM_ROWS_PER_ENTITY_PATH};

fn num_rows_per_bucket() -> &'static [u64] {
#[cfg(feature = "core_benchmarks_only")]
{
&[]
}
#[cfg(not(feature = "core_benchmarks_only"))]
{
&[256, 512, 1024, 2048]
}
}

// --- Benchmarks ---

fn plotting_dashboard(c: &mut Criterion) {
const DROP_AT_LEAST: f64 = 0.3;

let mut group = c.benchmark_group(format!(
"datastore/num_entities={NUM_ENTITY_PATHS}/num_rows_per_entity={NUM_ROWS_PER_ENTITY_PATH}/plotting_dashboard/drop_at_least={DROP_AT_LEAST}"
));
group.throughput(criterion::Throughput::Elements(
((NUM_ENTITY_PATHS * NUM_ROWS_PER_ENTITY_PATH) as f64 * DROP_AT_LEAST) as _,
));
group.sample_size(10);

let gc_settings = GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(DROP_AT_LEAST),
gc_timeless: true,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
};

let mut timegen = |i| {
[
build_log_time(Time::from_seconds_since_epoch(i as _)),
build_frame_nr((i as i64).into()),
]
.into()
};

let mut datagen = |i| {
Box::new(re_types::archetypes::TimeSeriesScalar::new(i as f64)) as Box<dyn AsComponents>
};

// Default config
group.bench_function("default", |b| {
let store = build_store(
Default::default(),
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
});

// Emulate more or less bucket
for &num_rows_per_bucket in num_rows_per_bucket() {
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
});
}
}

fn timeless_logs(c: &mut Criterion) {
const DROP_AT_LEAST: f64 = 0.3;

let mut group = c.benchmark_group(format!(
"datastore/num_entities={NUM_ENTITY_PATHS}/num_rows_per_entity={NUM_ROWS_PER_ENTITY_PATH}/timeless_logs/drop_at_least={DROP_AT_LEAST}"
));
group.throughput(criterion::Throughput::Elements(
((NUM_ENTITY_PATHS * NUM_ROWS_PER_ENTITY_PATH) as f64 * DROP_AT_LEAST) as _,
));
group.sample_size(10);

let gc_settings = GarbageCollectionOptions {
target: GarbageCollectionTarget::DropAtLeastFraction(DROP_AT_LEAST),
gc_timeless: true,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
};

let mut timegen = |_| TimePoint::timeless();

let mut datagen = |i: usize| {
Box::new(re_types::archetypes::TextLog::new(i.to_string())) as Box<dyn AsComponents>
};

// Default config
group.bench_function("default", |b| {
let store = build_store(
Default::default(),
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
});

// Emulate more or less bucket
for &num_rows_per_bucket in num_rows_per_bucket() {
group.bench_function(format!("bucketsz={num_rows_per_bucket}"), |b| {
let store = build_store(
DataStoreConfig {
indexed_bucket_num_rows: num_rows_per_bucket,
..Default::default()
},
InstanceKey::name(),
false,
&mut timegen,
&mut datagen,
);
b.iter_batched(
|| store.clone(),
|mut store| {
let (_, stats_diff) = store.gc(&gc_settings);
stats_diff
},
BatchSize::LargeInput,
);
});
}
}

// --- Helpers ---

fn build_store<FT, FD>(
config: DataStoreConfig,
cluster_key: ComponentName,
packed: bool,
timegen: &mut FT,
datagen: &mut FD,
) -> DataStore
where
FT: FnMut(usize) -> TimePoint,
FD: FnMut(usize) -> Box<dyn AsComponents>,
{
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
cluster_key,
config,
);

let tables = (0..NUM_ENTITY_PATHS)
.map(|i| build_table(format!("entity_path_{i}").into(), packed, timegen, datagen))
.collect_vec();
let mut rows_per_table = tables.iter().map(|table| table.to_rows()).collect_vec();

// NOTE: interleave insertions between entities to more closely match real world scenarios.
for _ in 0..NUM_ROWS_PER_ENTITY_PATH {
#[allow(clippy::needless_range_loop)] // readability
for i in 0..NUM_ENTITY_PATHS {
let row = rows_per_table[i].next().unwrap();
store.insert_row(&row.unwrap()).unwrap();
}
}

store
}

fn build_table<FT, FD>(
entity_path: EntityPath,
packed: bool,
timegen: &mut FT,
datagen: &mut FD,
) -> DataTable
where
FT: FnMut(usize) -> TimePoint,
FD: FnMut(usize) -> Box<dyn AsComponents>,
{
let mut table = DataTable::from_rows(
TableId::ZERO,
(0..NUM_ROWS_PER_ENTITY_PATH).map(move |i| {
DataRow::from_component_batches(
RowId::random(),
// NOTE: insert in multiple timelines to more closely match real world scenarios.
timegen(i),
entity_path.clone(),
datagen(i)
.as_component_batches()
.iter()
.map(|batch| batch as &dyn ComponentBatch),
)
.unwrap()
}),
);

// Do a serialization roundtrip to pack everything in contiguous memory.
if packed {
let (schema, columns) = table.serialize().unwrap();
table = DataTable::deserialize(TableId::ZERO, &schema, &columns).unwrap();
}

table.compute_all_size_bytes();

table
}
2 changes: 1 addition & 1 deletion crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ webbrowser = { workspace = true, optional = true }


[dev-dependencies]
re_arrow_store = { workspace = true, features = ["testing"] }
re_arrow_store.workspace = true

itertools.workspace = true
ndarray-rand.workspace = true
Expand Down
26 changes: 26 additions & 0 deletions crates/re_types_core/src/loggable_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,32 @@ impl<'a> std::ops::Deref for MaybeOwnedComponentBatch<'a> {
}
}

impl<'a> LoggableBatch for MaybeOwnedComponentBatch<'a> {
type Name = ComponentName;

#[inline]
fn name(&self) -> Self::Name {
self.as_ref().name()
}

#[inline]
fn num_instances(&self) -> usize {
self.as_ref().num_instances()
}

#[inline]
fn arrow_field(&self) -> arrow2::datatypes::Field {
self.as_ref().arrow_field()
}

#[inline]
fn to_arrow(&self) -> SerializationResult<Box<dyn ::arrow2::array::Array>> {
self.as_ref().to_arrow()
}
}

impl<'a> ComponentBatch for MaybeOwnedComponentBatch<'a> {}

// --- Unary ---

impl<L: Clone + Loggable> LoggableBatch for L {
Expand Down

0 comments on commit 9a9d990

Please sign in to comment.