Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK batching/revamp 3: sunset PythonSession #1985

Merged
merged 42 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
68d8b0c
version crossbeam at the workspace level
teh-cmc Apr 17, 2023
cb74038
more DataRow size helpers
teh-cmc Apr 26, 2023
a0d9d39
DataTableBatcher
teh-cmc Apr 26, 2023
f46ac72
lints
teh-cmc Apr 26, 2023
5440f76
lints
teh-cmc Apr 26, 2023
c1088c5
self review
teh-cmc Apr 26, 2023
cbf17be
don't expose shutdown to make errors impossible
teh-cmc Apr 26, 2023
e7b42bf
doc
teh-cmc Apr 26, 2023
573de98
backport
teh-cmc Apr 26, 2023
67dc616
backport
teh-cmc Apr 27, 2023
14130c5
introduce RecordingStream
teh-cmc Apr 27, 2023
b872b63
sunset PythonSession, introduce global daata RecordStream
teh-cmc Apr 27, 2023
71a31bd
clean up old stuff from the before time
teh-cmc Apr 27, 2023
649bbe8
self-review
teh-cmc Apr 28, 2023
2b74c3b
ordered data columns in data tables
teh-cmc Apr 28, 2023
34be0a7
tests
teh-cmc Apr 28, 2023
72685fa
even more tests
teh-cmc Apr 28, 2023
067168f
rogue todo
teh-cmc Apr 28, 2023
b8e0065
batching is now a reality
teh-cmc Apr 28, 2023
0e69707
some extra peace of mind
teh-cmc Apr 28, 2023
7a75eee
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/3_py_…
teh-cmc Apr 28, 2023
a7f84c8
revert
teh-cmc Apr 28, 2023
ead5883
Merge branch 'main' into cmc/sdk_revamp/1_batcher
teh-cmc Apr 28, 2023
3f0ec73
Merge branch 'cmc/sdk_revamp/1_batcher' into cmc/sdk_revamp/2_rust_re…
teh-cmc Apr 28, 2023
2b8d93f
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/3_py_…
teh-cmc Apr 28, 2023
6e348db
lock shenanigans
teh-cmc Apr 28, 2023
482f2a6
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/3_py_…
teh-cmc Apr 28, 2023
a31285b
Merge branch 'main' into cmc/sdk_revamp/1_batcher
teh-cmc May 3, 2023
ecb7ce5
Merge branch 'cmc/sdk_revamp/1_batcher' into cmc/sdk_revamp/2_rust_re…
teh-cmc May 3, 2023
51e8d92
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/3_py_…
teh-cmc May 3, 2023
8580773
Merge remote-tracking branch 'origin/main' into cmc/sdk_revamp/2_rust…
teh-cmc May 3, 2023
4af3342
merge shenanigans
teh-cmc May 3, 2023
d1e5c19
address PR comments
teh-cmc May 3, 2023
c825cd5
Merge branch 'cmc/sdk_revamp/2_rust_revamp' into cmc/sdk_revamp/3_py_…
teh-cmc May 3, 2023
c60ab2c
Merge remote-tracking branch 'origin/main' into cmc/sdk_revamp/3_py_r…
teh-cmc May 4, 2023
a9feba4
Restore `start_web_viewer_server` functionality
jleibs Apr 30, 2023
9eba5c3
clean up
teh-cmc May 4, 2023
8a6f14a
per-thread per-recording stateful time tracking
teh-cmc May 4, 2023
aac28aa
just build rows directly, thereby _not_ prevent size computation
teh-cmc May 4, 2023
7c1e97a
get_recording_id might return nothing now
teh-cmc May 4, 2023
57e0594
make a lack of active recording a warn_once situation across both lan…
teh-cmc May 4, 2023
b3f6a80
not an issue anymore
teh-cmc May 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }

ahash.workspace = true
crossbeam.workspace = true
document-features = "0.2"
parking_lot.workspace = true
Expand Down
155 changes: 151 additions & 4 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;

use ahash::HashMap;
use crossbeam::channel::{Receiver, Sender};
use re_log_types::{
ApplicationId, DataRow, DataTable, DataTableBatcher, DataTableBatcherConfig,
DataTableBatcherError, LogMsg, RecordingId, RecordingInfo, RecordingSource, Time,
DataTableBatcherError, LogMsg, RecordingId, RecordingInfo, RecordingSource, Time, TimeInt,
TimePoint, TimeType, Timeline, TimelineName,
};

use crate::sink::{LogSink, MemorySinkStorage};
Expand Down Expand Up @@ -325,9 +327,6 @@ struct RecordingStreamInner {

batcher: DataTableBatcher,
batcher_to_sink_handle: Option<std::thread::JoinHandle<()>>,
//
// TODO(emilk): add convenience `TimePoint` here so that users can
// do things like `session.set_time_sequence("frame", frame_idx);`
}

impl Drop for RecordingStreamInner {
Expand Down Expand Up @@ -761,6 +760,154 @@ impl RecordingStream {
}
}

// --- Stateful time ---

/// Thread-local data.
#[derive(Default)]
struct ThreadInfo {
/// The current time per-thread per-recording, which can be set by users.
timepoints: HashMap<RecordingId, TimePoint>,
}

impl ThreadInfo {
fn thread_now(rid: RecordingId) -> TimePoint {
Self::with(|ti| ti.now(rid))
}

fn set_thread_time(rid: RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
Self::with(|ti| ti.set_time(rid, timeline, time_int));
}

fn reset_thread_time(rid: RecordingId) {
Self::with(|ti| ti.reset_time(rid));
}

/// Get access to the thread-local [`ThreadInfo`].
fn with<R>(f: impl FnOnce(&mut ThreadInfo) -> R) -> R {
use std::cell::RefCell;
thread_local! {
static THREAD_INFO: RefCell<Option<ThreadInfo>> = RefCell::new(None);
}

THREAD_INFO.with(|thread_info| {
let mut thread_info = thread_info.borrow_mut();
let thread_info = thread_info.get_or_insert_with(ThreadInfo::default);
f(thread_info)
})
}

fn now(&self, rid: RecordingId) -> TimePoint {
let mut timepoint = self.timepoints.get(&rid).cloned().unwrap_or_default();
timepoint.insert(Timeline::log_time(), Time::now().into());
timepoint
}

fn set_time(&mut self, rid: RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
if let Some(time_int) = time_int {
self.timepoints
.entry(rid)
.or_default()
.insert(timeline, time_int);
} else if let Some(timepoint) = self.timepoints.get_mut(&rid) {
timepoint.remove(&timeline);
}
}

fn reset_time(&mut self, rid: RecordingId) {
if let Some(timepoint) = self.timepoints.get_mut(&rid) {
*timepoint = TimePoint::default();
}
}
}

impl RecordingStream {
/// Returns the current time of the recording on the current thread.
pub fn now(&self) -> TimePoint {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to now() ignored");
return TimePoint::default();
};

ThreadInfo::thread_now(this.info.recording_id)
}

/// Set the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_sequence`].
///
/// For example: `rec.set_time_sequence("frame_nr", frame_nr)`.
///
/// You can remove a timeline again using `set_time_sequence("frame_nr", None)`.
pub fn set_time_sequence(&self, timeline: impl Into<TimelineName>, sequence: Option<i64>) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to set_time_sequence() ignored");
return;
};

ThreadInfo::set_thread_time(
this.info.recording_id,
Timeline::new(timeline, TimeType::Sequence),
sequence.map(TimeInt::from),
);
}

/// Set the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_seconds`].
///
/// For example: `rec.set_time_seconds("sim_time", sim_time_secs)`.
///
/// You can remove a timeline again using `rec.set_time_seconds("sim_time", None)`.
pub fn set_time_seconds(&self, timeline: &str, seconds: Option<f64>) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to set_time_seconds() ignored");
return;
};

ThreadInfo::set_thread_time(
this.info.recording_id,
Timeline::new(timeline, TimeType::Time),
seconds.map(|secs| Time::from_seconds_since_epoch(secs).into()),
);
}

/// Set the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_nanos`].
///
/// For example: `rec.set_time_seconds("sim_time", sim_time_nanos)`.
///
/// You can remove a timeline again using `rec.set_time_seconds("sim_time", None)`.
pub fn set_time_nanos(&self, timeline: &str, ns: Option<i64>) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to set_time_nanos() ignored");
return;
};

ThreadInfo::set_thread_time(
this.info.recording_id,
Timeline::new(timeline, TimeType::Time),
ns.map(|ns| Time::from_ns_since_epoch(ns).into()),
);
}

/// Clears out the current time of the recording, for the current calling thread.
/// Used for all subsequent logging performed from this same thread, until the next call to
/// [`Self::set_time_sequence`]/[`Self::set_time_seconds`]/[`Self::set_time_nanos`].
///
/// For example: `rec.reset_time()`.
pub fn reset_time(&self) {
let Some(this) = &*self.inner else {
re_log::debug!("Recording disabled - call to reset_time() ignored");
return;
};

ThreadInfo::reset_thread_time(this.info.recording_id);
}
}

// ---

#[cfg(test)]
mod tests {
use re_log_types::RowId;
Expand Down
4 changes: 4 additions & 0 deletions crates/re_sdk_comms/src/buffered_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ fn msg_encode(
re_log::error!("Failed to send message to tcp_sender thread. Likely a shutdown race-condition.");
return;
}
// TODO: this is incorrect and dangerous: flush() can return before this
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
// thread is done with its workload, which means the python process might be
// dead before this thread is dead, which means we call a C callback that has
// been dunload().
if msg_drop_tx.send(msg_msg).is_err() {
re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition");
return;
Expand Down
2 changes: 1 addition & 1 deletion examples/python/minimal/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
_, unknown = __import__("argparse").ArgumentParser().parse_known_args()
[__import__("logging").warning(f"unknown arg: {arg}") for arg in unknown]

rr.spawn()
rr.init("minimal", spawn=True)
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved

positions = np.vstack([xyz.ravel() for xyz in np.mgrid[3 * [slice(-5, 5, 10j)]]]).T
colors = np.vstack([rgb.ravel() for rgb in np.mgrid[3 * [slice(0, 255, 10j)]]]).astype(np.uint8).T
Expand Down
1 change: 1 addition & 0 deletions examples/python/multiprocessing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def task(title: str) -> None:
# All processes spawned with `multiprocessing` will automatically
# be assigned the same default recording_id.
# We just need to connect each process to the the rerun viewer:
rr.init("multiprocessing")
rr.connect()

rr.log_text_entry(
Expand Down
3 changes: 1 addition & 2 deletions rerun_py/docs/gen_common_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
Function | Description
-------- | -----------
[rerun.init()](initialization/#rerun.init) | Initialize the Rerun SDK ...
[rerun.set_recording_id()](initialization/#rerun.set_recording_id) | Set the recording ID ...
[rerun.connect()](initialization/#rerun.connect) | Connect to a remote Rerun Viewer on the ...
[rerun.spawn()](initialization/#rerun.spawn) | Spawn a Rerun Viewer ...
...
Expand Down Expand Up @@ -54,7 +53,7 @@ class Section:
Section(
title="Viewer Control",
module_summary=None,
func_list=["set_recording_id", "save"],
func_list=["save"],
),
Section(
title="Time",
Expand Down
59 changes: 30 additions & 29 deletions rerun_py/rerun_sdk/rerun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
"ClassDescription",
"LoggingHandler",
"bindings",
"components",
"inline_show",
"ImageFormat",
"log_annotation_context",
"log_arrow",
Expand Down Expand Up @@ -58,7 +56,6 @@
"log_text_entry",
"log_unknown_transform",
"log_view_coordinates",
"notebook",
"LogLevel",
"MeshFormat",
"RectFormat",
Expand Down Expand Up @@ -87,9 +84,11 @@ def unregister_shutdown() -> None:
# -----------------------------------------------------------------------------


def get_recording_id() -> str:
def get_recording_id() -> Optional[str]:
"""
Get the recording ID that this process is logging to, as a UUIDv4.
Get the recording ID that this process is logging to, as a UUIDv4, if any.

You must have called [`rr.init`][] first in order to have an active recording.

The default recording_id is based on `multiprocessing.current_process().authkey`
which means that all processes spawned with `multiprocessing`
Expand All @@ -106,35 +105,25 @@ def get_recording_id() -> str:
The recording ID that this process is logging to.

"""
return str(bindings.get_recording_id())
rid = bindings.get_recording_id()
if rid:
return str(rid)
return None


def set_recording_id(value: str) -> None:
"""
Set the recording ID that this process is logging to, as a UUIDv4.

The default recording_id is based on `multiprocessing.current_process().authkey`
which means that all processes spawned with `multiprocessing`
will have the same default recording_id.

If you are not using `multiprocessing` and still want several different Python
processes to log to the same Rerun instance (and be part of the same recording),
you will need to manually assign them all the same recording_id.
Any random UUIDv4 will work, or copy the recording id for the parent process.

Parameters
----------
value : str
The recording ID to use for this process.

"""
bindings.set_recording_id(value)


def init(application_id: str, spawn: bool = False, default_enabled: bool = True, strict: bool = False) -> None:
def init(
application_id: str,
recording_id: Optional[str] = None,
spawn: bool = False,
default_enabled: bool = True,
strict: bool = False,
) -> None:
"""
Initialize the Rerun SDK with a user-chosen application id (name).

You must call this function first in order to initialize a global recording.
Without an active recording, all methods of the SDK will turn into no-ops.

Parameters
----------
application_id : str
Expand All @@ -144,6 +133,17 @@ def init(application_id: str, spawn: bool = False, default_enabled: bool = True,
For example, if you have one application doing object detection
and another doing camera calibration, you could have
`rerun.init("object_detector")` and `rerun.init("calibrator")`.
recording_id : Optional[str]
Set the recording ID that this process is logging to, as a UUIDv4.

The default recording_id is based on `multiprocessing.current_process().authkey`
which means that all processes spawned with `multiprocessing`
will have the same default recording_id.

If you are not using `multiprocessing` and still want several different Python
processes to log to the same Rerun instance (and be part of the same recording),
you will need to manually assign them all the same recording_id.
Any random UUIDv4 will work, or copy the recording id for the parent process.
spawn : bool
Spawn a Rerun Viewer and stream logging data to it.
Short for calling `spawn` separately.
Expand Down Expand Up @@ -189,6 +189,7 @@ def init(application_id: str, spawn: bool = False, default_enabled: bool = True,

bindings.init(
application_id=application_id,
recording_id=recording_id,
application_path=application_path,
default_enabled=default_enabled,
)
Expand Down
10 changes: 4 additions & 6 deletions rerun_py/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ pub fn get_registered_component_names(py: pyo3::Python<'_>) -> PyResult<&PyDict>
Ok(fields.into_py_dict(py))
}

/// Build a [`DataTable`] given a '**kwargs'-style dictionary of component arrays.
pub fn build_data_table_from_components(
/// Build a [`DataRow`] given a '**kwargs'-style dictionary of component arrays.
pub fn build_data_row_from_components(
entity_path: &EntityPath,
components: &PyDict,
time_point: &TimePoint,
) -> PyResult<DataTable> {
) -> PyResult<DataRow> {
let (arrays, fields): (Vec<Box<dyn Array>>, Vec<Field>) = itertools::process_results(
components.iter().map(|(name, array)| {
let name = name.downcast::<PyString>()?.to_str()?;
Expand All @@ -109,7 +109,5 @@ pub fn build_data_table_from_components(
cells,
);

let data_table = row.into_table();

Ok(data_table)
Ok(row)
}
1 change: 0 additions & 1 deletion rerun_py/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,3 @@ static GLOBAL: AccountingAllocator<mimalloc::MiMalloc> =

mod arrow;
mod python_bridge;
mod python_session;
Loading