From c21ae9babf184bc2e17a01e7c2ed3fe681d308f5 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 9 May 2023 15:09:36 +0200 Subject: [PATCH 1/2] Use the same RRD encoding for the SDK comms as for everything else This comes with two benefits: * We get nice error messages on version mismatch * We get compression of the TCP stream There is also a downside: we need to pay for the slow zstd encoding and decoding. Closes https://github.com/rerun-io/rerun/issues/2003 --- Cargo.lock | 1 + crates/re_log_encoding/src/decoder.rs | 11 ++++++++ crates/re_log_encoding/src/encoder.rs | 20 +++++++++++++++ crates/re_sdk_comms/Cargo.toml | 1 + crates/re_sdk_comms/src/buffered_client.rs | 23 ++++++++++++----- crates/re_sdk_comms/src/lib.rs | 25 ------------------ crates/re_sdk_comms/src/server.rs | 30 +++++++++++----------- 7 files changed, 64 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e1e128217fe..8f338429e46e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4131,6 +4131,7 @@ dependencies = [ "document-features", "rand", "re_log", + "re_log_encoding", "re_log_types", "re_smart_channel", "tokio", diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index a4ba2592b757..ee6976a54dbf 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -48,6 +48,17 @@ pub enum DecodeError { MsgPack(#[from] rmp_serde::decode::Error), } +// ---------------------------------------------------------------------------- + +pub fn decode_bytes(bytes: &[u8]) -> Result, DecodeError> { + let decoder = Decoder::new(std::io::Cursor::new(bytes))?; + let mut msgs = vec![]; + for msg in decoder { + msgs.push(msg?); + } + Ok(msgs) +} + // ---------------------------------------------------------------------------- // native decode: diff --git a/crates/re_log_encoding/src/encoder.rs b/crates/re_log_encoding/src/encoder.rs index 6d444746b6e1..06aa629887b9 100644 --- a/crates/re_log_encoding/src/encoder.rs +++ b/crates/re_log_encoding/src/encoder.rs @@ -1,5 +1,7 @@ //! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network. +// TODO(1316): switch to using lz4flex - see https://github.com/rerun-io/rerun/issues/1316#issuecomment-1510967893 + use std::io::Write as _; use re_log_types::LogMsg; @@ -20,6 +22,24 @@ pub enum EncodeError { AlreadyFinished, } +// ---------------------------------------------------------------------------- + +pub fn encode_to_bytes<'a>( + msgs: impl IntoIterator, +) -> Result, EncodeError> { + let mut bytes: Vec = vec![]; + { + let mut encoder = Encoder::new(std::io::Cursor::new(&mut bytes))?; + for msg in msgs { + encoder.append(msg)?; + } + encoder.finish()?; + } + Ok(bytes) +} + +// ---------------------------------------------------------------------------- + /// Encode a stream of [`LogMsg`] into an `.rrd` file. pub struct Encoder { /// Set to None when finished. diff --git a/crates/re_sdk_comms/Cargo.toml b/crates/re_sdk_comms/Cargo.toml index 485b13488d72..a7c5842c9107 100644 --- a/crates/re_sdk_comms/Cargo.toml +++ b/crates/re_sdk_comms/Cargo.toml @@ -26,6 +26,7 @@ server = [] [dependencies] re_log.workspace = true +re_log_encoding.workspace = true re_log_types = { workspace = true, features = ["serde"] } re_smart_channel.workspace = true diff --git a/crates/re_sdk_comms/src/buffered_client.rs b/crates/re_sdk_comms/src/buffered_client.rs index 797a4a23b34e..13f62e267b9f 100644 --- a/crates/re_sdk_comms/src/buffered_client.rs +++ b/crates/re_sdk_comms/src/buffered_client.rs @@ -191,16 +191,25 @@ fn msg_encode( if let Ok(msg_msg) = msg_msg { let packet_msg = match &msg_msg { MsgMsg::LogMsg(log_msg) => { - let packet = crate::encode_log_msg(log_msg); - re_log::trace!("Encoded message of size {}", packet.len()); - PacketMsg::Packet(packet) + match re_log_encoding::encoder::encode_to_bytes(std::iter::once(log_msg)) { + Ok(packet) => { + re_log::trace!("Encoded message of size {}", packet.len()); + Some(PacketMsg::Packet(packet)) + } + Err(err) => { + re_log::error_once!("Failed to encode log message: {err}"); + None + } + } } - MsgMsg::Flush => PacketMsg::Flush, + MsgMsg::Flush => Some(PacketMsg::Flush), }; - if packet_tx.send(packet_msg).is_err() { - re_log::error!("Failed to send message to tcp_sender thread. Likely a shutdown race-condition."); - return; + if let Some(packet_msg) = packet_msg { + if packet_tx.send(packet_msg).is_err() { + re_log::error!("Failed to send message to tcp_sender thread. Likely a shutdown race-condition."); + return; + } } if msg_drop_tx.send(msg_msg).is_err() { re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition"); diff --git a/crates/re_sdk_comms/src/lib.rs b/crates/re_sdk_comms/src/lib.rs index 210c2881069f..44c61432af9b 100644 --- a/crates/re_sdk_comms/src/lib.rs +++ b/crates/re_sdk_comms/src/lib.rs @@ -19,8 +19,6 @@ mod server; #[cfg(feature = "server")] pub use server::{serve, ServerOptions}; -use re_log_types::LogMsg; - pub type Result = anyhow::Result; pub const PROTOCOL_VERSION: u16 = 0; @@ -31,26 +29,3 @@ pub const DEFAULT_SERVER_PORT: u16 = 9876; pub fn default_server_addr() -> std::net::SocketAddr { std::net::SocketAddr::from(([127, 0, 0, 1], DEFAULT_SERVER_PORT)) } - -const PREFIX: [u8; 4] = *b"RR00"; - -pub fn encode_log_msg(log_msg: &LogMsg) -> Vec { - use bincode::Options as _; - let mut bytes = PREFIX.to_vec(); - bincode::DefaultOptions::new() - .serialize_into(&mut bytes, log_msg) - .unwrap(); - bytes -} - -pub fn decode_log_msg(data: &[u8]) -> Result { - let payload = data - .strip_prefix(&PREFIX) - .ok_or_else(|| anyhow::format_err!("Message didn't start with the correct prefix"))?; - - use anyhow::Context as _; - use bincode::Options as _; - bincode::DefaultOptions::new() - .deserialize(payload) - .context("bincode") -} diff --git a/crates/re_sdk_comms/src/server.rs b/crates/re_sdk_comms/src/server.rs index 75f766d40b7c..682bddbe05a1 100644 --- a/crates/re_sdk_comms/src/server.rs +++ b/crates/re_sdk_comms/src/server.rs @@ -151,25 +151,25 @@ async fn run_client( packet.resize(packet_size as usize, 0_u8); stream.read_exact(&mut packet).await?; - re_log::trace!("Received log message of size {packet_size}."); + re_log::trace!("Received packet of size {packet_size}."); congestion_manager.register_latency(tx.latency_sec()); - let msg = crate::decode_log_msg(&packet)?; - - if matches!(msg, LogMsg::Goodbye(_)) { - re_log::debug!("Received goodbye message."); - tx.send(msg)?; - return Ok(()); - } + for msg in re_log_encoding::decoder::decode_bytes(&packet)? { + if matches!(msg, LogMsg::Goodbye(_)) { + re_log::debug!("Received goodbye message."); + tx.send(msg)?; + return Ok(()); + } - if congestion_manager.should_send(&msg) { - tx.send(msg)?; - } else { - re_log::warn_once!( - "Input latency is over the max ({} s) - dropping packets.", - options.max_latency_sec - ); + if congestion_manager.should_send(&msg) { + tx.send(msg)?; + } else { + re_log::warn_once!( + "Input latency is over the max ({} s) - dropping packets.", + options.max_latency_sec + ); + } } } } From 08b0605da04840a0d79dc74d3307eb56c073ff52 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 9 May 2023 17:01:51 +0200 Subject: [PATCH 2/2] Use let-else to reduce rightward drift --- crates/re_sdk_comms/src/buffered_client.rs | 48 +++++++++++----------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/crates/re_sdk_comms/src/buffered_client.rs b/crates/re_sdk_comms/src/buffered_client.rs index 13f62e267b9f..d12b49dd6974 100644 --- a/crates/re_sdk_comms/src/buffered_client.rs +++ b/crates/re_sdk_comms/src/buffered_client.rs @@ -188,35 +188,35 @@ fn msg_encode( loop { select! { recv(msg_rx) -> msg_msg => { - if let Ok(msg_msg) = msg_msg { - let packet_msg = match &msg_msg { - MsgMsg::LogMsg(log_msg) => { - match re_log_encoding::encoder::encode_to_bytes(std::iter::once(log_msg)) { - Ok(packet) => { - re_log::trace!("Encoded message of size {}", packet.len()); - Some(PacketMsg::Packet(packet)) - } - Err(err) => { - re_log::error_once!("Failed to encode log message: {err}"); - None - } + let Ok(msg_msg) = msg_msg else { + return; // channel has closed + }; + + let packet_msg = match &msg_msg { + MsgMsg::LogMsg(log_msg) => { + match re_log_encoding::encoder::encode_to_bytes(std::iter::once(log_msg)) { + Ok(packet) => { + re_log::trace!("Encoded message of size {}", packet.len()); + Some(PacketMsg::Packet(packet)) + } + Err(err) => { + re_log::error_once!("Failed to encode log message: {err}"); + None } - } - MsgMsg::Flush => Some(PacketMsg::Flush), - }; - - if let Some(packet_msg) = packet_msg { - if packet_tx.send(packet_msg).is_err() { - re_log::error!("Failed to send message to tcp_sender thread. Likely a shutdown race-condition."); - return; } } - if msg_drop_tx.send(msg_msg).is_err() { - re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition"); + MsgMsg::Flush => Some(PacketMsg::Flush), + }; + + if let Some(packet_msg) = packet_msg { + if packet_tx.send(packet_msg).is_err() { + re_log::error!("Failed to send message to tcp_sender thread. Likely a shutdown race-condition."); return; } - } else { - return; // channel has closed + } + if msg_drop_tx.send(msg_msg).is_err() { + re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition"); + return; } } recv(quit_rx) -> _quit_msg => {