Skip to content

Commit

Permalink
Datastore revamp 3: efficient incremental stats (#1739)
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 12, 2023
1 parent 893138b commit 698e51b
Show file tree
Hide file tree
Showing 28 changed files with 755 additions and 378 deletions.
2 changes: 1 addition & 1 deletion crates/re_arrow_store/benches/arrow2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ fn estimated_size_bytes(c: &mut Criterion) {

{
let cells = generate_cells(kind);
let arrays = cells.iter().map(|cell| cell.as_arrow()).collect_vec();
let arrays = cells.iter().map(|cell| cell.to_arrow()).collect_vec();
let total_instances = arrays.iter().map(|array| array.len() as u32).sum::<u32>();
assert_eq!(total_instances, (NUM_ROWS * NUM_INSTANCES) as u32);

Expand Down
2 changes: 1 addition & 1 deletion crates/re_arrow_store/benches/arrow2_convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn deserialize(c: &mut Criterion) {
group.throughput(criterion::Throughput::Elements(NUM_INSTANCES as _));

let cell = DataCell::from_component::<InstanceKey>(0..NUM_INSTANCES as u64);
let data = cell.as_arrow();
let data = cell.to_arrow();

{
group.bench_function("arrow2_convert", |b| {
Expand Down
68 changes: 35 additions & 33 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ fn datastore_internal_repr() {
},
);

let timeless = DataTable::example(false);
let timeless = DataTable::example(true);
eprintln!("{timeless}");
store.insert_table(&timeless).unwrap();

Expand Down Expand Up @@ -317,38 +317,31 @@ pub struct IndexedTable {
/// to free up space.
pub all_components: IntSet<ComponentName>,

/// The total number of rows in this indexed table, accounting for all buckets.
pub total_rows: u64,
/// The number of rows stored in this table, across all of its buckets.
pub buckets_num_rows: u64,

/// The size of this table in bytes across all of its buckets, accounting for both data and
/// metadata.
/// The size of both the control & component data stored in this table, across all of its
/// buckets, in bytes.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
/// Also: there are many buckets.
pub total_size_bytes: u64,
/// This is a best-effort approximation, adequate for most purposes (stats,
/// triggering GCs, ...).
pub buckets_size_bytes: u64,
}

impl IndexedTable {
pub fn new(cluster_key: ComponentName, timeline: Timeline, ent_path: EntityPath) -> Self {
let bucket = IndexedBucket::new(cluster_key, timeline);
let buckets_size_bytes = bucket.size_bytes();
Self {
timeline,
ent_path,
buckets: [(i64::MIN.into(), IndexedBucket::new(cluster_key, timeline))].into(),
buckets: [(i64::MIN.into(), bucket)].into(),
cluster_key,
all_components: Default::default(),
total_rows: 0,
total_size_bytes: 0, // TODO(#1619)
buckets_num_rows: 0,
buckets_size_bytes,
}
}

/// Returns a read-only iterator over the raw buckets.
///
/// Do _not_ use this to try and test the internal state of the datastore.
#[doc(hidden)]
pub fn iter_buckets(&self) -> impl ExactSizeIterator<Item = &IndexedBucket> {
self.buckets.values()
}
}

/// An `IndexedBucket` holds a chunk of rows from an [`IndexedTable`]
Expand Down Expand Up @@ -414,25 +407,29 @@ pub struct IndexedBucketInner {
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// The size of this bucket in bytes, accounting for both data and metadata.
/// The size of both the control & component data stored in this bucket, in bytes.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
pub total_size_bytes: u64,
/// This is a best-effort approximation, adequate for most purposes (stats,
/// triggering GCs, ...).
///
/// We cache this because there can be many, many buckets.
pub size_bytes: u64,
}

impl Default for IndexedBucketInner {
fn default() -> Self {
Self {
let mut this = Self {
is_sorted: true,
time_range: TimeRange::new(i64::MAX.into(), i64::MIN.into()),
col_time: Default::default(),
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
total_size_bytes: 0, // TODO(#1619)
}
size_bytes: 0, // NOTE: computed below
};
this.compute_size_bytes();
this
}
}

Expand Down Expand Up @@ -476,15 +473,20 @@ pub struct PersistentIndexedTable {
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,

/// The size of this indexed table in bytes, accounting for both data and metadata.
///
/// Accurately computing the size of arrow arrays is surprisingly costly, which is why we
/// cache this.
pub total_size_bytes: u64,
}

impl PersistentIndexedTable {
pub fn new(cluster_key: ComponentName, ent_path: EntityPath) -> Self {
Self {
cluster_key,
ent_path,
col_insert_id: Default::default(),
col_row_id: Default::default(),
col_num_instances: Default::default(),
columns: Default::default(),
}
}

pub fn is_empty(&self) -> bool {
self.col_num_instances.is_empty()
}
Expand Down
3 changes: 1 addition & 2 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl IndexedBucket {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
size_bytes: _,
} = &*inner.read();

serialize(
Expand Down Expand Up @@ -72,7 +72,6 @@ impl PersistentIndexedTable {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = self;

serialize(
Expand Down
17 changes: 8 additions & 9 deletions crates/re_arrow_store/src/store_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl std::fmt::Display for DataStore {
format!(
"{} timeless indexed tables, for a total of {} across {} total rows\n",
timeless_tables.len(),
format_bytes(self.total_timeless_index_size_bytes() as _),
format_number(self.total_timeless_index_rows() as _)
format_bytes(self.total_timeless_size_bytes() as _),
format_number(self.total_timeless_rows() as _)
),
))?;
f.write_str(&indent::indent_all_by(4, "timeless_tables: [\n"))?;
Expand All @@ -53,8 +53,8 @@ impl std::fmt::Display for DataStore {
format!(
"{} indexed tables, for a total of {} across {} total rows\n",
tables.len(),
format_bytes(self.total_temporal_index_size_bytes() as _),
format_number(self.total_temporal_index_rows() as _)
format_bytes(self.total_temporal_size_bytes() as _),
format_number(self.total_temporal_rows() as _)
),
))?;
f.write_str(&indent::indent_all_by(4, "tables: [\n"))?;
Expand Down Expand Up @@ -83,8 +83,8 @@ impl std::fmt::Display for IndexedTable {
buckets,
cluster_key: _,
all_components: _,
total_rows: _,
total_size_bytes: _,
buckets_num_rows: _,
buckets_size_bytes: _,
} = self;

f.write_fmt(format_args!("timeline: {}\n", timeline.name()))?;
Expand Down Expand Up @@ -116,8 +116,8 @@ impl std::fmt::Display for IndexedBucket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"size: {} across {} rows\n",
format_bytes(self.total_size_bytes() as _),
format_number(self.total_rows() as _),
format_bytes(self.size_bytes() as _),
format_number(self.num_rows() as _),
))?;

let time_range = {
Expand Down Expand Up @@ -156,7 +156,6 @@ impl std::fmt::Display for PersistentIndexedTable {
col_row_id: _,
col_num_instances: _,
columns: _,
total_size_bytes: _,
} = self;

f.write_fmt(format_args!("entity: {ent_path}\n"))?;
Expand Down
3 changes: 1 addition & 2 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ impl PersistentIndexedTable {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
} = self;

let num_rows = self.total_rows() as usize;
Expand Down Expand Up @@ -217,7 +216,7 @@ impl IndexedBucket {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
size_bytes: _,
} = &*self.inner.read();

let (_, times) = DataTable::serialize_primitive_column(
Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl DataStore {
/// .map(|cell| {
/// Series::try_from((
/// cell.component_name().as_str(),
/// cell.as_arrow(),
/// cell.to_arrow(),
/// ))
/// })
/// .collect();
Expand Down Expand Up @@ -332,7 +332,7 @@ impl DataStore {
/// # .map(|cell| {
/// # Series::try_from((
/// # cell.component_name().as_str(),
/// # cell.as_arrow(),
/// # cell.to_arrow(),
/// # ))
/// # })
/// # .collect();
Expand Down Expand Up @@ -672,7 +672,7 @@ impl IndexedBucket {
col_row_id: _,
col_num_instances: _,
columns,
total_size_bytes: _, // TODO(#1619)
size_bytes: _,
} = &*self.inner.read();
debug_assert!(is_sorted);

Expand Down Expand Up @@ -787,7 +787,7 @@ impl IndexedBucket {
col_row_id,
col_num_instances: _,
columns,
total_size_bytes: _, // TODO(#1619)
size_bytes: _,
} = &*self.inner.read();
debug_assert!(is_sorted);

Expand Down Expand Up @@ -899,7 +899,7 @@ impl IndexedBucketInner {
col_row_id,
col_num_instances,
columns,
total_size_bytes: _,
size_bytes: _,
} = self;

if *is_sorted {
Expand Down
Loading

0 comments on commit 698e51b

Please sign in to comment.