Skip to content

Commit

Permalink
Primary caching 14: don't bake LatestAt(T-1) results into low-level…
Browse files Browse the repository at this point in the history
… range queries (#4793)

Our low-level range APIs used to bake the latest-at results at
`range.min - 1` into the range results, which is a big problem in a
multi tenant setting because `range(1, 10)` vs. `latestat(1) + range(2,
10)` are two completely different things.

Side-effect: a plot with a window of len 1 now behaves as expected:



https://github.com/rerun-io/rerun/assets/2910679/957ac367-35a6-4bea-9f40-59d51c556639

---

Part of the primary caching series of PR (index search, joins,
deserialization):
- #4592
- #4593
- #4659
- #4680 
- #4681
- #4698
- #4711
- #4712
- #4721 
- #4726 
- #4773
- #4784
- #4785
- #4793
- #4800
  • Loading branch information
teh-cmc committed Jan 15, 2024
1 parent bdae240 commit dc8cf2d
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 393 deletions.
2 changes: 1 addition & 1 deletion crates/re_data_store/examples/range_components.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Demonstrates usage of [`re_data_store::polars_util::range_components`].
//!
//! ```text
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_data_store --example range_components
//! POLARS_FMT_MAX_ROWS=100 cargo r -p re_data_store --all-features --example range_components
//! ```

use polars_core::prelude::JoinType;
Expand Down
51 changes: 9 additions & 42 deletions crates/re_data_store/src/polars_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ pub fn latest_components(
/// Iterates over the rows of any number of components and their respective cluster keys, all from
/// the single point-of-view of the `primary` component, returning an iterator of `DataFrame`s.
///
/// An initial dataframe is yielded with the latest-at state at the start of the time range, if
/// there is any.
///
/// The iterator only ever yields dataframes iff the `primary` component has changed.
/// A change affecting only secondary components will not yield a dataframe.
///
Expand Down Expand Up @@ -126,51 +123,21 @@ pub fn range_components<'a, const N: usize>(

let mut state = None;

// NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far
// into the past don't have a latest-at state!
let latest_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

let mut df_latest = None;
if let Some(latest_time) = latest_time {
let df = latest_components(
store,
&LatestAtQuery::new(query.timeline, latest_time),
ent_path,
&components,
join_type,
);

if df.as_ref().map_or(false, |df| {
// We only care about the initial state if it A) isn't empty and B) contains any data
// at all for the primary component.
!df.is_empty() && df.column(primary.as_ref()).is_ok()
}) {
df_latest = Some(df);
}
}

let primary_col = components
.iter()
.find_position(|component| **component == primary)
.map(|(col, _)| col)
.unwrap(); // asserted on entry

// send the latest-at state before anything else
df_latest
.into_iter()
.map(move |df| (latest_time, true, df))
// followed by the range
.chain(
store
.range(query, ent_path, components)
.map(move |(time, _, cells)| {
(
time,
cells[primary_col].is_some(), // is_primary
dataframe_from_cells(&cells),
)
}),
)
store
.range(query, ent_path, components)
.map(move |(time, _, cells)| {
(
time,
cells[primary_col].is_some(), // is_primary
dataframe_from_cells(&cells),
)
})
.filter_map(move |(time, is_primary, df)| {
state = Some(join_dataframes(
cluster_key,
Expand Down
109 changes: 22 additions & 87 deletions crates/re_data_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,6 @@ fn range_impl(store: &mut DataStore) {

let ent_path = EntityPath::from("this/that");

let frame0 = TimeInt::from(0);
let frame1 = TimeInt::from(1);
let frame2 = TimeInt::from(2);
let frame3 = TimeInt::from(3);
Expand Down Expand Up @@ -554,61 +553,40 @@ fn range_impl(store: &mut DataStore) {

// Unit ranges (Color's PoV)

// NOTE: Check out [1] to see what the results would've looked like with latest-at semantics at
// T-1 baked in (like we used to do).
//
// [1]: <https://github.com/rerun-io/rerun/blob/790f391/crates/re_data_store/tests/data_store.rs#L555-L837>

assert_range_components(
TimeRange::new(frame1, frame1),
[Color::name(), Position2D::name()],
&[
(
Some(frame0),
&[(Color::name(), &row4_3), (Position2D::name(), &row4_4)],
), // timeless
(
Some(frame1),
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
),
],
&[(
Some(frame1),
&[(Color::name(), &row1)], //
)],
);
assert_range_components(
TimeRange::new(frame2, frame2),
[Color::name(), Position2D::name()],
&[
(
Some(frame1),
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
), //
],
&[],
);
assert_range_components(
TimeRange::new(frame3, frame3),
[Color::name(), Position2D::name()],
&[
(
Some(frame2),
&[(Color::name(), &row1), (Position2D::name(), &row2)],
), //
],
&[],
);
assert_range_components(
TimeRange::new(frame4, frame4),
[Color::name(), Position2D::name()],
&[
(
Some(frame3),
&[(Color::name(), &row1), (Position2D::name(), &row3)],
),
(
Some(frame4),
&[(Color::name(), &row4_1), (Position2D::name(), &row3)],
&[(Color::name(), &row4_1)], //
),
(
Some(frame4),
&[(Color::name(), &row4_2), (Position2D::name(), &row3)],
&[(Color::name(), &row4_2)], //
),
(
Some(frame4),
Expand All @@ -619,65 +597,38 @@ fn range_impl(store: &mut DataStore) {
assert_range_components(
TimeRange::new(frame5, frame5),
[Color::name(), Position2D::name()],
&[
(
Some(frame4),
&[(Color::name(), &row4_3), (Position2D::name(), &row4_4)], // !!!
), //
],
&[],
);

// Unit ranges (Position2D's PoV)

assert_range_components(
TimeRange::new(frame1, frame1),
[Position2D::name(), Color::name()],
&[
(
Some(frame0),
&[(Position2D::name(), &row4_4), (Color::name(), &row4_3)],
), // timeless
],
&[],
);
assert_range_components(
TimeRange::new(frame2, frame2),
[Position2D::name(), Color::name()],
&[
(
Some(frame1),
&[
(Position2D::name(), &row4_4), // timeless
(Color::name(), &row1),
],
),
(
Some(frame2),
&[(Position2D::name(), &row2), (Color::name(), &row1)],
&[(Position2D::name(), &row2)], //
), //
],
);
assert_range_components(
TimeRange::new(frame3, frame3),
[Position2D::name(), Color::name()],
&[
(
Some(frame2),
&[(Position2D::name(), &row2), (Color::name(), &row1)],
),
(
Some(frame3),
&[(Position2D::name(), &row3), (Color::name(), &row1)],
),
],
&[(
Some(frame3),
&[(Position2D::name(), &row3)], //
)],
);
assert_range_components(
TimeRange::new(frame4, frame4),
[Position2D::name(), Color::name()],
&[
(
Some(frame3),
&[(Position2D::name(), &row3), (Color::name(), &row1)],
),
(
Some(frame4),
&[(Position2D::name(), &row4_25), (Color::name(), &row4_2)],
Expand All @@ -691,12 +642,7 @@ fn range_impl(store: &mut DataStore) {
assert_range_components(
TimeRange::new(frame5, frame5),
[Position2D::name(), Color::name()],
&[
(
Some(frame4),
&[(Position2D::name(), &row4_4), (Color::name(), &row4_3)],
), //
],
&[],
);

// Full range (Color's PoV)
Expand All @@ -705,16 +651,9 @@ fn range_impl(store: &mut DataStore) {
TimeRange::new(frame1, frame5),
[Color::name(), Position2D::name()],
&[
(
Some(frame0),
&[(Color::name(), &row4_3), (Position2D::name(), &row4_4)],
), // timeless
(
Some(frame1),
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
&[(Color::name(), &row1)], //
),
(
Some(frame4),
Expand All @@ -737,10 +676,6 @@ fn range_impl(store: &mut DataStore) {
TimeRange::new(frame1, frame5),
[Position2D::name(), Color::name()],
&[
(
Some(frame0),
&[(Position2D::name(), &row4_4), (Color::name(), &row4_3)],
), // timeless
(
Some(frame2),
&[(Position2D::name(), &row2), (Color::name(), &row1)],
Expand Down
80 changes: 24 additions & 56 deletions crates/re_query/src/range.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use itertools::Itertools as _;
use re_data_store::{DataStore, LatestAtQuery, RangeQuery};
use re_data_store::{DataStore, RangeQuery};
use re_log_types::EntityPath;
use re_types_core::{Archetype, ComponentName};

use crate::{get_component_with_instances, ArchetypeView, ComponentWithInstances};
use crate::{ArchetypeView, ComponentWithInstances};

// ---

Expand Down Expand Up @@ -61,61 +61,29 @@ pub fn range_archetype<'a, A: Archetype + 'a, const N: usize>(
.take(components.len())
.collect();

// NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far
// into the past don't have a latest-at state!
let query_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

let mut cwis_latest = None;
if let Some(query_time) = query_time {
let mut cwis_latest_raw: Vec<_> = std::iter::repeat_with(|| None)
.take(components.len())
.collect();

// Fetch the latest data for every single component from their respective point-of-views,
// this will allow us to build up the initial state and send an initial latest-at
// entity-view if needed.
for (i, primary) in components.iter().enumerate() {
cwis_latest_raw[i] = get_component_with_instances(
store,
&LatestAtQuery::new(query.timeline, query_time),
ent_path,
*primary,
)
.map(|(_, row_id, cwi)| (row_id, cwi));
}

if cwis_latest_raw[primary_col].is_some() {
cwis_latest = Some(cwis_latest_raw);
}
}

// send the latest-at state before anything else
cwis_latest
.into_iter()
.map(move |cwis| (query_time, true, cwis))
.chain(store.range(query, ent_path, components).map(
move |(data_time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
})
store
.range(query, ent_path, components)
.map(move |(data_time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
})
.collect::<Vec<_>>();
(data_time, is_primary, cwis)
},
))
})
.collect::<Vec<_>>();
(data_time, is_primary, cwis)
})
.filter_map(move |(data_time, is_primary, cwis)| {
for (i, cwi) in cwis
.into_iter()
Expand Down
Loading

0 comments on commit dc8cf2d

Please sign in to comment.