diff --git a/crates/re_sdk_comms/src/buffered_client.rs b/crates/re_sdk_comms/src/buffered_client.rs index 797a4a23b34e..a6063e884ddf 100644 --- a/crates/re_sdk_comms/src/buffered_client.rs +++ b/crates/re_sdk_comms/src/buffered_client.rs @@ -202,6 +202,10 @@ fn msg_encode( re_log::error!("Failed to send message to tcp_sender thread. Likely a shutdown race-condition."); return; } + // TODO: this is incorrect and dangerous: flush() can return before this + // thread is done with its workload, which means the python process might be + // dead before this thread is dead, which means we call a C callback that has + // been dunload(). if msg_drop_tx.send(msg_msg).is_err() { re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition"); return; diff --git a/crates/rerun/src/clap.rs b/crates/rerun/src/clap.rs index 27d1d84ad314..ad75215f1985 100644 --- a/crates/rerun/src/clap.rs +++ b/crates/rerun/src/clap.rs @@ -93,9 +93,10 @@ impl RerunArgs { }; let _tokio_runtime_guard = tokio_runtime_handle.enter(); - let (rerun_enabled, recording_info, batcher_config) = crate::RecordingContextBuilder::new(application_id) - .default_enabled(default_enabled) - .finalize(); + let (rerun_enabled, recording_info, batcher_config) = + crate::RecordingContextBuilder::new(application_id) + .default_enabled(default_enabled) + .finalize(); if !rerun_enabled { run(RecordingContext::disabled()); diff --git a/examples/python/clock/main.py b/examples/python/clock/main.py index 4501a0ad23db..a62fa123d63b 100755 --- a/examples/python/clock/main.py +++ b/examples/python/clock/main.py @@ -59,9 +59,9 @@ def rotate(angle: float, len: float) -> Tuple[float, float, float]: scaled_h = (t_secs % 43200) / 43200.0 point_h = np.array(rotate(math.tau * scaled_h, LENGTH_H)) - color_h = (int(255 - (scaled_h * 255)), int(scaled_h * 255), 255, 255) + color_h = (int(255 - (scaled_h * 255)), int(scaled_h * 255), 255, 128) rr.log_point("world/hours_pt", position=point_h, color=color_h) - rr.log_arrow("world/hours_hand", origin=[0.0, 0.0, 0.0], vector=point_h, color=color_h, width_scale=WIDTH_M) + rr.log_arrow("world/hours_hand", origin=[0.0, 0.0, 0.0], vector=point_h, color=color_h, width_scale=WIDTH_H) if __name__ == "__main__": diff --git a/examples/python/minimal/main.py b/examples/python/minimal/main.py index 92e45a01b000..3c2d8573a600 100755 --- a/examples/python/minimal/main.py +++ b/examples/python/minimal/main.py @@ -9,7 +9,7 @@ _, unknown = __import__("argparse").ArgumentParser().parse_known_args() [__import__("logging").warning(f"unknown arg: {arg}") for arg in unknown] -rr.spawn() +rr.init("minimal", spawn=True) positions = np.vstack([xyz.ravel() for xyz in np.mgrid[3 * [slice(-5, 5, 10j)]]]).T colors = np.vstack([rgb.ravel() for rgb in np.mgrid[3 * [slice(0, 255, 10j)]]]).astype(np.uint8).T diff --git a/examples/python/multiprocessing/main.py b/examples/python/multiprocessing/main.py index 4c849bdbf8b0..4cc319b39405 100755 --- a/examples/python/multiprocessing/main.py +++ b/examples/python/multiprocessing/main.py @@ -14,6 +14,7 @@ def task(title: str) -> None: # All processes spawned with `multiprocessing` will automatically # be assigned the same default recording_id. # We just need to connect each process to the the rerun viewer: + rr.init("multiprocessing") rr.connect() rr.log_text_entry( diff --git a/rerun_py/rerun_sdk/rerun/__init__.py b/rerun_py/rerun_sdk/rerun/__init__.py index 799ec2ccfd50..59033afb00d9 100644 --- a/rerun_py/rerun_sdk/rerun/__init__.py +++ b/rerun_py/rerun_sdk/rerun/__init__.py @@ -29,8 +29,6 @@ "ClassDescription", "LoggingHandler", "bindings", - "components", - "inline_show", "ImageFormat", "log_annotation_context", "log_arrow", @@ -58,7 +56,6 @@ "log_text_entry", "log_unknown_transform", "log_view_coordinates", - "notebook", "LogLevel", "MeshFormat", "RectFormat", @@ -74,7 +71,9 @@ def rerun_shutdown() -> None: + print("shutdown start") bindings.shutdown() + print("shutdown end") atexit.register(rerun_shutdown) @@ -109,29 +108,13 @@ def get_recording_id() -> str: return str(bindings.get_recording_id()) -def set_recording_id(value: str) -> None: - """ - Set the recording ID that this process is logging to, as a UUIDv4. - - The default recording_id is based on `multiprocessing.current_process().authkey` - which means that all processes spawned with `multiprocessing` - will have the same default recording_id. - - If you are not using `multiprocessing` and still want several different Python - processes to log to the same Rerun instance (and be part of the same recording), - you will need to manually assign them all the same recording_id. - Any random UUIDv4 will work, or copy the recording id for the parent process. - - Parameters - ---------- - value : str - The recording ID to use for this process. - - """ - bindings.set_recording_id(value) - - -def init(application_id: str, spawn: bool = False, default_enabled: bool = True, strict: bool = False) -> None: +def init( + application_id: str, + recording_id: Optional[str] = None, + spawn: bool = False, + default_enabled: bool = True, + strict: bool = False, +) -> None: """ Initialize the Rerun SDK with a user-chosen application id (name). @@ -144,6 +127,17 @@ def init(application_id: str, spawn: bool = False, default_enabled: bool = True, For example, if you have one application doing object detection and another doing camera calibration, you could have `rerun.init("object_detector")` and `rerun.init("calibrator")`. + recording_id : Optional[str] + Set the recording ID that this process is logging to, as a UUIDv4. + + The default recording_id is based on `multiprocessing.current_process().authkey` + which means that all processes spawned with `multiprocessing` + will have the same default recording_id. + + If you are not using `multiprocessing` and still want several different Python + processes to log to the same Rerun instance (and be part of the same recording), + you will need to manually assign them all the same recording_id. + Any random UUIDv4 will work, or copy the recording id for the parent process. spawn : bool Spawn a Rerun Viewer and stream logging data to it. Short for calling `spawn` separately. @@ -189,6 +183,7 @@ def init(application_id: str, spawn: bool = False, default_enabled: bool = True, bindings.init( application_id=application_id, + recording_id=recording_id, application_path=application_path, default_enabled=default_enabled, ) diff --git a/rerun_py/src/lib.rs b/rerun_py/src/lib.rs index 587c7422a28f..73c8e4a368a5 100644 --- a/rerun_py/src/lib.rs +++ b/rerun_py/src/lib.rs @@ -15,4 +15,3 @@ static GLOBAL: AccountingAllocator = mod arrow; mod python_bridge; -mod python_session; diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index d7485517047c..5841ea815cbf 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -6,17 +6,17 @@ use std::{borrow::Cow, io::Cursor, path::PathBuf}; use itertools::izip; use pyo3::{ - exceptions::{PyRuntimeError, PyTypeError, PyValueError}, + exceptions::{PyRuntimeError, PyTypeError}, prelude::*, types::{PyBytes, PyDict}, }; -use re_log_types::{ArrowMsg, DataRow, DataTableError}; +use re_log_types::DataRow; use rerun::{ log::{PathOp, RowId}, sink::MemorySinkStorage, time::{Time, TimeInt, TimePoint, TimeType, Timeline}, - ApplicationId, EntityPath, RecordingId, + EntityPath, RecordingContext, RecordingContextBuilder, RecordingId, }; pub use rerun::{ @@ -35,88 +35,34 @@ use re_web_viewer_server::WebViewerServerPort; #[cfg(feature = "web_viewer")] use re_ws_comms::RerunServerPort; -use crate::{arrow::get_registered_component_names, python_session::PythonSession}; +use crate::arrow::get_registered_component_names; -// ---------------------------------------------------------------------------- +// --- FFI --- -/// The global [`PythonSession`] object used by the Python API. -fn python_session() -> parking_lot::MutexGuard<'static, PythonSession> { +// TODO +/// The global [`RecordingContext`] object used by the Python API. +fn global_recording_context() -> parking_lot::MutexGuard<'static, Option> { use once_cell::sync::OnceCell; use parking_lot::Mutex; - static PYTHON_SESSION: OnceCell> = OnceCell::new(); - PYTHON_SESSION.get_or_init(Default::default).lock() + static REC_CTX: OnceCell>> = OnceCell::new(); + REC_CTX.get_or_init(Default::default).lock() } -// ---------------------------------------------------------------------------- - -/// Thread-local info -#[derive(Default)] -struct ThreadInfo { - /// The current time, which can be set by users. - time_point: TimePoint, -} - -impl ThreadInfo { - pub fn thread_now() -> TimePoint { - Self::with(|ti| ti.now()) - } - - pub fn set_thread_time(timeline: Timeline, time_int: Option) { - Self::with(|ti| ti.set_time(timeline, time_int)); - } - - pub fn reset_thread_time() { - Self::with(|ti| ti.reset_time()); - } - - /// Get access to the thread-local [`ThreadInfo`]. - fn with(f: impl FnOnce(&mut ThreadInfo) -> R) -> R { - use std::cell::RefCell; - thread_local! { - static THREAD_INFO: RefCell> = RefCell::new(None); - } - - THREAD_INFO.with(|thread_info| { - let mut thread_info = thread_info.borrow_mut(); - let thread_info = thread_info.get_or_insert_with(ThreadInfo::default); - f(thread_info) - }) - } - - fn now(&self) -> TimePoint { - let mut time_point = self.time_point.clone(); - time_point.insert(Timeline::log_time(), Time::now().into()); - time_point - } - - fn set_time(&mut self, timeline: Timeline, time_int: Option) { - if let Some(time_int) = time_int { - self.time_point.insert(timeline, time_int); - } else { - self.time_point.remove(&timeline); - } - } - - fn reset_time(&mut self) { - self.time_point = TimePoint::default(); - } -} - -// ---------------------------------------------------------------------------- - -fn python_version(py: Python<'_>) -> re_log_types::PythonVersion { - let py_version = py.version_info(); - re_log_types::PythonVersion { - major: py_version.major, - minor: py_version.minor, - patch: py_version.patch, - suffix: py_version.suffix.map(|s| s.to_owned()).unwrap_or_default(), - } +#[pyfunction] +fn main(py: Python<'_>, argv: Vec) -> PyResult { + let build_info = re_build_info::build_info!(); + let call_src = rerun::CallSource::Python(python_version(py)); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(rerun::run(build_info, call_src, argv)) + .map_err(|err| PyRuntimeError::new_err(re_error::format(err))) } /// The python module is called "rerun_bindings". #[pymodule] -fn rerun_bindings(py: Python<'_>, m: &PyModule) -> PyResult<()> { +fn rerun_bindings(_py: Python<'_>, m: &PyModule) -> PyResult<()> { // NOTE: We do this here because some the inner init methods don't respond too kindly to being // called more than once. re_log::setup_native_logging(); @@ -137,53 +83,122 @@ fn rerun_bindings(py: Python<'_>, m: &PyModule) -> PyResult<()> { return Ok(()); } - python_session().set_python_version(python_version(py)); - - // NOTE: We do this here because we want child processes to share the same recording-id, - // whether the user has called `init` or not. - // See `default_recording_id` for extra information. - python_session().set_recording_id(default_recording_id(py)); - - m.add_function(wrap_pyfunction!(get_recording_id, m)?)?; - m.add_function(wrap_pyfunction!(set_recording_id, m)?)?; + // init + m.add_function(wrap_pyfunction!(init, m)?)?; + m.add_function(wrap_pyfunction!(is_enabled, m)?)?; + m.add_function(wrap_pyfunction!(shutdown, m)?)?; + // sinks m.add_function(wrap_pyfunction!(connect, m)?)?; + m.add_function(wrap_pyfunction!(save, m)?)?; + m.add_function(wrap_pyfunction!(memory_recording, m)?)?; + m.add_function(wrap_pyfunction!(serve, m)?)?; m.add_function(wrap_pyfunction!(disconnect, m)?)?; m.add_function(wrap_pyfunction!(flush, m)?)?; - m.add_function(wrap_pyfunction!(get_app_url, m)?)?; - m.add_function(wrap_pyfunction!(init, m)?)?; - m.add_function(wrap_pyfunction!(is_enabled, m)?)?; - m.add_function(wrap_pyfunction!(memory_recording, m)?)?; - m.add_function(wrap_pyfunction!(save, m)?)?; m.add_function(wrap_pyfunction!(start_web_viewer_server, m)?)?; - m.add_function(wrap_pyfunction!(serve, m)?)?; - m.add_function(wrap_pyfunction!(set_enabled, m)?)?; - m.add_function(wrap_pyfunction!(shutdown, m)?)?; + // time m.add_function(wrap_pyfunction!(set_time_sequence, m)?)?; m.add_function(wrap_pyfunction!(set_time_seconds, m)?)?; m.add_function(wrap_pyfunction!(set_time_nanos, m)?)?; m.add_function(wrap_pyfunction!(reset_time, m)?)?; + // log transforms m.add_function(wrap_pyfunction!(log_unknown_transform, m)?)?; m.add_function(wrap_pyfunction!(log_rigid3, m)?)?; m.add_function(wrap_pyfunction!(log_pinhole, m)?)?; - m.add_function(wrap_pyfunction!(log_meshes, m)?)?; - + // log view coordinates m.add_function(wrap_pyfunction!(log_view_coordinates_xyz, m)?)?; m.add_function(wrap_pyfunction!(log_view_coordinates_up_handedness, m)?)?; + // log segmentation m.add_function(wrap_pyfunction!(log_annotation_context, m)?)?; + // log assets + m.add_function(wrap_pyfunction!(log_meshes, m)?)?; m.add_function(wrap_pyfunction!(log_mesh_file, m)?)?; m.add_function(wrap_pyfunction!(log_image_file, m)?)?; + + // log special m.add_function(wrap_pyfunction!(log_cleared, m)?)?; m.add_function(wrap_pyfunction!(log_arrow_msg, m)?)?; + // misc + m.add_function(wrap_pyfunction!(get_app_url, m)?)?; + m.add_function(wrap_pyfunction!(get_recording_id, m)?)?; + Ok(()) } +fn no_active_recording(origin: &str) { + re_log::debug!("No active recording - call to {origin}() ignored (have you called rr.init()?)",); +} + +// --- Init --- + +// TODO: actual python object for RecordingContext + make_default logic etc +#[pyfunction] +#[pyo3(signature = ( + application_id, + recording_id=None, + application_path=None, + default_enabled=true, +))] +fn init( + py: Python<'_>, + application_id: String, + recording_id: Option, + application_path: Option, + default_enabled: bool, +) -> PyResult<()> { + // TODO: pass this to the builder and let it do that logic + // The sentinel file we use to identify the official examples directory. + const SENTINEL_FILENAME: &str = ".rerun_examples"; + let is_official_example = application_path.map_or(false, |mut path| { + // more than 4 layers would be really pushing it + for _ in 0..4 { + path.pop(); // first iteration is always a file path in our examples + if path.join(SENTINEL_FILENAME).exists() { + return true; + } + } + false + }); + + let recording_id = if let Some(recording_id) = recording_id { + recording_id.parse().map_err(|_err| { + PyTypeError::new_err(format!( + "Invalid recording id - expected a UUID, got {recording_id:?}" + )) + })? + } else { + default_recording_id(py) + }; + + let mut rec_ctx = global_recording_context(); + *rec_ctx = RecordingContextBuilder::new(application_id) + .is_official_example(is_official_example) + .recording_id(recording_id) + .recording_source(re_log_types::RecordingSource::PythonSdk(python_version(py))) + .default_enabled(default_enabled) + .buffered() + .map_err(|err| PyRuntimeError::new_err(err.to_string()))? + .into(); + + Ok(()) +} + +fn python_version(py: Python<'_>) -> re_log_types::PythonVersion { + let py_version = py.version_info(); + re_log_types::PythonVersion { + major: py_version.major, + minor: py_version.minor, + patch: py_version.patch, + suffix: py_version.suffix.map(|s| s.to_owned()).unwrap_or_default(), + } +} + fn default_recording_id(py: Python<'_>) -> RecordingId { use rand::{Rng as _, SeedableRng as _}; use std::hash::{Hash as _, Hasher as _}; @@ -224,104 +239,80 @@ authkey = multiprocessing.current_process().authkey authkey.as_bytes().to_vec() } -// ---------------------------------------------------------------------------- - -fn parse_entity_path(entity_path: &str) -> PyResult { - let components = re_log_types::parse_entity_path(entity_path) - .map_err(|err| PyTypeError::new_err(err.to_string()))?; - if components.is_empty() { - Err(PyTypeError::new_err( - "You cannot log to the root {entity_path:?}", - )) - } else { - Ok(EntityPath::from(components)) - } -} - -fn time(timeless: bool) -> TimePoint { - if timeless { - TimePoint::timeless() - } else { - ThreadInfo::thread_now() - } +/// Is logging enabled in the global recording? +#[pyfunction] +fn is_enabled() -> bool { + global_recording_context() + .as_ref() + .map_or(false, |rec_ctx| rec_ctx.is_enabled()) } -// ---------------------------------------------------------------------------- - #[pyfunction] -fn main(py: Python<'_>, argv: Vec) -> PyResult { - let build_info = re_build_info::build_info!(); - let call_src = rerun::CallSource::Python(python_version(py)); - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() - .block_on(rerun::run(build_info, call_src, argv)) - .map_err(|err| PyRuntimeError::new_err(re_error::format(err))) +fn shutdown(py: Python<'_>) { + re_log::debug!("Shutting down the Rerun SDK"); + // Disconnect the current sink which ensures that + // it flushes and cleans up. + disconnect(py); } +// --- Sinks --- + #[pyfunction] -fn get_recording_id() -> PyResult { - let recording_id = python_session().recording_id(); +fn connect(addr: Option) -> PyResult<()> { + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("connect"); + return Ok(()); + }; - if recording_id == RecordingId::ZERO { - Err(PyTypeError::new_err("module has not been initialized")) + let addr = if let Some(addr) = addr { + addr.parse()? } else { - Ok(recording_id.to_string()) - } -} + rerun::default_server_addr() + }; -#[pyfunction] -fn set_recording_id(recording_id: &str) -> PyResult<()> { - if let Ok(recording_id) = recording_id.parse() { - python_session().set_recording_id(recording_id); - Ok(()) - } else { - Err(PyTypeError::new_err(format!( - "Invalid recording id - expected a UUID, got {recording_id:?}" - ))) - } + rec_ctx.connect(addr); + + Ok(()) } #[pyfunction] -#[pyo3(signature = (application_id, application_path=None, default_enabled=true))] -fn init(application_id: String, application_path: Option, default_enabled: bool) { - // The sentinel file we use to identify the official examples directory. - const SENTINEL_FILENAME: &str = ".rerun_examples"; - let is_official_example = application_path.map_or(false, |mut path| { - // more than 4 layers would be really pushing it - for _ in 0..4 { - path.pop(); // first iteration is always a file path in our examples - if path.join(SENTINEL_FILENAME).exists() { - return true; - } - } - false - }); +fn save(path: &str) -> PyResult<()> { + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("save"); + return Ok(()); + }; - let mut session = python_session(); - session.set_default_enabled(default_enabled); - session.set_application_id(ApplicationId(application_id), is_official_example); + rec_ctx + .save(path) + .map_err(|err| PyRuntimeError::new_err(err.to_string())) } +/// Create an in-memory rrd file #[pyfunction] -fn connect(addr: Option) -> PyResult<()> { - let addr = if let Some(addr) = addr { - addr.parse()? - } else { - rerun::default_server_addr() +fn memory_recording() -> PyMemorySinkStorage { + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("memory_recording"); + return Default::default(); }; - python_session().connect(addr); - Ok(()) + + PyMemorySinkStorage(rec_ctx.memory()) } -#[must_use = "the tokio_runtime guard must be kept alive while using tokio"] -#[cfg(feature = "web_viewer")] -fn enter_tokio_runtime() -> tokio::runtime::EnterGuard<'static> { - use once_cell::sync::Lazy; - static TOKIO_RUNTIME: Lazy = - Lazy::new(|| tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")); - TOKIO_RUNTIME.enter() +#[pyclass] +#[derive(Default)] +struct PyMemorySinkStorage(MemorySinkStorage); + +#[pymethods] +impl PyMemorySinkStorage { + fn get_rrd_as_bytes<'p>(&self, py: Python<'p>) -> PyResult<&'p PyBytes> { + self.0 + .rrd_as_bytes() + .map(|bytes| PyBytes::new(py, bytes.as_slice())) + .map_err(|err| PyRuntimeError::new_err(err.to_string())) + } } /// Serve a web-viewer. @@ -330,16 +321,24 @@ fn enter_tokio_runtime() -> tokio::runtime::EnterGuard<'static> { fn serve(open_browser: bool, web_port: Option, ws_port: Option) -> PyResult<()> { #[cfg(feature = "web_viewer")] { - let mut session = python_session(); + #[must_use = "the tokio_runtime guard must be kept alive while using tokio"] + fn enter_tokio_runtime() -> tokio::runtime::EnterGuard<'static> { + use once_cell::sync::Lazy; + static TOKIO_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Runtime::new().expect("Failed to create tokio runtime") + }); + TOKIO_RUNTIME.enter() + } - if !session.is_enabled() { - re_log::debug!("Rerun disabled - call to serve() ignored"); + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("serve"); return Ok(()); - } + }; let _guard = enter_tokio_runtime(); - session.set_sink( + rec_ctx.set_sink( rerun::web_viewer::new_sink( open_browser, web_port.map(WebViewerServerPort).unwrap_or_default(), @@ -362,108 +361,78 @@ fn serve(open_browser: bool, web_port: Option, ws_port: Option) -> PyR } } -#[pyfunction] -// TODO(jleibs) expose this as a python type -fn start_web_viewer_server(port: u16) -> PyResult<()> { - #[cfg(feature = "web_viewer")] - { - let mut session = python_session(); - let _guard = enter_tokio_runtime(); - session - .start_web_viewer_server(WebViewerServerPort(port)) - .map_err(|err| PyRuntimeError::new_err(err.to_string())) - } - - #[cfg(not(feature = "web_viewer"))] - { - _ = port; - Err(PyRuntimeError::new_err( - "The Rerun SDK was not compiled with the 'web_viewer' feature", - )) - } -} - -#[pyfunction] -fn get_app_url() -> String { - let session = python_session(); - session.get_app_url() -} - -#[pyfunction] -fn shutdown(py: Python<'_>) { - re_log::debug!("Shutting down the Rerun SDK"); - // Disconnect the current sink which ensures that - // it flushes and cleans up. - disconnect(py); -} - -/// Is logging enabled in the global session? -#[pyfunction] -fn is_enabled() -> bool { - python_session().is_enabled() -} - -/// Enable or disable logging in the global session. +/// Disconnect from remote server (if any). /// -/// This is a global setting that affects all threads. -/// By default logging is enabled. -#[pyfunction] -fn set_enabled(enabled: bool) { - python_session().set_enabled(enabled); -} - -/// Block until outstanding data has been flushed to the sink +/// Subsequent log messages will be buffered and either sent on the next call to `connect`, +/// or shown with `show`. #[pyfunction] -fn flush(py: Python<'_>) { +fn disconnect(py: Python<'_>) { // Release the GIL in case any flushing behavior needs to // cleanup a python object. py.allow_threads(|| { - python_session().flush(); + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("disconnect"); + return; + }; + rec_ctx.disconnect(); }); } -/// Disconnect from remote server (if any). -/// -/// Subsequent log messages will be buffered and either sent on the next call to `connect`, -/// or shown with `show`. +// TODO +/// Block until outstanding data has been flushed to the sink #[pyfunction] -fn disconnect(py: Python<'_>) { +fn flush(py: Python<'_>) { // Release the GIL in case any flushing behavior needs to // cleanup a python object. py.allow_threads(|| { - python_session().disconnect(); + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("flush"); + return; + }; + rec_ctx.flush_blocking(); }); } #[pyfunction] -fn save(path: &str) -> PyResult<()> { - let mut session = python_session(); - session - .save(path) - .map_err(|err| PyRuntimeError::new_err(err.to_string())) -} - -#[pyclass] -struct PyMemorySinkStorage(MemorySinkStorage); - -#[pymethods] -impl PyMemorySinkStorage { - fn get_rrd_as_bytes<'p>(&self, py: Python<'p>) -> PyResult<&'p PyBytes> { - self.0 - .rrd_as_bytes() - .map(|bytes| PyBytes::new(py, bytes.as_slice())) - .map_err(|err| PyRuntimeError::new_err(err.to_string())) - } -} +// TODO(jleibs) expose this as a python type +fn start_web_viewer_server(port: u16) -> PyResult<()> { + Ok(()) -/// Create an in-memory rrd file -#[pyfunction] -fn memory_recording() -> PyMemorySinkStorage { - let mut session = python_session(); - PyMemorySinkStorage(session.memory_recording()) + // /// Start a web server to host the run web-assets + // /// + // /// The caller needs to ensure that there is a `tokio` runtime running. + // #[allow(clippy::unnecessary_wraps)] + // #[cfg(feature = "web_viewer")] + // pub fn start_web_viewer_server( + // &mut self, + // _web_port: WebViewerServerPort, + // ) -> Result<(), PythonSessionError> { + // self.web_viewer_server = Some(re_web_viewer_server::WebViewerServerHandle::new(_web_port)?); + + // Ok(()) + // } + + // #[cfg(feature = "web_viewer")] + // { + // let mut session = python_session(); + // let _guard = enter_tokio_runtime(); + // session + // .start_web_viewer_server(WebViewerServerPort(port)) + // .map_err(|err| PyRuntimeError::new_err(err.to_string())) + // } + + // #[cfg(not(feature = "web_viewer"))] + // { + // _ = port; + // Err(PyRuntimeError::new_err( + // "The Rerun SDK was not compiled with the 'web_viewer' feature", + // )) + // } } -// ---------------------------------------------------------------------------- +// --- Time --- /// Set the current time globally. Used for all subsequent logging, /// until the next call to `set_time_sequence`. @@ -495,21 +464,66 @@ fn set_time_nanos(timeline: &str, ns: Option) { ); } -#[pyfunction] -fn reset_time() { - ThreadInfo::reset_thread_time(); -} +#[pyfunction] +fn reset_time() { + ThreadInfo::reset_thread_time(); +} + +/// Thread-local info +#[derive(Default)] +struct ThreadInfo { + /// The current time, which can be set by users. + time_point: TimePoint, +} + +impl ThreadInfo { + pub fn thread_now() -> TimePoint { + Self::with(|ti| ti.now()) + } + + pub fn set_thread_time(timeline: Timeline, time_int: Option) { + Self::with(|ti| ti.set_time(timeline, time_int)); + } + + pub fn reset_thread_time() { + Self::with(|ti| ti.reset_time()); + } + + /// Get access to the thread-local [`ThreadInfo`]. + fn with(f: impl FnOnce(&mut ThreadInfo) -> R) -> R { + use std::cell::RefCell; + thread_local! { + static THREAD_INFO: RefCell> = RefCell::new(None); + } + + THREAD_INFO.with(|thread_info| { + let mut thread_info = thread_info.borrow_mut(); + let thread_info = thread_info.get_or_insert_with(ThreadInfo::default); + f(thread_info) + }) + } + + fn now(&self) -> TimePoint { + let mut time_point = self.time_point.clone(); + time_point.insert(Timeline::log_time(), Time::now().into()); + time_point + } + + fn set_time(&mut self, timeline: Timeline, time_int: Option) { + if let Some(time_int) = time_int { + self.time_point.insert(timeline, time_int); + } else { + self.time_point.remove(&timeline); + } + } -fn convert_color(color: Vec) -> PyResult<[u8; 4]> { - match &color[..] { - [r, g, b] => Ok([*r, *g, *b, 255]), - [r, g, b, a] => Ok([*r, *g, *b, *a]), - _ => Err(PyTypeError::new_err(format!( - "Expected color to be of length 3 or 4, got {color:?}" - ))), + fn reset_time(&mut self) { + self.time_point = TimePoint::default(); } } +// --- Log transforms --- + #[pyfunction] fn log_unknown_transform(entity_path: &str, timeless: bool) -> PyResult<()> { let transform = re_log_types::Transform::Unknown; @@ -559,11 +573,16 @@ fn log_transform( transform: re_log_types::Transform, timeless: bool, ) -> PyResult<()> { + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("log_transform"); + return Ok(()); + }; + let entity_path = parse_entity_path(entity_path)?; if entity_path.is_root() { return Err(PyTypeError::new_err("Transforms are between a child entity and its parent, so the root cannot have a transform")); } - let mut session = python_session(); let time_point = time(timeless); // We currently log arrow transforms from inside the bridge because we are @@ -580,10 +599,12 @@ fn log_transform( [transform].as_slice(), ); - session.send_row(row) + record_row(rec_ctx, row); + + Ok(()) } -// ---------------------------------------------------------------------------- +// --- Log view coordinates --- #[pyfunction] #[pyo3(signature = (entity_path, xyz, right_handed = None, timeless = false))] @@ -630,6 +651,12 @@ fn log_view_coordinates( coordinates: ViewCoordinates, timeless: bool, ) -> PyResult<()> { + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + re_log::debug!("No active recording - call to log_view_coordinates() ignored"); + return Ok(()); + }; + if coordinates.handedness() == Some(Handedness::Left) { re_log::warn_once!("Left-handed coordinate systems are not yet fully supported by Rerun"); } @@ -641,7 +668,6 @@ fn log_view_coordinates( parse_entity_path(entity_path_str)? }; - let mut session = python_session(); let time_point = time(timeless); // We currently log view coordinates from inside the bridge because the code @@ -658,22 +684,93 @@ fn log_view_coordinates( [coordinates].as_slice(), ); - session.send_row(row) + record_row(rec_ctx, row); + + Ok(()) } -// ---------------------------------------------------------------------------- +// --- Log segmentation --- -// TODO(jleibs): This shadows [`re_log_types::TensorDataMeaning`] -// -#[pyclass] -#[derive(Clone, Debug)] -enum TensorDataMeaning { - Unknown, - ClassId, - Depth, +#[derive(FromPyObject)] +struct AnnotationInfoTuple(u16, Option, Option>); + +impl From for AnnotationInfo { + fn from(tuple: AnnotationInfoTuple) -> Self { + let AnnotationInfoTuple(id, label, color) = tuple; + Self { + id, + label: label.map(Label), + color: color + .as_ref() + .map(|color| convert_color(color.clone()).unwrap()) + .map(|bytes| bytes.into()), + } + } +} + +type ClassDescriptionTuple = (AnnotationInfoTuple, Vec, Vec); + +#[pyfunction] +fn log_annotation_context( + entity_path_str: &str, + class_descriptions: Vec, + timeless: bool, +) -> PyResult<()> { + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + re_log::debug!("No active recording - call to log_annotation_context() ignored"); + return Ok(()); + }; + + // We normally disallow logging to root, but we make an exception for class_descriptions + let entity_path = if entity_path_str == "/" { + EntityPath::root() + } else { + parse_entity_path(entity_path_str)? + }; + + let mut annotation_context = AnnotationContext::default(); + + for (info, keypoint_annotations, keypoint_skeleton_edges) in class_descriptions { + annotation_context + .class_map + .entry(ClassId(info.0)) + .or_insert_with(|| ClassDescription { + info: info.into(), + keypoint_map: keypoint_annotations + .into_iter() + .map(|k| (KeypointId(k.0), k.into())) + .collect(), + keypoint_connections: keypoint_skeleton_edges + .chunks_exact(2) + .map(|pair| (KeypointId(pair[0]), KeypointId(pair[1]))) + .collect(), + }); + } + + let time_point = time(timeless); + + // We currently log AnnotationContext from inside the bridge because it's a + // fairly complex type with a need for a fair amount of data-validation. We + // already have the serialization implemented in rust so we start with this + // implementation. + // + // TODO(jleibs) replace with python-native implementation + + let row = DataRow::from_cells1( + RowId::random(), + entity_path, + time_point, + 1, + [annotation_context].as_slice(), + ); + + record_row(rec_ctx, row); + + Ok(()) } -// ---------------------------------------------------------------------------- +// --- Log assets --- #[pyfunction] fn log_meshes( @@ -685,6 +782,12 @@ fn log_meshes( albedo_factors: Vec>>, timeless: bool, ) -> PyResult<()> { + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("log_meshes"); + return Ok(()); + }; + let entity_path = parse_entity_path(entity_path_str)?; // Make sure we have as many position buffers as index buffers, etc. @@ -704,8 +807,6 @@ fn log_meshes( ))); } - let mut session = python_session(); - let time_point = time(timeless); let mut meshes = Vec::with_capacity(position_buffers.len()); @@ -786,7 +887,9 @@ fn log_meshes( meshes, ); - session.send_row(row) + record_row(rec_ctx, row); + + Ok(()) } #[pyfunction] @@ -797,6 +900,12 @@ fn log_mesh_file( transform: numpy::PyReadonlyArray2<'_, f32>, timeless: bool, ) -> PyResult<()> { + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("log_mesh_file"); + return Ok(()); + }; + let entity_path = parse_entity_path(entity_path_str)?; let format = match mesh_format { "GLB" => MeshFormat::Glb, @@ -835,8 +944,6 @@ fn log_mesh_file( ] }; - let mut session = python_session(); - let time_point = time(timeless); let mesh3d = Mesh3D::Encoded(EncodedMesh3D { @@ -861,7 +968,9 @@ fn log_mesh_file( [mesh3d].as_slice(), ); - session.send_row(row) + record_row(rec_ctx, row); + + Ok(()) } /// Log an image file given its contents or path on disk. @@ -876,6 +985,12 @@ fn log_image_file( img_format: Option<&str>, timeless: bool, ) -> PyResult<()> { + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("log_image_file"); + return Ok(()); + }; + let entity_path = parse_entity_path(entity_path)?; let img_bytes = match (img_bytes, img_path) { @@ -923,8 +1038,6 @@ fn log_image_file( } }; - let mut session = python_session(); - let time_point = time(timeless); let tensor = re_log_types::component_types::Tensor { @@ -947,90 +1060,34 @@ fn log_image_file( [tensor].as_slice(), ); - session.send_row(row) -} + record_row(rec_ctx, row); -#[derive(FromPyObject)] -struct AnnotationInfoTuple(u16, Option, Option>); + Ok(()) +} -impl From for AnnotationInfo { - fn from(tuple: AnnotationInfoTuple) -> Self { - let AnnotationInfoTuple(id, label, color) = tuple; - Self { - id, - label: label.map(Label), - color: color - .as_ref() - .map(|color| convert_color(color.clone()).unwrap()) - .map(|bytes| bytes.into()), - } - } +// TODO(jleibs): This shadows [`re_log_types::TensorDataMeaning`] +#[pyclass] +#[derive(Clone, Debug)] +enum TensorDataMeaning { + Unknown, + ClassId, + Depth, } -type ClassDescriptionTuple = (AnnotationInfoTuple, Vec, Vec); +// --- Log special --- #[pyfunction] -fn log_annotation_context( - entity_path_str: &str, - class_descriptions: Vec, - timeless: bool, -) -> PyResult<()> { - let mut session = python_session(); - - // We normally disallow logging to root, but we make an exception for class_descriptions - let entity_path = if entity_path_str == "/" { - EntityPath::root() - } else { - parse_entity_path(entity_path_str)? +fn log_cleared(entity_path: &str, recursive: bool) -> PyResult<()> { + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("log_cleared"); + return Ok(()); }; - let mut annotation_context = AnnotationContext::default(); - - for (info, keypoint_annotations, keypoint_skeleton_edges) in class_descriptions { - annotation_context - .class_map - .entry(ClassId(info.0)) - .or_insert_with(|| ClassDescription { - info: info.into(), - keypoint_map: keypoint_annotations - .into_iter() - .map(|k| (KeypointId(k.0), k.into())) - .collect(), - keypoint_connections: keypoint_skeleton_edges - .chunks_exact(2) - .map(|pair| (KeypointId(pair[0]), KeypointId(pair[1]))) - .collect(), - }); - } - - let time_point = time(timeless); - - // We currently log AnnotationContext from inside the bridge because it's a - // fairly complex type with a need for a fair amount of data-validation. We - // already have the serialization implemented in rust so we start with this - // implementation. - // - // TODO(jleibs) replace with python-native implementation - - let row = DataRow::from_cells1( - RowId::random(), - entity_path, - time_point, - 1, - [annotation_context].as_slice(), - ); - - session.send_row(row) -} - -#[pyfunction] -fn log_cleared(entity_path: &str, recursive: bool) -> PyResult<()> { let entity_path = parse_entity_path(entity_path)?; - let mut session = python_session(); + let timepoint = time(false); - let time_point = time(false); - - session.send_path_op(&time_point, PathOp::clear(recursive, entity_path)); + rec_ctx.record_path_op(timepoint, PathOp::clear(recursive, entity_path)); Ok(()) } @@ -1045,18 +1102,68 @@ fn log_arrow_msg(entity_path: &str, components: &PyDict, timeless: bool) -> PyRe let data_table = crate::arrow::build_data_table_from_components(&entity_path, components, &time(timeless))?; - let mut session = python_session(); - - let msg: ArrowMsg = data_table - .to_arrow_msg() - .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; + let rec_ctx = global_recording_context(); + let Some(rec_ctx) = rec_ctx.as_ref() else { + no_active_recording("log_arrow_msg"); + return Ok(()); + }; - session.send_arrow_msg(msg); + // TODO: do this well + for row in data_table.to_rows() { + record_row(rec_ctx, row); + } Ok(()) } -// ---------------------------------------------------------------------------- +// --- Misc --- + +#[pyfunction] +fn get_recording_id() -> PyResult { + let recording_id = global_recording_context() + .as_ref() + .and_then(|rec_ctx| rec_ctx.recording_info().map(|info| info.recording_id)); + + match recording_id { + Some(id) => Ok(id.to_string()), + None => Err(PyTypeError::new_err("module has not been initialized")), + } +} + +#[pyfunction] +fn get_app_url() -> String { + // TODO + "TODO".to_owned() + + // /// Get a url to an instance of the web-viewer + // /// + // /// This may point to app.rerun.io or localhost depending on + // /// whether `host_assets` was called. + // pub fn get_app_url(&self) -> String { + // #[cfg(feature = "web_viewer")] + // if let Some(hosted_assets) = &self.web_viewer_server { + // return format!("http://localhost:{}", hosted_assets.port()); + // } + + // let short_git_hash = &self.build_info.git_hash[..7]; + // format!("https://app.rerun.io/commit/{short_git_hash}") + // } + + // let session = python_session(); + // session.get_app_url() +} + +// --- Helpers --- + +fn convert_color(color: Vec) -> PyResult<[u8; 4]> { + match &color[..] { + [r, g, b] => Ok([*r, *g, *b, 255]), + [r, g, b, a] => Ok([*r, *g, *b, *a]), + _ => Err(PyTypeError::new_err(format!( + "Expected color to be of length 3 or 4, got {color:?}" + ))), + } +} fn slice_from_np_array<'a, T: numpy::Element, D: numpy::ndarray::Dimension>( array: &'a numpy::PyReadonlyArray<'_, T, D>, @@ -1074,3 +1181,27 @@ fn slice_from_np_array<'a, T: numpy::Element, D: numpy::ndarray::Dimension>( Cow::Owned(array.iter().cloned().collect()) } + +fn record_row(rec_ctx: &RecordingContext, row: DataRow) { + rec_ctx.record_row(row); +} + +fn parse_entity_path(entity_path: &str) -> PyResult { + let components = re_log_types::parse_entity_path(entity_path) + .map_err(|err| PyTypeError::new_err(err.to_string()))?; + if components.is_empty() { + Err(PyTypeError::new_err( + "You cannot log to the root {entity_path:?}", + )) + } else { + Ok(EntityPath::from(components)) + } +} + +fn time(timeless: bool) -> TimePoint { + if timeless { + TimePoint::timeless() + } else { + ThreadInfo::thread_now() + } +} diff --git a/rerun_py/src/python_session.rs b/rerun_py/src/python_session.rs deleted file mode 100644 index fe56aa0e9a7a..000000000000 --- a/rerun_py/src/python_session.rs +++ /dev/null @@ -1,341 +0,0 @@ -use std::net::SocketAddr; - -use pyo3::{exceptions::PyValueError, PyResult}; -use re_log_types::{ - ApplicationId, ArrowMsg, BeginRecordingMsg, DataRow, DataTableError, LogMsg, PathOp, - RecordingId, RecordingInfo, RecordingSource, RowId, Time, TimePoint, -}; - -#[cfg(feature = "web_viewer")] -use re_web_viewer_server::WebViewerServerPort; -use rerun::sink::LogSink; -// ---------------------------------------------------------------------------- - -#[derive(thiserror::Error, Debug)] -pub enum PythonSessionError { - #[allow(dead_code)] - #[error("The Rerun SDK was not compiled with the '{0}' feature")] - FeatureNotEnabled(&'static str), - - #[cfg(feature = "web_viewer")] - #[error("Could not start the WebViewerServer: '{0}'")] - WebViewerServerError(#[from] re_web_viewer_server::WebViewerServerError), -} - -/// Used to construct a [`RecordingInfo`]: -struct RecordingMetaData { - recording_source: RecordingSource, - application_id: Option, - recording_id: RecordingId, - is_official_example: Option, -} - -impl Default for RecordingMetaData { - fn default() -> Self { - Self { - // Will be filled in when we initialize the `rerun` python module. - recording_source: RecordingSource::Unknown, - application_id: Default::default(), - // TODO(https://github.com/rerun-io/rerun/issues/1792): ZERO is not a great choice - // here. Ideally we would use `default_recording_id(py)` instead. - recording_id: RecordingId::ZERO, - is_official_example: Default::default(), - } - } -} - -impl RecordingMetaData { - pub fn to_recording_info(&self) -> RecordingInfo { - let recording_id = self.recording_id; - - let application_id = self - .application_id - .clone() - .unwrap_or_else(ApplicationId::unknown); - - RecordingInfo { - application_id, - recording_id, - is_official_example: self.is_official_example.unwrap_or(false), - started: Time::now(), - recording_source: self.recording_source.clone(), - } - } -} - -// ---------------------------------------------------------------------------- - -/// The python API bindings create a single [`PythonSession`] -/// which is used to send log messages. -/// -/// This mirrors the Python API to a certain extent, allowing users -/// to set enable/disable logging, set application id, switch log sinks, etc. -pub struct PythonSession { - /// Is this session enabled? - /// If not, all calls into it are ignored! - enabled: bool, - - has_sent_begin_recording_msg: bool, - recording_meta_data: RecordingMetaData, - - /// Where we put the log messages. - sink: Box, - - build_info: re_build_info::BuildInfo, - - /// Used to serve the web viewer assets. - /// TODO(jleibs): Potentially use this for serve as well - #[cfg(feature = "web_viewer")] - web_viewer_server: Option, -} - -impl Default for PythonSession { - fn default() -> Self { - let default_enabled = true; - Self { - enabled: rerun::decide_logging_enabled(default_enabled), - has_sent_begin_recording_msg: false, - recording_meta_data: Default::default(), - sink: Box::new(rerun::sink::BufferedSink::new()), - build_info: re_build_info::build_info!(), - #[cfg(feature = "web_viewer")] - web_viewer_server: None, - } - } -} - -impl PythonSession { - pub fn set_python_version(&mut self, python_version: re_log_types::PythonVersion) { - self.recording_meta_data.recording_source = - re_log_types::RecordingSource::PythonSdk(python_version); - } - - /// Check if logging is enabled on this `Session`. - pub fn is_enabled(&self) -> bool { - self.enabled - } - - /// Enable or disable logging on this `Session`. - pub fn set_enabled(&mut self, enabled: bool) { - self.enabled = enabled; - } - - /// Set whether or not logging is enabled by default. - /// This will be overridden by the `RERUN` environment variable, if found. - pub fn set_default_enabled(&mut self, default_enabled: bool) { - self.enabled = rerun::decide_logging_enabled(default_enabled); - } - - /// Set the [`ApplicationId`] to use for the following stream of log messages. - /// - /// This should be called once before anything else. - /// If you don't call this, the resulting application id will be [`ApplicationId::unknown`]. - /// - /// Note that many recordings can share the same [`ApplicationId`], but - /// they all have unique [`RecordingId`]s. - pub fn set_application_id(&mut self, application_id: ApplicationId, is_official_example: bool) { - if self.recording_meta_data.application_id.as_ref() != Some(&application_id) { - self.recording_meta_data.application_id = Some(application_id); - self.recording_meta_data.is_official_example = Some(is_official_example); - self.has_sent_begin_recording_msg = false; - } - } - - /// The current [`RecordingId`]. - pub fn recording_id(&self) -> RecordingId { - self.recording_meta_data.recording_id - } - - /// Set the [`RecordingId`] of this message stream. - /// - /// If you're logging from multiple processes and want all the messages - /// to end up as the same recording, you must make sure they all set the same - /// [`RecordingId`] using this function. - /// - /// Note that many recordings can share the same [`ApplicationId`], but - /// they all have unique [`RecordingId`]s. - pub fn set_recording_id(&mut self, recording_id: RecordingId) { - if self.recording_meta_data.recording_id != recording_id { - self.recording_meta_data.recording_id = recording_id; - self.has_sent_begin_recording_msg = false; - } - } - - /// Set the [`LogSink`] to use. This is where the log messages will be sent. - /// - /// If the previous sink is [`rerun::sink::BufferedSink`] (the default), - /// it will be drained and sent to the new sink. - pub fn set_sink(&mut self, sink: Box) { - // Capture the backlog (should only apply if this was a `BufferedSink`) - let backlog = self.sink.drain_backlog(); - - // Before changing the sink, we set drop_if_disconnected and - // flush. This ensures that any messages that are currently - // buffered will be sent. - self.sink.drop_if_disconnected(); - self.sink.flush_blocking(); - self.sink = sink; - - if backlog.is_empty() { - // If we had no backlog, we need to send the `BeginRecording` message to the new sink. - self.has_sent_begin_recording_msg = false; - } else { - // Otherwise the backlog should have had the `BeginRecording` message in it already. - self.sink.send_all(backlog); - } - } - - /// Send log data to a remote viewer/server. - /// - /// Usually this is done by running the `rerun` binary (`cargo install rerun`) without arguments, - /// and then connecting to it. - /// - /// Send all currently buffered messages. - /// If we are already connected, we will re-connect to this new address. - /// - /// This function returns immediately. - /// Disconnect with [`Self::disconnect`]. - pub fn connect(&mut self, addr: SocketAddr) { - if !self.enabled { - re_log::debug!("Rerun disabled - call to connect() ignored"); - return; - } - - re_log::debug!("Connecting to remote {addr}…"); - self.set_sink(Box::new(rerun::sink::TcpSink::new(addr))); - } - - /// Send all pending and future log messages to disk as an rrd file - pub fn save( - &mut self, - path: impl Into, - ) -> Result<(), rerun::sink::FileSinkError> { - if !self.enabled { - re_log::debug!("Rerun disabled - call to save() ignored"); - return Ok(()); - } - - self.set_sink(Box::new(rerun::sink::FileSink::new(path)?)); - Ok(()) - } - - /// Send all pending and future log messages to an in-memory store - pub fn memory_recording(&mut self) -> rerun::sink::MemorySinkStorage { - if !self.enabled { - re_log::debug!("Rerun disabled - call to memory_recording() ignored"); - return Default::default(); - } - - let memory_sink = rerun::sink::MemorySink::default(); - let buffer = memory_sink.buffer(); - - self.set_sink(Box::new(memory_sink)); - self.has_sent_begin_recording_msg = false; - - buffer - } - - /// Disconnects any TCP connection, shuts down any server, and closes any file. - pub fn disconnect(&mut self) { - self.set_sink(Box::new(rerun::sink::BufferedSink::new())); - self.has_sent_begin_recording_msg = false; - } - - /// Wait until all logged data have been sent to the remove server (if any). - pub fn flush(&mut self) { - self.sink.flush_blocking(); - } - - /// Send a single [`DataRow`]. - pub fn send_row(&mut self, row: DataRow) -> PyResult<()> { - let msg = row - .into_table() - .to_arrow_msg() - .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - - self.send(LogMsg::ArrowMsg(self.recording_id(), msg)); - - Ok(()) - } - - /// Send a [`LogMsg`]. - pub fn send(&mut self, log_msg: LogMsg) { - if !self.enabled { - // It's intended that the logging SDK should drop messages earlier than this if logging is disabled. This - // check here is just a safety net. - re_log::debug_once!("Logging is disabled, dropping message."); - return; - } - - if !self.has_sent_begin_recording_msg { - let info = self.recording_meta_data.to_recording_info(); - - // This shouldn't happen, but at least log an error if it does. - // See: https://github.com/rerun-io/rerun/issues/1792 - if info.recording_id == RecordingId::ZERO { - re_log::error_once!("RecordingId was still ZERO when sent to server. This is a python initialization bug."); - } - - re_log::debug!( - "Beginning new recording with application_id {:?} and recording id {}", - info.application_id.0, - info.recording_id - ); - - self.sink.send( - BeginRecordingMsg { - row_id: RowId::random(), - info, - } - .into(), - ); - self.has_sent_begin_recording_msg = true; - } - - self.sink.send(log_msg); - } - - pub fn send_arrow_msg(&mut self, arrow_msg: ArrowMsg) { - self.send(LogMsg::ArrowMsg(self.recording_id(), arrow_msg)); - } - - /// Send a [`PathOp`]. - pub fn send_path_op(&mut self, time_point: &TimePoint, path_op: PathOp) { - self.send(LogMsg::EntityPathOpMsg( - self.recording_id(), - re_log_types::EntityPathOpMsg { - row_id: RowId::random(), - time_point: time_point.clone(), - path_op, - }, - )); - } - - /// Get a url to an instance of the web-viewer - /// - /// This may point to app.rerun.io or localhost depending on - /// whether `host_assets` was called. - pub fn get_app_url(&self) -> String { - #[cfg(feature = "web_viewer")] - if let Some(hosted_assets) = &self.web_viewer_server { - return format!("http://localhost:{}", hosted_assets.port()); - } - - let short_git_hash = &self.build_info.git_hash[..7]; - format!("https://app.rerun.io/commit/{short_git_hash}") - } - - /// Start a web server to host the run web-asserts - /// - /// The caller needs to ensure that there is a `tokio` runtime running. - #[allow(clippy::unnecessary_wraps)] - #[cfg(feature = "web_viewer")] - pub fn start_web_viewer_server( - &mut self, - _web_port: WebViewerServerPort, - ) -> Result<(), PythonSessionError> { - self.web_viewer_server = Some(re_web_viewer_server::WebViewerServerHandle::new(_web_port)?); - - Ok(()) - } -}