Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RERUN_FLUSH_NUM_BYTES and data size estimations #5086

Merged
merged 10 commits into from
Feb 7, 2024
13 changes: 12 additions & 1 deletion crates/re_build_examples/src/rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,24 @@ pub struct Rrd {

#[argh(option, description = "include only examples in this channel")]
channel: Channel,

#[argh(option, description = "run only these examples")]
examples: Vec<String>,
}

impl Rrd {
pub fn run(self) -> anyhow::Result<()> {
create_dir_all(&self.output_dir)?;

let examples = self.channel.examples()?;
let examples = if self.examples.is_empty() {
self.channel.examples()?
} else {
Channel::Nightly
.examples()?
.into_iter()
.filter(|example| self.examples.contains(&example.name))
.collect()
};
let progress = MultiProgress::new();
let results: Vec<anyhow::Result<PathBuf>> = examples
.into_par_iter()
Expand Down
26 changes: 12 additions & 14 deletions crates/re_log_types/src/data_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,20 +626,18 @@ impl DataCell {
impl SizeBytes for DataCell {
#[inline]
fn heap_size_bytes(&self) -> u64 {
(self.inner.size_bytes > 0)
.then_some(self.inner.size_bytes)
.unwrap_or_else(|| {
// NOTE: Relying on unsized cells is always a mistake, but it isn't worth crashing
// the viewer when in release mode.
debug_assert!(
false,
"called `DataCell::heap_size_bytes() without computing it first"
);
re_log::warn_once!(
"called `DataCell::heap_size_bytes() without computing it first"
);
0
})
if 0 < self.inner.size_bytes {
self.inner.size_bytes
} else {
// NOTE: Relying on unsized cells is always a mistake, but it isn't worth crashing
// the viewer when in release mode.
debug_assert!(
false,
"called `DataCell::heap_size_bytes() without computing it first"
);
re_log::warn_once!("called `DataCell::heap_size_bytes() without computing it first");
0
}
}
}

Expand Down
41 changes: 27 additions & 14 deletions crates/re_log_types/src/data_table_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,19 @@ impl DataTableBatcherConfig {
}

if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_BYTES) {
new.flush_num_bytes = s
.parse()
.map_err(|err| DataTableBatcherError::ParseConfig {
name: Self::ENV_FLUSH_NUM_BYTES,
value: s.clone(),
err: Box::new(err),
})?;
if let Some(num_bytes) = re_format::parse_bytes(&s) {
// e.g. "10MB"
new.flush_num_bytes = num_bytes.unsigned_abs();
} else {
// Assume it's just an integer
new.flush_num_bytes =
s.parse()
.map_err(|err| DataTableBatcherError::ParseConfig {
name: Self::ENV_FLUSH_NUM_BYTES,
value: s.clone(),
err: Box::new(err),
})?;
}
}

if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_ROWS) {
Expand Down Expand Up @@ -437,23 +443,20 @@ fn batching_thread(
struct Accumulator {
latest: Instant,
pending_rows: Vec<DataRow>,
pending_num_rows: u64,
pending_num_bytes: u64,
}

impl Accumulator {
fn reset(&mut self) {
self.latest = Instant::now();
self.pending_rows.clear();
self.pending_num_rows = 0;
self.pending_num_bytes = 0;
}
}

let mut acc = Accumulator {
latest: Instant::now(),
pending_rows: Default::default(),
pending_num_rows: Default::default(),
pending_num_bytes: Default::default(),
};

Expand All @@ -462,7 +465,6 @@ fn batching_thread(
// it over the wire…
row.compute_all_size_bytes();

acc.pending_num_rows += 1;
acc.pending_num_bytes += row.total_size_bytes();
acc.pending_rows.push(row);
}
Expand All @@ -474,7 +476,11 @@ fn batching_thread(
return;
}

re_log::trace!(reason, "flushing tables");
re_log::trace!(
"Flushing {} rows and {} bytes. Reason: {reason}",
rows.len(),
re_format::format_bytes(acc.pending_num_bytes as _)
);

let table = DataTable::from_rows(TableId::new(), rows.drain(..));
// TODO(#1981): efficient table sorting here, following the same rules as the store's.
Expand All @@ -487,6 +493,13 @@ fn batching_thread(
acc.reset();
}

re_log::trace!(
"Flushing every: {:.2}s, {} rows, {}",
config.flush_tick.as_secs_f64(),
config.flush_num_rows,
re_format::format_bytes(config.flush_num_bytes as _),
);

use crossbeam::select;
loop {
select! {
Expand All @@ -505,7 +518,7 @@ fn batching_thread(
config(&acc.pending_rows);
}

if acc.pending_num_rows >= config.flush_num_rows {
if acc.pending_rows.len() as u64 >= config.flush_num_rows {
do_flush_all(&mut acc, &tx_table, "rows");
} else if acc.pending_num_bytes >= config.flush_num_bytes {
do_flush_all(&mut acc, &tx_table, "bytes");
Expand All @@ -519,7 +532,7 @@ fn batching_thread(
};
},
recv(rx_tick) -> _ => {
do_flush_all(&mut acc, &tx_table, "duration");
do_flush_all(&mut acc, &tx_table, "tick");
},
};
}
Expand Down
10 changes: 8 additions & 2 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,14 @@ impl RecordingStreamBuilder {
store_kind,
};

let batcher_config = batcher_config
.unwrap_or_else(|| DataTableBatcherConfig::from_env().unwrap_or_default());
let batcher_config =
batcher_config.unwrap_or_else(|| match DataTableBatcherConfig::from_env() {
Ok(config) => config,
Err(err) => {
re_log::error!("Failed to parse DataTableBatcherConfig from env: {}", err);
DataTableBatcherConfig::default()
}
});

(enabled, store_info, batcher_config)
}
Expand Down
17 changes: 13 additions & 4 deletions crates/re_types_core/src/size_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ impl<K: SizeBytes, V: SizeBytes, S> SizeBytes for HashMap<K, V, S> {
impl<T: SizeBytes, const N: usize> SizeBytes for [T; N] {
#[inline]
fn heap_size_bytes(&self) -> u64 {
0 // it's a const-sized array
if T::is_pod() {
0 // it's a const-sized array
} else {
self.iter().map(SizeBytes::heap_size_bytes).sum::<u64>()
}
}
}

Expand Down Expand Up @@ -131,10 +135,15 @@ impl<T: SizeBytes, const N: usize> SizeBytes for SmallVec<[T; N]> {
/// Does not take capacity into account.
#[inline]
fn heap_size_bytes(&self) -> u64 {
if self.len() < N {
if self.len() <= N {
// The `SmallVec` is still smaller than the threshold so no heap data has been
// allocated yet.
0
// allocated yet, beyond the heap data each element might have.

if T::is_pod() {
0 // early-out
} else {
self.iter().map(SizeBytes::heap_size_bytes).sum::<u64>()
}
} else {
// NOTE: It's all on the heap at this point.
if T::is_pod() {
Expand Down
16 changes: 9 additions & 7 deletions crates/rerun/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,17 +525,19 @@ impl PrintCommand {
} else {
table.compute_all_size_bytes();

let column_names = table
.columns
.keys()
.map(|name| name.short_name())
.collect_vec();
let column_names =
table.columns.keys().map(|name| name.short_name()).join(" ");

let entity_paths = if table.col_entity_path.len() == 1 {
format!("{:?}", table.col_entity_path[0])
} else {
format!("{} different entity paths", table.col_entity_path.len())
};

println!(
"Table with {} rows ({}). Columns: {:?}",
"Table with {} rows ({}) - {entity_paths} - columns: [{column_names}]",
table.num_rows(),
re_format::format_bytes(table.heap_size_bytes() as _),
column_names
);
}
}
Expand Down
2 changes: 2 additions & 0 deletions examples/python/nuscenes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ contains lidar data, radar data, color images, and labeled bounding boxes.
pip install -r examples/python/nuscenes/requirements.txt
python examples/python/nuscenes/main.py
```

Requires at least Python 3.9 to run.
6 changes: 3 additions & 3 deletions examples/python/plots/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ def log_trig() -> None:
rr.log("trig/sin", rr.SeriesLine(color=[255, 0, 0], name="sin(0.01t)"), timeless=True)
rr.log("trig/cos", rr.SeriesLine(color=[0, 255, 0], name="cos(0.01t)"), timeless=True)

for t in range(0, int(tau * 2 * 1000.0)):
for t in range(0, int(tau * 2 * 100.0)):
rr.set_time_sequence("frame_nr", t)

sin_of_t = sin(float(t) / 1000.0)
sin_of_t = sin(float(t) / 100.0)
rr.log("trig/sin", rr.Scalar(sin_of_t))

cos_of_t = cos(float(t) / 1000.0)
cos_of_t = cos(float(t) / 100.0)
rr.log("trig/cos", rr.Scalar(cos_of_t))


Expand Down
Loading