Skip to content

Commit

Permalink
Simpler, sturdier stateful time tracking in both SDKs (#2506)
Browse files Browse the repository at this point in the history
(Probably easier to review commit by commit)

Follow up to my discussion with @abey79 regarding his poor experience
with time tracking, which was summarized in
#2501 (comment):
> - When implementing the `step` timeline of the scalar example, I first
searched for `set_time_sequence` (used in Python API) in the Rust docs,
which I found in `RecordingStream`. Turns out it's not in 0.7 and bugged
on `main` (or rather ignored by `MsgSender`). Again, it compiled and
displayed no error, but no timeline was created.

This PR makes it so that `RecordingStream` is always in charge of
injecting its internal clock into outgoing rows (unless the caller ask
it not to, e.g. because the data is meant to be timeless).
This is pretty similar to what was already in place for `log_tick`,
except it now applies to every timelines, whether they are builtin or
user defined.

- Within the Python SDK, this gets rid of all the existing manual time
injection stuff.
- On the Rust SDK's side, this fixes the issue that `MsgSender` used to
ignore the internal clock altogether (i.e. the stateful time APIs were
not supported at all for Rust users).
- And finally this cleans up the Rust examples a bunch since we now have
access to stateful time.


---

<!-- This line will get updated when the PR build summary job finishes.
-->
PR Build Summary: https://build.rerun.io/pr/2506

<!-- pr-link-docs:start -->
Docs preview: https://rerun.io/preview/178edf5/docs
Examples preview: https://rerun.io/preview/178edf5/examples
<!-- pr-link-docs:end -->
  • Loading branch information
teh-cmc committed Jun 26, 2023
1 parent 8782c16 commit 1b10c67
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 185 deletions.
11 changes: 7 additions & 4 deletions crates/re_sdk/src/msg_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl MsgSender {
Self {
entity_path: ent_path.into(),

timepoint: [(Timeline::log_time(), Time::now().into())].into(),
timepoint: TimePoint::default(),
timeless: false,

num_instances: None,
Expand All @@ -116,7 +116,7 @@ impl MsgSender {
let ent_path = re_log_types::EntityPath::from_file_path_as_single_string(file_path);
let cell = re_components::data_cell_from_file_path(file_path)?;

let mut timepoint = TimePoint::from([(Timeline::log_time(), Time::now().into())]);
let mut timepoint = TimePoint::default();

// This may sounds like a good idea, but that means `rerun *.jpg` will
// actually act like it is playing a bunch of files over time, perhaps over many years.
Expand Down Expand Up @@ -160,6 +160,7 @@ impl MsgSender {
/// `MsgSender` automatically keeps track of the logging time, which is recorded when
/// [`Self::new`] is first called.
#[inline]
#[doc(hidden)]
pub fn with_timepoint(mut self, timepoint: TimePoint) -> Self {
for (timeline, time) in timepoint {
self.timepoint.insert(timeline, time);
Expand All @@ -176,6 +177,7 @@ impl MsgSender {
/// `MsgSender` automatically keeps track of the logging time, which is recorded when
/// [`Self::new`] is first called.
#[inline]
#[doc(hidden)]
pub fn with_time(mut self, timeline: Timeline, time: impl Into<TimeInt>) -> Self {
self.timepoint.insert(timeline, time.into());
self
Expand Down Expand Up @@ -294,16 +296,17 @@ impl MsgSender {
return Ok(()); // silently drop the message
}

let timeless = self.timeless;
let [row_standard, row_splats] = self.into_rows();

if let Some(row_splats) = row_splats {
rec_stream.record_row(row_splats);
rec_stream.record_row(row_splats, !timeless);
}

// Always the primary component last so range-based queries will include the other data.
// Since the primary component can't be splatted it must be in msg_standard, see(#1215).
if let Some(row_standard) = row_standard {
rec_stream.record_row(row_standard);
rec_stream.record_row(row_standard, !timeless);
}

Ok(())
Expand Down
52 changes: 33 additions & 19 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,7 @@ impl RecordingStream {
///
/// This is a convenience wrapper for [`Self::record_msg`].
#[inline]
pub fn record_path_op(
&self,
timepoint: re_log_types::TimePoint,
path_op: re_log_types::PathOp,
) {
pub fn record_path_op(&self, path_op: re_log_types::PathOp) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to record_path_op() ignored");
return;
Expand All @@ -613,18 +609,21 @@ impl RecordingStream {
this.info.store_id.clone(),
re_log_types::EntityPathOpMsg {
row_id: re_log_types::RowId::random(),
time_point: timepoint,
time_point: self.now(),
path_op,
},
));
}

/// Records a single [`DataRow`].
///
/// If `inject_time` is set to `true`, the row's timestamp data will be overridden using the
/// [`RecordingStream`]'s internal clock.
///
/// Internally, incoming [`DataRow`]s are automatically coalesced into larger [`DataTable`]s to
/// optimize for transport.
#[inline]
pub fn record_row(&self, mut row: DataRow) {
pub fn record_row(&self, mut row: DataRow, inject_time: bool) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to record_row() ignored");
return;
Expand All @@ -635,8 +634,17 @@ impl RecordingStream {
//
// NOTE: We're incrementing the current tick still.
let tick = this.tick.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if !row.timepoint().is_timeless() {
row.timepoint.insert(Timeline::log_tick(), tick.into());
if inject_time {
// Get the current time on all timelines, for the current recording, on the current
// thread...
let mut now = self.now();
// ...and then also inject the current recording tick into it.
now.insert(Timeline::log_tick(), tick.into());

// Inject all these times into the row, overriding conflicting times, if any.
for (timeline, time) in now {
row.timepoint.insert(timeline, time);
}
}

this.batcher.push_row(row);
Expand Down Expand Up @@ -860,7 +868,11 @@ impl RecordingStream {
/// For example: `rec.set_time_sequence("frame_nr", frame_nr)`.
///
/// You can remove a timeline again using `set_time_sequence("frame_nr", None)`.
pub fn set_time_sequence(&self, timeline: impl Into<TimelineName>, sequence: Option<i64>) {
pub fn set_time_sequence(
&self,
timeline: impl Into<TimelineName>,
sequence: impl Into<Option<i64>>,
) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_sequence() ignored");
return;
Expand All @@ -869,7 +881,7 @@ impl RecordingStream {
ThreadInfo::set_thread_time(
&this.info.store_id,
Timeline::new(timeline, TimeType::Sequence),
sequence.map(TimeInt::from),
sequence.into().map(TimeInt::from),
);
}

Expand All @@ -880,7 +892,7 @@ impl RecordingStream {
/// For example: `rec.set_time_seconds("sim_time", sim_time_secs)`.
///
/// You can remove a timeline again using `rec.set_time_seconds("sim_time", None)`.
pub fn set_time_seconds(&self, timeline: &str, seconds: Option<f64>) {
pub fn set_time_seconds(&self, timeline: &str, seconds: impl Into<Option<f64>>) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_seconds() ignored");
return;
Expand All @@ -889,7 +901,9 @@ impl RecordingStream {
ThreadInfo::set_thread_time(
&this.info.store_id,
Timeline::new(timeline, TimeType::Time),
seconds.map(|secs| Time::from_seconds_since_epoch(secs).into()),
seconds
.into()
.map(|secs| Time::from_seconds_since_epoch(secs).into()),
);
}

Expand All @@ -900,7 +914,7 @@ impl RecordingStream {
/// For example: `rec.set_time_seconds("sim_time", sim_time_nanos)`.
///
/// You can remove a timeline again using `rec.set_time_seconds("sim_time", None)`.
pub fn set_time_nanos(&self, timeline: &str, ns: Option<i64>) {
pub fn set_time_nanos(&self, timeline: &str, ns: impl Into<Option<i64>>) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_nanos() ignored");
return;
Expand All @@ -909,7 +923,7 @@ impl RecordingStream {
ThreadInfo::set_thread_time(
&this.info.store_id,
Timeline::new(timeline, TimeType::Time),
ns.map(|ns| Time::from_ns_since_epoch(ns).into()),
ns.into().map(|ns| Time::from_ns_since_epoch(ns).into()),
);
}

Expand Down Expand Up @@ -956,7 +970,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

let storage = rec_stream.memory();
Expand Down Expand Up @@ -1021,7 +1035,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

let storage = rec_stream.memory();
Expand Down Expand Up @@ -1101,7 +1115,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

{
Expand Down Expand Up @@ -1169,7 +1183,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

let mut msgs = {
Expand Down
12 changes: 5 additions & 7 deletions docs/content/getting-started/logging-rust.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ Rerun has rich support for time: whether you want concurrent or disjoint timelin

Let's add our custom timeline:
```rust
let stable_time = Timeline::new("stable_time", TimeType::Time);

for i in 0..400 {
let time = i as f32 * 0.01;

rec_stream.set_time_seconds("stable_time", time as f64);

let times = offsets.iter().map(|offset| time + offset).collect_vec();
let (beads, colors): (Vec<_>, Vec<_>) = points1
.iter()
Expand All @@ -226,15 +226,14 @@ for i in 0..400 {
.unzip();

MsgSender::new("dna/structure/scaffolding/beads")
.with_time(stable_time, Time::from_seconds_since_epoch(time as _))
.with_component(&beads)?
.with_component(&colors)?
.with_splat(Radius(0.06))?
.send(&recording)?;
}
```

First we [declare a name and type](https://docs.rs/rerun/latest/rerun/time/struct.Timeline.html#new) for our `Timeline`, then we pass it to [`MsgSender::with_time`](https://docs.rs/rerun/latest/rerun/struct.MsgSender.html#with_time) along with a timestamp for our data.
First we use [`RecordingStream::set_time_seconds`](https://docs.rs/rerun/latest/rerun/struct.RecordingStream.html#method.set_time_seconds) to declare our own custom `Timeline` and set the current timestamp.
You can add as many timelines and timestamps as you want when logging data.

⚠️ If you run this code as is, the result will be.. surprising: the beads are animating as expected, but everything we've logged until that point is gone! ⚠️
Expand All @@ -246,9 +245,9 @@ Enter...
### Latest At semantics

That's because the Rerun Viewer has switched to displaying your custom timeline by default, but the original data was only logged to the *default* timeline (called `log_time`).
To fix this, go back through the previous logging calls we made and add this:
To fix this, add this at the beginning of the main function:
```rust
.with_time(stable_time, 0)
rec_stream.set_time_seconds("stable_time", 0f64);
```

![logging data - latest at](https://static.rerun.io/0182b4795ca2fed2f2097cfa5f5271115dee0aaf_logging_data8_latest_at.png)
Expand All @@ -270,7 +269,6 @@ Expand the previous loop to also include:
for i in 0..400 {
// ...everything else...
MsgSender::new("dna/structure")
.with_time(stable_time, Time::from_seconds_since_epoch(time as _))
.with_component(&[Transform3D::new(transform::RotationAxisAngle::new(
glam::Vec3::Z,
rerun::transform::Angle::Radians(time / 4.0 * TAU),
Expand Down

1 comment on commit 1b10c67

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Rust Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.25.

Benchmark suite Current: 1b10c67 Previous: 8782c16 Ratio
datastore/num_rows=1000/num_instances=1000/packed=false/insert/default 4061186 ns/iter (± 75194) 2845953 ns/iter (± 6434) 1.43
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at_missing/primary/default 295 ns/iter (± 0) 230 ns/iter (± 0) 1.28
datastore/num_rows=1000/num_instances=1000/packed=false/range/default 4100873 ns/iter (± 56406) 2895353 ns/iter (± 8774) 1.42
datastore/num_rows=1000/num_instances=1000/gc/default 2650347 ns/iter (± 4961) 1717175 ns/iter (± 4896) 1.54
mono_points_arrow/generate_message_bundles 38809490 ns/iter (± 715579) 28722835 ns/iter (± 814792) 1.35
mono_points_arrow/encode_log_msg 234970816 ns/iter (± 620274) 177976079 ns/iter (± 1445392) 1.32
mono_points_arrow/decode_message_bundles 85118962 ns/iter (± 731699) 60591469 ns/iter (± 597320) 1.40
mono_points_arrow_batched/generate_message_bundles 28108984 ns/iter (± 1516059) 18314861 ns/iter (± 119012) 1.53
mono_points_arrow_batched/generate_messages 5855670 ns/iter (± 310682) 3582539 ns/iter (± 12982) 1.63
mono_points_arrow_batched/encode_log_msg 683246 ns/iter (± 2260) 490506 ns/iter (± 2256) 1.39
mono_points_arrow_batched/encode_total 36000254 ns/iter (± 1503879) 23428623 ns/iter (± 125052) 1.54
mono_points_arrow_batched/decode_message_bundles 9954621 ns/iter (± 364865) 7450939 ns/iter (± 5194) 1.34
mono_points_arrow_batched/decode_total 10599088 ns/iter (± 382609) 7900853 ns/iter (± 17506) 1.34
batch_points_arrow/decode_total 53640 ns/iter (± 181) 41919 ns/iter (± 110) 1.28
arrow_mono_points/insert 2903686096 ns/iter (± 10609966) 1832526612 ns/iter (± 5686519) 1.58
arrow_mono_points/query 1329145 ns/iter (± 10967) 945380 ns/iter (± 1361) 1.41

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.