Skip to content

Commit

Permalink
Fix missing data for "bridged" range queries (#6177)
Browse files Browse the repository at this point in the history
A cheap fix for #5686, both in terms of added code and runtime
performance, as it only kicks in in very particular circumstances.
The test added in this PR explains the situation better than words ever
could.

The proper solution here would to implement proper bucketing in the
range cache (#4810), but that's a much bigger change that I don't want
to get into until after the upcoming drastic changes to the datastore.

- Fixes #5686
  • Loading branch information
teh-cmc committed May 3, 2024
1 parent 0ac38a4 commit a90b3a8
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 6 deletions.
5 changes: 0 additions & 5 deletions crates/re_query/src/latest_at/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ impl LatestAtCache {
std::collections::btree_map::Entry::Occupied(entry) => {
// Fastest path: we have an entry for this exact query time, no need to look any
// further.
re_log::trace!(query_time=?query.at(), "cache hit (query time)");
return Some(Arc::clone(entry.get()));
}
std::collections::btree_map::Entry::Vacant(entry) => entry,
Expand All @@ -229,8 +228,6 @@ impl LatestAtCache {
// Fast path: we've run the query and realized that we already have the data for the resulting
// _data_ time, so let's use that to avoid join & deserialization costs.
if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
re_log::trace!(query_time=?query.at(), ?data_time, "cache hit (data time)");

query_time_bucket_at_query_time.insert(Arc::clone(data_time_bucket_at_data_time));

// We now know for a fact that a query at that data time would yield the same
Expand Down Expand Up @@ -260,8 +257,6 @@ impl LatestAtCache {

// Slowest path: this is a complete cache miss.
{
re_log::trace!(query_time=?query.at(), ?data_time, "cache miss");

let query_time_bucket_at_query_time =
query_time_bucket_at_query_time.insert(Arc::clone(&bucket));

Expand Down
59 changes: 59 additions & 0 deletions crates/re_query/src/range/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use parking_lot::RwLock;

use re_data_store::LatestAtQuery;
use re_data_store::{DataStore, RangeQuery, TimeInt};
use re_log_types::{EntityPath, ResolvedTimeRange};
use re_types_core::ComponentName;
Expand Down Expand Up @@ -47,7 +48,65 @@ impl Caches {
};

let mut cache = cache.write();

// TODO(#4810): Get rid of this once we have proper bucketing in place.
//
// Detects the case where the user loads a piece of data at the end of the time range, then a piece
// at the beginning of the range, and finally a piece right in the middle.
//
// DATA = ###################################################
// | | | | \_____/
// \______/ | | query #1
// query #2 \_______/
// query #3
//
// and coarsly invalidates the whole cache in that case, to avoid the kind of bugs
// showcased in <https://github.com/rerun-io/rerun/issues/5686>.
{
let time_range = cache.per_data_time.read_recursive().time_range();
if let Some(time_range) = time_range {
{
let hole_start = time_range.max();
let hole_end =
TimeInt::new_temporal(query.range().min().as_i64().saturating_sub(1));
if hole_start < hole_end {
if let Some((data_time, _, _)) = store.latest_at(
&LatestAtQuery::new(query.timeline(), hole_end),
entity_path,
component_name,
&[component_name],
) {
if data_time > hole_start {
re_log::trace!(%entity_path, %component_name, "coarsely invalidated because of bridged queries");
cache.pending_invalidation = Some(TimeInt::MIN);
}
}
}
}

{
let hole_start = query.range().max();
let hole_end =
TimeInt::new_temporal(time_range.min().as_i64().saturating_sub(1));
if hole_start < hole_end {
if let Some((data_time, _, _)) = store.latest_at(
&LatestAtQuery::new(query.timeline(), hole_end),
entity_path,
component_name,
&[component_name],
) {
if data_time > hole_start {
re_log::trace!(%entity_path, %component_name, "coarsely invalidated because of bridged queries");
cache.pending_invalidation = Some(TimeInt::MIN);
}
}
}
}
}
}

cache.handle_pending_invalidation();

let cached = cache.range(store, query, entity_path, component_name);
results.add(component_name, cached);
}
Expand Down
101 changes: 100 additions & 1 deletion crates/re_query/tests/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use re_data_store::{DataStore, RangeQuery, ResolvedTimeRange, StoreSubscriber as
use re_log_types::{
build_frame_nr,
example_components::{MyColor, MyPoint, MyPoints},
DataRow, EntityPath, RowId, TimePoint,
DataRow, EntityPath, RowId, TimePoint, Timeline,
};
use re_query::{Caches, PromiseResolver, PromiseResult};
use re_types::Archetype;
Expand Down Expand Up @@ -272,6 +272,105 @@ fn static_range() {
);
}

// Test the case where the user loads a piece of data at the end of the time range, then a piece at
// the beginning of the range, and finally a piece right in the middle.
//
// DATA = ###################################################
// | | | | \_____/
// \______/ | | query #1
// query #2 \_______/
// query #3
//
// There is no data invalidation involved, which is what makes this case tricky: the cache must
// properly keep track of the fact that there are holes in the data -- on purpose.
#[test]
fn time_back_and_forth() {
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
Default::default(),
);
let mut caches = Caches::new(&store);

let entity_path: EntityPath = "point".into();

let (rows, points): (Vec<_>, Vec<_>) = (0..10)
.map(|i| {
let timepoint = [build_frame_nr(i)];
let points = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)];
let row = DataRow::from_cells1_sized(
RowId::new(),
entity_path.clone(),
timepoint,
points.clone(),
)
.unwrap();

insert_and_react(&mut store, &mut caches, &row);

(row, points)
})
.unzip();

// --- Query #1: `[8, 10]` ---

let query =
re_data_store::RangeQuery::new(Timeline::new_sequence("frame_nr"), TimeRange::new(8, 10));

let expected_points = &[
(
(TimeInt::new_temporal(8), rows[8].row_id()), //
points[8].as_slice(),
), //
(
(TimeInt::new_temporal(9), rows[9].row_id()), //
points[9].as_slice(),
), //
];
query_and_compare(&caches, &store, &query, &entity_path, expected_points, &[]);

// --- Query #2: `[1, 3]` ---

let query =
re_data_store::RangeQuery::new(Timeline::new_sequence("frame_nr"), TimeRange::new(1, 3));

let expected_points = &[
(
(TimeInt::new_temporal(1), rows[1].row_id()), //
points[1].as_slice(),
), //
(
(TimeInt::new_temporal(2), rows[2].row_id()), //
points[2].as_slice(),
), //
(
(TimeInt::new_temporal(3), rows[3].row_id()), //
points[3].as_slice(),
), //
];
query_and_compare(&caches, &store, &query, &entity_path, expected_points, &[]);

// --- Query #3: `[5, 7]` ---

let query =
re_data_store::RangeQuery::new(Timeline::new_sequence("frame_nr"), TimeRange::new(5, 7));

let expected_points = &[
(
(TimeInt::new_temporal(5), rows[5].row_id()), //
points[5].as_slice(),
), //
(
(TimeInt::new_temporal(6), rows[6].row_id()), //
points[6].as_slice(),
), //
(
(TimeInt::new_temporal(7), rows[7].row_id()), //
points[7].as_slice(),
), //
];
query_and_compare(&caches, &store, &query, &entity_path, expected_points, &[]);
}

#[test]
fn invalidation() {
let entity_path = "point";
Expand Down

0 comments on commit a90b3a8

Please sign in to comment.