Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix premature pausing when reaching end of still-streaming stream #2106

Merged
merged 5 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions crates/re_log_encoding/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod web_event_listener {
/// var rrd = new Uint8Array(...); // Get an RRD from somewhere
/// window.postMessage(rrd, "*")
/// ```
pub fn stream_rrd_from_event_listener(on_msg: Arc<dyn Fn(LogMsg) + Send>) {
pub fn stream_rrd_from_event_listener(mut on_msg: Option<Arc<dyn Fn(LogMsg) + Send>>) {
let window = web_sys::window().expect("no global `window` exists");
let closure =
Closure::wrap(Box::new(
Expand All @@ -62,7 +62,14 @@ mod web_event_listener {
let uint8_array = Uint8Array::new(&message_event.data());
let result: Vec<u8> = uint8_array.to_vec();

crate::stream_rrd_from_http::decode_rrd(result, on_msg.clone());
// On the first incoming message_event, take the on_msg callback
// so that the channel drops dropped when we are done. This is
// necessary to allow the viewer to know that no more data is coming.
// TODO(jleibs): In live-streaming mode we don't want to do this and
// will instead want to clone the arc.
if let Some(on_msg) = on_msg.take() {
crate::stream_rrd_from_http::decode_rrd(result, on_msg);
}
}
Err(js_val) => {
re_log::error!("Incoming event was not a MessageEvent. {:?}", js_val);
Expand Down
47 changes: 42 additions & 5 deletions crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A channel that keeps track of latency and queue length.

use std::sync::{
atomic::{AtomicU64, Ordering::Relaxed},
atomic::{AtomicBool, AtomicU64, Ordering::Relaxed},
Arc,
};

Expand Down Expand Up @@ -64,7 +64,12 @@ fn smart_channel_with_stats<T: Send>(
tx,
stats: stats.clone(),
};
let receiver = Receiver { rx, stats, source };
let receiver = Receiver {
rx,
stats,
source,
connected: AtomicBool::new(true),
};
(sender, receiver)
}

Expand Down Expand Up @@ -121,25 +126,57 @@ pub struct Receiver<T: Send> {
rx: crossbeam::channel::Receiver<(Instant, T)>,
stats: Arc<SharedStats>,
source: Source,
connected: AtomicBool,
}

impl<T: Send> Receiver<T> {
/// Are we still connected?
///
/// Once false, we will never be connected again: the source has run dry.
///
/// This is only updated once one of the receive methods fails.
pub fn is_connected(&self) -> bool {
self.connected.load(Relaxed)
}

pub fn recv(&self) -> Result<T, RecvError> {
let (sent, msg) = self.rx.recv()?;
let (sent, msg) = match self.rx.recv() {
Ok(x) => x,
Err(RecvError) => {
self.connected.store(false, Relaxed);
return Err(RecvError);
}
};
let latency_ns = sent.elapsed().as_nanos() as u64;
self.stats.latency_ns.store(latency_ns, Relaxed);
Ok(msg)
}

pub fn try_recv(&self) -> Result<T, TryRecvError> {
let (sent, msg) = self.rx.try_recv()?;
let (sent, msg) = match self.rx.try_recv() {
Ok(x) => x,
Err(err) => {
if err == TryRecvError::Disconnected {
self.connected.store(false, Relaxed);
}
return Err(err);
}
};
let latency_ns = sent.elapsed().as_nanos() as u64;
self.stats.latency_ns.store(latency_ns, Relaxed);
Ok(msg)
}

pub fn recv_timeout(&self, timeout: std::time::Duration) -> Result<T, RecvTimeoutError> {
let (sent, msg) = self.rx.recv_timeout(timeout)?;
let (sent, msg) = match self.rx.recv_timeout(timeout) {
Ok(x) => x,
Err(err) => {
if err == RecvTimeoutError::Disconnected {
self.connected.store(false, Relaxed);
}
return Err(err);
}
};
let latency_ns = sent.elapsed().as_nanos() as u64;
self.stats.latency_ns.store(latency_ns, Relaxed);
Ok(msg)
Expand Down
24 changes: 15 additions & 9 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ impl eframe::App for App {
log_db,
&self.re_ui,
&self.component_ui_registry,
self.rx.source(),
&self.rx,
);
}

Expand Down Expand Up @@ -964,7 +964,7 @@ impl AppState {
log_db: &LogDb,
re_ui: &re_ui::ReUi,
component_ui_registry: &ComponentUiRegistry,
data_source: &re_smart_channel::Source,
rx: &Receiver<LogMsg>,
) {
crate::profile_function!();

Expand All @@ -984,7 +984,7 @@ impl AppState {
let rec_cfg = recording_config_entry(
recording_configs,
selected_rec_id.clone(),
data_source,
rx.source(),
log_db,
);
let selected_app_id = log_db
Expand Down Expand Up @@ -1024,12 +1024,18 @@ impl AppState {
.blueprint_panel_and_viewport(&mut ctx, ui),
});

// move time last, so we get to see the first data first!
ctx.rec_cfg
.time_ctrl
.move_time(log_db.times_per_timeline(), ui.ctx().input(|i| i.stable_dt));
if ctx.rec_cfg.time_ctrl.play_state() == PlayState::Playing {
ui.ctx().request_repaint();
{
// We move the time at the very end of the frame,
// so we have one frame to see the first data before we move the time.
let dt = ui.ctx().input(|i| i.stable_dt);
let more_data_is_coming = rx.is_connected();
let needs_repaint =
ctx.rec_cfg
.time_ctrl
.update(log_db.times_per_timeline(), dt, more_data_is_coming);
if needs_repaint == re_viewer_context::NeedsRepaint::Yes {
ui.ctx().request_repaint();
}
}

if WATERMARK {
Expand Down
4 changes: 2 additions & 2 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ impl WebHandle {
);
let egui_ctx = cc.egui_ctx.clone();
re_log_encoding::stream_rrd_from_http::stream_rrd_from_event_listener(
Arc::new(move |msg| {
Some(Arc::new(move |msg| {
egui_ctx.request_repaint(); // wake up ui thread
tx.send(msg).ok();
}),
})),
);

Box::new(crate::App::from_receiver(
Expand Down
6 changes: 6 additions & 0 deletions crates/re_viewer_context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ slotmap::new_key_type! {
pub struct DataBlueprintGroupHandle;
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum NeedsRepaint {
Yes,
No,
}

// ---------------------------------------------------------------------------

/// Profiling macro for feature "puffin"
Expand Down
37 changes: 27 additions & 10 deletions crates/re_viewer_context/src/time_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::collections::{BTreeMap, BTreeSet};
use re_data_store::TimesPerTimeline;
use re_log_types::{Duration, TimeInt, TimeRange, TimeRangeF, TimeReal, TimeType, Timeline};

use crate::NeedsRepaint;

/// The time range we are currently zoomed in on.
#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
pub struct TimeView {
Expand Down Expand Up @@ -111,12 +113,18 @@ impl Default for TimeControl {
}

impl TimeControl {
/// Update the current time
pub fn move_time(&mut self, times_per_timeline: &TimesPerTimeline, stable_dt: f32) {
/// Move the time forward (if playing), and perhaps pause if we've reached the end.
#[must_use]
pub fn update(
&mut self,
times_per_timeline: &TimesPerTimeline,
stable_dt: f32,
more_data_is_coming: bool,
) -> NeedsRepaint {
self.select_a_valid_timeline(times_per_timeline);

let Some(full_range) = self.full_range(times_per_timeline) else {
return;
return NeedsRepaint::No; // we have no data on this timeline yet, so bail
};

match self.play_state() {
Expand All @@ -132,6 +140,7 @@ impl TimeControl {
full_range.min
})
});
NeedsRepaint::No
}
PlayState::Playing => {
let dt = stable_dt.min(0.1) * self.speed;
Expand All @@ -141,11 +150,17 @@ impl TimeControl {
.entry(self.timeline)
.or_insert_with(|| TimeState::new(full_range.min));

if self.looping == Looping::Off && state.time >= full_range.max {
// We've reached the end - stop playing.
if self.looping == Looping::Off && full_range.max <= state.time {
// We've reached the end of the data
state.time = full_range.max.into();
self.pause();
return;

if more_data_is_coming {
// then let's wait for it without pausing!
return NeedsRepaint::No; // ui will wake up when more data arrives
} else {
self.pause();
return NeedsRepaint::No;
}
}

let loop_range = match self.looping {
Expand All @@ -166,10 +181,12 @@ impl TimeControl {
}

if let Some(loop_range) = loop_range {
if state.time > loop_range.max {
state.time = loop_range.min;
if loop_range.max < state.time {
state.time = loop_range.min; // loop!
}
}

NeedsRepaint::Yes
}
PlayState::Following => {
// Set the time to the max:
Expand All @@ -181,7 +198,7 @@ impl TimeControl {
entry.get_mut().time = full_range.max.into();
}
}
// no need for request_repaint - we already repaint when new data arrives
NeedsRepaint::No // no need for request_repaint - we already repaint when new data arrives
}
}
}
Expand Down