diff --git a/crates/re_arrow_store/src/store_dump.rs b/crates/re_arrow_store/src/store_dump.rs index d0909da1f392..7c29ae5315ca 100644 --- a/crates/re_arrow_store/src/store_dump.rs +++ b/crates/re_arrow_store/src/store_dump.rs @@ -1,7 +1,8 @@ use ahash::HashMapExt; +use arrow2::Either; use nohash_hasher::IntMap; use re_log_types::{ - DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeInt, TimeRange, + DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline, }; use crate::{ @@ -17,14 +18,16 @@ impl DataStore { // TODO(#1794): Implement simple recompaction. pub fn to_data_tables( &self, - time_filter: Option, + time_filter: Option<(Timeline, TimeRange)>, ) -> impl Iterator + '_ { crate::profile_function!(); - let time_filter = time_filter.unwrap_or_else(|| TimeRange::new(TimeInt::MIN, TimeInt::MAX)); - let timeless = self.dump_timeless_tables(); - let temporal = self.dump_temporal_tables(time_filter); + let temporal = if let Some(time_filter) = time_filter { + Either::Left(self.dump_temporal_tables_filtered(time_filter)) + } else { + Either::Right(self.dump_temporal_tables()) + }; timeless.chain(temporal) } @@ -60,11 +63,11 @@ impl DataStore { }) } - fn dump_temporal_tables(&self, time_filter: TimeRange) -> impl Iterator + '_ { - self.tables.values().flat_map(move |table| { + fn dump_temporal_tables(&self) -> impl Iterator + '_ { + self.tables.values().flat_map(|table| { crate::profile_function!(); - table.buckets.values().filter_map(move |bucket| { + table.buckets.values().map(move |bucket| { crate::profile_function!(); bucket.sort_indices_if_needed(); @@ -77,7 +80,7 @@ impl DataStore { let IndexedBucketInner { is_sorted, - time_range, + time_range: _, col_time, col_insert_id: _, col_row_id, @@ -87,55 +90,108 @@ impl DataStore { } = &*inner.read(); debug_assert!(is_sorted); - if time_range.min > time_filter.max || time_range.max < time_filter.min { - return None; + DataTable { + table_id: TableId::random(), + col_row_id: col_row_id.clone(), + col_timelines: [(*timeline, col_time.iter().copied().map(Some).collect())] + .into(), + col_entity_path: std::iter::repeat_with(|| table.ent_path.clone()) + .take(table.total_rows() as _) + .collect(), + col_num_instances: col_num_instances.clone(), + columns: columns.clone(), // shallow } + }) + }) + } - let col_row_id: RowIdVec = - filter_column(col_time, col_row_id.iter(), time_filter).collect(); + fn dump_temporal_tables_filtered( + &self, + (timeline_filter, time_filter): (Timeline, TimeRange), + ) -> impl Iterator + '_ { + self.tables + .values() + .filter_map(move |table| { + crate::profile_function!(); - // NOTE: Shouldn't ever happen due to check above, but better safe than sorry... - debug_assert!(!col_row_id.is_empty()); - if col_row_id.is_empty() { + if table.timeline != timeline_filter { return None; } - let col_timelines = [( - *timeline, - filter_column(col_time, col_time.iter(), time_filter) - .map(Some) - .collect(), - )] - .into(); - - let col_entity_path = std::iter::repeat_with(|| table.ent_path.clone()) - .take(col_row_id.len()) - .collect(); - - let col_num_instances = - filter_column(col_time, col_num_instances.iter(), time_filter).collect(); - - let mut columns2 = IntMap::with_capacity(columns.len()); - for (component, column) in columns { - let column = filter_column(col_time, column.iter(), time_filter).collect(); - columns2.insert(*component, DataCellColumn(column)); - } - - Some(DataTable { - // NOTE: The `TableId` is only used for debug purposes so this doesn't matter - // much... also we don't support saving to file on the web anyhow. - #[cfg(not(target_arch = "wasm32"))] - table_id: TableId::random(), - #[cfg(target_arch = "wasm32")] - table_id: TableId::ZERO, - col_row_id, - col_timelines, - col_entity_path, - col_num_instances, - columns: columns2, - }) + Some(table.buckets.values().filter_map(move |bucket| { + crate::profile_function!(); + + bucket.sort_indices_if_needed(); + + let IndexedBucket { + timeline, + cluster_key: _, + inner, + } = bucket; + + let IndexedBucketInner { + is_sorted, + time_range, + col_time, + col_insert_id: _, + col_row_id, + col_num_instances, + columns, + size_bytes: _, + } = &*inner.read(); + debug_assert!(is_sorted); + + if time_range.min > time_filter.max || time_range.max < time_filter.min { + return None; + } + + let col_row_id: RowIdVec = + filter_column(col_time, col_row_id.iter(), time_filter).collect(); + + // NOTE: Shouldn't ever happen due to check above, but better safe than + // sorry... + debug_assert!(!col_row_id.is_empty()); + if col_row_id.is_empty() { + return None; + } + + let col_timelines = [( + *timeline, + filter_column(col_time, col_time.iter(), time_filter) + .map(Some) + .collect(), + )] + .into(); + + let col_entity_path = std::iter::repeat_with(|| table.ent_path.clone()) + .take(col_row_id.len()) + .collect(); + + let col_num_instances = + filter_column(col_time, col_num_instances.iter(), time_filter).collect(); + + let mut columns2 = IntMap::with_capacity(columns.len()); + for (component, column) in columns { + let column = filter_column(col_time, column.iter(), time_filter).collect(); + columns2.insert(*component, DataCellColumn(column)); + } + + Some(DataTable { + // NOTE: The `TableId` is only used for debug purposes so this doesn't + // matter much... also we don't support saving to file on the web anyhow. + #[cfg(not(target_arch = "wasm32"))] + table_id: TableId::random(), + #[cfg(target_arch = "wasm32")] + table_id: TableId::ZERO, + col_row_id, + col_timelines, + col_entity_path, + col_num_instances, + columns: columns2, + }) + })) }) - }) + .flatten() } } diff --git a/crates/re_arrow_store/tests/dump.rs b/crates/re_arrow_store/tests/dump.rs index 3ad42c3da14b..8c059bf80fdc 100644 --- a/crates/re_arrow_store/tests/dump.rs +++ b/crates/re_arrow_store/tests/dump.rs @@ -3,13 +3,11 @@ use std::sync::atomic::{AtomicBool, Ordering}; use itertools::Itertools; -use re_arrow_store::{test_row, DataStore, DataStoreStats, TimeInt, TimeRange}; +use re_arrow_store::{test_row, DataStore, DataStoreStats, TimeInt, TimeRange, Timeline}; use re_log_types::{ component_types::InstanceKey, - datagen::{ - build_frame_nr, build_log_time, build_some_colors, build_some_instances, build_some_point2d, - }, - Component as _, DataTable, EntityPath, TableId, Time, + datagen::{build_frame_nr, build_some_colors, build_some_instances, build_some_point2d}, + Component as _, DataTable, EntityPath, TableId, }; // --- Dump --- @@ -31,40 +29,6 @@ fn data_store_dump() { } fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataStore) { - let frame1: TimeInt = 1.into(); - let frame2: TimeInt = 2.into(); - let frame3: TimeInt = 3.into(); - let frame4: TimeInt = 4.into(); - - let create_insert_table = |ent_path| { - let ent_path = EntityPath::from(ent_path); - - let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); - let row1 = test_row!(ent_path @ [ - build_log_time(Time::now()), build_frame_nr(frame1), - ] => 3; [instances1.clone(), colors1]); - - let points2 = build_some_point2d(3); - let row2 = test_row!(ent_path @ [ - build_log_time(Time::now()), build_frame_nr(frame2), - ] => 3; [instances1, points2]); - - let points3 = build_some_point2d(10); - let row3 = test_row!(ent_path @ [ - build_log_time(Time::now()), build_frame_nr(frame3), - ] => 10; [points3]); - - let colors4 = build_some_colors(5); - let row4 = test_row!(ent_path @ [ - build_log_time(Time::now()), build_frame_nr(frame4), - ] => 5; [colors4]); - - let mut table = DataTable::from_rows(TableId::random(), [row1, row2, row3, row4]); - table.compute_all_size_bytes(); - - table - }; - // helper to insert a table both as a temporal and timeless payload let insert_table = |store: &mut DataStore, table: &DataTable| { // insert temporal @@ -157,33 +121,11 @@ fn data_store_dump_filtered() { } fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore) { + let timeline_frame_nr = Timeline::new_sequence("frame_nr"); let frame1: TimeInt = 1.into(); let frame2: TimeInt = 2.into(); let frame3: TimeInt = 3.into(); - - let create_insert_table = |ent_path| { - let ent_path = EntityPath::from(ent_path); - - let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); - let row1 = test_row!(ent_path @ [ - build_frame_nr(frame1), - ] => 3; [instances1.clone(), colors1]); - - let points2 = build_some_point2d(3); - let row2 = test_row!(ent_path @ [ - build_frame_nr(frame2), - ] => 3; [instances1, points2]); - - let points3 = build_some_point2d(10); - let row3 = test_row!(ent_path @ [ - build_frame_nr(frame3), - ] => 10; [points3]); - - let mut table = DataTable::from_rows(TableId::random(), [row1, row2, row3]); - table.compute_all_size_bytes(); - - table - }; + let frame4: TimeInt = 4.into(); let ent_paths = ["this/that", "other", "yet/another/one"]; let tables = ent_paths @@ -202,15 +144,19 @@ fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore) } // Dump frame1 from the first store into the second one. - for table in store1.to_data_tables(TimeRange::new(frame1, frame1).into()) { + for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame1, frame1)).into()) { store2.insert_table(&table).unwrap(); } // Dump frame2 from the first store into the second one. - for table in store1.to_data_tables(TimeRange::new(frame2, frame2).into()) { + for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame2, frame2)).into()) { store2.insert_table(&table).unwrap(); } // Dump frame3 from the first store into the second one. - for table in store1.to_data_tables(TimeRange::new(frame3, frame3).into()) { + for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame3, frame3)).into()) { + store2.insert_table(&table).unwrap(); + } + // Dump frame4 from the first store into the second one. + for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame4, frame4)).into()) { store2.insert_table(&table).unwrap(); } if let err @ Err(_) = store2.sanity_check() { @@ -247,3 +193,37 @@ pub fn init_logs() { re_log::setup_native_logging(); } } + +fn create_insert_table(ent_path: impl Into) -> DataTable { + let ent_path = ent_path.into(); + + let frame1: TimeInt = 1.into(); + let frame2: TimeInt = 2.into(); + let frame3: TimeInt = 3.into(); + let frame4: TimeInt = 4.into(); + + let (instances1, colors1) = (build_some_instances(3), build_some_colors(3)); + let row1 = test_row!(ent_path @ [ + build_frame_nr(frame1), + ] => 3; [instances1.clone(), colors1]); + + let points2 = build_some_point2d(3); + let row2 = test_row!(ent_path @ [ + build_frame_nr(frame2), + ] => 3; [instances1, points2]); + + let points3 = build_some_point2d(10); + let row3 = test_row!(ent_path @ [ + build_frame_nr(frame3), + ] => 10; [points3]); + + let colors4 = build_some_colors(5); + let row4 = test_row!(ent_path @ [ + build_frame_nr(frame4), + ] => 5; [colors4]); + + let mut table = DataTable::from_rows(TableId::random(), [row1, row2, row3, row4]); + table.compute_all_size_bytes(); + + table +}