Skip to content

Commit

Permalink
Fix crsh on decoding old .rrd files (#1579)
Browse files Browse the repository at this point in the history
* Replace unwrap with logging an error when loading .rrd file

* Less anyhow
  • Loading branch information
emilk authored Mar 14, 2023
1 parent 1792207 commit ba131ac
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 23 deletions.
2 changes: 1 addition & 1 deletion crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn log_messages() {
fn decode_log_msg(mut bytes: &[u8]) -> LogMsg {
let mut messages = re_log_types::encoding::Decoder::new(&mut bytes)
.unwrap()
.collect::<anyhow::Result<Vec<LogMsg>>>()
.collect::<Result<Vec<LogMsg>, _>>()
.unwrap();
assert!(bytes.is_empty());
assert_eq!(messages.len(), 1);
Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn encode_log_msgs(messages: &[LogMsg]) -> Vec<u8> {
fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
let messages = re_log_types::encoding::Decoder::new(&mut bytes)
.unwrap()
.collect::<anyhow::Result<Vec<LogMsg>>>()
.collect::<Result<Vec<LogMsg>, _>>()
.unwrap();
assert!(bytes.is_empty());
messages
Expand Down
69 changes: 49 additions & 20 deletions crates/re_log_types/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,34 @@ fn warn_on_version_mismatch(encoded_version: [u8; 4]) {
}
}

// ----------------------------------------------------------------------------

/// On failure to encode or serialize a [`LogMsg`].
#[cfg(feature = "load")]
#[derive(thiserror::Error, Debug)]
pub enum DecodeError {
#[error("Not an .rrd file")]
NotAnRrd,

#[error("Failed to read: {0}")]
Read(std::io::Error),

#[cfg(not(target_arch = "wasm32"))]
#[error("Zstd error: {0}")]
Zstd(std::io::Error),

#[cfg(target_arch = "wasm32")]
#[error("Zstd error: {0}")]
RuzstdInit(ruzstd::frame_decoder::FrameDecoderError),

#[cfg(target_arch = "wasm32")]
#[error("Zstd read error: {0}")]
RuzstdRead(std::io::Error),

#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::decode::Error),
}

// ----------------------------------------------------------------------------
// native decode:

Expand All @@ -121,17 +149,18 @@ pub struct Decoder<'r, R: std::io::BufRead> {
#[cfg(feature = "load")]
#[cfg(not(target_arch = "wasm32"))]
impl<'r, R: std::io::Read> Decoder<'r, std::io::BufReader<R>> {
pub fn new(mut read: R) -> anyhow::Result<Self> {
pub fn new(mut read: R) -> Result<Self, DecodeError> {
crate::profile_function!();
use anyhow::Context as _;

let mut header = [0_u8; 4];
read.read_exact(&mut header).context("missing header")?;
anyhow::ensure!(&header == b"RRF0", "Not a rerun file");
read.read_exact(&mut header).context("missing header")?;
read.read_exact(&mut header).map_err(DecodeError::Read)?;
if &header != b"RRF0" {
return Err(DecodeError::NotAnRrd);
}
read.read_exact(&mut header).map_err(DecodeError::Read)?;
warn_on_version_mismatch(header);

let zdecoder = zstd::stream::read::Decoder::new(read).context("zstd")?;
let zdecoder = zstd::stream::read::Decoder::new(read).map_err(DecodeError::Zstd)?;
Ok(Self {
zdecoder,
buffer: vec![],
Expand All @@ -142,7 +171,7 @@ impl<'r, R: std::io::Read> Decoder<'r, std::io::BufReader<R>> {
#[cfg(feature = "load")]
#[cfg(not(target_arch = "wasm32"))]
impl<'r, R: std::io::BufRead> Iterator for Decoder<'r, R> {
type Item = anyhow::Result<LogMsg>;
type Item = Result<LogMsg, DecodeError>;

fn next(&mut self) -> Option<Self::Item> {
crate::profile_function!();
Expand All @@ -157,14 +186,14 @@ impl<'r, R: std::io::BufRead> Iterator for Decoder<'r, R> {
{
crate::profile_scope!("zstd");
if let Err(err) = self.zdecoder.read_exact(&mut self.buffer) {
return Some(Err(anyhow::anyhow!("zstd: {err}")));
return Some(Err(DecodeError::Zstd(err)));
}
}

crate::profile_scope!("MsgPack deser");
match rmp_serde::from_read(&mut self.buffer.as_slice()) {
Ok(msg) => Some(Ok(msg)),
Err(err) => Some(Err(anyhow::anyhow!("MessagePack: {err}"))),
Err(err) => Some(Err(err.into())),
}
}
}
Expand All @@ -182,18 +211,18 @@ pub struct Decoder<R: std::io::Read> {
#[cfg(feature = "load")]
#[cfg(target_arch = "wasm32")]
impl<R: std::io::Read> Decoder<R> {
pub fn new(mut read: R) -> anyhow::Result<Self> {
pub fn new(mut read: R) -> Result<Self, DecodeError> {
crate::profile_function!();
use anyhow::Context as _;

let mut header = [0_u8; 4];
read.read_exact(&mut header).context("missing header")?;
anyhow::ensure!(&header == b"RRF0", "Not a rerun file");
read.read_exact(&mut header).context("missing header")?;
read.read_exact(&mut header).map_err(DecodeError::Read)?;
if &header != b"RRF0" {
return Err(DecodeError::NotAnRrd);
}
read.read_exact(&mut header).map_err(DecodeError::Read)?;
warn_on_version_mismatch(header);

let zdecoder =
ruzstd::StreamingDecoder::new(read).map_err(|err| anyhow::anyhow!("ruzstd: {err}"))?;
let zdecoder = ruzstd::StreamingDecoder::new(read).map_err(DecodeError::RuzstdInit)?;
Ok(Self {
zdecoder,
buffer: vec![],
Expand All @@ -204,7 +233,7 @@ impl<R: std::io::Read> Decoder<R> {
#[cfg(feature = "load")]
#[cfg(target_arch = "wasm32")]
impl<R: std::io::Read> Iterator for Decoder<R> {
type Item = anyhow::Result<LogMsg>;
type Item = Result<LogMsg, DecodeError>;

fn next(&mut self) -> Option<Self::Item> {
crate::profile_function!();
Expand All @@ -219,14 +248,14 @@ impl<R: std::io::Read> Iterator for Decoder<R> {
{
crate::profile_scope!("ruzstd");
if let Err(err) = self.zdecoder.read_exact(&mut self.buffer) {
return Some(Err(anyhow::anyhow!("ruzstd: {err}")));
return Some(Err(DecodeError::RuzstdRead(err)));
}
}

crate::profile_scope!("MsgPack deser");
match rmp_serde::from_read(&mut self.buffer.as_slice()) {
Ok(msg) => Some(Ok(msg)),
Err(err) => Some(Err(anyhow::anyhow!("MessagePack: {err}"))),
Err(err) => Some(Err(err.into())),
}
}
}
Expand Down Expand Up @@ -257,7 +286,7 @@ fn test_encode_decode() {

let decoded_messages = Decoder::new(&mut file.as_slice())
.unwrap()
.collect::<anyhow::Result<Vec<LogMsg>>>()
.collect::<Result<Vec<LogMsg>, DecodeError>>()
.unwrap();

assert_eq!(messages, decoded_messages);
Expand Down
10 changes: 9 additions & 1 deletion crates/rerun/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,19 @@ fn load_file_to_channel(path: &std::path::Path) -> anyhow::Result<Receiver<LogMs
path: path.to_owned(),
});

let path = path.to_owned();
std::thread::Builder::new()
.name("rrd_file_reader".into())
.spawn(move || {
for msg in decoder {
tx.send(msg.unwrap()).ok();
match msg {
Ok(msg) => {
tx.send(msg).ok();
}
Err(err) => {
re_log::warn_once!("Failed to decode message in {path:?}: {err}");
}
}
}
})
.expect("Failed to spawn thread");
Expand Down

1 comment on commit ba131ac

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: ba131ac Previous: 1792207 Ratio
datastore/insert/batch/rects/insert 561999 ns/iter (± 31262) 550683 ns/iter (± 3116) 1.02
datastore/latest_at/batch/rects/query 1832 ns/iter (± 59) 1859 ns/iter (± 1) 0.99
datastore/latest_at/missing_components/primary 286 ns/iter (± 0) 286 ns/iter (± 0) 1
datastore/latest_at/missing_components/secondaries 432 ns/iter (± 0) 440 ns/iter (± 1) 0.98
datastore/range/batch/rects/query 151838 ns/iter (± 649) 151542 ns/iter (± 668) 1.00
mono_points_arrow/generate_message_bundles 48476144 ns/iter (± 554883) 50303635 ns/iter (± 936482) 0.96
mono_points_arrow/generate_messages 128738372 ns/iter (± 1019899) 136618493 ns/iter (± 1071045) 0.94
mono_points_arrow/encode_log_msg 154611712 ns/iter (± 1502070) 167944922 ns/iter (± 788902) 0.92
mono_points_arrow/encode_total 329992505 ns/iter (± 2447529) 355510814 ns/iter (± 1545110) 0.93
mono_points_arrow/decode_log_msg 176334821 ns/iter (± 847285) 187438049 ns/iter (± 1029803) 0.94
mono_points_arrow/decode_message_bundles 65905957 ns/iter (± 802450) 73438704 ns/iter (± 1034209) 0.90
mono_points_arrow/decode_total 239902655 ns/iter (± 1524889) 259027478 ns/iter (± 2585415) 0.93
batch_points_arrow/generate_message_bundles 332332 ns/iter (± 579) 332638 ns/iter (± 1256) 1.00
batch_points_arrow/generate_messages 6538 ns/iter (± 25) 6292 ns/iter (± 22) 1.04
batch_points_arrow/encode_log_msg 371253 ns/iter (± 1126) 355178 ns/iter (± 3185) 1.05
batch_points_arrow/encode_total 731713 ns/iter (± 3212) 708875 ns/iter (± 3171) 1.03
batch_points_arrow/decode_log_msg 346768 ns/iter (± 1695) 347362 ns/iter (± 2355) 1.00
batch_points_arrow/decode_message_bundles 2175 ns/iter (± 9) 2110 ns/iter (± 13) 1.03
batch_points_arrow/decode_total 363354 ns/iter (± 2898) 354341 ns/iter (± 1521) 1.03
arrow_mono_points/insert 6099542708 ns/iter (± 37818498) 6904330112 ns/iter (± 14106181) 0.88
arrow_mono_points/query 1762514 ns/iter (± 13452) 1760917 ns/iter (± 25829) 1.00
arrow_batch_points/insert 2674834 ns/iter (± 20195) 2672911 ns/iter (± 18939) 1.00
arrow_batch_points/query 17001 ns/iter (± 20) 16957 ns/iter (± 88) 1.00
arrow_batch_vecs/insert 42053 ns/iter (± 146) 41988 ns/iter (± 257) 1.00
arrow_batch_vecs/query 389089 ns/iter (± 6468) 388422 ns/iter (± 2971) 1.00
tuid/Tuid::random 34 ns/iter (± 0) 34 ns/iter (± 0) 1

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.