Skip to content

Commit

Permalink
SDK batching/revamp 3: sunset PythonSession (#1985)
Browse files Browse the repository at this point in the history
* version crossbeam at the workspace level

* more DataRow size helpers

* DataTableBatcher

* lints

* lints

* self review

* don't expose shutdown to make errors impossible

* doc

* backport

* backport

* introduce RecordingStream

* sunset PythonSession, introduce global daata RecordStream

* clean up old stuff from the before time

* self-review

* ordered data columns in data tables

* tests

* even more tests

* rogue todo

* batching is now a reality

* some extra peace of mind

* revert

* lock shenanigans

* merge shenanigans

* address PR comments

* Restore `start_web_viewer_server` functionality

* clean up

* per-thread per-recording stateful time tracking

* just build rows directly, thereby _not_ prevent size computation

* get_recording_id might return nothing now

* make a lack of active recording a warn_once situation across both languages and all weird situations

* not an issue anymore

---------

Co-authored-by: Jeremy Leibs <jeremy@rerun.io>
  • Loading branch information
teh-cmc and jleibs committed May 4, 2023
1 parent ec8333e commit 256edfb
Show file tree
Hide file tree
Showing 12 changed files with 685 additions and 786 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }

ahash.workspace = true
crossbeam.workspace = true
document-features = "0.2"
parking_lot.workspace = true
Expand Down
167 changes: 157 additions & 10 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;

use ahash::HashMap;
use crossbeam::channel::{Receiver, Sender};
use re_log_types::{
ApplicationId, DataRow, DataTable, DataTableBatcher, DataTableBatcherConfig,
DataTableBatcherError, LogMsg, RecordingId, RecordingInfo, RecordingSource, Time,
DataTableBatcherError, LogMsg, RecordingId, RecordingInfo, RecordingSource, Time, TimeInt,
TimePoint, TimeType, Timeline, TimelineName,
};

use crate::sink::{LogSink, MemorySinkStorage};
Expand Down Expand Up @@ -325,9 +327,6 @@ struct RecordingStreamInner {

batcher: DataTableBatcher,
batcher_to_sink_handle: Option<std::thread::JoinHandle<()>>,
//
// TODO(emilk): add convenience `TimePoint` here so that users can
// do things like `session.set_time_sequence("frame", frame_idx);`
}

impl Drop for RecordingStreamInner {
Expand Down Expand Up @@ -573,7 +572,7 @@ impl RecordingStream {
#[inline]
pub fn record_msg(&self, msg: LogMsg) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to record_msg() ignored");
re_log::warn_once!("Recording disabled - call to record_msg() ignored");
return;
};

Expand All @@ -593,7 +592,7 @@ impl RecordingStream {
path_op: re_log_types::PathOp,
) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to record_path_op() ignored");
re_log::warn_once!("Recording disabled - call to record_path_op() ignored");
return;
};

Expand All @@ -614,7 +613,7 @@ impl RecordingStream {
#[inline]
pub fn record_row(&self, row: DataRow) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to record_row() ignored");
re_log::warn_once!("Recording disabled - call to record_row() ignored");
return;
};

Expand All @@ -637,7 +636,7 @@ impl RecordingStream {
/// cannot be repaired), all pending data in its buffers will be dropped.
pub fn set_sink(&self, sink: Box<dyn LogSink>) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to set_sink() ignored");
re_log::warn_once!("Recording disabled - call to set_sink() ignored");
return;
};

Expand Down Expand Up @@ -666,7 +665,7 @@ impl RecordingStream {
/// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
pub fn flush_async(&self) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to flush_async() ignored");
re_log::warn_once!("Recording disabled - call to flush_async() ignored");
return;
};

Expand All @@ -693,7 +692,7 @@ impl RecordingStream {
/// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
pub fn flush_blocking(&self) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to flush_blocking() ignored");
re_log::warn_once!("Recording disabled - call to flush_blocking() ignored");
return;
};

Expand Down Expand Up @@ -761,6 +760,154 @@ impl RecordingStream {
}
}

// --- Stateful time ---

/// Thread-local data.
#[derive(Default)]
struct ThreadInfo {
/// The current time per-thread per-recording, which can be set by users.
timepoints: HashMap<RecordingId, TimePoint>,
}

impl ThreadInfo {
fn thread_now(rid: RecordingId) -> TimePoint {
Self::with(|ti| ti.now(rid))
}

fn set_thread_time(rid: RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
Self::with(|ti| ti.set_time(rid, timeline, time_int));
}

fn reset_thread_time(rid: RecordingId) {
Self::with(|ti| ti.reset_time(rid));
}

/// Get access to the thread-local [`ThreadInfo`].
fn with<R>(f: impl FnOnce(&mut ThreadInfo) -> R) -> R {
use std::cell::RefCell;
thread_local! {
static THREAD_INFO: RefCell<Option<ThreadInfo>> = RefCell::new(None);
}

THREAD_INFO.with(|thread_info| {
let mut thread_info = thread_info.borrow_mut();
let thread_info = thread_info.get_or_insert_with(ThreadInfo::default);
f(thread_info)
})
}

fn now(&self, rid: RecordingId) -> TimePoint {
let mut timepoint = self.timepoints.get(&rid).cloned().unwrap_or_default();
timepoint.insert(Timeline::log_time(), Time::now().into());
timepoint
}

fn set_time(&mut self, rid: RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
if let Some(time_int) = time_int {
self.timepoints
.entry(rid)
.or_default()
.insert(timeline, time_int);
} else if let Some(timepoint) = self.timepoints.get_mut(&rid) {
timepoint.remove(&timeline);
}
}

fn reset_time(&mut self, rid: RecordingId) {
if let Some(timepoint) = self.timepoints.get_mut(&rid) {
*timepoint = TimePoint::default();
}
}
}

impl RecordingStream {
/// Returns the current time of the recording on the current thread.
pub fn now(&self) -> TimePoint {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to now() ignored");
return TimePoint::default();
};

ThreadInfo::thread_now(this.info.recording_id)
}

/// Set the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_sequence`].
///
/// For example: `rec.set_time_sequence("frame_nr", frame_nr)`.
///
/// You can remove a timeline again using `set_time_sequence("frame_nr", None)`.
pub fn set_time_sequence(&self, timeline: impl Into<TimelineName>, sequence: Option<i64>) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_sequence() ignored");
return;
};

ThreadInfo::set_thread_time(
this.info.recording_id,
Timeline::new(timeline, TimeType::Sequence),
sequence.map(TimeInt::from),
);
}

/// Set the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_seconds`].
///
/// For example: `rec.set_time_seconds("sim_time", sim_time_secs)`.
///
/// You can remove a timeline again using `rec.set_time_seconds("sim_time", None)`.
pub fn set_time_seconds(&self, timeline: &str, seconds: Option<f64>) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_seconds() ignored");
return;
};

ThreadInfo::set_thread_time(
this.info.recording_id,
Timeline::new(timeline, TimeType::Time),
seconds.map(|secs| Time::from_seconds_since_epoch(secs).into()),
);
}

/// Set the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_nanos`].
///
/// For example: `rec.set_time_seconds("sim_time", sim_time_nanos)`.
///
/// You can remove a timeline again using `rec.set_time_seconds("sim_time", None)`.
pub fn set_time_nanos(&self, timeline: &str, ns: Option<i64>) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_nanos() ignored");
return;
};

ThreadInfo::set_thread_time(
this.info.recording_id,
Timeline::new(timeline, TimeType::Time),
ns.map(|ns| Time::from_ns_since_epoch(ns).into()),
);
}

/// Clears out the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_sequence`]/[`Self::set_time_seconds`]/[`Self::set_time_nanos`].
///
/// For example: `rec.reset_time()`.
pub fn reset_time(&self) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to reset_time() ignored");
return;
};

ThreadInfo::reset_thread_time(this.info.recording_id);
}
}

// ---

#[cfg(test)]
mod tests {
use re_log_types::RowId;
Expand Down
2 changes: 1 addition & 1 deletion examples/python/minimal/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
_, unknown = __import__("argparse").ArgumentParser().parse_known_args()
[__import__("logging").warning(f"unknown arg: {arg}") for arg in unknown]

rr.spawn()
rr.init("minimal", spawn=True)

positions = np.vstack([xyz.ravel() for xyz in np.mgrid[3 * [slice(-5, 5, 10j)]]]).T
colors = np.vstack([rgb.ravel() for rgb in np.mgrid[3 * [slice(0, 255, 10j)]]]).astype(np.uint8).T
Expand Down
1 change: 1 addition & 0 deletions examples/python/multiprocessing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def task(title: str) -> None:
# All processes spawned with `multiprocessing` will automatically
# be assigned the same default recording_id.
# We just need to connect each process to the the rerun viewer:
rr.init("multiprocessing")
rr.connect()

rr.log_text_entry(
Expand Down
3 changes: 1 addition & 2 deletions rerun_py/docs/gen_common_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
Function | Description
-------- | -----------
[rerun.init()](initialization/#rerun.init) | Initialize the Rerun SDK ...
[rerun.set_recording_id()](initialization/#rerun.set_recording_id) | Set the recording ID ...
[rerun.connect()](initialization/#rerun.connect) | Connect to a remote Rerun Viewer on the ...
[rerun.spawn()](initialization/#rerun.spawn) | Spawn a Rerun Viewer ...
...
Expand Down Expand Up @@ -54,7 +53,7 @@ class Section:
Section(
title="Viewer Control",
module_summary=None,
func_list=["set_recording_id", "save"],
func_list=["save"],
),
Section(
title="Time",
Expand Down
Loading

0 comments on commit 256edfb

Please sign in to comment.