Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement recording/last-modified-at aware garbage collection #4183

Merged
merged 6 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/re_data_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ itertools.workspace = true
nohash-hasher.workspace = true
serde = { workspace = true, features = ["derive", "rc"], optional = true }
thiserror.workspace = true
web-time.workspace = true

[dev-dependencies]
re_log_encoding = { workspace = true, features = ["decoder", "encoder"] }
Expand Down
11 changes: 11 additions & 0 deletions crates/re_data_store/src/store_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ pub struct StoreDb {

/// Where we store the entities.
entity_db: EntityDb,

/// Keeps track of the last time data was inserted into this store (viewer wall-clock).
last_modified_at: web_time::Instant,
}

impl StoreDb {
Expand All @@ -327,6 +330,7 @@ impl StoreDb {
data_source: None,
set_store_info: None,
entity_db: Default::default(),
last_modified_at: web_time::Instant::now(),
}
}

Expand Down Expand Up @@ -408,6 +412,10 @@ impl StoreDb {
self.entity_db.data_store.generation()
}

pub fn last_modified_at(&self) -> web_time::Instant {
self.last_modified_at
}

pub fn is_empty(&self) -> bool {
self.set_store_info.is_none() && self.num_rows() == 0
}
Expand Down Expand Up @@ -437,6 +445,8 @@ impl StoreDb {
self.add_data_row(row?)?;
}

self.last_modified_at = web_time::Instant::now();

Ok(())
}

Expand Down Expand Up @@ -489,6 +499,7 @@ impl StoreDb {
data_source: _,
set_store_info: _,
entity_db,
last_modified_at: _,
} = self;

entity_db.purge(&deleted);
Expand Down
21 changes: 21 additions & 0 deletions crates/re_memory/src/memory_use.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,33 @@ pub struct MemoryUse {
}

impl MemoryUse {
#[inline]
pub fn capture() -> Self {
Self {
resident: bytes_resident(),
counted: crate::accounting_allocator::global_allocs().map(|c| c.size as _),
}
}

/// Bytes used by the application according to our best estimate.
///
/// This is either [`Self::counted`] if it's available, otherwise fallbacks to
/// [`Self::resident`] if that's available, otherwise `None`.
#[inline]
pub fn used(&self) -> Option<i64> {
self.counted.or(self.resident)
}
}

impl std::ops::Mul<f32> for MemoryUse {
type Output = Self;

fn mul(self, factor: f32) -> Self::Output {
Self {
resident: self.resident.map(|v| (v as f32 * factor) as i64),
counted: self.counted.map(|v| (v as f32 * factor) as i64),
}
}
}

impl std::ops::Sub for MemoryUse {
Expand Down
113 changes: 83 additions & 30 deletions crates/re_viewer/src/store_hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct StoreHub {
selected_rec_id: Option<StoreId>,
selected_application_id: Option<ApplicationId>,
blueprint_by_app_id: HashMap<ApplicationId, StoreId>,
store_dbs: StoreBundle,
store_bundle: StoreBundle,

/// Was a recording ever activated? Used by the heuristic controlling the welcome screen.
was_recording_active: bool,
Expand Down Expand Up @@ -59,7 +59,7 @@ impl StoreHub {
pub fn new() -> Self {
re_tracing::profile_function!();
let mut blueprint_by_app_id = HashMap::new();
let mut store_dbs = StoreBundle::default();
let mut store_bundle = StoreBundle::default();

let welcome_screen_store_id = StoreId::from_string(
StoreKind::Blueprint,
Expand All @@ -70,14 +70,14 @@ impl StoreHub {
welcome_screen_store_id.clone(),
);

let welcome_screen_blueprint = store_dbs.blueprint_entry(&welcome_screen_store_id);
let welcome_screen_blueprint = store_bundle.blueprint_entry(&welcome_screen_store_id);
crate::app_blueprint::setup_welcome_screen_blueprint(welcome_screen_blueprint);

Self {
selected_rec_id: None,
selected_application_id: None,
blueprint_by_app_id,
store_dbs,
store_bundle,

was_recording_active: false,

Expand All @@ -100,22 +100,22 @@ impl StoreHub {
// As long as we have a blueprint-id, create the blueprint.
blueprint_id
.as_ref()
.map(|id| self.store_dbs.blueprint_entry(id));
.map(|id| self.store_bundle.blueprint_entry(id));

// If we have a blueprint, we can return the `StoreContext`. In most
// cases it should have already existed or been created above.
blueprint_id
.and_then(|id| self.store_dbs.blueprint(id))
.and_then(|id| self.store_bundle.blueprint(id))
.map(|blueprint| {
let recording = self
.selected_rec_id
.as_ref()
.and_then(|id| self.store_dbs.recording(id));
.and_then(|id| self.store_bundle.recording(id));

StoreContext {
blueprint,
recording,
all_recordings: self.store_dbs.recordings().collect_vec(),
all_recordings: self.store_bundle.recordings().collect_vec(),
}
})
}
Expand All @@ -132,7 +132,7 @@ impl StoreHub {
pub fn set_recording_id(&mut self, recording_id: StoreId) {
// If this recording corresponds to an app that we know about, then update the app-id.
if let Some(app_id) = self
.store_dbs
.store_bundle
.recording(&recording_id)
.as_ref()
.and_then(|recording| recording.app_id())
Expand All @@ -146,15 +146,15 @@ impl StoreHub {

pub fn remove_recording_id(&mut self, recording_id: &StoreId) {
if self.selected_rec_id.as_ref() == Some(recording_id) {
if let Some(new_selection) = self.store_dbs.find_closest_recording(recording_id) {
if let Some(new_selection) = self.store_bundle.find_closest_recording(recording_id) {
self.set_recording_id(new_selection.clone());
} else {
self.selected_application_id = None;
self.selected_rec_id = None;
}
}

self.store_dbs.remove(recording_id);
self.store_bundle.remove(recording_id);
}

/// Change the selected [`ApplicationId`]
Expand Down Expand Up @@ -187,7 +187,7 @@ impl StoreHub {
pub fn clear_blueprint(&mut self) {
if let Some(app_id) = &self.selected_application_id {
if let Some(blueprint_id) = self.blueprint_by_app_id.remove(app_id) {
self.store_dbs.remove(&blueprint_id);
self.store_bundle.remove(&blueprint_id);
}
}
}
Expand All @@ -197,34 +197,82 @@ impl StoreHub {
/// Note that the recording is not automatically made active. Use [`StoreHub::set_recording_id`]
/// if needed.
pub fn insert_recording(&mut self, store_db: StoreDb) {
self.store_dbs.insert_recording(store_db);
self.store_bundle.insert_recording(store_db);
}

/// Mutable access to a [`StoreDb`] by id
pub fn store_db_mut(&mut self, store_id: &StoreId) -> &mut StoreDb {
self.store_dbs.store_db_entry(store_id)
self.store_bundle.store_db_entry(store_id)
}

/// Remove any empty [`StoreDb`]s from the hub
pub fn purge_empty(&mut self) {
self.store_dbs.purge_empty();
self.store_bundle.purge_empty();
}

/// Call [`StoreDb::purge_fraction_of_ram`] on every recording
//
// NOTE: If you touch any of this, make sure to play around with our GC stress test scripts
// available under `$WORKSPACE_ROOT/tests/python/gc_stress`.
pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) {
self.store_dbs.purge_fraction_of_ram(fraction_to_purge);
re_tracing::profile_function!();

let Some(store_id) = self.store_bundle.find_oldest_modified_recording().cloned() else {
return;
};

let store_dbs = &mut self.store_bundle.store_dbs;

let Some(store_db) = store_dbs.get_mut(&store_id) else {
if cfg!(debug_assertions) {
unreachable!();
}
return; // unreachable
};

let store_size_before =
store_db.store().timeless_size_bytes() + store_db.store().temporal_size_bytes();
store_db.purge_fraction_of_ram(fraction_to_purge);
let store_size_after =
store_db.store().timeless_size_bytes() + store_db.store().temporal_size_bytes();

// No point keeping an empty recording around.
if store_db.is_empty() {
self.remove_recording_id(&store_id);
return;
}

// Running the GC didn't do anything.
//
// That's because all that's left in that store is protected rows: it's time to remove it
// entirely, unless it's the last recording still standing, in which case we're better off
// keeping some data around to show the user rather than a blank screen.
//
// If the user needs the memory for something else, they will get it back as soon as they
// log new things anyhow.
if store_size_before == store_size_after && store_dbs.len() > 1 {
self.remove_recording_id(&store_id);
}

// Either we've reached our target goal or we couldn't fetch memory stats, in which case
// we still consider that we're done anyhow.

// NOTE: It'd be tempting to loop through recordings here, as long as we haven't reached
// our actual target goal.
// We cannot do that though: there are other subsystems that need to release memory before
// we can get an accurate reading of the current memory used and decide if we should go on.
}

/// Directly access the [`StoreDb`] for the selected recording
pub fn current_recording(&self) -> Option<&StoreDb> {
self.selected_rec_id
.as_ref()
.and_then(|id| self.store_dbs.recording(id))
.and_then(|id| self.store_bundle.recording(id))
}

/// Check whether the [`StoreHub`] contains the referenced recording
pub fn contains_recording(&self, id: &StoreId) -> bool {
self.store_dbs.contains_recording(id)
self.store_bundle.contains_recording(id)
}

/// Persist any in-use blueprints to durable storage.
Expand All @@ -237,7 +285,7 @@ impl StoreHub {
// there may be other Blueprints in the Hub.

for (app_id, blueprint_id) in &self.blueprint_by_app_id {
if let Some(blueprint) = self.store_dbs.blueprint_mut(blueprint_id) {
if let Some(blueprint) = self.store_bundle.blueprint_mut(blueprint_id) {
if self.blueprint_last_save.get(blueprint_id) != Some(&blueprint.generation()) {
blueprint.gc_everything_but_the_latest_row();

Expand Down Expand Up @@ -291,7 +339,7 @@ impl StoreHub {
.insert(app_id.clone(), store.store_id().clone());
self.blueprint_last_save
.insert(store.store_id().clone(), store.generation());
self.store_dbs.insert_blueprint(store);
self.store_bundle.insert_blueprint(store);
} else {
anyhow::bail!(
"Found unexpected store while loading blueprint: {:?}",
Expand All @@ -313,7 +361,7 @@ impl StoreHub {
.selected_application_id
.as_ref()
.and_then(|app_id| self.blueprint_by_app_id.get(app_id))
.and_then(|blueprint_id| self.store_dbs.blueprint(blueprint_id));
.and_then(|blueprint_id| self.store_bundle.blueprint(blueprint_id));

let blueprint_stats = blueprint
.map(|store_db| DataStoreStats::from_store(store_db.store()))
Expand All @@ -326,7 +374,7 @@ impl StoreHub {
let recording = self
.selected_rec_id
.as_ref()
.and_then(|rec_id| self.store_dbs.recording(rec_id));
.and_then(|rec_id| self.store_bundle.recording(rec_id));

let recording_stats = recording
.map(|store_db| DataStoreStats::from_store(store_db.store()))
Expand Down Expand Up @@ -433,6 +481,19 @@ impl StoreBundle {
}
}

/// Returns the [`StoreId`] of the oldest modified recording, according to [`StoreDb::last_modified_at`].
pub fn find_oldest_modified_recording(&self) -> Option<&StoreId> {
let mut store_dbs = self
.store_dbs
.values()
.filter(|db| db.store_kind() == StoreKind::Recording)
.collect_vec();

store_dbs.sort_by_key(|db| db.last_modified_at());

store_dbs.first().map(|db| db.store_id())
}

// --

pub fn contains_recording(&self, id: &StoreId) -> bool {
Expand Down Expand Up @@ -530,14 +591,6 @@ impl StoreBundle {
self.store_dbs.retain(|_, store_db| !store_db.is_empty());
}

pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) {
re_tracing::profile_function!();

for store_db in self.store_dbs.values_mut() {
store_db.purge_fraction_of_ram(fraction_to_purge);
}
}

pub fn drain_store_dbs(&mut self) -> impl Iterator<Item = StoreDb> + '_ {
self.store_dbs.drain().map(|(_, store)| store)
}
Expand Down
26 changes: 26 additions & 0 deletions tests/python/gc_stress/many_large_many_rows_recordings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""
Stress test for cross-recording garbage collection.

Logs many large recordings that contain a lot of large rows.

Usage:
- Start a Rerun Viewer in release mode with 2GiB of memory limit:
`cargo r -p rerun-cli --release --no-default-features --features native_viewer -- --memory-limit 2GiB`
- Open the memory panel to see what's going on.
- Run this script.
- You should see recordings coming in and going out in a ringbuffer-like rolling fashion.
"""
from __future__ import annotations

import rerun as rr
from numpy.random import default_rng

rng = default_rng(12345)

for i in range(0, 20000000):
rr.init("rerun_example_recording_gc", recording_id=f"image-rec-{i}", spawn=True)
for j in range(0, 10000):
positions = rng.uniform(-5, 5, size=[10000000, 3])
colors = rng.uniform(0, 255, size=[10000000, 3])
radii = rng.uniform(0, 1, size=[10000000])
rr.log("points", rr.Points3D(positions, colors=colors, radii=radii))
Loading
Loading