Skip to content

Commit

Permalink
Spawn via $PATH 1: Rust implementation (#3996)
Browse files Browse the repository at this point in the history
Introduces standalone `rerun::spawn(SpawnOptions)` API that allows one
to start a Rerun Viewer process ready to listen for TCP connections, as
well as the associated `RecordingStream` integration.



https://github.com/rerun-io/rerun/assets/2910679/24eb5647-38c1-4049-b249-dc6e00e4ff54



---

Spawn via `$PATH` series:
- #3996
- #3997
- #3998
  • Loading branch information
teh-cmc committed Oct 26, 2023
1 parent 2def0c2 commit c7d26a1
Show file tree
Hide file tree
Showing 73 changed files with 511 additions and 187 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
mod global;
mod log_sink;
mod recording_stream;
mod spawn;

#[cfg(feature = "log")]
mod log_integration;

// -------------
// Public items:

pub use spawn::{spawn, SpawnError, SpawnOptions};

pub use self::recording_stream::{
RecordingStream, RecordingStreamBuilder, RecordingStreamError, RecordingStreamResult,
};
Expand Down
148 changes: 148 additions & 0 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub enum RecordingStreamError {
err: std::io::Error,
},

/// Error spawning a Rerun Viewer process.
#[error(transparent)] // makes bubbling all the way up to main look nice
SpawnViewer(#[from] crate::SpawnError),

/// Failure to host a web viewer and/or Rerun server.
#[cfg(feature = "web_viewer")]
#[error(transparent)]
Expand Down Expand Up @@ -311,6 +315,67 @@ impl RecordingStreamBuilder {
}
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
/// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over TCP.
///
/// If a Rerun Viewer is already listening on this TCP port, the stream will be redirected to
/// that viewer instead of starting a new one.
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// ## Example
///
/// ```no_run
/// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app").spawn(re_sdk::default_flush_timeout())?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
pub fn spawn(
self,
flush_timeout: Option<std::time::Duration>,
) -> RecordingStreamResult<RecordingStream> {
self.spawn_opts(&Default::default(), flush_timeout)
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then creates a new
/// [`RecordingStream`] that is pre-configured to stream the data through to that viewer over TCP.
///
/// If a Rerun Viewer is already listening on this TCP port, the stream will be redirected to
/// that viewer instead of starting a new one.
///
/// The behavior of the spawned Viewer can be configured via `opts`.
/// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// ## Example
///
/// ```no_run
/// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
/// .spawn_opts(&re_sdk::SpawnOptions::default(), re_sdk::default_flush_timeout())?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
pub fn spawn_opts(
self,
opts: &crate::SpawnOptions,
flush_timeout: Option<std::time::Duration>,
) -> RecordingStreamResult<RecordingStream> {
let connect_addr = opts.connect_addr();

// NOTE: If `_RERUN_TEST_FORCE_SAVE` is set, all recording streams will write to disk no matter
// what, thus spawning a viewer is pointless (and probably not intended).
if forced_sink_path().is_some() {
return self.connect(connect_addr, flush_timeout);
}

spawn(opts)?;

self.connect(connect_addr, flush_timeout)
}

/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
/// web-based Rerun viewer via WebSockets.
///
Expand Down Expand Up @@ -1104,6 +1169,58 @@ impl RecordingStream {
self.set_sink(Box::new(crate::log_sink::TcpSink::new(addr, flush_timeout)));
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
/// underlying sink for a [`crate::log_sink::TcpSink`] sink pre-configured to send data to that
/// new process.
///
/// If a Rerun Viewer is already listening on this TCP port, the stream will be redirected to
/// that viewer instead of starting a new one.
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn spawn(&self, flush_timeout: Option<std::time::Duration>) -> RecordingStreamResult<()> {
self.spawn_opts(&Default::default(), flush_timeout)
}

/// Spawns a new Rerun Viewer process from an executable available in PATH, then swaps the
/// underlying sink for a [`crate::log_sink::TcpSink`] sink pre-configured to send data to that
/// new process.
///
/// If a Rerun Viewer is already listening on this TCP port, the stream will be redirected to
/// that viewer instead of starting a new one.
///
/// The behavior of the spawned Viewer can be configured via `opts`.
/// If you're fine with the default behavior, refer to the simpler [`Self::spawn`].
///
/// `flush_timeout` is the minimum time the [`TcpSink`][`crate::log_sink::TcpSink`] will
/// wait during a flush before potentially dropping data. Note: Passing `None` here can cause a
/// call to `flush` to block indefinitely if a connection cannot be established.
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn spawn_opts(
&self,
opts: &crate::SpawnOptions,
flush_timeout: Option<std::time::Duration>,
) -> RecordingStreamResult<()> {
if forced_sink_path().is_some() {
re_log::debug!("Ignored setting new TcpSink since _RERUN_FORCE_SINK is set");
return Ok(());
}

spawn(opts)?;

self.connect(opts.connect_addr(), flush_timeout);

Ok(())
}

/// Swaps the underlying sink for a [`crate::sink::MemorySink`] sink and returns the associated
/// [`MemorySinkStorage`].
///
Expand Down Expand Up @@ -1177,6 +1294,37 @@ impl fmt::Debug for RecordingStream {
}
}

/// Helper to deduplicate spawn logic across [`RecordingStreamBuilder`] & [`RecordingStream`].
fn spawn(opts: &crate::SpawnOptions) -> RecordingStreamResult<()> {
use std::{net::TcpStream, time::Duration};

let connect_addr = opts.connect_addr();

// TODO(#4019): application-level handshake
if TcpStream::connect_timeout(&connect_addr, Duration::from_secs(1)).is_ok() {
re_log::info!(
addr = %opts.listen_addr(),
"A process is already listening at this address. Trying to connect instead."
);
} else {
crate::spawn(opts)?;

// Give the newly spawned Rerun Viewer some time to bind.
//
// NOTE: The timeout only covers the TCP handshake: if no process is bound to that address
// at all, the connection will fail immediately, irrelevant of the timeout configuration.
// For that reason we use an extra loop.
for _ in 0..5 {
if TcpStream::connect_timeout(&connect_addr, Duration::from_secs(1)).is_ok() {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
}

Ok(())
}

// --- Stateful time ---

/// Thread-local data.
Expand Down
190 changes: 190 additions & 0 deletions crates/re_sdk/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/// Options to control the behavior of [`spawn`].
///
/// Refer to the field-level documentation for more information about each individual options.
///
/// The defaults are ok for most use cases: `SpawnOptions::default()`.
/// Use the partial-default pattern to customize them further:
/// ```no_run
/// let opts = re_sdk::SpawnOptions {
/// port: 1234,
/// memory_limit: "25%".into(),
/// ..Default::default()
/// };
/// ```
#[derive(Debug, Clone)]
pub struct SpawnOptions {
/// The port to listen on.
///
/// Defaults to `9876`.
pub port: u16,

/// An upper limit on how much memory the Rerun Viewer should use.
/// When this limit is reached, Rerun will drop the oldest data.
/// Example: `16GB` or `50%` (of system total).
///
/// Defaults to `75%`.
pub memory_limit: String,

/// Specifies the name of the Rerun executable.
///
/// You can omit the `.exe` suffix on Windows.
///
/// Defaults to `rerun`.
pub executable_name: String,

/// Enforce a specific executable to use instead of searching though PATH
/// for [`Self::executable_name`].
///
/// Unspecified by default.
pub executable_path: Option<String>,

/// Extra arguments that will be passed as-is to the Rerun Viewer process.
pub extra_args: Vec<String>,
}

// NOTE: No need for .exe extension on windows.
const RERUN_BINARY: &str = "rerun";

impl Default for SpawnOptions {
fn default() -> Self {
Self {
port: crate::default_server_addr().port(),
memory_limit: "75%".into(),
executable_name: RERUN_BINARY.into(),
executable_path: None,
extra_args: Vec::new(),
}
}
}

impl SpawnOptions {
/// Resolves the final connect address value.
pub fn connect_addr(&self) -> std::net::SocketAddr {
std::net::SocketAddr::new("127.0.0.1".parse().unwrap(), self.port)
}

/// Resolves the final listen address value.
pub fn listen_addr(&self) -> std::net::SocketAddr {
std::net::SocketAddr::new("0.0.0.0".parse().unwrap(), self.port)
}

/// Resolves the final executable path.
pub fn executable_path(&self) -> String {
if let Some(path) = self.executable_path.as_deref() {
return path.to_owned();
}

self.executable_name.clone()
}
}

/// Errors that can occur when [`spawn`]ing a Rerun Viewer.
#[derive(thiserror::Error)]
pub enum SpawnError {
/// Failed to find Rerun Viewer executable in PATH.
#[error("Failed to find Rerun Viewer executable in PATH.\n{message}\nPATH={search_path:?}")]
ExecutableNotFoundInPath {
/// High-level error message meant to be printed to the user (install tips etc).
message: String,

/// Name used for the executable search.
executable_name: String,

/// Value of the `PATH` environment variable, if any.
search_path: String,
},

/// Failed to find Rerun Viewer executable at explicit path.
#[error("Failed to find Rerun Viewer executable at {executable_path:?}")]
ExecutableNotFound {
/// Explicit path of the executable (specified by the caller).
executable_path: String,
},

/// Other I/O error.
#[error("Failed to spawn the Rerun Viewer process: {0}")]
Io(#[from] std::io::Error),
}

impl std::fmt::Debug for SpawnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Due to how recording streams are initialized in practice, most of the time `SpawnError`s
// will bubble all the way up to `main` and crash the program, which will call into the
// `Debug` implementation.
//
// Spawn errors include a user guide, and so we need them to render in a nice way.
// Hence we redirect the debug impl to the display impl generated by `thiserror`.
<Self as std::fmt::Display>::fmt(self, f)
}
}

/// Spawns a new Rerun Viewer process ready to listen for TCP connections.
///
/// If there is already a process listening on this port (Rerun or not), this function returns `Ok`
/// WITHOUT spawning a `rerun` process (!).
///
/// Refer to [`SpawnOptions`]'s documentation for configuration options.
///
/// This only starts a Viewer process: if you'd like to connect to it and start sending data, refer
/// to [`crate::RecordingStream::connect`] or use [`crate::RecordingStream::spawn`] directly.
pub fn spawn(opts: &SpawnOptions) -> Result<(), SpawnError> {
use std::{net::TcpStream, process::Command, time::Duration};

// NOTE: It's indented on purpose, it just looks better and reads easier.
const EXECUTABLE_NOT_FOUND: &str = //
"
You can install binary releases of the Rerun Viewer:
* Using `cargo`: `cargo binstall rerun-cli` (see https://github.com/cargo-bins/cargo-binstall)
* Via direct download from our release assets: https://github.com/rerun-io/rerun/releases/latest/
* Using `pip`: `pip3 install rerun-sdk` (warning: pip version has slower start times!)
For more information, refer to our complete install documentation over at:
https://rerun.io/docs/getting-started/installing-viewer
";

let port = opts.port;
let connect_addr = opts.connect_addr();
let memory_limit = &opts.memory_limit;
let executable_path = opts.executable_path();

// TODO(#4019): application-level handshake
if TcpStream::connect_timeout(&connect_addr, Duration::from_secs(1)).is_ok() {
re_log::info!(
addr = %opts.listen_addr(),
"A process is already listening at this address. Assuming it's a Rerun Viewer."
);
return Ok(());
}

let res = Command::new(executable_path)
.arg(format!("--port={port}"))
.arg(format!("--memory-limit={memory_limit}"))
.arg("--skip-welcome-screen")
.args(opts.extra_args.clone())
.spawn();

let rerun_bin = match res {
Ok(rerun_bin) => rerun_bin,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return if let Some(executable_path) = opts.executable_path.as_ref() {
Err(SpawnError::ExecutableNotFound {
executable_path: executable_path.clone(),
})
} else {
Err(SpawnError::ExecutableNotFoundInPath {
message: EXECUTABLE_NOT_FOUND.to_owned(),
executable_name: opts.executable_name.clone(),
search_path: std::env::var("PATH").unwrap_or_else(|_| String::new()),
})
}
}
Err(err) => {
return Err(err.into());
}
};

// Simply forget about the child process, we want it to outlive the parent process if needed.
_ = rerun_bin;

Ok(())
}
Loading

0 comments on commit c7d26a1

Please sign in to comment.