Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust SDK: introduce MsgSender builder-like interface for logging data #1037

Merged
merged 12 commits into from
Feb 6, 2023
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/re_arrow_store/src/arrow_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ fn test_clean_for_polars_nomodify() {

// Colors don't need polars cleaning
let bundle: ComponentBundle = build_some_colors(5).try_into().unwrap();
let cleaned = bundle.value.clean_for_polars();
assert_eq!(bundle.value, cleaned);
let cleaned = bundle.value().clean_for_polars();
assert_eq!(bundle.value(), &*cleaned);
}

#[test]
Expand All @@ -282,7 +282,7 @@ fn test_clean_for_polars_modify() {

let bundle: ComponentBundle = transforms.try_into().unwrap();
assert_eq!(
*bundle.value.data_type(),
*bundle.value().data_type(),
DataType::List(Box::new(Field::new(
"item",
DataType::Union(
Expand Down Expand Up @@ -340,7 +340,7 @@ fn test_clean_for_polars_modify() {
)))
);

let cleaned = bundle.value.clean_for_polars();
let cleaned = bundle.value().clean_for_polars();

assert_eq!(
*cleaned.data_type(),
Expand Down
56 changes: 24 additions & 32 deletions crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,51 +66,44 @@ impl DataStore {
/// If the bundle doesn't carry a payload for the cluster key, one will be auto-generated
/// based on the length of the components in the payload, in the form of an array of
/// monotonically increasing u64s going from `0` to `N-1`.
pub fn insert(&mut self, bundle: &MsgBundle) -> WriteResult<()> {
pub fn insert(&mut self, msg: &MsgBundle) -> WriteResult<()> {
// TODO(cmc): kind & insert_id need to somehow propagate through the span system.
self.insert_id += 1;

let MsgBundle {
msg_id,
entity_path: ent_path,
time_point,
components,
} = bundle;
components: bundles,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't components a more descriptive name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, see above.

} = msg;

if components.is_empty() {
if bundles.is_empty() {
return Ok(());
}

crate::profile_function!();

let ent_path_hash = ent_path.hash();
let nb_rows = components[0].value.len();
let nb_rows = bundles[0].nb_rows();

// Effectively the same thing as having a non-unit length batch, except it's really not
// worth more than an assertion since:
// - A) `MsgBundle` should already guarantee this
// - B) this limitation should be gone soon enough
debug_assert!(
bundle
.components
.iter()
.map(|bundle| bundle.name)
.all_unique(),
msg.components.iter().map(|bundle| bundle.name).all_unique(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit weird to refer both to msg.components and to its alias bundles

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR: this is all in dire need of a terminology pass; at the moment I don't know of any good solution here.

  • msg.components.iter().map(|components| ...) is technically correct but plain confusing.
  • msg.components.iter().map(|component| ...) looks like you'd expect but is plain wrong.
  • msg.components.iter().map(|bundle| ...) looks awful but at least it makes you look twice, which you should because there's more going on than meets the eye.

The root of the issue is that msg.components is too vague: this is in fact an array (component types) of arrays (rows) of actual components (instances/columns).

I guess we'll clean all of this when we implement support for batch insertions.

"cannot insert same component multiple times, this is equivalent to multiple rows",
);
// Batches cannot contain more than 1 row at the moment.
if nb_rows != 1 {
return Err(WriteError::MoreThanOneRow(nb_rows));
}
// Components must share the same number of rows.
if !components
.iter()
.all(|bundle| bundle.value.len() == nb_rows)
{
if !bundles.iter().all(|bundle| bundle.nb_rows() == nb_rows) {
return Err(WriteError::MismatchedRows(
components
bundles
.iter()
.map(|bundle| (bundle.name, bundle.value.len()))
.map(|bundle| (bundle.name, bundle.nb_rows()))
.collect(),
));
}
Expand All @@ -123,12 +116,12 @@ impl DataStore {
.map(|(timeline, time)| (timeline.name(), timeline.typ().format(*time)))
.collect::<Vec<_>>(),
entity = %ent_path,
components = ?components.iter().map(|bundle| &bundle.name).collect::<Vec<_>>(),
components = ?bundles.iter().map(|bundle| &bundle.name).collect::<Vec<_>>(),
nb_rows,
"insertion started..."
);

let cluster_comp_pos = components
let cluster_comp_pos = bundles
.iter()
.find_position(|bundle| bundle.name == self.cluster_key)
.map(|(pos, _)| pos);
Expand All @@ -138,7 +131,7 @@ impl DataStore {

// TODO(#589): support for batched row component insertions
for row_nr in 0..nb_rows {
self.insert_timeless_row(row_nr, cluster_comp_pos, components, &mut row_indices)?;
self.insert_timeless_row(row_nr, cluster_comp_pos, bundles, &mut row_indices)?;
}

let index = self
Expand All @@ -155,7 +148,7 @@ impl DataStore {
time_point,
row_nr,
cluster_comp_pos,
components,
bundles,
&mut row_indices,
)?;
}
Expand Down Expand Up @@ -212,7 +205,7 @@ impl DataStore {
.iter()
.filter(|bundle| bundle.name != self.cluster_key)
{
let ComponentBundle { name, value: rows } = bundle;
let (name, rows) = (bundle.name, bundle.value());

// Unwrapping a ListArray is somewhat costly, especially considering we're just
// gonna rewrap it again in a minute... so we'd rather just slice it to a list of
Expand All @@ -224,13 +217,12 @@ impl DataStore {
// So use the fact that `rows` is always of unit-length for now.
let rows_single = rows;

// TODO(#440): support for splats
let nb_instances = rows_single.get_child_length(0);
if nb_instances != cluster_len {
return Err(WriteError::MismatchedInstances {
cluster_comp: self.cluster_key,
cluster_comp_nb_instances: cluster_len,
key: *name,
key: name,
nb_instances,
});
}
Expand All @@ -240,13 +232,13 @@ impl DataStore {
.entry(bundle.name)
.or_insert_with(|| {
PersistentComponentTable::new(
*name,
name,
ListArray::<i32>::get_child_type(rows_single.data_type()),
)
});

let row_idx = table.push(rows_single.as_ref());
row_indices.insert(*name, row_idx);
row_indices.insert(name, row_idx);
}

Ok(())
Expand Down Expand Up @@ -285,7 +277,7 @@ impl DataStore {
.iter()
.filter(|bundle| bundle.name != self.cluster_key)
{
let ComponentBundle { name, value: rows } = bundle;
let (name, rows) = (bundle.name, bundle.value());

// Unwrapping a ListArray is somewhat costly, especially considering we're just
// gonna rewrap it again in a minute... so we'd rather just slice it to a list of
Expand All @@ -303,20 +295,20 @@ impl DataStore {
return Err(WriteError::MismatchedInstances {
cluster_comp: self.cluster_key,
cluster_comp_nb_instances: cluster_len,
key: *name,
key: name,
nb_instances,
});
}

let table = self.components.entry(bundle.name).or_insert_with(|| {
ComponentTable::new(
*name,
name,
ListArray::<i32>::get_child_type(rows_single.data_type()),
)
});

let row_idx = table.push(&self.config, time_point, rows_single.as_ref());
row_indices.insert(*name, row_idx);
row_indices.insert(name, row_idx);
}

Ok(())
Expand Down Expand Up @@ -349,7 +341,7 @@ impl DataStore {

let cluster_comp = &components[cluster_comp_pos];
let data = cluster_comp
.value
.value()
.as_any()
.downcast_ref::<ListArray<i32>>()
.unwrap()
Expand All @@ -367,7 +359,7 @@ impl DataStore {

(
len,
ClusterData::UserData(cluster_comp.value.clone() /* shallow */),
ClusterData::UserData(cluster_comp.value().to_boxed() /* shallow */),
)
} else {
// The caller has not specified any cluster component, and so we'll have to generate
Expand All @@ -378,7 +370,7 @@ impl DataStore {
// share the same length at this point anyway.
let len = components
.first()
.map_or(0, |comp| comp.value.get_child_length(0));
.map_or(0, |comp| comp.value().get_child_length(0));

if let Some(row_idx) = self.cluster_comp_cache.get(&len) {
// Cache hit! Re-use that row index.
Expand Down
27 changes: 11 additions & 16 deletions crates/re_arrow_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ fn write_errors() {
]);

// make instances 2 rows long
bundle.components[0].value =
concatenate(&[&*bundle.components[0].value, &*bundle.components[0].value]).unwrap();
bundle.components[0] = ComponentBundle::new(
bundle.components[0].name,
concatenate(&[bundle.components[0].value(), bundle.components[0].value()]).unwrap(),
);

// The first component of the bundle determines the number of rows for all other
// components in there (since it has to match for all of them), so in this case we get a
Expand All @@ -67,8 +69,10 @@ fn write_errors() {
]);

// make component 2 rows long
bundle.components[1].value =
concatenate(&[&*bundle.components[1].value, &*bundle.components[1].value]).unwrap();
bundle.components[1] = ComponentBundle::new(
bundle.components[1].name,
concatenate(&[bundle.components[1].value(), bundle.components[1].value()]).unwrap(),
);

// The first component of the bundle determines the number of rows for all other
// components in there (since it has to match for all of them), so in this case we get a
Expand All @@ -82,10 +86,7 @@ fn write_errors() {
{
pub fn build_sparse_instances() -> ComponentBundle {
let ids = wrap_in_listarray(UInt64Array::from(vec![Some(1), None, Some(3)]).boxed());
ComponentBundle {
name: InstanceKey::name(),
value: ids.boxed(),
}
ComponentBundle::new(InstanceKey::name(), ids.boxed())
}

let mut store = DataStore::new(InstanceKey::name(), Default::default());
Expand All @@ -102,17 +103,11 @@ fn write_errors() {
{
pub fn build_unsorted_instances() -> ComponentBundle {
let ids = wrap_in_listarray(UInt64Array::from_vec(vec![1, 3, 2]).boxed());
ComponentBundle {
name: InstanceKey::name(),
value: ids.boxed(),
}
ComponentBundle::new(InstanceKey::name(), ids.boxed())
}
pub fn build_duped_instances() -> ComponentBundle {
let ids = wrap_in_listarray(UInt64Array::from_vec(vec![1, 2, 2]).boxed());
ComponentBundle {
name: InstanceKey::name(),
value: ids.boxed(),
}
ComponentBundle::new(InstanceKey::name(), ids.boxed())
}

let mut store = DataStore::new(InstanceKey::name(), Default::default());
Expand Down
7 changes: 4 additions & 3 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,14 +868,15 @@ fn joint_df(cluster_key: ComponentName, bundles: &[(ComponentName, &MsgBundle)])
let cluster_comp = if let Some(idx) = bundle.find_component(&cluster_key) {
Series::try_from((
cluster_key.as_str(),
bundle.components[idx].value.to_boxed(),
bundle.components[idx].value().to_boxed(),
))
.unwrap()
} else {
let nb_instances = bundle.nb_instances(0).unwrap_or(0);
Series::try_from((
cluster_key.as_str(),
wrap_in_listarray(
UInt64Array::from_vec((0..bundle.row_len(0) as u64).collect()).to_boxed(),
UInt64Array::from_vec((0..nb_instances as u64).collect()).to_boxed(),
)
.to_boxed(),
))
Expand All @@ -887,7 +888,7 @@ fn joint_df(cluster_key: ComponentName, bundles: &[(ComponentName, &MsgBundle)])
cluster_comp,
Series::try_from((
component.as_str(),
bundle.components[comp_idx].value.to_boxed(),
bundle.components[comp_idx].value().to_boxed(),
))
.unwrap(),
])
Expand Down
11 changes: 11 additions & 0 deletions crates/re_log_types/src/component_types/quaternion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ pub struct Quaternion {
pub w: f32,
}

impl Default for Quaternion {
fn default() -> Self {
Self {
x: 0.0,
y: 0.0,
z: 0.0,
w: 1.0,
}
}
}

impl Component for Quaternion {
fn name() -> crate::ComponentName {
"rerun.quaternion".into()
Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/src/component_types/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use super::{mat::Mat3x3, Quaternion, Vec2D, Vec3D};
/// ]),
/// );
/// ```
#[derive(Copy, Clone, Debug, PartialEq, ArrowField, ArrowSerialize, ArrowDeserialize)]
#[derive(Default, Copy, Clone, Debug, PartialEq, ArrowField, ArrowSerialize, ArrowDeserialize)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct Rigid3 {
/// How is the child rotated?
Expand Down