Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK batching/revamp 3: sunset PythonSession #1985

Merged
merged 42 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
68d8b0c
version crossbeam at the workspace level
teh-cmc Apr 17, 2023
cb74038
more DataRow size helpers
teh-cmc Apr 26, 2023
a0d9d39
DataTableBatcher
teh-cmc Apr 26, 2023
f46ac72
lints
teh-cmc Apr 26, 2023
5440f76
lints
teh-cmc Apr 26, 2023
c1088c5
self review
teh-cmc Apr 26, 2023
cbf17be
don't expose shutdown to make errors impossible
teh-cmc Apr 26, 2023
e7b42bf
doc
teh-cmc Apr 26, 2023
573de98
backport
teh-cmc Apr 26, 2023
67dc616
backport
teh-cmc Apr 27, 2023
14130c5
introduce RecordingStream
teh-cmc Apr 27, 2023
b872b63
sunset PythonSession, introduce global daata RecordStream
teh-cmc Apr 27, 2023
71a31bd
clean up old stuff from the before time
teh-cmc Apr 27, 2023
649bbe8
self-review
teh-cmc Apr 28, 2023
2b74c3b
ordered data columns in data tables
teh-cmc Apr 28, 2023
34be0a7
tests
teh-cmc Apr 28, 2023
72685fa
even more tests
teh-cmc Apr 28, 2023
067168f
rogue todo
teh-cmc Apr 28, 2023
b8e0065
batching is now a reality
teh-cmc Apr 28, 2023
0e69707
some extra peace of mind
teh-cmc Apr 28, 2023
7a75eee
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/3_py_…
teh-cmc Apr 28, 2023
a7f84c8
revert
teh-cmc Apr 28, 2023
ead5883
Merge branch 'main' into cmc/sdk_revamp/1_batcher
teh-cmc Apr 28, 2023
3f0ec73
Merge branch 'cmc/sdk_revamp/1_batcher' into cmc/sdk_revamp/2_rust_re…
teh-cmc Apr 28, 2023
2b8d93f
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/3_py_…
teh-cmc Apr 28, 2023
6e348db
lock shenanigans
teh-cmc Apr 28, 2023
482f2a6
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/3_py_…
teh-cmc Apr 28, 2023
a31285b
Merge branch 'main' into cmc/sdk_revamp/1_batcher
teh-cmc May 3, 2023
ecb7ce5
Merge branch 'cmc/sdk_revamp/1_batcher' into cmc/sdk_revamp/2_rust_re…
teh-cmc May 3, 2023
51e8d92
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/3_py_…
teh-cmc May 3, 2023
8580773
Merge remote-tracking branch 'origin/main' into cmc/sdk_revamp/2_rust…
teh-cmc May 3, 2023
4af3342
merge shenanigans
teh-cmc May 3, 2023
d1e5c19
address PR comments
teh-cmc May 3, 2023
c825cd5
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/3_py_…
teh-cmc May 3, 2023
c60ab2c
Merge remote-tracking branch 'origin/main' into cmc/sdk_revamp/3_py_r…
teh-cmc May 4, 2023
a9feba4
Restore `start_web_viewer_server` functionality
jleibs Apr 30, 2023
9eba5c3
clean up
teh-cmc May 4, 2023
8a6f14a
per-thread per-recording stateful time tracking
teh-cmc May 4, 2023
aac28aa
just build rows directly, thereby _not_ prevent size computation
teh-cmc May 4, 2023
7c1e97a
get_recording_id might return nothing now
teh-cmc May 4, 2023
57e0594
make a lack of active recording a warn_once situation across both lan…
teh-cmc May 4, 2023
b3f6a80
not an issue anymore
teh-cmc May 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }

ahash.workspace = true
crossbeam.workspace = true
document-features = "0.2"
parking_lot.workspace = true
Expand Down
167 changes: 157 additions & 10 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;

use ahash::HashMap;
use crossbeam::channel::{Receiver, Sender};
use re_log_types::{
ApplicationId, DataRow, DataTable, DataTableBatcher, DataTableBatcherConfig,
DataTableBatcherError, LogMsg, RecordingId, RecordingInfo, RecordingSource, Time,
DataTableBatcherError, LogMsg, RecordingId, RecordingInfo, RecordingSource, Time, TimeInt,
TimePoint, TimeType, Timeline, TimelineName,
};

use crate::sink::{LogSink, MemorySinkStorage};
Expand Down Expand Up @@ -325,9 +327,6 @@ struct RecordingStreamInner {

batcher: DataTableBatcher,
batcher_to_sink_handle: Option<std::thread::JoinHandle<()>>,
//
// TODO(emilk): add convenience `TimePoint` here so that users can
// do things like `session.set_time_sequence("frame", frame_idx);`
}

impl Drop for RecordingStreamInner {
Expand Down Expand Up @@ -573,7 +572,7 @@ impl RecordingStream {
#[inline]
pub fn record_msg(&self, msg: LogMsg) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to record_msg() ignored");
re_log::warn_once!("Recording disabled - call to record_msg() ignored");
return;
};

Expand All @@ -593,7 +592,7 @@ impl RecordingStream {
path_op: re_log_types::PathOp,
) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to record_path_op() ignored");
re_log::warn_once!("Recording disabled - call to record_path_op() ignored");
return;
};

Expand All @@ -614,7 +613,7 @@ impl RecordingStream {
#[inline]
pub fn record_row(&self, row: DataRow) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to record_row() ignored");
re_log::warn_once!("Recording disabled - call to record_row() ignored");
return;
};

Expand All @@ -637,7 +636,7 @@ impl RecordingStream {
/// cannot be repaired), all pending data in its buffers will be dropped.
pub fn set_sink(&self, sink: Box<dyn LogSink>) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to set_sink() ignored");
re_log::warn_once!("Recording disabled - call to set_sink() ignored");
return;
};

Expand Down Expand Up @@ -666,7 +665,7 @@ impl RecordingStream {
/// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
pub fn flush_async(&self) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to flush_async() ignored");
re_log::warn_once!("Recording disabled - call to flush_async() ignored");
return;
};

Expand All @@ -693,7 +692,7 @@ impl RecordingStream {
/// See [`RecordingStream`] docs for ordering semantics and multithreading guarantees.
pub fn flush_blocking(&self) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to flush_blocking() ignored");
re_log::warn_once!("Recording disabled - call to flush_blocking() ignored");
return;
};

Expand Down Expand Up @@ -761,6 +760,154 @@ impl RecordingStream {
}
}

// --- Stateful time ---

/// Thread-local data.
#[derive(Default)]
struct ThreadInfo {
/// The current time per-thread per-recording, which can be set by users.
timepoints: HashMap<RecordingId, TimePoint>,
}

impl ThreadInfo {
fn thread_now(rid: RecordingId) -> TimePoint {
Self::with(|ti| ti.now(rid))
}

fn set_thread_time(rid: RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
Self::with(|ti| ti.set_time(rid, timeline, time_int));
}

fn reset_thread_time(rid: RecordingId) {
Self::with(|ti| ti.reset_time(rid));
}

/// Get access to the thread-local [`ThreadInfo`].
fn with<R>(f: impl FnOnce(&mut ThreadInfo) -> R) -> R {
use std::cell::RefCell;
thread_local! {
static THREAD_INFO: RefCell<Option<ThreadInfo>> = RefCell::new(None);
}

THREAD_INFO.with(|thread_info| {
let mut thread_info = thread_info.borrow_mut();
let thread_info = thread_info.get_or_insert_with(ThreadInfo::default);
f(thread_info)
})
}

fn now(&self, rid: RecordingId) -> TimePoint {
let mut timepoint = self.timepoints.get(&rid).cloned().unwrap_or_default();
timepoint.insert(Timeline::log_time(), Time::now().into());
timepoint
}

fn set_time(&mut self, rid: RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
if let Some(time_int) = time_int {
self.timepoints
.entry(rid)
.or_default()
.insert(timeline, time_int);
} else if let Some(timepoint) = self.timepoints.get_mut(&rid) {
timepoint.remove(&timeline);
}
}

fn reset_time(&mut self, rid: RecordingId) {
if let Some(timepoint) = self.timepoints.get_mut(&rid) {
*timepoint = TimePoint::default();
}
}
}

impl RecordingStream {
/// Returns the current time of the recording on the current thread.
pub fn now(&self) -> TimePoint {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to now() ignored");
return TimePoint::default();
};

ThreadInfo::thread_now(this.info.recording_id)
}

/// Set the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_sequence`].
///
/// For example: `rec.set_time_sequence("frame_nr", frame_nr)`.
///
/// You can remove a timeline again using `set_time_sequence("frame_nr", None)`.
pub fn set_time_sequence(&self, timeline: impl Into<TimelineName>, sequence: Option<i64>) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_sequence() ignored");
return;
};

ThreadInfo::set_thread_time(
this.info.recording_id,
Timeline::new(timeline, TimeType::Sequence),
sequence.map(TimeInt::from),
);
}

/// Set the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_seconds`].
///
/// For example: `rec.set_time_seconds("sim_time", sim_time_secs)`.
///
/// You can remove a timeline again using `rec.set_time_seconds("sim_time", None)`.
pub fn set_time_seconds(&self, timeline: &str, seconds: Option<f64>) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_seconds() ignored");
return;
};

ThreadInfo::set_thread_time(
this.info.recording_id,
Timeline::new(timeline, TimeType::Time),
seconds.map(|secs| Time::from_seconds_since_epoch(secs).into()),
);
}

/// Set the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_nanos`].
///
/// For example: `rec.set_time_seconds("sim_time", sim_time_nanos)`.
///
/// You can remove a timeline again using `rec.set_time_seconds("sim_time", None)`.
pub fn set_time_nanos(&self, timeline: &str, ns: Option<i64>) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_nanos() ignored");
return;
};

ThreadInfo::set_thread_time(
this.info.recording_id,
Timeline::new(timeline, TimeType::Time),
ns.map(|ns| Time::from_ns_since_epoch(ns).into()),
);
}

/// Clears out the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_sequence`]/[`Self::set_time_seconds`]/[`Self::set_time_nanos`].
///
/// For example: `rec.reset_time()`.
pub fn reset_time(&self) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to reset_time() ignored");
return;
};

ThreadInfo::reset_thread_time(this.info.recording_id);
}
}

// ---

#[cfg(test)]
mod tests {
use re_log_types::RowId;
Expand Down
2 changes: 1 addition & 1 deletion examples/python/minimal/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
_, unknown = __import__("argparse").ArgumentParser().parse_known_args()
[__import__("logging").warning(f"unknown arg: {arg}") for arg in unknown]

rr.spawn()
rr.init("minimal", spawn=True)
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved

positions = np.vstack([xyz.ravel() for xyz in np.mgrid[3 * [slice(-5, 5, 10j)]]]).T
colors = np.vstack([rgb.ravel() for rgb in np.mgrid[3 * [slice(0, 255, 10j)]]]).astype(np.uint8).T
Expand Down
1 change: 1 addition & 0 deletions examples/python/multiprocessing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def task(title: str) -> None:
# All processes spawned with `multiprocessing` will automatically
# be assigned the same default recording_id.
# We just need to connect each process to the the rerun viewer:
rr.init("multiprocessing")
rr.connect()

rr.log_text_entry(
Expand Down
3 changes: 1 addition & 2 deletions rerun_py/docs/gen_common_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
Function | Description
-------- | -----------
[rerun.init()](initialization/#rerun.init) | Initialize the Rerun SDK ...
[rerun.set_recording_id()](initialization/#rerun.set_recording_id) | Set the recording ID ...
[rerun.connect()](initialization/#rerun.connect) | Connect to a remote Rerun Viewer on the ...
[rerun.spawn()](initialization/#rerun.spawn) | Spawn a Rerun Viewer ...
...
Expand Down Expand Up @@ -54,7 +53,7 @@ class Section:
Section(
title="Viewer Control",
module_summary=None,
func_list=["set_recording_id", "save"],
func_list=["save"],
),
Section(
title="Time",
Expand Down
Loading