Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rerun --save: stream incoming log stream to an rrd file #1662

Merged
merged 3 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions crates/re_log_types/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,29 @@ mod encoder {

#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::encode::Error),

#[error("Called append on already finished encoder")]
AlreadyFinished,
}

/// Encode a stream of [`LogMsg`] into an `.rrd` file.
pub struct Encoder<W: std::io::Write> {
zstd_encoder: zstd::stream::Encoder<'static, W>,
/// Set to None when finished.
zstd_encoder: Option<zstd::stream::Encoder<'static, W>>,
buffer: Vec<u8>,
}

impl<W: std::io::Write> Drop for Encoder<W> {
fn drop(&mut self) {
if self.zstd_encoder.is_some() {
re_log::warn!("Encoder dropped without calling finish()!");
if let Err(err) = self.finish() {
re_log::error!("Failed to finish encoding: {err}");
}
}
}
}

impl<W: std::io::Write> Encoder<W> {
pub fn new(mut write: W) -> Result<Self, EncodeError> {
let rerun_version = re_build_info::CrateVersion::parse(env!("CARGO_PKG_VERSION"));
Expand All @@ -45,7 +60,7 @@ mod encoder {
zstd::stream::Encoder::new(write, level).map_err(EncodeError::Zstd)?;

Ok(Self {
zstd_encoder,
zstd_encoder: Some(zstd_encoder),
buffer: vec![],
})
}
Expand All @@ -56,20 +71,29 @@ mod encoder {
buffer,
} = self;

buffer.clear();
rmp_serde::encode::write_named(buffer, message)?;
if let Some(zstd_encoder) = zstd_encoder {
buffer.clear();
rmp_serde::encode::write_named(buffer, message)?;

zstd_encoder
.write_all(&(buffer.len() as u64).to_le_bytes())
.map_err(EncodeError::Zstd)?;
zstd_encoder.write_all(buffer).map_err(EncodeError::Zstd)?;
zstd_encoder
.write_all(&(buffer.len() as u64).to_le_bytes())
.map_err(EncodeError::Zstd)?;
zstd_encoder.write_all(buffer).map_err(EncodeError::Zstd)?;

Ok(())
Ok(())
} else {
Err(EncodeError::AlreadyFinished)
}
}

pub fn finish(self) -> Result<(), EncodeError> {
self.zstd_encoder.finish().map_err(EncodeError::Zstd)?;
Ok(())
pub fn finish(&mut self) -> Result<(), EncodeError> {
if let Some(zstd_encoder) = self.zstd_encoder.take() {
zstd_encoder.finish().map_err(EncodeError::Zstd)?;
Ok(())
} else {
re_log::warn!("Encoder::finish called twice");
Ok(())
}
}
}

Expand Down
10 changes: 9 additions & 1 deletion crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use std::sync::{
Arc,
};

use crossbeam::channel::{RecvError, SendError, TryRecvError};
use instant::Instant;

pub use crossbeam::channel::{RecvError, RecvTimeoutError, SendError, TryRecvError};

/// Where is the messages coming from?
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Source {
Expand Down Expand Up @@ -132,6 +133,13 @@ impl<T: Send> Receiver<T> {
Ok(msg)
}

pub fn recv_timeout(&self, timeout: std::time::Duration) -> Result<T, RecvTimeoutError> {
let (sent, msg) = self.rx.recv_timeout(timeout)?;
let latency_ns = sent.elapsed().as_nanos() as u64;
self.stats.latency_ns.store(latency_ns, Relaxed);
Ok(msg)
}

/// Receives without registering the latency.
///
/// This is for use with [`Sender::send_at`] when chaining to another channel
Expand Down
48 changes: 47 additions & 1 deletion crates/rerun/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ struct Args {
#[clap(long)]
web_viewer: bool,

/// Stream incoming log events to an .rrd file at the given path.
#[clap(long)]
save: Option<String>,

/// Start with the puffin profiler running.
#[clap(long)]
profile: bool,
Expand Down Expand Up @@ -315,7 +319,9 @@ async fn run_impl(

// Now what do we do with the data?

if args.web_viewer {
if let Some(rrd_path) = args.save {
Ok(stream_to_rrd(&rx, &rrd_path.into(), &shutdown_bool)?)
} else if args.web_viewer {
#[cfg(feature = "web_viewer")]
{
#[cfg(feature = "server")]
Expand Down Expand Up @@ -472,6 +478,46 @@ fn load_file_to_channel(path: &std::path::Path) -> anyhow::Result<Receiver<LogMs
Ok(rx)
}

fn stream_to_rrd(
rx: &re_smart_channel::Receiver<LogMsg>,
path: &std::path::PathBuf,
shutdown_bool: &Arc<AtomicBool>,
) -> Result<(), re_sdk::sink::FileSinkError> {
use re_sdk::sink::FileSinkError;
use re_smart_channel::RecvTimeoutError;

if path.exists() {
re_log::warn!("Overwriting existing file at {path:?}");
}

re_log::info!("Saving incoming log stream to {path:?}. Abort with Ctrl-C.");

let file =
std::fs::File::create(path).map_err(|err| FileSinkError::CreateFile(path.clone(), err))?;
let mut encoder = re_log_types::encoding::Encoder::new(file)?;

while !shutdown_bool.load(std::sync::atomic::Ordering::Relaxed) {
// We wake up and poll shutdown_bool every now and then.
// This is far from elegant, but good enough.
match rx.recv_timeout(std::time::Duration::from_millis(500)) {
Ok(log_msg) => {
encoder.append(&log_msg)?;
}
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => {
re_log::info!("Log stream disconnected, stopping.");
break;
}
}
}

encoder.finish()?;

re_log::info!("File saved to {path:?}");

Ok(())
}

#[cfg(feature = "server")]
fn parse_max_latency(max_latency: Option<&String>) -> f32 {
max_latency.as_ref().map_or(f32::INFINITY, |time| {
Expand Down