Skip to content

Commit

Permalink
Immediate compatiblity feedback for external loaders (#4797)
Browse files Browse the repository at this point in the history
External loaders are streaming their data via stdout as expected, but
we're waiting for their exit status code in order to decide whether they
are compatible... which nullifies the whole point of streaming the data
in the first place!

This PR now detects as soon as a child process starts streaming data in,
and considers that as a sure sign of compatibility.

`0.12`:



https://github.com/rerun-io/rerun/assets/2910679/872d7bc3-f907-4dd4-9cbe-c4c85510ffc1



Now:
 


https://github.com/rerun-io/rerun/assets/2910679/4266c194-1eb0-4c34-a79a-c2f4286688ba
  • Loading branch information
teh-cmc committed Jan 15, 2024
1 parent 04f8534 commit da8976d
Showing 1 changed file with 51 additions and 4 deletions.
55 changes: 51 additions & 4 deletions crates/re_data_source/src/data_loader/loader_external.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::io::Read;
use std::{
io::Read,
sync::{atomic::AtomicBool, Arc},
};

use ahash::HashMap;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -152,6 +155,10 @@ impl crate::DataLoader for ExternalLoader {

re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);

// A single value will be sent on this channel as soon as the child process starts
// streaming data to stdout.
let is_sending_data = Arc::new(AtomicBool::new(false));

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let stdout = std::io::BufReader::new(stdout);
match re_log_encoding::decoder::Decoder::new(version_policy, stdout) {
Expand All @@ -164,9 +171,8 @@ impl crate::DataLoader for ExternalLoader {
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
}
let is_sending_data = Arc::clone(&is_sending_data);
move || decode_and_stream(&filepath, &tx, is_sending_data, decoder)
})
{
re_log::error!(?filepath, loader = ?exe, %err, "Failed to open spawn IO thread");
Expand All @@ -185,6 +191,43 @@ impl crate::DataLoader for ExternalLoader {
}
};

// We have to wait in order to know whether the child process is a compatible loader.
//
// This can manifest itself in two distinct ways:
// 1. If it exits immediately with an INCOMPATIBLE exit code, then we have our
// answer straight away.
// - If it starts streaming data, then we immediately assume it's compatible.
loop {
re_tracing::profile_scope!("waiting for compatibility");

match child.try_wait() {
Ok(Some(_)) => break,
Ok(None) => {
if is_sending_data.load(std::sync::atomic::Ordering::Relaxed) {
// The child process has started streaming data, it is therefore compatible.
// Let's get out ASAP.
re_log::debug!(loader = ?exe, ?filepath, "compatible external loader found");
tx_feedback.send(CompatibleLoaderFound).ok();
break; // we still want to check for errors once it finally exits!
}

// NOTE: This will busy loop if there's no work available in neither
// the rayon threadpool nor the native OS threadpool.
match rayon::yield_now() {
Some(rayon::Yield::Executed) => {}
_ => std::thread::yield_now(),
}

continue;
}
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};
}

// NOTE: `try_wait` and `wait` are idempotent.
let status = match child.wait() {
Ok(output) => output,
Err(err) => {
Expand Down Expand Up @@ -239,14 +282,18 @@ impl crate::DataLoader for ExternalLoader {
}
}

#[allow(clippy::needless_pass_by_value)]
fn decode_and_stream<R: std::io::Read>(
filepath: &std::path::Path,
tx: &std::sync::mpsc::Sender<crate::LoadedData>,
is_sending_data: Arc<AtomicBool>,
decoder: re_log_encoding::decoder::Decoder<R>,
) {
re_tracing::profile_function!(filepath.display().to_string());

for msg in decoder {
is_sending_data.store(true, std::sync::atomic::Ordering::Relaxed);

let msg = match msg {
Ok(msg) => msg,
Err(err) => {
Expand Down

0 comments on commit da8976d

Please sign in to comment.