From 6adb7fa06954557d0f6041e3dfc1ab7cb88a47cc Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 20 Mar 2023 10:09:12 +0100 Subject: [PATCH 01/12] Handle ctrl+c to gracefully shutdown the server(s) --- Cargo.lock | 2 + Cargo.toml | 1 + crates/re_viewer/Cargo.toml | 2 +- crates/re_web_viewer_server/Cargo.toml | 1 + crates/re_web_viewer_server/src/lib.rs | 11 ++++- crates/re_web_viewer_server/src/main.rs | 11 ++++- crates/re_ws_comms/src/server.rs | 17 ++++++-- crates/rerun/Cargo.toml | 1 + crates/rerun/src/run.rs | 51 +++++++++++++--------- crates/rerun/src/web_viewer.rs | 48 ++++++++++++++------ rerun_py/rerun_sdk/rerun/script_helpers.py | 10 +++-- 11 files changed, 108 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 563a0cc2b1ff..0a65c1f3b070 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4156,6 +4156,7 @@ version = "0.3.1" dependencies = [ "anyhow", "cargo_metadata", + "ctrlc", "document-features", "futures-util", "glob", @@ -4245,6 +4246,7 @@ dependencies = [ "backtrace", "clap 4.1.4", "crossbeam", + "ctrlc", "document-features", "egui", "itertools", diff --git a/Cargo.toml b/Cargo.toml index 1b1e05810147..bcec71d3438c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ arrow2 = "0.16" arrow2_convert = "0.4.2" clap = "4.0" comfy-table = { version = "6.1", default-features = false } +ctrlc = { version = "3.0", features = ["termination"] } ecolor = "0.21.0" eframe = { version = "0.21.3", default-features = false } egui = "0.21.0" diff --git a/crates/re_viewer/Cargo.toml b/crates/re_viewer/Cargo.toml index 7bf553ac6676..d13bf202cce7 100644 --- a/crates/re_viewer/Cargo.toml +++ b/crates/re_viewer/Cargo.toml @@ -111,7 +111,7 @@ wgpu.workspace = true arboard = { version = "3.2", default-features = false, features = [ "image-data", ] } -ctrlc = { version = "3.0", features = ["termination"] } +ctrlc.workspace = true puffin_http = "0.11" puffin.workspace = true diff --git a/crates/re_web_viewer_server/Cargo.toml b/crates/re_web_viewer_server/Cargo.toml index a26f9a3a5d66..36a55936eb33 100644 --- a/crates/re_web_viewer_server/Cargo.toml +++ b/crates/re_web_viewer_server/Cargo.toml @@ -39,6 +39,7 @@ analytics = ["dep:re_analytics"] re_log.workspace = true anyhow.workspace = true +ctrlc.workspace = true document-features = "0.2" futures-util = "0.3" hyper = { version = "0.14", features = ["full"] } diff --git a/crates/re_web_viewer_server/src/lib.rs b/crates/re_web_viewer_server/src/lib.rs index 9b01bf217cd6..103554a13f30 100644 --- a/crates/re_web_viewer_server/src/lib.rs +++ b/crates/re_web_viewer_server/src/lib.rs @@ -157,8 +157,15 @@ impl WebViewerServer { Self { server } } - pub async fn serve(self) -> anyhow::Result<()> { - self.server.await?; + pub async fn serve( + self, + mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, + ) -> anyhow::Result<()> { + self.server + .with_graceful_shutdown(async { + shutdown_rx.recv().await.ok(); + }) + .await?; Ok(()) } } diff --git a/crates/re_web_viewer_server/src/main.rs b/crates/re_web_viewer_server/src/main.rs index 52a4930c521a..06196f108304 100644 --- a/crates/re_web_viewer_server/src/main.rs +++ b/crates/re_web_viewer_server/src/main.rs @@ -6,8 +6,17 @@ async fn main() { re_log::setup_native_logging(); let port = 9090; eprintln!("Hosting web-viewer on http://127.0.0.1:{port}"); + + // Shutdown server via Ctrl+C + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); + ctrlc::set_handler(move || { + re_log::debug!("Ctrl-C detected - Closing server."); + shutdown_tx.send(()).unwrap(); + }) + .expect("Error setting Ctrl-C handler"); + re_web_viewer_server::WebViewerServer::new(port) - .serve() + .serve(shutdown_rx) .await .unwrap(); } diff --git a/crates/re_ws_comms/src/server.rs b/crates/re_ws_comms/src/server.rs index 626d11b3b75d..d1f98309f1a8 100644 --- a/crates/re_ws_comms/src/server.rs +++ b/crates/re_ws_comms/src/server.rs @@ -41,14 +41,25 @@ impl Server { } /// Accept new connections forever - pub async fn listen(self, rx: Receiver) -> anyhow::Result<()> { + pub async fn listen( + self, + rx: Receiver, + mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, + ) -> anyhow::Result<()> { use anyhow::Context as _; let history = Arc::new(Mutex::new(Vec::new())); let log_stream = to_broadcast_stream(rx, history.clone()); - while let Ok((tcp_stream, _)) = self.listener.accept().await { + loop { + let (tcp_stream, _) = tokio::select! { + res = self.listener.accept() => res?, + _ = shutdown_rx.recv() => { + return Ok(()); + } + }; + let peer = tcp_stream .peer_addr() .context("connected streams should have a peer address")?; @@ -59,8 +70,6 @@ impl Server { history.clone(), )); } - - Ok(()) } } diff --git a/crates/rerun/Cargo.toml b/crates/rerun/Cargo.toml index 253e54f81e8e..7074903844b0 100644 --- a/crates/rerun/Cargo.toml +++ b/crates/rerun/Cargo.toml @@ -97,6 +97,7 @@ backtrace = "0.3" clap = { workspace = true, features = ["derive"] } mimalloc.workspace = true puffin_http = "0.11" +ctrlc.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } # Native unix dependencies: diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index 89ab851bc6b2..1f9ea972afd1 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -262,7 +262,9 @@ async fn run_impl( // We are connecting to a server at a websocket address: if args.web_viewer { - return host_web_viewer(rerun_server_ws_url).await; + let shutdown_rx = setup_ctrl_c_handler(); + let web_viewer = host_web_viewer(true, rerun_server_ws_url, shutdown_rx); + return web_viewer.await; } else { #[cfg(feature = "native_viewer")] return native_viewer_connect_to_ws_url( @@ -311,13 +313,21 @@ async fn run_impl( ); } + // Make it possible to gracefully shutdown the servers on ctrl-c. + let shutdown_ws_server = setup_ctrl_c_handler(); + let shutdown_web_viewer = shutdown_ws_server.resubscribe(); + // This is the server which the web viewer will talk to: let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT).await?; - let server_handle = tokio::spawn(ws_server.listen(rx)); + let server_handle = tokio::spawn(ws_server.listen(rx, shutdown_ws_server)); - let rerun_ws_server_url = re_ws_comms::default_server_url(); - host_web_viewer(rerun_ws_server_url).await?; + // This is the server that serves the Wasm+HTML: + let ws_server_url = re_ws_comms::default_server_url(); + let ws_server_handle = + tokio::spawn(host_web_viewer(true, ws_server_url, shutdown_web_viewer)); + // Wait for both servers to shutdown. + ws_server_handle.await?.ok(); return server_handle.await?; } @@ -355,6 +365,16 @@ async fn run_impl( } } +fn setup_ctrl_c_handler() -> tokio::sync::broadcast::Receiver<()> { + let (sender, receiver) = tokio::sync::broadcast::channel(1); + ctrlc::set_handler(move || { + re_log::debug!("Ctrl-C detected - Closing server."); + sender.send(()).unwrap(); + }) + .expect("Error setting Ctrl-C handler"); + receiver +} + enum ArgumentCategory { /// A remote RRD file, served over http. RrdHttpUrl(String), @@ -440,25 +460,14 @@ fn load_file_to_channel(path: &std::path::Path) -> anyhow::Result anyhow::Result<()> { - let web_port = 9090; - let viewer_url = format!("http://127.0.0.1:{web_port}?url={rerun_ws_server_url}"); - - let web_server = re_web_viewer_server::WebViewerServer::new(web_port); - let web_server_handle = tokio::spawn(web_server.serve()); - - let open = true; - if open { - webbrowser::open(&viewer_url).ok(); - } else { - println!("Hosting Rerun Web Viewer at {viewer_url}."); - } - - web_server_handle.await? -} +use crate::web_viewer::host_web_viewer; #[cfg(not(feature = "web_viewer"))] -async fn host_web_viewer(_rerun_ws_server_url: String) -> anyhow::Result<()> { +pub async fn host_web_viewer( + _open_browser: bool, + _ws_server_url: String, + _shutdown_rx: tokio::sync::broadcast::Receiver<()>, +) -> anyhow::Result<()> { panic!("Can't host web-viewer - rerun was not compiled with the 'web_viewer' feature"); } diff --git a/crates/rerun/src/web_viewer.rs b/crates/rerun/src/web_viewer.rs index 7535098ebfd2..b3b38806bfb8 100644 --- a/crates/rerun/src/web_viewer.rs +++ b/crates/rerun/src/web_viewer.rs @@ -6,6 +6,8 @@ use re_log_types::LogMsg; struct RemoteViewerServer { web_server_join_handle: tokio::task::JoinHandle<()>, sender: re_smart_channel::Sender, + #[allow(dead_code)] // Unused currently, but can be used later to cancel a serve. + shutdown_tx: tokio::sync::broadcast::Sender<()>, } impl Drop for RemoteViewerServer { @@ -18,40 +20,58 @@ impl Drop for RemoteViewerServer { impl RemoteViewerServer { pub fn new(tokio_rt: &tokio::runtime::Runtime, open_browser: bool) -> Self { let (rerun_tx, rerun_rx) = re_smart_channel::smart_channel(re_smart_channel::Source::Sdk); + let (shutdown_tx, shutdown_rx_ws_server) = tokio::sync::broadcast::channel(1); + let shutdown_rx_web_server = shutdown_tx.subscribe(); let web_server_join_handle = tokio_rt.spawn(async move { // This is the server which the web viewer will talk to: let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT) .await .unwrap(); - let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx)); // TODO(emilk): use tokio_rt ? + let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx, shutdown_rx_ws_server)); // TODO(emilk): use tokio_rt ? // This is the server that serves the Wasm+HTML: - let web_port = 9090; - let web_server = re_web_viewer_server::WebViewerServer::new(web_port); - let web_server_handle = tokio::spawn(async move { - web_server.serve().await.unwrap(); - }); - let ws_server_url = re_ws_comms::default_server_url(); - let viewer_url = format!("http://127.0.0.1:{web_port}?url={ws_server_url}"); - if open_browser { - webbrowser::open(&viewer_url).ok(); - } else { - re_log::info!("Web server is running - view it at {viewer_url}"); - } + let web_server_handle = tokio::spawn(host_web_viewer( + open_browser, + ws_server_url, + shutdown_rx_web_server, + )); ws_server_handle.await.unwrap().unwrap(); - web_server_handle.await.unwrap(); + web_server_handle.await.unwrap().unwrap(); }); Self { web_server_join_handle, sender: rerun_tx, + shutdown_tx, } } } +/// Start a web server for localhost and optionally spawns a browser to view it. +#[cfg(feature = "web_viewer")] +pub async fn host_web_viewer( + open_browser: bool, + ws_server_url: String, + shutdown_rx: tokio::sync::broadcast::Receiver<()>, +) -> anyhow::Result<()> { + let web_port = 9090; + let viewer_url = format!("http://127.0.0.1:{web_port}?url={ws_server_url}"); + + let web_server = re_web_viewer_server::WebViewerServer::new(web_port); + let web_server_handle = tokio::spawn(web_server.serve(shutdown_rx)); + + if open_browser { + webbrowser::open(&viewer_url).ok(); + } else { + re_log::info!("Web server is running - view it at {viewer_url}"); + } + + web_server_handle.await? +} + impl crate::sink::LogSink for RemoteViewerServer { fn send(&self, msg: LogMsg) { if let Err(err) = self.sender.send(msg) { diff --git a/rerun_py/rerun_sdk/rerun/script_helpers.py b/rerun_py/rerun_sdk/rerun/script_helpers.py index 36c091b85e32..38405c0fd88c 100644 --- a/rerun_py/rerun_sdk/rerun/script_helpers.py +++ b/rerun_py/rerun_sdk/rerun/script_helpers.py @@ -18,9 +18,7 @@ ``` """ -import contextlib from argparse import ArgumentParser, Namespace -from time import sleep import rerun as rr @@ -93,6 +91,10 @@ def script_teardown(args: Namespace) -> None: """ if args.serve: + from threading import Event + import signal + + exit = Event() + signal.signal(signal.SIGINT, lambda sig, frame: exit.set()) print("Sleeping while serving the web viewer. Abort with Ctrl-C") - with contextlib.suppress(Exception): - sleep(1_000_000_000) + exit.wait() From bd9891115e55324a21a9df1eebee6aa731f36f89 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 20 Mar 2023 10:29:04 +0100 Subject: [PATCH 02/12] formatting --- rerun_py/rerun_sdk/rerun/script_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rerun_py/rerun_sdk/rerun/script_helpers.py b/rerun_py/rerun_sdk/rerun/script_helpers.py index 38405c0fd88c..8b59133dd090 100644 --- a/rerun_py/rerun_sdk/rerun/script_helpers.py +++ b/rerun_py/rerun_sdk/rerun/script_helpers.py @@ -91,8 +91,8 @@ def script_teardown(args: Namespace) -> None: """ if args.serve: - from threading import Event import signal + from threading import Event exit = Event() signal.signal(signal.SIGINT, lambda sig, frame: exit.set()) From 2efb8d8514d109246e843a06452ef9db04d4cf36 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 20 Mar 2023 14:32:49 +0100 Subject: [PATCH 03/12] move setup_ctrl_c_handler to re_web_viewer_server --- Cargo.lock | 1 - crates/re_web_viewer_server/src/lib.rs | 12 ++++++++ crates/re_web_viewer_server/src/main.rs | 13 ++------ crates/rerun/Cargo.toml | 1 - crates/rerun/src/run.rs | 40 ++++++++++--------------- 5 files changed, 30 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0a65c1f3b070..63a7c9551103 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4246,7 +4246,6 @@ dependencies = [ "backtrace", "clap 4.1.4", "crossbeam", - "ctrlc", "document-features", "egui", "itertools", diff --git a/crates/re_web_viewer_server/src/lib.rs b/crates/re_web_viewer_server/src/lib.rs index 103554a13f30..45727381fbf1 100644 --- a/crates/re_web_viewer_server/src/lib.rs +++ b/crates/re_web_viewer_server/src/lib.rs @@ -169,3 +169,15 @@ impl WebViewerServer { Ok(()) } } + +// ---------------------------------------------------------------------------- + +pub fn setup_ctrl_c_handler() -> tokio::sync::broadcast::Receiver<()> { + let (sender, receiver) = tokio::sync::broadcast::channel(1); + ctrlc::set_handler(move || { + re_log::debug!("Ctrl-C detected - Closing web server."); + sender.send(()).unwrap(); + }) + .expect("Error setting Ctrl-C handler"); + receiver +} diff --git a/crates/re_web_viewer_server/src/main.rs b/crates/re_web_viewer_server/src/main.rs index 06196f108304..84c37df96e99 100644 --- a/crates/re_web_viewer_server/src/main.rs +++ b/crates/re_web_viewer_server/src/main.rs @@ -1,22 +1,15 @@ #![forbid(unsafe_code)] #![warn(clippy::all, rust_2018_idioms)] +use re_web_viewer_server::setup_ctrl_c_handler; + #[tokio::main] async fn main() { re_log::setup_native_logging(); let port = 9090; eprintln!("Hosting web-viewer on http://127.0.0.1:{port}"); - - // Shutdown server via Ctrl+C - let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); - ctrlc::set_handler(move || { - re_log::debug!("Ctrl-C detected - Closing server."); - shutdown_tx.send(()).unwrap(); - }) - .expect("Error setting Ctrl-C handler"); - re_web_viewer_server::WebViewerServer::new(port) - .serve(shutdown_rx) + .serve(setup_ctrl_c_handler()) .await .unwrap(); } diff --git a/crates/rerun/Cargo.toml b/crates/rerun/Cargo.toml index 7074903844b0..253e54f81e8e 100644 --- a/crates/rerun/Cargo.toml +++ b/crates/rerun/Cargo.toml @@ -97,7 +97,6 @@ backtrace = "0.3" clap = { workspace = true, features = ["derive"] } mimalloc.workspace = true puffin_http = "0.11" -ctrlc.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } # Native unix dependencies: diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index 1f9ea972afd1..317c13d0d4f9 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -4,6 +4,11 @@ use re_smart_channel::Receiver; use anyhow::Context as _; use clap::Subcommand; +#[cfg(feature = "web_viewer")] +use crate::web_viewer::host_web_viewer; +#[cfg(feature = "web_viewer")] +use re_web_viewer_server::setup_ctrl_c_handler; + // Note the extra blank lines between the point-lists below: it is required by `clap`. /// The Rerun Viewer and Server @@ -262,9 +267,16 @@ async fn run_impl( // We are connecting to a server at a websocket address: if args.web_viewer { - let shutdown_rx = setup_ctrl_c_handler(); - let web_viewer = host_web_viewer(true, rerun_server_ws_url, shutdown_rx); - return web_viewer.await; + #[cfg(feature = "web_viewer")] + { + let shutdown_rx = setup_ctrl_c_handler(); + let web_viewer = host_web_viewer(true, rerun_server_ws_url, shutdown_rx); + return web_viewer.await; + } + #[cfg(not(feature = "web_viewer"))] + { + panic!("Can't host web-viewer - rerun was not compiled with the 'web_viewer' feature"); + } } else { #[cfg(feature = "native_viewer")] return native_viewer_connect_to_ws_url( @@ -365,16 +377,6 @@ async fn run_impl( } } -fn setup_ctrl_c_handler() -> tokio::sync::broadcast::Receiver<()> { - let (sender, receiver) = tokio::sync::broadcast::channel(1); - ctrlc::set_handler(move || { - re_log::debug!("Ctrl-C detected - Closing server."); - sender.send(()).unwrap(); - }) - .expect("Error setting Ctrl-C handler"); - receiver -} - enum ArgumentCategory { /// A remote RRD file, served over http. RrdHttpUrl(String), @@ -459,18 +461,6 @@ fn load_file_to_channel(path: &std::path::Path) -> anyhow::Result, -) -> anyhow::Result<()> { - panic!("Can't host web-viewer - rerun was not compiled with the 'web_viewer' feature"); -} - #[cfg(feature = "server")] fn parse_max_latency(max_latency: Option<&String>) -> f32 { max_latency.as_ref().map_or(f32::INFINITY, |time| { From 686f2740a2eb888920bfad163fa7dcc8bf8136b8 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 20 Mar 2023 14:38:49 +0100 Subject: [PATCH 04/12] comment fix, use channel for server shutdown in `RemoteViewerServer` --- crates/rerun/src/web_viewer.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/crates/rerun/src/web_viewer.rs b/crates/rerun/src/web_viewer.rs index b3b38806bfb8..32cc384d5271 100644 --- a/crates/rerun/src/web_viewer.rs +++ b/crates/rerun/src/web_viewer.rs @@ -4,16 +4,14 @@ use re_log_types::LogMsg; /// * A web-server, serving the web-viewer /// * A `WebSocket` server, server [`LogMsg`]es to remote viewer(s). struct RemoteViewerServer { - web_server_join_handle: tokio::task::JoinHandle<()>, sender: re_smart_channel::Sender, - #[allow(dead_code)] // Unused currently, but can be used later to cancel a serve. shutdown_tx: tokio::sync::broadcast::Sender<()>, } impl Drop for RemoteViewerServer { fn drop(&mut self) { re_log::info!("Shutting down web server."); - self.web_server_join_handle.abort(); + self.shutdown_tx.send(()).unwrap(); } } @@ -23,7 +21,7 @@ impl RemoteViewerServer { let (shutdown_tx, shutdown_rx_ws_server) = tokio::sync::broadcast::channel(1); let shutdown_rx_web_server = shutdown_tx.subscribe(); - let web_server_join_handle = tokio_rt.spawn(async move { + tokio_rt.spawn(async move { // This is the server which the web viewer will talk to: let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT) .await @@ -43,14 +41,17 @@ impl RemoteViewerServer { }); Self { - web_server_join_handle, sender: rerun_tx, shutdown_tx, } } } -/// Start a web server for localhost and optionally spawns a browser to view it. +/// Hosts two servers: +/// * A web-server, serving the web-viewer +/// * A `WebSocket` server, server [`LogMsg`]es to remote viewer(s). +/// +/// Optionally opens a browser with the web-viewer. #[cfg(feature = "web_viewer")] pub async fn host_web_viewer( open_browser: bool, From 3cd7aef03602a687dfb2aa2275e8a28d63bf125e Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 20 Mar 2023 15:29:35 +0100 Subject: [PATCH 05/12] use tokio in re_web_viewer_server/server in order to use same graceful shutdown mechanism Had to move setup_ctrl_c_handler back to `rerun` because of this --- Cargo.lock | 2 + crates/re_sdk_comms/Cargo.toml | 1 + crates/re_sdk_comms/src/server.rs | 116 +++++++++++++----------- crates/re_web_viewer_server/src/lib.rs | 12 --- crates/re_web_viewer_server/src/main.rs | 13 ++- crates/rerun/Cargo.toml | 1 + crates/rerun/src/run.rs | 24 +++-- 7 files changed, 94 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63a7c9551103..c2030b356b20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4020,6 +4020,7 @@ dependencies = [ "re_log", "re_log_types", "re_smart_channel", + "tokio", ] [[package]] @@ -4246,6 +4247,7 @@ dependencies = [ "backtrace", "clap 4.1.4", "crossbeam", + "ctrlc", "document-features", "egui", "itertools", diff --git a/crates/re_sdk_comms/Cargo.toml b/crates/re_sdk_comms/Cargo.toml index b8ecd31eea9b..9aa9cce5d4cc 100644 --- a/crates/re_sdk_comms/Cargo.toml +++ b/crates/re_sdk_comms/Cargo.toml @@ -35,3 +35,4 @@ bincode = "1.3" crossbeam = "0.8" document-features = "0.2" rand = { version = "0.8.5", features = ["small_rng"] } +tokio.workspace = true diff --git a/crates/re_sdk_comms/src/server.rs b/crates/re_sdk_comms/src/server.rs index 7fa717143213..8e56c03bb4f5 100644 --- a/crates/re_sdk_comms/src/server.rs +++ b/crates/re_sdk_comms/src/server.rs @@ -7,6 +7,7 @@ use rand::{Rng as _, SeedableRng}; use re_log_types::{LogMsg, TimePoint, TimeType, TimelineName}; use re_smart_channel::{Receiver, Sender}; +use tokio::net::{TcpListener, TcpStream}; #[derive(Clone, Copy, Debug, PartialEq)] pub struct ServerOptions { @@ -27,37 +28,18 @@ impl Default for ServerOptions { } } -/// Listen to multiple SDK:s connecting to us over TCP. -/// -/// ``` no_run -/// # use re_sdk_comms::{serve, ServerOptions}; -/// let log_msg_rx = serve(80, ServerOptions::default())?; -/// # Ok::<(), anyhow::Error>(()) -/// ``` -pub fn serve(port: u16, options: ServerOptions) -> anyhow::Result> { +async fn listen_for_new_clients( + port: u16, + options: ServerOptions, + tx: Sender, + mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, +) { let bind_addr = format!("0.0.0.0:{port}"); - let listener = std::net::TcpListener::bind(&bind_addr) - .with_context(|| format!("Failed to bind address {bind_addr:?}"))?; - - let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port }); - - std::thread::Builder::new() - .name("sdk-server".into()) - .spawn(move || { - for stream in listener.incoming() { - match stream { - Ok(stream) => { - let tx = tx.clone(); - spawn_client(stream, tx, options); - } - Err(err) => { - re_log::warn!("Failed to accept incoming SDK client: {err}"); - } - } - } - }) - .expect("Failed to spawn thread"); + let listener = TcpListener::bind(&bind_addr) + .await + .with_context(|| format!("Failed to bind address {bind_addr:?}")) + .unwrap(); if options.quiet { re_log::debug!( @@ -69,40 +51,68 @@ pub fn serve(port: u16, options: ServerOptions) -> anyhow::Result res, + _ = shutdown_rx.recv() => { + return; + } + }; + match incoming { + Ok((stream, _)) => { + let tx = tx.clone(); + spawn_client(stream, tx, options); + } + Err(err) => { + re_log::warn!("Failed to accept incoming SDK client: {err}"); + } + } + } } -fn spawn_client(stream: std::net::TcpStream, tx: Sender, options: ServerOptions) { - std::thread::Builder::new() - .name(format!( - "sdk-server-client-handler-{:?}", - stream.peer_addr() - )) - .spawn(move || { - if options.quiet { - re_log::debug!("New SDK client connected: {:?}", stream.peer_addr()); - } else { - re_log::info!("New SDK client connected: {:?}", stream.peer_addr()); - } +/// Listen to multiple SDK:s connecting to us over TCP. +/// +/// ``` no_run +/// # use re_sdk_comms::{serve, ServerOptions}; +/// let log_msg_rx = serve(80, ServerOptions::default())?; +/// # Ok::<(), anyhow::Error>(()) +/// ``` +pub fn serve( + port: u16, + options: ServerOptions, + shutdown_rx: tokio::sync::broadcast::Receiver<()>, +) -> anyhow::Result> { + let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port }); - if let Err(err) = run_client(stream, &tx, options) { - re_log::warn!("Closing connection to client: {err}"); - } - }) - .expect("Failed to spawn thread"); + tokio::spawn(listen_for_new_clients(port, options, tx, shutdown_rx)); + + Ok(rx) +} + +fn spawn_client(stream: TcpStream, tx: Sender, options: ServerOptions) { + tokio::spawn(async move { + if options.quiet { + re_log::debug!("New SDK client connected: {:?}", stream.peer_addr()); + } else { + re_log::info!("New SDK client connected: {:?}", stream.peer_addr()); + } + if let Err(err) = run_client(stream, &tx, options).await { + re_log::warn!("Closing connection to client: {err}"); + } + }); } -fn run_client( - mut stream: std::net::TcpStream, +async fn run_client( + mut stream: TcpStream, tx: &Sender, options: ServerOptions, ) -> anyhow::Result<()> { #![allow(clippy::read_zero_byte_vec)] // false positive: https://github.com/rust-lang/rust-clippy/issues/9274 - use std::io::Read as _; + use tokio::io::AsyncReadExt as _; let mut client_version = [0_u8; 2]; - stream.read_exact(&mut client_version)?; + stream.read_exact(&mut client_version).await?; let client_version = u16::from_le_bytes(client_version); match client_version.cmp(&crate::PROTOCOL_VERSION) { @@ -129,11 +139,11 @@ fn run_client( loop { let mut packet_size = [0_u8; 4]; - stream.read_exact(&mut packet_size)?; + stream.read_exact(&mut packet_size).await?; let packet_size = u32::from_le_bytes(packet_size); packet.resize(packet_size as usize, 0_u8); - stream.read_exact(&mut packet)?; + stream.read_exact(&mut packet).await?; re_log::trace!("Received log message of size {packet_size}."); diff --git a/crates/re_web_viewer_server/src/lib.rs b/crates/re_web_viewer_server/src/lib.rs index 45727381fbf1..103554a13f30 100644 --- a/crates/re_web_viewer_server/src/lib.rs +++ b/crates/re_web_viewer_server/src/lib.rs @@ -169,15 +169,3 @@ impl WebViewerServer { Ok(()) } } - -// ---------------------------------------------------------------------------- - -pub fn setup_ctrl_c_handler() -> tokio::sync::broadcast::Receiver<()> { - let (sender, receiver) = tokio::sync::broadcast::channel(1); - ctrlc::set_handler(move || { - re_log::debug!("Ctrl-C detected - Closing web server."); - sender.send(()).unwrap(); - }) - .expect("Error setting Ctrl-C handler"); - receiver -} diff --git a/crates/re_web_viewer_server/src/main.rs b/crates/re_web_viewer_server/src/main.rs index 84c37df96e99..84bb08a376d6 100644 --- a/crates/re_web_viewer_server/src/main.rs +++ b/crates/re_web_viewer_server/src/main.rs @@ -1,15 +1,22 @@ #![forbid(unsafe_code)] #![warn(clippy::all, rust_2018_idioms)] -use re_web_viewer_server::setup_ctrl_c_handler; - #[tokio::main] async fn main() { re_log::setup_native_logging(); let port = 9090; eprintln!("Hosting web-viewer on http://127.0.0.1:{port}"); + + // Shutdown server via Ctrl+C + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); + ctrlc::set_handler(move || { + re_log::debug!("Ctrl-C detected - Closing web server."); + shutdown_tx.send(()).unwrap(); + }) + .expect("Error setting Ctrl-C handler"); + re_web_viewer_server::WebViewerServer::new(port) - .serve(setup_ctrl_c_handler()) + .serve(shutdown_rx) .await .unwrap(); } diff --git a/crates/rerun/Cargo.toml b/crates/rerun/Cargo.toml index 253e54f81e8e..7074903844b0 100644 --- a/crates/rerun/Cargo.toml +++ b/crates/rerun/Cargo.toml @@ -97,6 +97,7 @@ backtrace = "0.3" clap = { workspace = true, features = ["derive"] } mimalloc.workspace = true puffin_http = "0.11" +ctrlc.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } # Native unix dependencies: diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index 317c13d0d4f9..15d22c8c1bd5 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -6,8 +6,6 @@ use clap::Subcommand; #[cfg(feature = "web_viewer")] use crate::web_viewer::host_web_viewer; -#[cfg(feature = "web_viewer")] -use re_web_viewer_server::setup_ctrl_c_handler; // Note the extra blank lines between the point-lists below: it is required by `clap`. @@ -253,6 +251,8 @@ async fn run_impl( }), }; + let shutdown_rx = setup_ctrl_c_handler(); + // Where do we get the data from? let rx = if let Some(url_or_path) = args.url_or_path.clone() { match categorize_argument(url_or_path) { @@ -269,8 +269,8 @@ async fn run_impl( if args.web_viewer { #[cfg(feature = "web_viewer")] { - let shutdown_rx = setup_ctrl_c_handler(); - let web_viewer = host_web_viewer(true, rerun_server_ws_url, shutdown_rx); + let web_viewer = + host_web_viewer(true, rerun_server_ws_url, shutdown_rx.resubscribe()); return web_viewer.await; } #[cfg(not(feature = "web_viewer"))] @@ -304,7 +304,7 @@ async fn run_impl( // `rerun.spawn()` doesn't need to log that a connection has been made quiet: call_source.is_python(), }; - re_sdk_comms::serve(args.port, server_options)? + re_sdk_comms::serve(args.port, server_options, shutdown_rx.resubscribe())? } #[cfg(not(feature = "server"))] @@ -326,8 +326,8 @@ async fn run_impl( } // Make it possible to gracefully shutdown the servers on ctrl-c. - let shutdown_ws_server = setup_ctrl_c_handler(); - let shutdown_web_viewer = shutdown_ws_server.resubscribe(); + let shutdown_ws_server = shutdown_rx.resubscribe(); + let shutdown_web_viewer = shutdown_rx.resubscribe(); // This is the server which the web viewer will talk to: let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT).await?; @@ -468,3 +468,13 @@ fn parse_max_latency(max_latency: Option<&String>) -> f32 { .unwrap_or_else(|err| panic!("Failed to parse max_latency ({max_latency:?}): {err}")) }) } + +pub fn setup_ctrl_c_handler() -> tokio::sync::broadcast::Receiver<()> { + let (sender, receiver) = tokio::sync::broadcast::channel(1); + ctrlc::set_handler(move || { + re_log::debug!("Ctrl-C detected, shutting down."); + sender.send(()).unwrap(); + }) + .expect("Error setting Ctrl-C handler"); + receiver +} From e3d20271a46fa74af8b8bfa9e25ea3182c41dbe2 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 20 Mar 2023 16:23:21 +0100 Subject: [PATCH 06/12] don't create tokio runtime if there is already one --- crates/rerun/src/web_viewer.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/crates/rerun/src/web_viewer.rs b/crates/rerun/src/web_viewer.rs index 32cc384d5271..b45a7f7fba4e 100644 --- a/crates/rerun/src/web_viewer.rs +++ b/crates/rerun/src/web_viewer.rs @@ -16,17 +16,17 @@ impl Drop for RemoteViewerServer { } impl RemoteViewerServer { - pub fn new(tokio_rt: &tokio::runtime::Runtime, open_browser: bool) -> Self { + pub fn new(open_browser: bool) -> Self { let (rerun_tx, rerun_rx) = re_smart_channel::smart_channel(re_smart_channel::Source::Sdk); let (shutdown_tx, shutdown_rx_ws_server) = tokio::sync::broadcast::channel(1); let shutdown_rx_web_server = shutdown_tx.subscribe(); - tokio_rt.spawn(async move { + tokio::spawn(async move { // This is the server which the web viewer will talk to: let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT) .await .unwrap(); - let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx, shutdown_rx_ws_server)); // TODO(emilk): use tokio_rt ? + let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx, shutdown_rx_ws_server)); // This is the server that serves the Wasm+HTML: let ws_server_url = re_ws_comms::default_server_url(); @@ -64,6 +64,7 @@ pub async fn host_web_viewer( let web_server = re_web_viewer_server::WebViewerServer::new(web_port); let web_server_handle = tokio::spawn(web_server.serve(shutdown_rx)); + re_log::info!("Web server is running - view it at {viewer_url}"); if open_browser { webbrowser::open(&viewer_url).ok(); } else { @@ -95,12 +96,17 @@ impl crate::sink::LogSink for RemoteViewerServer { /// This function returns immediately. #[must_use] pub fn new_sink(open_browser: bool) -> Box { - // TODO(emilk): creating a tokio runtime on-demand like this is not great. Not sure how this interacts with `#[tokio::main]`, for instance. use once_cell::sync::Lazy; - use parking_lot::Mutex; - static TOKIO_RUNTIME: Lazy> = Lazy::new(|| { - Mutex::new(tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")) - }); + static TOKIO_RUNTIME: Lazy = + Lazy::new(|| tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")); - Box::new(RemoteViewerServer::new(&TOKIO_RUNTIME.lock(), open_browser)) + let server = match tokio::runtime::Handle::try_current() { + Ok(_) => RemoteViewerServer::new(open_browser), + Err(_) => { + let _runtime_guard = TOKIO_RUNTIME.enter(); + RemoteViewerServer::new(open_browser) + } + }; + + Box::new(server) } From 078c92dc899c17c10ae1305abbc21d9b925d76d4 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 20 Mar 2023 16:59:25 +0100 Subject: [PATCH 07/12] move responsibility for alive tokio runtime outside of web_viewer. Don't create a new runtime if there is one already. Gracefully wait on ctrl+c instead of sleeping for `RerunArgs` entrypoint --- crates/rerun/src/clap.rs | 23 ++++++++++++++++++++--- crates/rerun/src/web_viewer.rs | 16 +++------------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/crates/rerun/src/clap.rs b/crates/rerun/src/clap.rs index 415659f2a3f0..db2af9bf1679 100644 --- a/crates/rerun/src/clap.rs +++ b/crates/rerun/src/clap.rs @@ -2,7 +2,9 @@ use std::{net::SocketAddr, path::PathBuf}; -use crate::Session; +use anyhow::Context; + +use crate::{run::setup_ctrl_c_handler, Session}; // --- @@ -74,6 +76,18 @@ impl RerunArgs { default_enabled: bool, run: impl FnOnce(Session) + Send + 'static, ) -> anyhow::Result<()> { + // Ensure we have a running tokio runtime. + #[allow(unused_assignments)] + let mut tokio_runtime = None; + let tokio_runtime_handle = if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle + } else { + tokio_runtime = + Some(tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")); + tokio_runtime.as_ref().unwrap().handle().clone() + }; + let _tokio_runtime_guard = tokio_runtime_handle.enter(); + let (rerun_enabled, recording_info) = crate::SessionBuilder::new(application_id) .default_enabled(default_enabled) .finalize(); @@ -102,12 +116,15 @@ impl RerunArgs { }; let session = Session::new(recording_info, sink); + let _sink = session.sink().clone(); // Keep sink (and potential associated servers) alive until the end of this function scope. run(session); #[cfg(feature = "web_viewer")] if matches!(self.to_behavior(), Ok(RerunBehavior::Serve)) { - eprintln!("Sleeping while serving the web viewer. Abort with Ctrl-C"); - std::thread::sleep(std::time::Duration::from_secs(1_000_000_000)); + let mut shutdown_rx = setup_ctrl_c_handler(); + return tokio_runtime_handle + .block_on(async { shutdown_rx.recv().await }) + .context("Failed to wait for shutdown signal."); } Ok(()) diff --git a/crates/rerun/src/web_viewer.rs b/crates/rerun/src/web_viewer.rs index b45a7f7fba4e..7c7bb97a798a 100644 --- a/crates/rerun/src/web_viewer.rs +++ b/crates/rerun/src/web_viewer.rs @@ -94,19 +94,9 @@ impl crate::sink::LogSink for RemoteViewerServer { /// NOTE: you can not connect one `Session` to another. /// /// This function returns immediately. +/// +/// The caller needs to ensure that there is a `tokio` runtime running. #[must_use] pub fn new_sink(open_browser: bool) -> Box { - use once_cell::sync::Lazy; - static TOKIO_RUNTIME: Lazy = - Lazy::new(|| tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")); - - let server = match tokio::runtime::Handle::try_current() { - Ok(_) => RemoteViewerServer::new(open_browser), - Err(_) => { - let _runtime_guard = TOKIO_RUNTIME.enter(); - RemoteViewerServer::new(open_browser) - } - }; - - Box::new(server) + Box::new(RemoteViewerServer::new(open_browser)) } From b71d15afda6f0a22f023bac2bb896b2de0125383 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 20 Mar 2023 17:07:17 +0100 Subject: [PATCH 08/12] Fix python `serve` lacking a tokio runtime --- rerun_py/src/python_bridge.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 77c805024fc4..43c70d572bb0 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -295,7 +295,12 @@ fn serve(open_browser: bool) -> PyResult<()> { return Ok(()); } + use once_cell::sync::Lazy; + static TOKIO_RUNTIME: Lazy = + Lazy::new(|| tokio::runtime::Runtime::new().expect("Failed to create tokio runtime")); + let _guard = TOKIO_RUNTIME.enter(); session.set_sink(rerun::web_viewer::new_sink(open_browser)); + Ok(()) } From d295ed9424cb9c89c377855657be2ee29b44126e Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 20 Mar 2023 17:58:05 +0100 Subject: [PATCH 09/12] fix doc test --- crates/re_sdk_comms/src/server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/re_sdk_comms/src/server.rs b/crates/re_sdk_comms/src/server.rs index f8027c23d856..5c92a15f0135 100644 --- a/crates/re_sdk_comms/src/server.rs +++ b/crates/re_sdk_comms/src/server.rs @@ -74,7 +74,8 @@ async fn listen_for_new_clients( /// /// ``` no_run /// # use re_sdk_comms::{serve, ServerOptions}; -/// let log_msg_rx = serve(80, ServerOptions::default())?; +/// let (sender, receiver) = tokio::sync::broadcast::channel(1); +/// let log_msg_rx = serve(80, ServerOptions::default(), receiver)?; /// # Ok::<(), anyhow::Error>(()) /// ``` pub fn serve( From 9a15af3fd6569cf9d1a9fad3b7b781400986d09f Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 21 Mar 2023 08:19:33 +0100 Subject: [PATCH 10/12] Fix warnings --- crates/rerun/src/clap.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/rerun/src/clap.rs b/crates/rerun/src/clap.rs index db2af9bf1679..dc03a63d5559 100644 --- a/crates/rerun/src/clap.rs +++ b/crates/rerun/src/clap.rs @@ -2,9 +2,7 @@ use std::{net::SocketAddr, path::PathBuf}; -use anyhow::Context; - -use crate::{run::setup_ctrl_c_handler, Session}; +use crate::Session; // --- @@ -121,7 +119,9 @@ impl RerunArgs { #[cfg(feature = "web_viewer")] if matches!(self.to_behavior(), Ok(RerunBehavior::Serve)) { - let mut shutdown_rx = setup_ctrl_c_handler(); + use anyhow::Context as _; + + let mut shutdown_rx = crate::run::setup_ctrl_c_handler(); return tokio_runtime_handle .block_on(async { shutdown_rx.recv().await }) .context("Failed to wait for shutdown signal."); From 236a4f0dd4d8824100b074192365583a31288fa7 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 21 Mar 2023 08:26:23 +0100 Subject: [PATCH 11/12] improve docstring --- crates/re_ws_comms/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/re_ws_comms/src/server.rs b/crates/re_ws_comms/src/server.rs index d1f98309f1a8..997b0a13c3ed 100644 --- a/crates/re_ws_comms/src/server.rs +++ b/crates/re_ws_comms/src/server.rs @@ -40,7 +40,7 @@ impl Server { Ok(Self { listener }) } - /// Accept new connections forever + /// Accept new connections until we get a message on `shutdown_rx` pub async fn listen( self, rx: Receiver, From 96f7556e62439871a121e66613b62ea78a4fac75 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 21 Mar 2023 08:26:40 +0100 Subject: [PATCH 12/12] don't unwrap, especially in drop --- crates/rerun/src/web_viewer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rerun/src/web_viewer.rs b/crates/rerun/src/web_viewer.rs index 7c7bb97a798a..1c91170b0f07 100644 --- a/crates/rerun/src/web_viewer.rs +++ b/crates/rerun/src/web_viewer.rs @@ -11,7 +11,7 @@ struct RemoteViewerServer { impl Drop for RemoteViewerServer { fn drop(&mut self) { re_log::info!("Shutting down web server."); - self.shutdown_tx.send(()).unwrap(); + self.shutdown_tx.send(()).ok(); } }