Skip to content

Commit

Permalink
riiiight, kinda forgot about the whole timeline thing
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 6, 2023
1 parent 49766ab commit e3672e9
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 117 deletions.
158 changes: 107 additions & 51 deletions crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use ahash::HashMapExt;
use arrow2::Either;
use nohash_hasher::IntMap;
use re_log_types::{
DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeInt, TimeRange,
DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline,
};

use crate::{
Expand All @@ -17,14 +18,16 @@ impl DataStore {
// TODO(#1794): Implement simple recompaction.
pub fn to_data_tables(
&self,
time_filter: Option<TimeRange>,
time_filter: Option<(Timeline, TimeRange)>,
) -> impl Iterator<Item = DataTable> + '_ {
crate::profile_function!();

let time_filter = time_filter.unwrap_or_else(|| TimeRange::new(TimeInt::MIN, TimeInt::MAX));

let timeless = self.dump_timeless_tables();
let temporal = self.dump_temporal_tables(time_filter);
let temporal = if let Some(time_filter) = time_filter {
Either::Left(self.dump_temporal_tables_filtered(time_filter))
} else {
Either::Right(self.dump_temporal_tables())
};

timeless.chain(temporal)
}
Expand Down Expand Up @@ -60,11 +63,11 @@ impl DataStore {
})
}

fn dump_temporal_tables(&self, time_filter: TimeRange) -> impl Iterator<Item = DataTable> + '_ {
self.tables.values().flat_map(move |table| {
fn dump_temporal_tables(&self) -> impl Iterator<Item = DataTable> + '_ {
self.tables.values().flat_map(|table| {
crate::profile_function!();

table.buckets.values().filter_map(move |bucket| {
table.buckets.values().map(move |bucket| {
crate::profile_function!();

bucket.sort_indices_if_needed();
Expand All @@ -77,7 +80,7 @@ impl DataStore {

let IndexedBucketInner {
is_sorted,
time_range,
time_range: _,
col_time,
col_insert_id: _,
col_row_id,
Expand All @@ -87,55 +90,108 @@ impl DataStore {
} = &*inner.read();
debug_assert!(is_sorted);

if time_range.min > time_filter.max || time_range.max < time_filter.min {
return None;
DataTable {
table_id: TableId::random(),
col_row_id: col_row_id.clone(),
col_timelines: [(*timeline, col_time.iter().copied().map(Some).collect())]
.into(),
col_entity_path: std::iter::repeat_with(|| table.ent_path.clone())
.take(table.total_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone(), // shallow
}
})
})
}

let col_row_id: RowIdVec =
filter_column(col_time, col_row_id.iter(), time_filter).collect();
fn dump_temporal_tables_filtered(
&self,
(timeline_filter, time_filter): (Timeline, TimeRange),
) -> impl Iterator<Item = DataTable> + '_ {
self.tables
.values()
.filter_map(move |table| {
crate::profile_function!();

// NOTE: Shouldn't ever happen due to check above, but better safe than sorry...
debug_assert!(!col_row_id.is_empty());
if col_row_id.is_empty() {
if table.timeline != timeline_filter {
return None;
}

let col_timelines = [(
*timeline,
filter_column(col_time, col_time.iter(), time_filter)
.map(Some)
.collect(),
)]
.into();

let col_entity_path = std::iter::repeat_with(|| table.ent_path.clone())
.take(col_row_id.len())
.collect();

let col_num_instances =
filter_column(col_time, col_num_instances.iter(), time_filter).collect();

let mut columns2 = IntMap::with_capacity(columns.len());
for (component, column) in columns {
let column = filter_column(col_time, column.iter(), time_filter).collect();
columns2.insert(*component, DataCellColumn(column));
}

Some(DataTable {
// NOTE: The `TableId` is only used for debug purposes so this doesn't matter
// much... also we don't support saving to file on the web anyhow.
#[cfg(not(target_arch = "wasm32"))]
table_id: TableId::random(),
#[cfg(target_arch = "wasm32")]
table_id: TableId::ZERO,
col_row_id,
col_timelines,
col_entity_path,
col_num_instances,
columns: columns2,
})
Some(table.buckets.values().filter_map(move |bucket| {
crate::profile_function!();

bucket.sort_indices_if_needed();

let IndexedBucket {
timeline,
cluster_key: _,
inner,
} = bucket;

let IndexedBucketInner {
is_sorted,
time_range,
col_time,
col_insert_id: _,
col_row_id,
col_num_instances,
columns,
size_bytes: _,
} = &*inner.read();
debug_assert!(is_sorted);

if time_range.min > time_filter.max || time_range.max < time_filter.min {
return None;
}

let col_row_id: RowIdVec =
filter_column(col_time, col_row_id.iter(), time_filter).collect();

// NOTE: Shouldn't ever happen due to check above, but better safe than
// sorry...
debug_assert!(!col_row_id.is_empty());
if col_row_id.is_empty() {
return None;
}

let col_timelines = [(
*timeline,
filter_column(col_time, col_time.iter(), time_filter)
.map(Some)
.collect(),
)]
.into();

let col_entity_path = std::iter::repeat_with(|| table.ent_path.clone())
.take(col_row_id.len())
.collect();

let col_num_instances =
filter_column(col_time, col_num_instances.iter(), time_filter).collect();

let mut columns2 = IntMap::with_capacity(columns.len());
for (component, column) in columns {
let column = filter_column(col_time, column.iter(), time_filter).collect();
columns2.insert(*component, DataCellColumn(column));
}

Some(DataTable {
// NOTE: The `TableId` is only used for debug purposes so this doesn't
// matter much... also we don't support saving to file on the web anyhow.
#[cfg(not(target_arch = "wasm32"))]
table_id: TableId::random(),
#[cfg(target_arch = "wasm32")]
table_id: TableId::ZERO,
col_row_id,
col_timelines,
col_entity_path,
col_num_instances,
columns: columns2,
})
}))
})
})
.flatten()
}
}

Expand Down
112 changes: 46 additions & 66 deletions crates/re_arrow_store/tests/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
use std::sync::atomic::{AtomicBool, Ordering};

use itertools::Itertools;
use re_arrow_store::{test_row, DataStore, DataStoreStats, TimeInt, TimeRange};
use re_arrow_store::{test_row, DataStore, DataStoreStats, TimeInt, TimeRange, Timeline};
use re_log_types::{
component_types::InstanceKey,
datagen::{
build_frame_nr, build_log_time, build_some_colors, build_some_instances, build_some_point2d,
},
Component as _, DataTable, EntityPath, TableId, Time,
datagen::{build_frame_nr, build_some_colors, build_some_instances, build_some_point2d},
Component as _, DataTable, EntityPath, TableId,
};

// --- Dump ---
Expand All @@ -31,40 +29,6 @@ fn data_store_dump() {
}

fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataStore) {
let frame1: TimeInt = 1.into();
let frame2: TimeInt = 2.into();
let frame3: TimeInt = 3.into();
let frame4: TimeInt = 4.into();

let create_insert_table = |ent_path| {
let ent_path = EntityPath::from(ent_path);

let (instances1, colors1) = (build_some_instances(3), build_some_colors(3));
let row1 = test_row!(ent_path @ [
build_log_time(Time::now()), build_frame_nr(frame1),
] => 3; [instances1.clone(), colors1]);

let points2 = build_some_point2d(3);
let row2 = test_row!(ent_path @ [
build_log_time(Time::now()), build_frame_nr(frame2),
] => 3; [instances1, points2]);

let points3 = build_some_point2d(10);
let row3 = test_row!(ent_path @ [
build_log_time(Time::now()), build_frame_nr(frame3),
] => 10; [points3]);

let colors4 = build_some_colors(5);
let row4 = test_row!(ent_path @ [
build_log_time(Time::now()), build_frame_nr(frame4),
] => 5; [colors4]);

let mut table = DataTable::from_rows(TableId::random(), [row1, row2, row3, row4]);
table.compute_all_size_bytes();

table
};

// helper to insert a table both as a temporal and timeless payload
let insert_table = |store: &mut DataStore, table: &DataTable| {
// insert temporal
Expand Down Expand Up @@ -157,33 +121,11 @@ fn data_store_dump_filtered() {
}

fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore) {
let timeline_frame_nr = Timeline::new_sequence("frame_nr");
let frame1: TimeInt = 1.into();
let frame2: TimeInt = 2.into();
let frame3: TimeInt = 3.into();

let create_insert_table = |ent_path| {
let ent_path = EntityPath::from(ent_path);

let (instances1, colors1) = (build_some_instances(3), build_some_colors(3));
let row1 = test_row!(ent_path @ [
build_frame_nr(frame1),
] => 3; [instances1.clone(), colors1]);

let points2 = build_some_point2d(3);
let row2 = test_row!(ent_path @ [
build_frame_nr(frame2),
] => 3; [instances1, points2]);

let points3 = build_some_point2d(10);
let row3 = test_row!(ent_path @ [
build_frame_nr(frame3),
] => 10; [points3]);

let mut table = DataTable::from_rows(TableId::random(), [row1, row2, row3]);
table.compute_all_size_bytes();

table
};
let frame4: TimeInt = 4.into();

let ent_paths = ["this/that", "other", "yet/another/one"];
let tables = ent_paths
Expand All @@ -202,15 +144,19 @@ fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore)
}

// Dump frame1 from the first store into the second one.
for table in store1.to_data_tables(TimeRange::new(frame1, frame1).into()) {
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame1, frame1)).into()) {
store2.insert_table(&table).unwrap();
}
// Dump frame2 from the first store into the second one.
for table in store1.to_data_tables(TimeRange::new(frame2, frame2).into()) {
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame2, frame2)).into()) {
store2.insert_table(&table).unwrap();
}
// Dump frame3 from the first store into the second one.
for table in store1.to_data_tables(TimeRange::new(frame3, frame3).into()) {
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame3, frame3)).into()) {
store2.insert_table(&table).unwrap();
}
// Dump frame4 from the first store into the second one.
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame4, frame4)).into()) {
store2.insert_table(&table).unwrap();
}
if let err @ Err(_) = store2.sanity_check() {
Expand Down Expand Up @@ -247,3 +193,37 @@ pub fn init_logs() {
re_log::setup_native_logging();
}
}

fn create_insert_table(ent_path: impl Into<EntityPath>) -> DataTable {
let ent_path = ent_path.into();

let frame1: TimeInt = 1.into();
let frame2: TimeInt = 2.into();
let frame3: TimeInt = 3.into();
let frame4: TimeInt = 4.into();

let (instances1, colors1) = (build_some_instances(3), build_some_colors(3));
let row1 = test_row!(ent_path @ [
build_frame_nr(frame1),
] => 3; [instances1.clone(), colors1]);

let points2 = build_some_point2d(3);
let row2 = test_row!(ent_path @ [
build_frame_nr(frame2),
] => 3; [instances1, points2]);

let points3 = build_some_point2d(10);
let row3 = test_row!(ent_path @ [
build_frame_nr(frame3),
] => 10; [points3]);

let colors4 = build_some_colors(5);
let row4 = test_row!(ent_path @ [
build_frame_nr(frame4),
] => 5; [colors4]);

let mut table = DataTable::from_rows(TableId::random(), [row1, row2, row3, row4]);
table.compute_all_size_bytes();

table
}

0 comments on commit e3672e9

Please sign in to comment.