Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Mar 29, 2023
1 parent d84d2d1 commit dfeeec4
Show file tree
Hide file tree
Showing 28 changed files with 2,025 additions and 3,256 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ arrow2 = { workspace = true, features = [
"compute_concatenate",
"compute_aggregate",
] }
arrow2_convert.workspace = true
document-features = "0.2"
indent = "0.1"
itertools = { workspace = true }
nohash-hasher = "0.2"
parking_lot.workspace = true
smallvec = { version = "1.0", features = ["const_generics"]}
static_assertions = "1.1"
thiserror.workspace = true

Expand Down
28 changes: 14 additions & 14 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, Time
use re_log_types::{
component_types::{InstanceKey, Rect2D},
datagen::{build_frame_nr, build_some_instances, build_some_rects},
Component as _, ComponentName, DataRow, EntityPath, MsgId, TimeType, Timeline,
Component as _, ComponentName, DataCell, DataRow, EntityPath, MsgId, TimeType, Timeline,
};

// ---
Expand Down Expand Up @@ -49,10 +49,11 @@ fn latest_at_batch(c: &mut Criterion) {
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("query", |b| {
b.iter(|| {
let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = results[0]
let cells = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = cells[0]
.as_ref()
.unwrap()
.as_arrow_ref()
.as_any()
.downcast_ref::<UnionArray>()
.unwrap();
Expand All @@ -65,8 +66,7 @@ fn latest_at_batch(c: &mut Criterion) {
fn latest_at_missing_components(c: &mut Criterion) {
// Simulate the worst possible case: many many buckets.
let config = DataStoreConfig {
index_bucket_size_bytes: 0,
index_bucket_nb_rows: 0,
indexed_bucket_num_rows: 0,
..Default::default()
};

Expand Down Expand Up @@ -118,14 +118,15 @@ fn range_batch(c: &mut Criterion) {
));
group.bench_function("query", |b| {
b.iter(|| {
let msgs = range_data(&store, [Rect2D::name()]);
for (cur_time, (time, results)) in msgs.enumerate() {
let rows = range_data(&store, [Rect2D::name()]);
for (cur_time, (time, cells)) in rows.enumerate() {
let time = time.unwrap();
assert_eq!(cur_time as i64, time.as_i64());

let rects = results[0]
let rects = cells[0]
.as_ref()
.unwrap()
.as_arrow_ref()
.as_any()
.downcast_ref::<UnionArray>()
.unwrap();
Expand Down Expand Up @@ -175,21 +176,20 @@ fn latest_data_at<const N: usize>(
store: &DataStore,
primary: ComponentName,
secondaries: &[ComponentName; N],
) -> [Option<Box<dyn Array>>; N] {
) -> [Option<DataCell>; N] {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let timeline_query = LatestAtQuery::new(timeline_frame_nr, (NUM_FRAMES / 2).into());
let ent_path = EntityPath::from("rects");

let row_indices = store
store
.latest_at(&timeline_query, &ent_path, primary, secondaries)
.unwrap_or_else(|| [(); N].map(|_| None));
store.get(secondaries, &row_indices)
.unwrap_or_else(|| [(); N].map(|_| None))
}

fn range_data<const N: usize>(
store: &DataStore,
components: [ComponentName; N],
) -> impl Iterator<Item = (Option<TimeInt>, [Option<Box<dyn Array>>; N])> + '_ {
) -> impl Iterator<Item = (Option<TimeInt>, [Option<DataCell>; N])> + '_ {
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = RangeQuery::new(
timeline_frame_nr,
Expand All @@ -199,5 +199,5 @@ fn range_data<const N: usize>(

store
.range(&query, &ent_path, components)
.map(move |(time, _, row_indices)| (time, store.get(&components, &row_indices)))
.map(move |(time, _, cells)| (time, cells))
}
8 changes: 3 additions & 5 deletions crates/re_arrow_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

mod arrow_util;
mod store;
mod store_arrow;
mod store_format;
mod store_gc;
mod store_read;
Expand All @@ -33,17 +34,14 @@ pub mod polars_util;
pub mod test_util;

pub use self::arrow_util::ArrayExt;
pub use self::store::{
DataStore, DataStoreConfig, IndexBucket, IndexRowNr, IndexTable, RowIndex, RowIndexKind,
};
pub use self::store::{DataStore, DataStoreConfig};
pub use self::store_gc::GarbageCollectionTarget;
pub use self::store_read::{LatestAtQuery, RangeQuery};
pub use self::store_stats::DataStoreStats;
pub use self::store_write::{WriteError, WriteResult};

pub(crate) use self::store::{
ComponentBucket, ComponentTable, IndexBucketIndices, PersistentComponentTable,
PersistentIndexTable, SecondaryIndex, TimeIndex,
IndexedBucket, IndexedBucketInner, IndexedTable, InsertIdVec, PersistentIndexedTable,
};

// Re-exports
Expand Down
36 changes: 18 additions & 18 deletions crates/re_arrow_store/src/polars_util.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use arrow2::array::Array;
use itertools::Itertools;
use polars_core::{prelude::*, series::Series};
use polars_ops::prelude::*;
use re_log_types::{ComponentName, EntityPath, TimeInt};
use re_log_types::{ComponentName, DataCell, EntityPath, TimeInt};

use crate::{ArrayExt, DataStore, LatestAtQuery, RangeQuery};

Expand Down Expand Up @@ -38,12 +37,11 @@ pub fn latest_component(
let cluster_key = store.cluster_key();

let components = &[cluster_key, primary];
let row_indices = store
let cells = store
.latest_at(query, ent_path, primary, components)
.unwrap_or([None; 2]);
let results = store.get(components, &row_indices);
.unwrap_or([(); 2].map(|_| None));

dataframe_from_results(components, results)
dataframe_from_cells(cells)
}

/// Queries any number of components and their cluster keys from their respective point-of-views,
Expand Down Expand Up @@ -161,12 +159,11 @@ pub fn range_components<'a, const N: usize>(
.chain(
store
.range(query, ent_path, components)
.map(move |(time, _, row_indices)| {
let results = store.get(&components, &row_indices);
.map(move |(time, _, cells)| {
(
time,
row_indices[primary_col].is_some(), // is_primary
dataframe_from_results(&components, results),
cells[primary_col].is_some(), // is_primary
dataframe_from_cells(cells),
)
}),
)
Expand Down Expand Up @@ -200,16 +197,19 @@ pub fn range_components<'a, const N: usize>(

// --- Joins ---

pub fn dataframe_from_results<const N: usize>(
components: &[ComponentName; N],
results: [Option<Box<dyn Array>>; N],
// TODO: none of this mess should be here

pub fn dataframe_from_cells<const N: usize>(
cells: [Option<DataCell>; N],
) -> SharedResult<DataFrame> {
let series: Result<Vec<_>, _> = components
let series: Result<Vec<_>, _> = cells
.iter()
.zip(results)
.filter_map(|(component, col)| col.map(|col| (component, col)))
.map(|(&component, col)| {
Series::try_from((component.as_str(), col.as_ref().clean_for_polars()))
.flatten()
.map(|cell| {
Series::try_from((
cell.component_name().as_str(),
cell.as_arrow_ref().clean_for_polars(),
))
})
.collect();

Expand Down

0 comments on commit dfeeec4

Please sign in to comment.