Skip to content

Commit

Permalink
Automagic flush when take()ing a MemorySinkStorage (#2632)
Browse files Browse the repository at this point in the history
Fixes #2627 
Should supersede #2630 

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested [demo.rerun.io](https://demo.rerun.io/pr/2632) (if
applicable)

- [PR Build Summary](https://build.rerun.io/pr/2632)
- [Docs
preview](https://rerun.io/preview/pr%3Acmc%2Fautoflushmagix/docs)
- [Examples
preview](https://rerun.io/preview/pr%3Acmc%2Fautoflushmagix/examples)
  • Loading branch information
teh-cmc committed Jul 7, 2023
1 parent d4e05c5 commit 9ccaabf
Show file tree
Hide file tree
Showing 34 changed files with 112 additions and 148 deletions.
18 changes: 14 additions & 4 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,35 @@ impl LogSink for MemorySink {

/// The storage used by [`MemorySink`].
#[derive(Default, Clone)]
pub struct MemorySinkStorage(Arc<RwLock<Vec<LogMsg>>>);
pub struct MemorySinkStorage {
msgs: Arc<RwLock<Vec<LogMsg>>>,
pub(crate) rec_stream: Option<crate::RecordingStream>,
}

impl MemorySinkStorage {
/// Write access to the inner array of [`LogMsg`].
#[inline]
fn write(&self) -> parking_lot::RwLockWriteGuard<'_, Vec<LogMsg>> {
self.0.write()
self.msgs.write()
}

/// Read access to the inner array of [`LogMsg`].
#[inline]
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, Vec<LogMsg>> {
self.0.read()
self.msgs.read()
}

/// Consumes and returns the inner array of [`LogMsg`].
///
/// This automatically takes care of flushing the underlying [`crate::RecordingStream`].
#[inline]
pub fn take(&self) -> Vec<LogMsg> {
std::mem::take(&mut *self.0.write())
if let Some(rec_stream) = self.rec_stream.as_ref() {
// NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved
// in this flush; it's just a matter of making the table batcher tick early.
rec_stream.flush_blocking();
}
std::mem::take(&mut *self.msgs.write())
}
}

Expand Down
24 changes: 6 additions & 18 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,14 @@ impl RecordingStreamBuilder {
self,
) -> RecordingStreamResult<(RecordingStream, crate::log_sink::MemorySinkStorage)> {
let sink = crate::log_sink::MemorySink::default();
let storage = sink.buffer();
let mut storage = sink.buffer();

let (enabled, store_info, batcher_config) = self.into_args();
if enabled {
RecordingStream::new(store_info, batcher_config, Box::new(sink))
.map(|rec_stream| (rec_stream, storage))
RecordingStream::new(store_info, batcher_config, Box::new(sink)).map(|rec_stream| {
storage.rec_stream = Some(rec_stream.clone());
(rec_stream, storage)
})
} else {
re_log::debug!("Rerun disabled - call to memory() ignored");
Ok((RecordingStream::disabled(), Default::default()))
Expand Down Expand Up @@ -1142,21 +1144,7 @@ mod tests {
_ => panic!("expected SetStoreInfo"),
}

// The underlying batcher is never flushing: there's nothing else.
assert!(msgs.pop().is_none());
}

// The underlying batcher is never flushing: there's nothing else.
assert!(storage.take().is_empty());

rec_stream.flush_blocking(); // flush the entire hierarchy

{
let mut msgs = {
let mut msgs = storage.take();
msgs.reverse();
msgs
};
// MemorySinkStorage transparently handles flushing during `take()`!

// The batched table itself, which was sent as a result of the explicit flush above.
match msgs.pop().unwrap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ table __AffixFuzzer3Vec (transparent, order: 0) {

union AffixFuzzer4 (
"attr.rust.derive": "PartialEq",
order: 300
order: 400
) {
single_required: __AffixFuzzer3 (transparent, order: 100),
many_required: __AffixFuzzer3Vec (transparent, order: 101),
Expand Down
2 changes: 1 addition & 1 deletion crates/re_types/source_hash.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This is a sha256 hash for all direct and indirect dependencies of this crate's build script.
# It can be safely removed at anytime to force the build script to run again.
# Check out build.rs to see how it's computed.
31e5d16dbe17e37a95f5c0c135cbf181e43d216f995f96c6dcda7f27959256b8
4ae47b009f4946234e8d7edd37ce7815ef9f77dd35cfdd83bf328be612b4d54e
2 changes: 0 additions & 2 deletions crates/re_types/src/archetypes/points2d.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
#[doc = " .with_component(&[Rect2D::XCYCWH(Vec4D([0.0, 0.0, 4.0, 3.0]))])?"]
#[doc = " .send(&rec_stream)?;"]
#[doc = ""]
#[doc = " rec_stream.flush_blocking();"]
#[doc = ""]
#[doc = " rerun::native_viewer::show(storage.take())?;"]
#[doc = ""]
#[doc = " Ok(())"]
Expand Down
2 changes: 0 additions & 2 deletions docs/code-examples/annotation_context_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_splat(ClassId(0))?
.send(&rec_stream)?;

rec_stream.flush_blocking();

rerun::native_viewer::show(storage.take())?;

Ok(())
Expand Down
1 change: 0 additions & 1 deletion docs/code-examples/annotation_context_rects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Rect2D::XCYCWH(Vec4D([0.0, 0.0, 5.0, 5.0]))])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/annotation_context_segmentation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[image])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/arrow3d_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Radius(0.05)])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/box3d_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Box3D::new(2.0, 2.0, 1.0)])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/depth_image_3d.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[tensor])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/depth_image_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[tensor])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/image_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Tensor::try_from(image.as_standard_layout().view())?])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/line_segments2d_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Rect2D::XCYCWH(Vec4D([3.0, 0.0, 8.0, 6.0]))])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/line_segments3d_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
)?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/line_strip2d_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Rect2D::XCYCWH(Vec4D([3.0, 0.0, 8.0, 6.0]))])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/line_strip3d_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[LineStrip3D(points.into_iter().map(Into::into).collect())])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/mesh_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Mesh3D::Raw(mesh)])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/pinhole_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Tensor::try_from(image.as_standard_layout().view())?])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/point2d_random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Rect2D::XCYCWH(Vec4D([0.0, 0.0, 8.0, 6.0]))])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/point2d_random_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Rect2D::XCYCWH(Vec4D([0.0, 0.0, 8.0, 6.0]))])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
2 changes: 0 additions & 2 deletions docs/code-examples/point2d_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Rect2D::XCYCWH(Vec4D([0.0, 0.0, 4.0, 3.0]))])?
.send(&rec_stream)?;

rec_stream.flush_blocking();

rerun::native_viewer::show(storage.take())?;

Ok(())
Expand Down
2 changes: 0 additions & 2 deletions docs/code-examples/point2d_simple_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Rect2D::XCYCWH(Vec4D([0.0, 0.0, 4.0, 3.0]))])?
.send(&rec_stream)?;

rec_stream.flush_blocking();

rerun::native_viewer::show(storage.take())?;

Ok(())
Expand Down
1 change: 0 additions & 1 deletion docs/code-examples/point3d_random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&radii)?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/point3d_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Rect2D::XCYCWH(Vec4D([0.0, 0.0, 4.0, 3.0]))])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/rect2d_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Rect2D::XCYCWH(Vec4D([0.0, 0.0, 4.0, 3.0]))])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/scalar_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
value += thread_rng().sample::<f64, _>(StandardNormal);
}

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/segmentation_image_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[tensor])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/tensor_one_dim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[Tensor::try_from(data.as_standard_layout().view())?])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/tensor_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[tensor])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/text_entry_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
)])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
1 change: 0 additions & 1 deletion docs/code-examples/transform3d_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_component(&[arrow])?
.send(&rec_stream)?;

rec_stream.flush_blocking();
rerun::native_viewer::show(storage.take())?;
Ok(())
}
2 changes: 0 additions & 2 deletions examples/rust/minimal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_splat(Radius(0.5))?
.send(&rec_stream)?;

rec_stream.flush_blocking();

rerun::native_viewer::show(storage.take())?;

Ok(())
Expand Down
Loading

0 comments on commit 9ccaabf

Please sign in to comment.