Skip to content

Commit

Permalink
introduce DataRow and DataTable
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Mar 24, 2023
1 parent d6dede4 commit e647f07
Show file tree
Hide file tree
Showing 29 changed files with 1,482 additions and 800 deletions.
51 changes: 26 additions & 25 deletions crates/re_arrow_store/benches/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use re_arrow_store::{DataStore, DataStoreConfig, LatestAtQuery, RangeQuery, Time
use re_log_types::{
component_types::{InstanceKey, Rect2D},
datagen::{build_frame_nr, build_some_instances, build_some_rects},
msg_bundle::{try_build_msg_bundle2, MsgBundle},
Component as _, ComponentName, EntityPath, MsgId, TimeType, Timeline,
Component as _, ComponentName, DataRow, EntityPath, MsgId, TimeType, Timeline,
};

// ---
Expand All @@ -27,28 +26,30 @@ const NUM_RECTS: i64 = 1;

// --- Benchmarks ---

// TODO(cmc): need additional benches for full tables

fn insert(c: &mut Criterion) {
{
let msgs = build_messages(NUM_RECTS as usize);
let rows = build_rows(NUM_RECTS as usize);
let mut group = c.benchmark_group("datastore/insert/batch/rects");
group.throughput(criterion::Throughput::Elements(
(NUM_RECTS * NUM_FRAMES) as _,
));
group.bench_function("insert", |b| {
b.iter(|| insert_messages(Default::default(), InstanceKey::name(), msgs.iter()));
b.iter(|| insert_rows(Default::default(), InstanceKey::name(), rows.iter()));
});
}
}

fn latest_at_batch(c: &mut Criterion) {
{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(Default::default(), InstanceKey::name(), msgs.iter());
let rows = build_rows(NUM_RECTS as usize);
let store = insert_rows(Default::default(), InstanceKey::name(), rows.iter());
let mut group = c.benchmark_group("datastore/latest_at/batch/rects");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("query", |b| {
b.iter(|| {
let results = latest_messages_at(&store, Rect2D::name(), &[Rect2D::name()]);
let results = latest_data_at(&store, Rect2D::name(), &[Rect2D::name()]);
let rects = results[0]
.as_ref()
.unwrap()
Expand All @@ -70,27 +71,27 @@ fn latest_at_missing_components(c: &mut Criterion) {
};

{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(config.clone(), InstanceKey::name(), msgs.iter());
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(config.clone(), InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/latest_at/missing_components");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("primary", |b| {
b.iter(|| {
let results =
latest_messages_at(&store, "non_existing_component".into(), &[Rect2D::name()]);
latest_data_at(&store, "non_existing_component".into(), &[Rect2D::name()]);
assert!(results[0].is_none());
});
});
}

{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(config, InstanceKey::name(), msgs.iter());
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(config, InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/latest_at/missing_components");
group.throughput(criterion::Throughput::Elements(NUM_RECTS as _));
group.bench_function("secondaries", |b| {
b.iter(|| {
let results = latest_messages_at(
let results = latest_data_at(
&store,
Rect2D::name(),
&[
Expand All @@ -109,15 +110,15 @@ fn latest_at_missing_components(c: &mut Criterion) {

fn range_batch(c: &mut Criterion) {
{
let msgs = build_messages(NUM_RECTS as usize);
let store = insert_messages(Default::default(), InstanceKey::name(), msgs.iter());
let msgs = build_rows(NUM_RECTS as usize);
let store = insert_rows(Default::default(), InstanceKey::name(), msgs.iter());
let mut group = c.benchmark_group("datastore/range/batch/rects");
group.throughput(criterion::Throughput::Elements(
(NUM_RECTS * NUM_FRAMES) as _,
));
group.bench_function("query", |b| {
b.iter(|| {
let msgs = range_messages(&store, [Rect2D::name()]);
let msgs = range_data(&store, [Rect2D::name()]);
for (cur_time, (time, results)) in msgs.enumerate() {
let time = time.unwrap();
assert_eq!(cur_time as i64, time.as_i64());
Expand Down Expand Up @@ -146,31 +147,31 @@ criterion_main!(benches);

// --- Helpers ---

fn build_messages(n: usize) -> Vec<MsgBundle> {
fn build_rows(n: usize) -> Vec<DataRow> {
(0..NUM_FRAMES)
.map(move |frame_idx| {
try_build_msg_bundle2(
MsgId::ZERO,
DataRow::from_cells2(
MsgId::random(),
"rects",
[build_frame_nr(frame_idx.into())],
n as _,
(build_some_instances(n), build_some_rects(n)),
)
.unwrap()
})
.collect()
}

fn insert_messages<'a>(
fn insert_rows<'a>(
config: DataStoreConfig,
cluster_key: ComponentName,
msgs: impl Iterator<Item = &'a MsgBundle>,
rows: impl Iterator<Item = &'a DataRow>,
) -> DataStore {
let mut store = DataStore::new(cluster_key, config);
msgs.for_each(|msg_bundle| store.insert_row(msg_bundle).unwrap());
rows.for_each(|row| store.insert_row(row).unwrap());
store
}

fn latest_messages_at<const N: usize>(
fn latest_data_at<const N: usize>(
store: &DataStore,
primary: ComponentName,
secondaries: &[ComponentName; N],
Expand All @@ -185,7 +186,7 @@ fn latest_messages_at<const N: usize>(
store.get(secondaries, &row_indices)
}

fn range_messages<const N: usize>(
fn range_data<const N: usize>(
store: &DataStore,
components: [ComponentName; N],
) -> impl Iterator<Item = (Option<TimeInt>, [Option<Box<dyn Array>>; N])> + '_ {
Expand Down
67 changes: 33 additions & 34 deletions crates/re_arrow_store/examples/dump_dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_arrow_store --example dump_dataframe
//! ```

use re_arrow_store::{test_bundle, DataStore};
use re_arrow_store::{test_row, DataStore};
use re_log_types::{
component_types::InstanceKey,
datagen::{
Expand All @@ -25,51 +25,50 @@ fn main() {
];

for ent_path in &ent_paths {
let bundle1 = test_bundle!(ent_path @ [
build_frame_nr(1.into()), build_log_time(Time::now()),
] => [build_some_instances(2), build_some_rects(2)]);
store.insert_row(&bundle1).unwrap();
let row1 = test_row!(ent_path @ [
build_frame_nr(1.into()), build_log_time(Time::now()),
] => 2; [build_some_instances(2), build_some_rects(2)]);
store.insert_row(&row1).unwrap();
}

for ent_path in &ent_paths {
let bundle2 = test_bundle!(ent_path @ [
build_frame_nr(2.into())
] => [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&bundle2).unwrap();
let row2 = test_row!(ent_path @ [
build_frame_nr(2.into())
] => 2; [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&row2).unwrap();
// Insert timelessly too!
let bundle2 =
test_bundle!(ent_path @ [] => [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&bundle2).unwrap();
let row2 = test_row!(ent_path @ [] => 2; [build_some_instances(2), build_some_point2d(2)]);
store.insert_row(&row2).unwrap();

let bundle3 = test_bundle!(ent_path @ [
build_frame_nr(3.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&bundle3).unwrap();
let row3 = test_row!(ent_path @ [
build_frame_nr(3.into()), build_log_time(Time::now()),
] => 4; [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&row3).unwrap();
// Insert timelessly too!
let bundle3 = test_bundle!(ent_path @ [] => [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&bundle3).unwrap();
let row3 = test_row!(ent_path @ [] => 4; [build_some_instances_from(25..29), build_some_point2d(4)]);
store.insert_row(&row3).unwrap();
}

for ent_path in &ent_paths {
let bundle4_1 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(20..23), build_some_rects(3)]);
store.insert_row(&bundle4_1).unwrap();
let row4_1 = test_row!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => 3; [build_some_instances_from(20..23), build_some_rects(3)]);
store.insert_row(&row4_1).unwrap();

let bundle4_15 = test_bundle!(ent_path @ [
build_frame_nr(4.into()),
] => [build_some_instances_from(20..23), build_some_point2d(3)]);
store.insert_row(&bundle4_15).unwrap();
let row4_15 = test_row!(ent_path @ [
build_frame_nr(4.into()),
] => 3; [build_some_instances_from(20..23), build_some_point2d(3)]);
store.insert_row(&row4_15).unwrap();

let bundle4_2 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..28), build_some_rects(3)]);
store.insert_row(&bundle4_2).unwrap();
let row4_2 = test_row!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => 3; [build_some_instances_from(25..28), build_some_rects(3)]);
store.insert_row(&row4_2).unwrap();

let bundle4_25 = test_bundle!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => [build_some_instances_from(25..28), build_some_point2d(3)]);
store.insert_row(&bundle4_25).unwrap();
let row4_25 = test_row!(ent_path @ [
build_frame_nr(4.into()), build_log_time(Time::now()),
] => 3; [build_some_instances_from(25..28), build_some_point2d(3)]);
store.insert_row(&row4_25).unwrap();
}

let df = store.to_dataframe();
Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/examples/latest_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! ```

use re_arrow_store::polars_util::latest_component;
use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline};
use re_arrow_store::{test_row, DataStore, LatestAtQuery, TimeType, Timeline};
use re_log_types::component_types::Rect2D;
use re_log_types::datagen::build_some_rects;
use re_log_types::{
Expand All @@ -19,11 +19,11 @@ fn main() {

let ent_path = EntityPath::from("my/entity");

let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(2.into())] => 4; [build_some_rects(4)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(3.into())] => 2; [build_some_point2d(2)]);
store.insert_row(&row).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);

Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/examples/latest_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use polars_core::prelude::*;
use re_arrow_store::polars_util::latest_components;
use re_arrow_store::{test_bundle, DataStore, LatestAtQuery, TimeType, Timeline};
use re_arrow_store::{test_row, DataStore, LatestAtQuery, TimeType, Timeline};
use re_log_types::{
component_types::{InstanceKey, Point2D, Rect2D},
datagen::{build_frame_nr, build_some_point2d, build_some_rects},
Expand All @@ -18,11 +18,11 @@ fn main() {

let ent_path = EntityPath::from("my/entity");

let bundle = test_bundle!(ent_path @ [build_frame_nr(2.into())] => [build_some_rects(4)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(2.into())] => 4; [build_some_rects(4)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(3.into())] => [build_some_point2d(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(3.into())] => 2; [build_some_point2d(2)]);
store.insert_row(&row).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let df = latest_components(
Expand Down
30 changes: 15 additions & 15 deletions crates/re_arrow_store/examples/range_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! ```

use polars_core::prelude::JoinType;
use re_arrow_store::{polars_util, test_bundle, DataStore, RangeQuery, TimeRange};
use re_arrow_store::{polars_util, test_row, DataStore, RangeQuery, TimeRange};
use re_log_types::{
component_types::{InstanceKey, Point2D, Rect2D},
datagen::{build_frame_nr, build_some_point2d, build_some_rects},
Expand All @@ -22,26 +22,26 @@ fn main() {
let frame3 = 3.into();
let frame4 = 4.into();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame1)] => [build_some_rects(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame1)] => 2; [build_some_rects(2)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame2)] => [build_some_point2d(2)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame2)] => 2; [build_some_point2d(2)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame3)] => [build_some_point2d(4)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame3)] => 4; [build_some_point2d(4)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_rects(3)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(1)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 1; [build_some_point2d(1)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_rects(3)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_rects(3)]);
store.insert_row(&row).unwrap();

let bundle = test_bundle!(ent_path @ [build_frame_nr(frame4)] => [build_some_point2d(3)]);
store.insert_row(&bundle).unwrap();
let row = test_row!(ent_path @ [build_frame_nr(frame4)] => 3; [build_some_point2d(3)]);
store.insert_row(&row).unwrap();

let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let query = RangeQuery::new(timeline_frame_nr, TimeRange::new(2.into(), 4.into()));
Expand Down
2 changes: 2 additions & 0 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::{
PersistentIndexTable, RowIndex,
};

// TODO: all of this stuff should be defined by Data{Cell,Row,Table}, not the store

// ---

impl DataStore {
Expand Down

0 comments on commit e647f07

Please sign in to comment.