diff --git a/Cargo.lock b/Cargo.lock index af23fc79bcc6..254a3c709274 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4012,6 +4012,7 @@ dependencies = [ "re_log", "re_log_types", "re_smart_channel", + "tokio", ] [[package]] @@ -4086,7 +4087,6 @@ dependencies = [ "bytemuck", "cocoa", "console_error_panic_hook", - "ctrlc", "eframe", "egui", "egui-wgpu", @@ -4148,6 +4148,7 @@ version = "0.3.1" dependencies = [ "anyhow", "cargo_metadata", + "ctrlc", "document-features", "futures-util", "glob", @@ -4237,6 +4238,7 @@ dependencies = [ "backtrace", "clap 4.1.4", "crossbeam", + "ctrlc", "document-features", "egui", "itertools", diff --git a/Cargo.toml b/Cargo.toml index 1b1e05810147..bcec71d3438c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ arrow2 = "0.16" arrow2_convert = "0.4.2" clap = "4.0" comfy-table = { version = "6.1", default-features = false } +ctrlc = { version = "3.0", features = ["termination"] } ecolor = "0.21.0" eframe = { version = "0.21.3", default-features = false } egui = "0.21.0" diff --git a/crates/re_sdk_comms/Cargo.toml b/crates/re_sdk_comms/Cargo.toml index b8ecd31eea9b..9aa9cce5d4cc 100644 --- a/crates/re_sdk_comms/Cargo.toml +++ b/crates/re_sdk_comms/Cargo.toml @@ -35,3 +35,4 @@ bincode = "1.3" crossbeam = "0.8" document-features = "0.2" rand = { version = "0.8.5", features = ["small_rng"] } +tokio.workspace = true diff --git a/crates/re_sdk_comms/src/server.rs b/crates/re_sdk_comms/src/server.rs index b9d93a0b8030..5c92a15f0135 100644 --- a/crates/re_sdk_comms/src/server.rs +++ b/crates/re_sdk_comms/src/server.rs @@ -7,6 +7,7 @@ use rand::{Rng as _, SeedableRng}; use re_log_types::{LogMsg, TimePoint, TimeType, TimelineName}; use re_smart_channel::{Receiver, Sender}; +use tokio::net::{TcpListener, TcpStream}; #[derive(Clone, Copy, Debug, PartialEq)] pub struct ServerOptions { @@ -27,37 +28,18 @@ impl Default for ServerOptions { } } -/// Listen to multiple SDK:s connecting to us over TCP. -/// -/// ``` no_run -/// # use re_sdk_comms::{serve, ServerOptions}; -/// let log_msg_rx = serve(80, ServerOptions::default())?; -/// # Ok::<(), anyhow::Error>(()) -/// ``` -pub fn serve(port: u16, options: ServerOptions) -> anyhow::Result> { +async fn listen_for_new_clients( + port: u16, + options: ServerOptions, + tx: Sender, + mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, +) { let bind_addr = format!("0.0.0.0:{port}"); - let listener = std::net::TcpListener::bind(&bind_addr) - .with_context(|| format!("Failed to bind TCP address {bind_addr:?} for our WS server."))?; - - let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port }); - - std::thread::Builder::new() - .name("sdk-server".into()) - .spawn(move || { - for stream in listener.incoming() { - match stream { - Ok(stream) => { - let tx = tx.clone(); - spawn_client(stream, tx, options); - } - Err(err) => { - re_log::warn!("Failed to accept incoming SDK client: {err}"); - } - } - } - }) - .expect("Failed to spawn thread"); + let listener = TcpListener::bind(&bind_addr) + .await + .with_context(|| format!("Failed to bind TCP address {bind_addr:?}")) + .unwrap(); if options.quiet { re_log::debug!( @@ -69,43 +51,72 @@ pub fn serve(port: u16, options: ServerOptions) -> anyhow::Result res, + _ = shutdown_rx.recv() => { + return; + } + }; + match incoming { + Ok((stream, _)) => { + let tx = tx.clone(); + spawn_client(stream, tx, options); + } + Err(err) => { + re_log::warn!("Failed to accept incoming SDK client: {err}"); + } + } + } } -fn spawn_client(stream: std::net::TcpStream, tx: Sender, options: ServerOptions) { - std::thread::Builder::new() - .name(format!( - "sdk-server-client-handler-{:?}", - stream.peer_addr() - )) - .spawn(move || { - let addr_string = stream - .peer_addr() - .map_or_else(|_| "(unknown ip)".to_owned(), |addr| addr.to_string()); - if options.quiet { - re_log::debug!("New SDK client connected: {addr_string}"); - } else { - re_log::info!("New SDK client connected: {addr_string}"); - } +/// Listen to multiple SDK:s connecting to us over TCP. +/// +/// ``` no_run +/// # use re_sdk_comms::{serve, ServerOptions}; +/// let (sender, receiver) = tokio::sync::broadcast::channel(1); +/// let log_msg_rx = serve(80, ServerOptions::default(), receiver)?; +/// # Ok::<(), anyhow::Error>(()) +/// ``` +pub fn serve( + port: u16, + options: ServerOptions, + shutdown_rx: tokio::sync::broadcast::Receiver<()>, +) -> anyhow::Result> { + let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port }); - if let Err(err) = run_client(stream, &tx, options) { - re_log::warn!("Closing connection to client: {err}"); - } - }) - .expect("Failed to spawn thread"); + tokio::spawn(listen_for_new_clients(port, options, tx, shutdown_rx)); + + Ok(rx) +} + +fn spawn_client(stream: TcpStream, tx: Sender, options: ServerOptions) { + tokio::spawn(async move { + let addr_string = stream + .peer_addr() + .map_or_else(|_| "(unknown ip)".to_owned(), |addr| addr.to_string()); + if options.quiet { + re_log::debug!("New SDK client connected: {addr_string}"); + } else { + re_log::info!("New SDK client connected: {addr_string}"); + } + if let Err(err) = run_client(stream, &tx, options).await { + re_log::warn!("Closing connection to client: {err}"); + } + }); } -fn run_client( - mut stream: std::net::TcpStream, +async fn run_client( + mut stream: TcpStream, tx: &Sender, options: ServerOptions, ) -> anyhow::Result<()> { #![allow(clippy::read_zero_byte_vec)] // false positive: https://github.com/rust-lang/rust-clippy/issues/9274 - use std::io::Read as _; + use tokio::io::AsyncReadExt as _; let mut client_version = [0_u8; 2]; - stream.read_exact(&mut client_version)?; + stream.read_exact(&mut client_version).await?; let client_version = u16::from_le_bytes(client_version); match client_version.cmp(&crate::PROTOCOL_VERSION) { @@ -132,11 +143,11 @@ fn run_client( loop { let mut packet_size = [0_u8; 4]; - stream.read_exact(&mut packet_size)?; + stream.read_exact(&mut packet_size).await?; let packet_size = u32::from_le_bytes(packet_size); packet.resize(packet_size as usize, 0_u8); - stream.read_exact(&mut packet)?; + stream.read_exact(&mut packet).await?; re_log::trace!("Received log message of size {packet_size}."); diff --git a/crates/re_viewer/Cargo.toml b/crates/re_viewer/Cargo.toml index 5f147f4121f0..33d7c73da078 100644 --- a/crates/re_viewer/Cargo.toml +++ b/crates/re_viewer/Cargo.toml @@ -110,7 +110,6 @@ wgpu.workspace = true arboard = { version = "3.2", default-features = false, features = [ "image-data", ] } -ctrlc = { version = "3.0", features = ["termination"] } puffin_http = "0.11" puffin.workspace = true diff --git a/crates/re_viewer/src/app.rs b/crates/re_viewer/src/app.rs index 362f0e1ce33f..33f23822e3ee 100644 --- a/crates/re_viewer/src/app.rs +++ b/crates/re_viewer/src/app.rs @@ -73,8 +73,7 @@ pub struct App { state: AppState, /// Set to `true` on Ctrl-C. - #[cfg(not(target_arch = "wasm32"))] - ctrl_c: std::sync::Arc, + shutdown: std::sync::Arc, /// Pending background tasks, using `poll_promise`. pending_promises: HashMap>>, @@ -109,28 +108,11 @@ impl App { re_ui: re_ui::ReUi, storage: Option<&dyn eframe::Storage>, rx: Receiver, + shutdown: std::sync::Arc, ) -> Self { - #[cfg(not(target_arch = "wasm32"))] - let ctrl_c = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); - let (logger, text_log_rx) = re_log::ChannelLogger::new(re_log::LevelFilter::Info); re_log::add_boxed_logger(Box::new(logger)); - #[cfg(not(target_arch = "wasm32"))] - { - // Close viewer on Ctrl-C. TODO(emilk): maybe add to `eframe`? - - let ctrl_c = ctrl_c.clone(); - let egui_ctx = re_ui.egui_ctx.clone(); - - ctrlc::set_handler(move || { - re_log::debug!("Ctrl-C detected - Closing viewer."); - ctrl_c.store(true, std::sync::atomic::Ordering::SeqCst); - egui_ctx.request_repaint(); // so that we notice that we should close - }) - .expect("Error setting Ctrl-C handler"); - } - let state: AppState = storage .and_then(|storage| eframe::get_value(storage, eframe::APP_KEY)) .unwrap_or_default(); @@ -147,8 +129,7 @@ impl App { rx, log_dbs: Default::default(), state, - #[cfg(not(target_arch = "wasm32"))] - ctrl_c, + shutdown, pending_promises: Default::default(), toasts: toasts::Toasts::new(), latest_memory_purge: instant::Instant::now(), // TODO(emilk): `Instant::MIN` when we have our own `Instant` that supports it. @@ -436,8 +417,8 @@ impl eframe::App for App { self.icon_status = setup_app_icon(); } - #[cfg(not(target_arch = "wasm32"))] - if self.ctrl_c.load(std::sync::atomic::Ordering::Relaxed) { + if self.shutdown.load(std::sync::atomic::Ordering::Relaxed) { + #[cfg(not(target_arch = "wasm32"))] frame.close(); return; } diff --git a/crates/re_viewer/src/native.rs b/crates/re_viewer/src/native.rs index 2d772bcfefaa..c365c5829dc3 100644 --- a/crates/re_viewer/src/native.rs +++ b/crates/re_viewer/src/native.rs @@ -58,6 +58,7 @@ pub fn run_native_viewer_with_messages( re_ui, cc.storage, rx, + std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), )) })) } diff --git a/crates/re_viewer/src/remote_viewer_app.rs b/crates/re_viewer/src/remote_viewer_app.rs index 78a079555584..7d76449c654d 100644 --- a/crates/re_viewer/src/remote_viewer_app.rs +++ b/crates/re_viewer/src/remote_viewer_app.rs @@ -71,6 +71,7 @@ impl RemoteViewerApp { self.re_ui.clone(), storage, rx, + std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), ); self.app = Some((connection, app)); diff --git a/crates/re_viewer/src/web.rs b/crates/re_viewer/src/web.rs index ec5102a35293..98101571f61c 100644 --- a/crates/re_viewer/src/web.rs +++ b/crates/re_viewer/src/web.rs @@ -60,6 +60,7 @@ pub async fn start( re_ui, cc.storage, rx, + std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), )) } EndpointCategory::WebSocket(url) => { diff --git a/crates/re_web_viewer_server/Cargo.toml b/crates/re_web_viewer_server/Cargo.toml index a26f9a3a5d66..36a55936eb33 100644 --- a/crates/re_web_viewer_server/Cargo.toml +++ b/crates/re_web_viewer_server/Cargo.toml @@ -39,6 +39,7 @@ analytics = ["dep:re_analytics"] re_log.workspace = true anyhow.workspace = true +ctrlc.workspace = true document-features = "0.2" futures-util = "0.3" hyper = { version = "0.14", features = ["full"] } diff --git a/crates/re_web_viewer_server/src/lib.rs b/crates/re_web_viewer_server/src/lib.rs index 904596702c06..dc28aaddea1f 100644 --- a/crates/re_web_viewer_server/src/lib.rs +++ b/crates/re_web_viewer_server/src/lib.rs @@ -159,8 +159,15 @@ impl WebViewerServer { Self { server } } - pub async fn serve(self) -> anyhow::Result<()> { - self.server.await?; + pub async fn serve( + self, + mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, + ) -> anyhow::Result<()> { + self.server + .with_graceful_shutdown(async { + shutdown_rx.recv().await.ok(); + }) + .await?; Ok(()) } } diff --git a/crates/re_web_viewer_server/src/main.rs b/crates/re_web_viewer_server/src/main.rs index 52a4930c521a..84bb08a376d6 100644 --- a/crates/re_web_viewer_server/src/main.rs +++ b/crates/re_web_viewer_server/src/main.rs @@ -6,8 +6,17 @@ async fn main() { re_log::setup_native_logging(); let port = 9090; eprintln!("Hosting web-viewer on http://127.0.0.1:{port}"); + + // Shutdown server via Ctrl+C + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); + ctrlc::set_handler(move || { + re_log::debug!("Ctrl-C detected - Closing web server."); + shutdown_tx.send(()).unwrap(); + }) + .expect("Error setting Ctrl-C handler"); + re_web_viewer_server::WebViewerServer::new(port) - .serve() + .serve(shutdown_rx) .await .unwrap(); } diff --git a/crates/re_ws_comms/src/server.rs b/crates/re_ws_comms/src/server.rs index 626d11b3b75d..997b0a13c3ed 100644 --- a/crates/re_ws_comms/src/server.rs +++ b/crates/re_ws_comms/src/server.rs @@ -40,15 +40,26 @@ impl Server { Ok(Self { listener }) } - /// Accept new connections forever - pub async fn listen(self, rx: Receiver) -> anyhow::Result<()> { + /// Accept new connections until we get a message on `shutdown_rx` + pub async fn listen( + self, + rx: Receiver, + mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, + ) -> anyhow::Result<()> { use anyhow::Context as _; let history = Arc::new(Mutex::new(Vec::new())); let log_stream = to_broadcast_stream(rx, history.clone()); - while let Ok((tcp_stream, _)) = self.listener.accept().await { + loop { + let (tcp_stream, _) = tokio::select! { + res = self.listener.accept() => res?, + _ = shutdown_rx.recv() => { + return Ok(()); + } + }; + let peer = tcp_stream .peer_addr() .context("connected streams should have a peer address")?; @@ -59,8 +70,6 @@ impl Server { history.clone(), )); } - - Ok(()) } } diff --git a/crates/rerun/Cargo.toml b/crates/rerun/Cargo.toml index 253e54f81e8e..7074903844b0 100644 --- a/crates/rerun/Cargo.toml +++ b/crates/rerun/Cargo.toml @@ -97,6 +97,7 @@ backtrace = "0.3" clap = { workspace = true, features = ["derive"] } mimalloc.workspace = true puffin_http = "0.11" +ctrlc.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } # Native unix dependencies: diff --git a/crates/rerun/src/clap.rs b/crates/rerun/src/clap.rs index 415659f2a3f0..29ba016d81ca 100644 --- a/crates/rerun/src/clap.rs +++ b/crates/rerun/src/clap.rs @@ -74,6 +74,18 @@ impl RerunArgs { default_enabled: bool, run: impl FnOnce(Session) + Send + 'static, ) -> anyhow::Result<()> { + // Ensure we have a running tokio runtime. + #[allow(unused_assignments)] + let mut tokio_runtime = None; + let tokio_runtime_handle = if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle + } else { + tokio_runtime = + Some(tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")); + tokio_runtime.as_ref().unwrap().handle().clone() + }; + let _tokio_runtime_guard = tokio_runtime_handle.enter(); + let (rerun_enabled, recording_info) = crate::SessionBuilder::new(application_id) .default_enabled(default_enabled) .finalize(); @@ -102,12 +114,17 @@ impl RerunArgs { }; let session = Session::new(recording_info, sink); + let _sink = session.sink().clone(); // Keep sink (and potential associated servers) alive until the end of this function scope. run(session); #[cfg(feature = "web_viewer")] if matches!(self.to_behavior(), Ok(RerunBehavior::Serve)) { - eprintln!("Sleeping while serving the web viewer. Abort with Ctrl-C"); - std::thread::sleep(std::time::Duration::from_secs(1_000_000_000)); + use anyhow::Context as _; + + let (mut shutdown_rx, _) = crate::run::setup_ctrl_c_handler(); + return tokio_runtime_handle + .block_on(async { shutdown_rx.recv().await }) + .context("Failed to wait for shutdown signal."); } Ok(()) diff --git a/crates/rerun/src/native_viewer.rs b/crates/rerun/src/native_viewer.rs index 7122d327c03d..e1d5fc1de4e0 100644 --- a/crates/rerun/src/native_viewer.rs +++ b/crates/rerun/src/native_viewer.rs @@ -44,6 +44,7 @@ where re_ui, cc.storage, rx, + std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), )) })) } diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index 89ab851bc6b2..1994e424e139 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -1,9 +1,14 @@ +use std::sync::{atomic::AtomicBool, Arc}; + use re_log_types::{LogMsg, PythonVersion}; use re_smart_channel::Receiver; use anyhow::Context as _; use clap::Subcommand; +#[cfg(feature = "web_viewer")] +use crate::web_viewer::host_web_viewer; + // Note the extra blank lines between the point-lists below: it is required by `clap`. /// The Rerun Viewer and Server @@ -248,6 +253,8 @@ async fn run_impl( }), }; + let (shutdown_rx, shutdown_bool) = setup_ctrl_c_handler(); + // Where do we get the data from? let rx = if let Some(url_or_path) = args.url_or_path.clone() { match categorize_argument(url_or_path) { @@ -262,7 +269,16 @@ async fn run_impl( // We are connecting to a server at a websocket address: if args.web_viewer { - return host_web_viewer(rerun_server_ws_url).await; + #[cfg(feature = "web_viewer")] + { + let web_viewer = + host_web_viewer(true, rerun_server_ws_url, shutdown_rx.resubscribe()); + return web_viewer.await; + } + #[cfg(not(feature = "web_viewer"))] + { + panic!("Can't host web-viewer - rerun was not compiled with the 'web_viewer' feature"); + } } else { #[cfg(feature = "native_viewer")] return native_viewer_connect_to_ws_url( @@ -290,7 +306,7 @@ async fn run_impl( // `rerun.spawn()` doesn't need to log that a connection has been made quiet: call_source.is_python(), }; - re_sdk_comms::serve(args.port, server_options)? + re_sdk_comms::serve(args.port, server_options, shutdown_rx.resubscribe())? } #[cfg(not(feature = "server"))] @@ -311,13 +327,21 @@ async fn run_impl( ); } + // Make it possible to gracefully shutdown the servers on ctrl-c. + let shutdown_ws_server = shutdown_rx.resubscribe(); + let shutdown_web_viewer = shutdown_rx.resubscribe(); + // This is the server which the web viewer will talk to: let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT).await?; - let server_handle = tokio::spawn(ws_server.listen(rx)); + let server_handle = tokio::spawn(ws_server.listen(rx, shutdown_ws_server)); - let rerun_ws_server_url = re_ws_comms::default_server_url(); - host_web_viewer(rerun_ws_server_url).await?; + // This is the server that serves the Wasm+HTML: + let ws_server_url = re_ws_comms::default_server_url(); + let ws_server_handle = + tokio::spawn(host_web_viewer(true, ws_server_url, shutdown_web_viewer)); + // Wait for both servers to shutdown. + ws_server_handle.await?.ok(); return server_handle.await?; } @@ -339,6 +363,7 @@ async fn run_impl( re_ui, cc.storage, rx, + shutdown_bool, ); app.set_profiler(profiler); Box::new(app) @@ -439,29 +464,6 @@ fn load_file_to_channel(path: &std::path::Path) -> anyhow::Result anyhow::Result<()> { - let web_port = 9090; - let viewer_url = format!("http://127.0.0.1:{web_port}?url={rerun_ws_server_url}"); - - let web_server = re_web_viewer_server::WebViewerServer::new(web_port); - let web_server_handle = tokio::spawn(web_server.serve()); - - let open = true; - if open { - webbrowser::open(&viewer_url).ok(); - } else { - println!("Hosting Rerun Web Viewer at {viewer_url}."); - } - - web_server_handle.await? -} - -#[cfg(not(feature = "web_viewer"))] -async fn host_web_viewer(_rerun_ws_server_url: String) -> anyhow::Result<()> { - panic!("Can't host web-viewer - rerun was not compiled with the 'web_viewer' feature"); -} - #[cfg(feature = "server")] fn parse_max_latency(max_latency: Option<&String>) -> f32 { max_latency.as_ref().map_or(f32::INFINITY, |time| { @@ -469,3 +471,16 @@ fn parse_max_latency(max_latency: Option<&String>) -> f32 { .unwrap_or_else(|err| panic!("Failed to parse max_latency ({max_latency:?}): {err}")) }) } + +pub fn setup_ctrl_c_handler() -> (tokio::sync::broadcast::Receiver<()>, Arc) { + let (sender, receiver) = tokio::sync::broadcast::channel(1); + let shutdown_return = Arc::new(AtomicBool::new(false)); + let shutdown = shutdown_return.clone(); + ctrlc::set_handler(move || { + re_log::debug!("Ctrl-C detected, shutting down."); + sender.send(()).unwrap(); + shutdown.store(true, std::sync::atomic::Ordering::Relaxed); + }) + .expect("Error setting Ctrl-C handler"); + (receiver, shutdown_return) +} diff --git a/crates/rerun/src/web_viewer.rs b/crates/rerun/src/web_viewer.rs index 7535098ebfd2..1c91170b0f07 100644 --- a/crates/rerun/src/web_viewer.rs +++ b/crates/rerun/src/web_viewer.rs @@ -4,54 +4,76 @@ use re_log_types::LogMsg; /// * A web-server, serving the web-viewer /// * A `WebSocket` server, server [`LogMsg`]es to remote viewer(s). struct RemoteViewerServer { - web_server_join_handle: tokio::task::JoinHandle<()>, sender: re_smart_channel::Sender, + shutdown_tx: tokio::sync::broadcast::Sender<()>, } impl Drop for RemoteViewerServer { fn drop(&mut self) { re_log::info!("Shutting down web server."); - self.web_server_join_handle.abort(); + self.shutdown_tx.send(()).ok(); } } impl RemoteViewerServer { - pub fn new(tokio_rt: &tokio::runtime::Runtime, open_browser: bool) -> Self { + pub fn new(open_browser: bool) -> Self { let (rerun_tx, rerun_rx) = re_smart_channel::smart_channel(re_smart_channel::Source::Sdk); + let (shutdown_tx, shutdown_rx_ws_server) = tokio::sync::broadcast::channel(1); + let shutdown_rx_web_server = shutdown_tx.subscribe(); - let web_server_join_handle = tokio_rt.spawn(async move { + tokio::spawn(async move { // This is the server which the web viewer will talk to: let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT) .await .unwrap(); - let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx)); // TODO(emilk): use tokio_rt ? + let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx, shutdown_rx_ws_server)); // This is the server that serves the Wasm+HTML: - let web_port = 9090; - let web_server = re_web_viewer_server::WebViewerServer::new(web_port); - let web_server_handle = tokio::spawn(async move { - web_server.serve().await.unwrap(); - }); - let ws_server_url = re_ws_comms::default_server_url(); - let viewer_url = format!("http://127.0.0.1:{web_port}?url={ws_server_url}"); - if open_browser { - webbrowser::open(&viewer_url).ok(); - } else { - re_log::info!("Web server is running - view it at {viewer_url}"); - } + let web_server_handle = tokio::spawn(host_web_viewer( + open_browser, + ws_server_url, + shutdown_rx_web_server, + )); ws_server_handle.await.unwrap().unwrap(); - web_server_handle.await.unwrap(); + web_server_handle.await.unwrap().unwrap(); }); Self { - web_server_join_handle, sender: rerun_tx, + shutdown_tx, } } } +/// Hosts two servers: +/// * A web-server, serving the web-viewer +/// * A `WebSocket` server, server [`LogMsg`]es to remote viewer(s). +/// +/// Optionally opens a browser with the web-viewer. +#[cfg(feature = "web_viewer")] +pub async fn host_web_viewer( + open_browser: bool, + ws_server_url: String, + shutdown_rx: tokio::sync::broadcast::Receiver<()>, +) -> anyhow::Result<()> { + let web_port = 9090; + let viewer_url = format!("http://127.0.0.1:{web_port}?url={ws_server_url}"); + + let web_server = re_web_viewer_server::WebViewerServer::new(web_port); + let web_server_handle = tokio::spawn(web_server.serve(shutdown_rx)); + + re_log::info!("Web server is running - view it at {viewer_url}"); + if open_browser { + webbrowser::open(&viewer_url).ok(); + } else { + re_log::info!("Web server is running - view it at {viewer_url}"); + } + + web_server_handle.await? +} + impl crate::sink::LogSink for RemoteViewerServer { fn send(&self, msg: LogMsg) { if let Err(err) = self.sender.send(msg) { @@ -72,14 +94,9 @@ impl crate::sink::LogSink for RemoteViewerServer { /// NOTE: you can not connect one `Session` to another. /// /// This function returns immediately. +/// +/// The caller needs to ensure that there is a `tokio` runtime running. #[must_use] pub fn new_sink(open_browser: bool) -> Box { - // TODO(emilk): creating a tokio runtime on-demand like this is not great. Not sure how this interacts with `#[tokio::main]`, for instance. - use once_cell::sync::Lazy; - use parking_lot::Mutex; - static TOKIO_RUNTIME: Lazy> = Lazy::new(|| { - Mutex::new(tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")) - }); - - Box::new(RemoteViewerServer::new(&TOKIO_RUNTIME.lock(), open_browser)) + Box::new(RemoteViewerServer::new(open_browser)) } diff --git a/rerun_py/rerun_sdk/rerun/script_helpers.py b/rerun_py/rerun_sdk/rerun/script_helpers.py index 36c091b85e32..8b59133dd090 100644 --- a/rerun_py/rerun_sdk/rerun/script_helpers.py +++ b/rerun_py/rerun_sdk/rerun/script_helpers.py @@ -18,9 +18,7 @@ ``` """ -import contextlib from argparse import ArgumentParser, Namespace -from time import sleep import rerun as rr @@ -93,6 +91,10 @@ def script_teardown(args: Namespace) -> None: """ if args.serve: + import signal + from threading import Event + + exit = Event() + signal.signal(signal.SIGINT, lambda sig, frame: exit.set()) print("Sleeping while serving the web viewer. Abort with Ctrl-C") - with contextlib.suppress(Exception): - sleep(1_000_000_000) + exit.wait() diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 77c805024fc4..43c70d572bb0 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -295,7 +295,12 @@ fn serve(open_browser: bool) -> PyResult<()> { return Ok(()); } + use once_cell::sync::Lazy; + static TOKIO_RUNTIME: Lazy = + Lazy::new(|| tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")); + let _guard = TOKIO_RUNTIME.enter(); session.set_sink(rerun::web_viewer::new_sink(open_browser)); + Ok(()) }