Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always send recording_id as part of LogMsg #1778

Merged
merged 9 commits into from
Apr 6, 2023
Merged
13 changes: 10 additions & 3 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn live_bytes() -> usize {

// ----------------------------------------------------------------------------

use re_log_types::{entity_path, DataRow, MsgId};
use re_log_types::{entity_path, DataRow, MsgId, RecordingId};

fn main() {
log_messages();
Expand Down Expand Up @@ -91,6 +91,7 @@ fn log_messages() {

const NUM_POINTS: usize = 1_000;

let recording_id = RecordingId::random();
let timeline = Timeline::new_sequence("frame_nr");
let mut time_point = TimePoint::default();
time_point.insert(timeline, TimeInt::from(0));
Expand All @@ -116,7 +117,10 @@ fn log_messages() {
.into_table(),
);
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg = Box::new(LogMsg::ArrowMsg(
recording_id,
ArrowMsg::try_from(&*table).unwrap(),
));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
Expand All @@ -139,7 +143,10 @@ fn log_messages() {
.into_table(),
);
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(&*table).unwrap()));
let log_msg = Box::new(LogMsg::ArrowMsg(
recording_id,
ArrowMsg::try_from(&*table).unwrap(),
));
let log_msg_bytes = live_bytes() - used_bytes_start;
println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM");
let encoded = encode_log_msg(&log_msg);
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,15 @@ impl LogDb {

match &msg {
LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg),
LogMsg::EntityPathOpMsg(msg) => {
LogMsg::EntityPathOpMsg(_, msg) => {
let EntityPathOpMsg {
msg_id,
time_point,
path_op,
} = msg;
self.entity_db.add_path_op(*msg_id, time_point, path_op);
}
LogMsg::ArrowMsg(inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::ArrowMsg(_, inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::Goodbye(_) => {}
}

Expand Down
29 changes: 16 additions & 13 deletions crates/re_log_types/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use re_log_types::{
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId,
entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, RecordingId,
};

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -42,18 +42,18 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
messages
}

fn generate_messages(tables: &[DataTable]) -> Vec<LogMsg> {
fn generate_messages(recording_id: RecordingId, tables: &[DataTable]) -> Vec<LogMsg> {
tables
.iter()
.map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table).unwrap()))
.map(|table| LogMsg::ArrowMsg(recording_id, ArrowMsg::try_from(table).unwrap()))
.collect()
}

fn decode_tables(messages: &[LogMsg]) -> Vec<DataTable> {
messages
.iter()
.map(|log_msg| {
if let LogMsg::ArrowMsg(arrow_msg) = log_msg {
if let LogMsg::ArrowMsg(_, arrow_msg) = log_msg {
DataTable::try_from(arrow_msg).unwrap()
} else {
unreachable!()
Expand Down Expand Up @@ -81,21 +81,22 @@ fn mono_points_arrow(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let mut group = c.benchmark_group("mono_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_tables);
});
let tables = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&tables));
b.iter(|| generate_messages(recording_id, &tables));
});
let messages = generate_messages(&tables);
let messages = generate_messages(recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down Expand Up @@ -136,21 +137,22 @@ fn mono_points_arrow_batched(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let mut group = c.benchmark_group("mono_points_arrow_batched");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_table);
});
let tables = [generate_table()];
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&tables));
b.iter(|| generate_messages(recording_id, &tables));
});
let messages = generate_messages(&tables);
let messages = generate_messages(recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&[generate_table()])));
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &[generate_table()])));
});

let encoded = encode_log_msgs(&messages);
Expand Down Expand Up @@ -192,21 +194,22 @@ fn batch_points_arrow(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let mut group = c.benchmark_group("batch_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
b.iter(generate_tables);
});
let tables = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(&tables));
b.iter(|| generate_messages(recording_id, &tables));
});
let messages = generate_messages(&tables);
let messages = generate_messages(recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(&generate_tables())));
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down
41 changes: 34 additions & 7 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ pub enum LogMsg {
BeginRecordingMsg(BeginRecordingMsg),

/// Server-backed operation on an [`EntityPath`].
EntityPathOpMsg(EntityPathOpMsg),
EntityPathOpMsg(RecordingId, EntityPathOpMsg),

/// Log an entity using an [`ArrowMsg`].
ArrowMsg(ArrowMsg),
ArrowMsg(RecordingId, ArrowMsg),

/// Sent when the client shuts down the connection.
Goodbye(MsgId),
Expand All @@ -186,19 +186,46 @@ impl LogMsg {
pub fn id(&self) -> MsgId {
match self {
Self::BeginRecordingMsg(msg) => msg.msg_id,
Self::EntityPathOpMsg(msg) => msg.msg_id,
Self::EntityPathOpMsg(_, msg) => msg.msg_id,
Self::Goodbye(msg_id) => *msg_id,
// TODO(#1619): the following only makes sense because, while we support sending and
// receiving batches, we don't actually do so yet.
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
Self::ArrowMsg(msg) => msg.table_id,
Self::ArrowMsg(_, msg) => msg.table_id,
}
}

pub fn recording_id(&self) -> Option<&RecordingId> {
match self {
Self::BeginRecordingMsg(msg) => Some(&msg.info.recording_id),
Self::EntityPathOpMsg(recording_id, _) | Self::ArrowMsg(recording_id, _) => {
Some(recording_id)
}
Self::Goodbye(_) => None,
}
}
}

impl From<BeginRecordingMsg> for LogMsg {
#[inline]
fn from(value: BeginRecordingMsg) -> Self {
Self::BeginRecordingMsg(value)
}
}

impl From<(RecordingId, EntityPathOpMsg)> for LogMsg {
#[inline]
fn from(value: (RecordingId, EntityPathOpMsg)) -> Self {
Self::EntityPathOpMsg(value.0, value.1)
}
}

impl_into_enum!(BeginRecordingMsg, LogMsg, BeginRecordingMsg);
impl_into_enum!(EntityPathOpMsg, LogMsg, EntityPathOpMsg);
impl_into_enum!(ArrowMsg, LogMsg, ArrowMsg);
impl From<(RecordingId, ArrowMsg)> for LogMsg {
#[inline]
fn from(value: (RecordingId, ArrowMsg)) -> Self {
Self::ArrowMsg(value.0, value.1)
}
}

// ----------------------------------------------------------------------------

Expand Down
28 changes: 21 additions & 7 deletions crates/re_sdk/src/msg_sender.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use re_log_types::{component_types::InstanceKey, DataRow, DataTableError};
use re_log_types::{component_types::InstanceKey, DataRow, DataTableError, RecordingId};

use crate::{
components::Transform,
log::{DataCell, LogMsg, MsgId},
session::RecordingLogSink,
sink::LogSink,
time::{Time, TimeInt, TimePoint, Timeline},
Component, EntityPath, SerializableComponent,
Expand Down Expand Up @@ -229,29 +230,42 @@ impl MsgSender {

/// Consumes, packs, sanity checks and finally sends the message to the currently configured
/// target of the SDK.
pub fn send(self, sink: &impl std::borrow::Borrow<dyn LogSink>) -> Result<(), DataTableError> {
self.send_to_sink(sink.borrow())
pub fn send(self, sink: &impl RecordingLogSink) -> Result<(), DataTableError> {
jleibs marked this conversation as resolved.
Show resolved Hide resolved
self.send_to_sink(sink.get_recording_id(), sink.borrow())
}

/// Consumes, packs, sanity checks and finally sends the message to the currently configured
/// target of the SDK.
fn send_to_sink(self, sink: &dyn LogSink) -> Result<(), DataTableError> {
fn send_to_sink(
self,
recording_id: RecordingId,
sink: &dyn LogSink,
) -> Result<(), DataTableError> {
if !sink.is_enabled() {
return Ok(()); // silently drop the message
}

let [row_standard, row_transforms, row_splats] = self.into_rows();

if let Some(row_transforms) = row_transforms {
sink.send(LogMsg::ArrowMsg((&row_transforms.into_table()).try_into()?));
sink.send(LogMsg::ArrowMsg(
recording_id,
(&row_transforms.into_table()).try_into()?,
));
}
if let Some(row_splats) = row_splats {
sink.send(LogMsg::ArrowMsg((&row_splats.into_table()).try_into()?));
sink.send(LogMsg::ArrowMsg(
recording_id,
(&row_splats.into_table()).try_into()?,
));
}
// Always the primary component last so range-based queries will include the other data.
// Since the primary component can't be splatted it must be in msg_standard, see(#1215).
if let Some(row_standard) = row_standard {
sink.send(LogMsg::ArrowMsg((&row_standard.into_table()).try_into()?));
sink.send(LogMsg::ArrowMsg(
recording_id,
(&row_standard.into_table()).try_into()?,
));
}

Ok(())
Expand Down
40 changes: 33 additions & 7 deletions crates/re_sdk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ impl SessionBuilder {
#[must_use]
#[derive(Clone)]
pub struct Session {
recording_info: Option<RecordingInfo>,
sink: Arc<dyn LogSink>,
// TODO(emilk): add convenience `TimePoint` here so that users can
// do things like `session.set_time_sequence("frame", frame_idx);`
Expand Down Expand Up @@ -222,20 +223,24 @@ impl Session {
sink.send(
re_log_types::BeginRecordingMsg {
msg_id: re_log_types::MsgId::random(),
info: recording_info,
info: recording_info.clone(),
}
.into(),
);
}

Self { sink: sink.into() }
Self {
recording_info: Some(recording_info),
sink: sink.into(),
}
}

/// Construct a new session with a disabled "dummy" sink that drops all logging messages.
///
/// [`Self::is_enabled`] will return `false`.
pub fn disabled() -> Self {
Self {
recording_info: None,
sink: crate::sink::disabled().into(),
}
}
Expand Down Expand Up @@ -272,11 +277,14 @@ impl Session {
time_point: &re_log_types::TimePoint,
path_op: re_log_types::PathOp,
) {
self.send(LogMsg::EntityPathOpMsg(re_log_types::EntityPathOpMsg {
msg_id: re_log_types::MsgId::random(),
time_point: time_point.clone(),
path_op,
}));
self.send(LogMsg::EntityPathOpMsg(
self.get_recording_id(),
re_log_types::EntityPathOpMsg {
msg_id: re_log_types::MsgId::random(),
time_point: time_point.clone(),
path_op,
},
));
}

/// Drain all buffered [`LogMsg`]es and return them.
Expand All @@ -296,3 +304,21 @@ impl std::borrow::Borrow<dyn LogSink> for Session {
self.sink.as_ref()
}
}

/// Trait to expose [`RecordingId`] from a session
pub trait HasRecordingId {
fn get_recording_id(&self) -> RecordingId;
}

impl HasRecordingId for Session {
fn get_recording_id(&self) -> RecordingId {
self.recording_info
.as_ref()
.map_or(RecordingId::default(), |i| i.recording_id)
emilk marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Composite trait for accessing [`LogSink`] and [`RecordingId`] from session-like objects
pub trait RecordingLogSink: std::borrow::Borrow<dyn LogSink> + HasRecordingId {}

impl<T: std::borrow::Borrow<dyn LogSink> + HasRecordingId> RecordingLogSink for T {}
6 changes: 4 additions & 2 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ impl CongestionManager {
#[allow(clippy::match_same_arms)]
match msg {
// we don't want to drop any of these
LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_) | LogMsg::Goodbye(_) => true,
LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_, _) | LogMsg::Goodbye(_) => {
true
}

LogMsg::ArrowMsg(arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max),
LogMsg::ArrowMsg(_, arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint_max),
}
}

Expand Down
Loading