diff --git a/Cargo.lock b/Cargo.lock index 6f0373b9b7b1..1cdf9f7fe480 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3619,6 +3619,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", + "clap 4.0.26", "gltf", "mimalloc", "reqwest", @@ -4260,6 +4261,7 @@ dependencies = [ "re_viewer", "re_web_server", "re_ws_comms", + "thiserror", "tokio", "webbrowser", ] diff --git a/crates/re_arrow_store/src/arrow_util.rs b/crates/re_arrow_store/src/arrow_util.rs index 15b6dc72a463..8938f21bc576 100644 --- a/crates/re_arrow_store/src/arrow_util.rs +++ b/crates/re_arrow_store/src/arrow_util.rs @@ -266,8 +266,8 @@ fn test_clean_for_polars_nomodify() { // Colors don't need polars cleaning let bundle: ComponentBundle = build_some_colors(5).try_into().unwrap(); - let cleaned = bundle.value.clean_for_polars(); - assert_eq!(bundle.value, cleaned); + let cleaned = bundle.value().clean_for_polars(); + assert_eq!(bundle.value(), &*cleaned); } #[test] @@ -282,7 +282,7 @@ fn test_clean_for_polars_modify() { let bundle: ComponentBundle = transforms.try_into().unwrap(); assert_eq!( - *bundle.value.data_type(), + *bundle.value().data_type(), DataType::List(Box::new(Field::new( "item", DataType::Union( @@ -340,7 +340,7 @@ fn test_clean_for_polars_modify() { ))) ); - let cleaned = bundle.value.clean_for_polars(); + let cleaned = bundle.value().clean_for_polars(); assert_eq!( *cleaned.data_type(), diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index 7f490abb38fc..6595ffca439c 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -66,7 +66,7 @@ impl DataStore { /// If the bundle doesn't carry a payload for the cluster key, one will be auto-generated /// based on the length of the components in the payload, in the form of an array of /// monotonically increasing u64s going from `0` to `N-1`. - pub fn insert(&mut self, bundle: &MsgBundle) -> WriteResult<()> { + pub fn insert(&mut self, msg: &MsgBundle) -> WriteResult<()> { // TODO(cmc): kind & insert_id need to somehow propagate through the span system. self.insert_id += 1; @@ -74,28 +74,24 @@ impl DataStore { msg_id, entity_path: ent_path, time_point, - components, - } = bundle; + components: bundles, + } = msg; - if components.is_empty() { + if bundles.is_empty() { return Ok(()); } crate::profile_function!(); let ent_path_hash = ent_path.hash(); - let nb_rows = components[0].value.len(); + let nb_rows = bundles[0].nb_rows(); // Effectively the same thing as having a non-unit length batch, except it's really not // worth more than an assertion since: // - A) `MsgBundle` should already guarantee this // - B) this limitation should be gone soon enough debug_assert!( - bundle - .components - .iter() - .map(|bundle| bundle.name) - .all_unique(), + msg.components.iter().map(|bundle| bundle.name).all_unique(), "cannot insert same component multiple times, this is equivalent to multiple rows", ); // Batches cannot contain more than 1 row at the moment. @@ -103,14 +99,11 @@ impl DataStore { return Err(WriteError::MoreThanOneRow(nb_rows)); } // Components must share the same number of rows. - if !components - .iter() - .all(|bundle| bundle.value.len() == nb_rows) - { + if !bundles.iter().all(|bundle| bundle.nb_rows() == nb_rows) { return Err(WriteError::MismatchedRows( - components + bundles .iter() - .map(|bundle| (bundle.name, bundle.value.len())) + .map(|bundle| (bundle.name, bundle.nb_rows())) .collect(), )); } @@ -123,12 +116,12 @@ impl DataStore { .map(|(timeline, time)| (timeline.name(), timeline.typ().format(*time))) .collect::>(), entity = %ent_path, - components = ?components.iter().map(|bundle| &bundle.name).collect::>(), + components = ?bundles.iter().map(|bundle| &bundle.name).collect::>(), nb_rows, "insertion started..." ); - let cluster_comp_pos = components + let cluster_comp_pos = bundles .iter() .find_position(|bundle| bundle.name == self.cluster_key) .map(|(pos, _)| pos); @@ -138,7 +131,7 @@ impl DataStore { // TODO(#589): support for batched row component insertions for row_nr in 0..nb_rows { - self.insert_timeless_row(row_nr, cluster_comp_pos, components, &mut row_indices)?; + self.insert_timeless_row(row_nr, cluster_comp_pos, bundles, &mut row_indices)?; } let index = self @@ -155,7 +148,7 @@ impl DataStore { time_point, row_nr, cluster_comp_pos, - components, + bundles, &mut row_indices, )?; } @@ -212,7 +205,7 @@ impl DataStore { .iter() .filter(|bundle| bundle.name != self.cluster_key) { - let ComponentBundle { name, value: rows } = bundle; + let (name, rows) = (bundle.name, bundle.value()); // Unwrapping a ListArray is somewhat costly, especially considering we're just // gonna rewrap it again in a minute... so we'd rather just slice it to a list of @@ -224,13 +217,12 @@ impl DataStore { // So use the fact that `rows` is always of unit-length for now. let rows_single = rows; - // TODO(#440): support for splats let nb_instances = rows_single.get_child_length(0); if nb_instances != cluster_len { return Err(WriteError::MismatchedInstances { cluster_comp: self.cluster_key, cluster_comp_nb_instances: cluster_len, - key: *name, + key: name, nb_instances, }); } @@ -240,13 +232,13 @@ impl DataStore { .entry(bundle.name) .or_insert_with(|| { PersistentComponentTable::new( - *name, + name, ListArray::::get_child_type(rows_single.data_type()), ) }); let row_idx = table.push(rows_single.as_ref()); - row_indices.insert(*name, row_idx); + row_indices.insert(name, row_idx); } Ok(()) @@ -285,7 +277,7 @@ impl DataStore { .iter() .filter(|bundle| bundle.name != self.cluster_key) { - let ComponentBundle { name, value: rows } = bundle; + let (name, rows) = (bundle.name, bundle.value()); // Unwrapping a ListArray is somewhat costly, especially considering we're just // gonna rewrap it again in a minute... so we'd rather just slice it to a list of @@ -303,20 +295,20 @@ impl DataStore { return Err(WriteError::MismatchedInstances { cluster_comp: self.cluster_key, cluster_comp_nb_instances: cluster_len, - key: *name, + key: name, nb_instances, }); } let table = self.components.entry(bundle.name).or_insert_with(|| { ComponentTable::new( - *name, + name, ListArray::::get_child_type(rows_single.data_type()), ) }); let row_idx = table.push(&self.config, time_point, rows_single.as_ref()); - row_indices.insert(*name, row_idx); + row_indices.insert(name, row_idx); } Ok(()) @@ -349,7 +341,7 @@ impl DataStore { let cluster_comp = &components[cluster_comp_pos]; let data = cluster_comp - .value + .value() .as_any() .downcast_ref::>() .unwrap() @@ -367,7 +359,7 @@ impl DataStore { ( len, - ClusterData::UserData(cluster_comp.value.clone() /* shallow */), + ClusterData::UserData(cluster_comp.value().to_boxed() /* shallow */), ) } else { // The caller has not specified any cluster component, and so we'll have to generate @@ -378,7 +370,7 @@ impl DataStore { // share the same length at this point anyway. let len = components .first() - .map_or(0, |comp| comp.value.get_child_length(0)); + .map_or(0, |comp| comp.value().get_child_length(0)); if let Some(row_idx) = self.cluster_comp_cache.get(&len) { // Cache hit! Re-use that row index. diff --git a/crates/re_arrow_store/tests/correctness.rs b/crates/re_arrow_store/tests/correctness.rs index db8bc4a13517..406f2807ebef 100644 --- a/crates/re_arrow_store/tests/correctness.rs +++ b/crates/re_arrow_store/tests/correctness.rs @@ -45,8 +45,10 @@ fn write_errors() { ]); // make instances 2 rows long - bundle.components[0].value = - concatenate(&[&*bundle.components[0].value, &*bundle.components[0].value]).unwrap(); + bundle.components[0] = ComponentBundle::new( + bundle.components[0].name, + concatenate(&[bundle.components[0].value(), bundle.components[0].value()]).unwrap(), + ); // The first component of the bundle determines the number of rows for all other // components in there (since it has to match for all of them), so in this case we get a @@ -67,8 +69,10 @@ fn write_errors() { ]); // make component 2 rows long - bundle.components[1].value = - concatenate(&[&*bundle.components[1].value, &*bundle.components[1].value]).unwrap(); + bundle.components[1] = ComponentBundle::new( + bundle.components[1].name, + concatenate(&[bundle.components[1].value(), bundle.components[1].value()]).unwrap(), + ); // The first component of the bundle determines the number of rows for all other // components in there (since it has to match for all of them), so in this case we get a @@ -82,10 +86,7 @@ fn write_errors() { { pub fn build_sparse_instances() -> ComponentBundle { let ids = wrap_in_listarray(UInt64Array::from(vec![Some(1), None, Some(3)]).boxed()); - ComponentBundle { - name: InstanceKey::name(), - value: ids.boxed(), - } + ComponentBundle::new(InstanceKey::name(), ids.boxed()) } let mut store = DataStore::new(InstanceKey::name(), Default::default()); @@ -102,17 +103,11 @@ fn write_errors() { { pub fn build_unsorted_instances() -> ComponentBundle { let ids = wrap_in_listarray(UInt64Array::from_vec(vec![1, 3, 2]).boxed()); - ComponentBundle { - name: InstanceKey::name(), - value: ids.boxed(), - } + ComponentBundle::new(InstanceKey::name(), ids.boxed()) } pub fn build_duped_instances() -> ComponentBundle { let ids = wrap_in_listarray(UInt64Array::from_vec(vec![1, 2, 2]).boxed()); - ComponentBundle { - name: InstanceKey::name(), - value: ids.boxed(), - } + ComponentBundle::new(InstanceKey::name(), ids.boxed()) } let mut store = DataStore::new(InstanceKey::name(), Default::default()); diff --git a/crates/re_arrow_store/tests/data_store.rs b/crates/re_arrow_store/tests/data_store.rs index 501e40b06976..03f6691da117 100644 --- a/crates/re_arrow_store/tests/data_store.rs +++ b/crates/re_arrow_store/tests/data_store.rs @@ -868,14 +868,15 @@ fn joint_df(cluster_key: ComponentName, bundles: &[(ComponentName, &MsgBundle)]) let cluster_comp = if let Some(idx) = bundle.find_component(&cluster_key) { Series::try_from(( cluster_key.as_str(), - bundle.components[idx].value.to_boxed(), + bundle.components[idx].value().to_boxed(), )) .unwrap() } else { + let nb_instances = bundle.nb_instances(0).unwrap_or(0); Series::try_from(( cluster_key.as_str(), wrap_in_listarray( - UInt64Array::from_vec((0..bundle.row_len(0) as u64).collect()).to_boxed(), + UInt64Array::from_vec((0..nb_instances as u64).collect()).to_boxed(), ) .to_boxed(), )) @@ -887,7 +888,7 @@ fn joint_df(cluster_key: ComponentName, bundles: &[(ComponentName, &MsgBundle)]) cluster_comp, Series::try_from(( component.as_str(), - bundle.components[comp_idx].value.to_boxed(), + bundle.components[comp_idx].value().to_boxed(), )) .unwrap(), ]) diff --git a/crates/re_log_types/src/component_types/quaternion.rs b/crates/re_log_types/src/component_types/quaternion.rs index feb57dd665a4..b79a45c54270 100644 --- a/crates/re_log_types/src/component_types/quaternion.rs +++ b/crates/re_log_types/src/component_types/quaternion.rs @@ -32,6 +32,17 @@ pub struct Quaternion { pub w: f32, } +impl Default for Quaternion { + fn default() -> Self { + Self { + x: 0.0, + y: 0.0, + z: 0.0, + w: 1.0, + } + } +} + impl Component for Quaternion { fn name() -> crate::ComponentName { "rerun.quaternion".into() diff --git a/crates/re_log_types/src/component_types/transform.rs b/crates/re_log_types/src/component_types/transform.rs index e7b1020d319c..b1af3379c027 100644 --- a/crates/re_log_types/src/component_types/transform.rs +++ b/crates/re_log_types/src/component_types/transform.rs @@ -35,7 +35,7 @@ use super::{mat::Mat3x3, Quaternion, Vec2D, Vec3D}; /// ]), /// ); /// ``` -#[derive(Copy, Clone, Debug, PartialEq, ArrowField, ArrowSerialize, ArrowDeserialize)] +#[derive(Default, Copy, Clone, Debug, PartialEq, ArrowField, ArrowSerialize, ArrowDeserialize)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] pub struct Rigid3 { /// How is the child rotated? diff --git a/crates/re_log_types/src/msg_bundle.rs b/crates/re_log_types/src/msg_bundle.rs index 4311b7aefcf2..3a592d744dae 100644 --- a/crates/re_log_types/src/msg_bundle.rs +++ b/crates/re_log_types/src/msg_bundle.rs @@ -85,6 +85,18 @@ pub trait Component: ArrowField { } } +/// A trait to identify any `Component` that is ready to be collected and subsequently serialized +/// into an Arrow payload. +pub trait SerializableComponent +where + Self: Component + ArrowSerialize + ArrowField + 'static, +{ +} +impl SerializableComponent for C where + C: Component + ArrowSerialize + ArrowField + 'static +{ +} + /// A `ComponentBundle` holds an Arrow component column, and its field name. /// /// A `ComponentBundle` can be created from a collection of any element that implements the @@ -114,31 +126,50 @@ impl ComponentBundle { } } + pub fn new(name: ComponentName, value: Box) -> Self { + Self { name, value } + } + + /// Returns the datatype of the bundled component, discarding the list array that wraps it (!). pub fn data_type(&self) -> &DataType { ListArray::::get_child_type(self.value.data_type()) } + + pub fn value(&self) -> &dyn Array { + &*self.value + } + + /// Returns the number of _rows_ in this bundle, i.e. the length of the bundle. + /// + /// Currently always 1 as we don't yet support batch insertions. + pub fn nb_rows(&self) -> usize { + self.value.len() + } + + /// Returns the number of _instances_ for a given `row` in the bundle, i.e. the length of a + /// specific row within the bundle. + pub fn nb_instances(&self, row: usize) -> Option { + self.value + .as_any() + .downcast_ref::>() + .unwrap() + .offsets() + .lengths() + .nth(row) + } } -impl TryFrom<&[C]> for ComponentBundle -where - C: Component + ArrowSerialize + ArrowField + 'static, -{ +impl TryFrom<&[C]> for ComponentBundle { type Error = MsgBundleError; fn try_from(c: &[C]) -> Result { let array: Box = TryIntoArrow::try_into_arrow(c)?; let wrapped = wrap_in_listarray(array).boxed(); - Ok(ComponentBundle { - name: C::name(), - value: wrapped, - }) + Ok(ComponentBundle::new(C::name(), wrapped)) } } -impl TryFrom> for ComponentBundle -where - C: Component + ArrowSerialize + ArrowField + 'static, -{ +impl TryFrom> for ComponentBundle { type Error = MsgBundleError; fn try_from(c: Vec) -> Result { @@ -146,10 +177,7 @@ where } } -impl TryFrom<&Vec> for ComponentBundle -where - C: Component + ArrowSerialize + ArrowField + 'static, -{ +impl TryFrom<&Vec> for ComponentBundle { type Error = MsgBundleError; fn try_from(c: &Vec) -> Result { @@ -157,6 +185,22 @@ where } } +// TODO(cmc): We'd like this, but orphan rules prevent us from having it: +// +// ``` +// = note: conflicting implementation in crate `core`: +// - impl std::convert::TryFrom for T +// where U: std::convert::Into; +// ``` +// +// impl<'a, C: SerializableComponent, I: IntoIterator> TryFrom for ComponentBundle { +// type Error = MsgBundleError; + +// fn try_from(c: I) -> Result { +// c.as_slice().try_into() +// } +// } + /// A `MsgBundle` holds data necessary for composing a single log message. /// /// # Example @@ -230,12 +274,12 @@ impl MsgBundle { components, }; - // Since we don't yet support splats, we need to craft an array of `MsgId`s that matches - // the length of the other components. - // - // TODO(#440): support splats & remove this hack. - this.components - .push(vec![msg_id; this.row_len(0)].try_into().unwrap()); + // TODO(cmc): Since we don't yet support mixing splatted data within instanced rows, + // we need to craft an array of `MsgId`s that matches the length of the other components. + if let Some(nb_instances) = this.nb_instances(0) { + this.try_append_component(&vec![msg_id; nb_instances]) + .unwrap(); + } this } @@ -243,60 +287,49 @@ impl MsgBundle { /// Try to append a collection of `Component` onto the `MessageBundle`. /// /// This first converts the component collection into an Arrow array, and then wraps it in a [`ListArray`]. - pub fn try_append_component<'a, Element, Collection>( + pub fn try_append_component<'a, Component, Collection>( &mut self, component: Collection, ) -> Result<()> where - Element: Component + ArrowSerialize + ArrowField + 'static, - Collection: IntoIterator, + Component: SerializableComponent, + Collection: IntoIterator, { let array: Box = TryIntoArrow::try_into_arrow(component)?; let wrapped = wrap_in_listarray(array).boxed(); - let bundle = ComponentBundle { - name: Element::name(), - value: wrapped, - }; + let bundle = ComponentBundle::new(Component::name(), wrapped); self.components.push(bundle); Ok(()) } - /// Returns the length of a specific row within the bundle, i.e. the row's _number of - /// instances_. - /// - /// Panics if `row_nr` is out of bounds. - pub fn row_len(&self, row_nr: usize) -> usize { - // TODO(#440): won't be able to pick any component randomly once we support splats! - self.components.first().map_or(0, |bundle| { - bundle - .value - .as_any() - .downcast_ref::>() - .unwrap() - .offsets() - .lengths() - .nth(row_nr) - .unwrap() - }) + /// Returns the number of component collections in this bundle, i.e. the length of the bundle + /// itself. + pub fn nb_components(&self) -> usize { + self.components.len() } - /// Returns the length of the bundle, i.e. its _number of rows_. - pub fn len(&self) -> usize { - // TODO(#440): won't be able to pick any component randomly once we support splats! - self.components.first().map_or(0, |bundle| { - bundle - .value - .as_any() - .downcast_ref::>() - .unwrap() - .len() - }) + /// Returns the number of _rows_ for each component collections in this bundle, i.e. the + /// length of each component collections. + /// + /// All component collections within a `MsgBundle` must share the same number of rows! + /// + /// Currently always 1 as we don't yet support batch insertions. + pub fn nb_rows(&self) -> usize { + self.components.first().map_or(0, |bundle| bundle.nb_rows()) } - pub fn is_empty(&self) -> bool { - self.len() == 0 + /// Returns the number of _instances_ for a given `row` in the bundle, i.e. the length of a + /// specific row within the bundle. + /// + /// Since we don't yet support batch insertions and all components within a single row must + /// have the same number of instances, we simply pick the value for the first component + /// collection. + pub fn nb_instances(&self, row: usize) -> Option { + self.components + .first() + .map_or(Some(0), |bundle| bundle.nb_instances(row)) } /// Returns the index of `component` in the bundle, if it exists. @@ -469,9 +502,8 @@ fn extract_components( .fields() .iter() .zip(components.values()) - .map(|(field, component)| ComponentBundle { - name: ComponentName::from(field.name.as_str()), - value: component.clone(), + .map(|(field, component)| { + ComponentBundle::new(ComponentName::from(field.name.as_str()), component.clone()) }) .collect()) } diff --git a/crates/re_viewer/src/ui/data_ui/mod.rs b/crates/re_viewer/src/ui/data_ui/mod.rs index 56ec80522460..e0b2c9b02231 100644 --- a/crates/re_viewer/src/ui/data_ui/mod.rs +++ b/crates/re_viewer/src/ui/data_ui/mod.rs @@ -96,15 +96,13 @@ impl DataUi for [ComponentBundle] { } } -fn format_component_bundle(component_bundle: &ComponentBundle) -> String { - let ComponentBundle { name, value } = component_bundle; - - use re_arrow_store::ArrayExt as _; - let num_instances = value.get_child_length(0); - +fn format_component_bundle(bundle: &ComponentBundle) -> String { // TODO(emilk): if there's only once instance, and the byte size is small, then deserialize and show the value. - - format!("{}x {}", num_instances, name.short_name()) + format!( + "{}x {}", + bundle.nb_instances(0).unwrap(), // all of our bundles have exactly 1 row as of today + bundle.name.short_name() + ) } impl DataUi for PathOp { diff --git a/crates/rerun_sdk/Cargo.toml b/crates/rerun_sdk/Cargo.toml index 41b533e49249..1122f65a7897 100644 --- a/crates/rerun_sdk/Cargo.toml +++ b/crates/rerun_sdk/Cargo.toml @@ -35,6 +35,7 @@ document-features = "0.2" lazy_static.workspace = true nohash-hasher = "0.2" once_cell = "1.12" +thiserror.workspace = true # Optional dependencies: re_ws_comms = { path = "../re_ws_comms", optional = true, features = [ diff --git a/crates/rerun_sdk/src/lib.rs b/crates/rerun_sdk/src/lib.rs index b589c64b03ea..0e529ce8a5fd 100644 --- a/crates/rerun_sdk/src/lib.rs +++ b/crates/rerun_sdk/src/lib.rs @@ -5,6 +5,7 @@ //! // Work with timestamps +// TODO(cmc): remove traces of previous APIs & examples pub mod time; pub use time::log_time; @@ -12,48 +13,40 @@ pub use time::log_time; mod session; pub use self::session::Session; +mod msg_sender; +pub use self::msg_sender::{MsgSender, MsgSenderError}; + mod global; pub use self::global::global_session; pub mod viewer; -// TODO(cmc): clean all that up? - -pub use re_log_types::msg_bundle::MsgBundle; -pub use re_log_types::{EntityPath, LogMsg, MsgId}; -pub use re_log_types::{Time, TimePoint, TimeType, Timeline}; - -// TODO(cmc): separate datatypes (e.g. Vec3D) from components (e.g. Size3D). -pub use re_log_types::component_types::AnnotationContext; -pub use re_log_types::component_types::Arrow3D; -pub use re_log_types::component_types::Box3D; -pub use re_log_types::component_types::ClassId; -pub use re_log_types::component_types::ColorRGBA; -pub use re_log_types::component_types::InstanceKey; -pub use re_log_types::component_types::KeypointId; -pub use re_log_types::component_types::Label; -pub use re_log_types::component_types::Mat3x3; -pub use re_log_types::component_types::Quaternion; -pub use re_log_types::component_types::Radius; -pub use re_log_types::component_types::Rect2D; -pub use re_log_types::component_types::Size3D; -pub use re_log_types::component_types::TextEntry; -pub use re_log_types::component_types::{ - coordinates::{Axis3, Handedness, Sign, SignedAxis3}, - ViewCoordinates, +// --- + +// init +pub use re_log_types::{ApplicationId, RecordingId}; +pub use re_sdk_comms::default_server_addr; + +// messages +pub use re_log_types::{ + msg_bundle::{Component, ComponentBundle, MsgBundle, SerializableComponent}, + ComponentName, EntityPath, LogMsg, MsgId, Time, TimeInt, TimePoint, TimeType, Timeline, }; -pub use re_log_types::component_types::{EncodedMesh3D, Mesh3D, MeshFormat, MeshId, RawMesh3D}; -pub use re_log_types::component_types::{LineStrip2D, LineStrip3D}; -pub use re_log_types::component_types::{Pinhole, Rigid3, Transform}; -pub use re_log_types::component_types::{Point2D, Point3D}; -pub use re_log_types::component_types::{Scalar, ScalarPlotProps}; + +// components pub use re_log_types::component_types::{ - Tensor, TensorData, TensorDataMeaning, TensorDimension, TensorId, TensorTrait, + coordinates::{Axis3, Handedness, Sign, SignedAxis3}, + AnnotationContext, Arrow3D, Box3D, ClassId, ColorRGBA, EncodedMesh3D, InstanceKey, KeypointId, + Label, LineStrip2D, LineStrip3D, Mat3x3, Mesh3D, MeshFormat, MeshId, Pinhole, Point2D, Point3D, + Quaternion, Radius, RawMesh3D, Rect2D, Rigid3, Scalar, ScalarPlotProps, Size3D, Tensor, + TensorData, TensorDataMeaning, TensorDimension, TensorId, TensorTrait, TextEntry, Transform, + Vec2D, Vec3D, Vec4D, ViewCoordinates, }; -pub use re_log_types::component_types::{Vec2D, Vec3D, Vec4D}; -pub mod reexports { +// re-exports +pub mod external { pub use re_log; pub use re_log_types; pub use re_memory; + pub use re_sdk_comms; } diff --git a/crates/rerun_sdk/src/msg_sender.rs b/crates/rerun_sdk/src/msg_sender.rs new file mode 100644 index 000000000000..4ae85158779f --- /dev/null +++ b/crates/rerun_sdk/src/msg_sender.rs @@ -0,0 +1,483 @@ +use arrow2::array::Array; +use nohash_hasher::IntMap; +use re_log_types::external::arrow2_convert::serialize::TryIntoArrow; +use re_log_types::msg_bundle::MsgBundleError; +use re_log_types::{component_types::InstanceKey, msg_bundle::wrap_in_listarray}; + +use crate::{ + Component, ComponentBundle, ComponentName, EntityPath, LogMsg, MsgBundle, MsgId, + SerializableComponent, Session, Time, TimeInt, TimePoint, Timeline, Transform, +}; + +// --- + +#[derive(thiserror::Error, Debug)] +pub enum MsgSenderError { + #[error( + "All component collections must have exactly one row (i.e. no batching), got {0:?} instead" + )] + MoreThanOneRow(Vec<(ComponentName, usize)>), + + #[error( + "All component collections must share the same number of instances (i.e. row length) \ + for a given row, got {0:?} instead" + )] + MismatchedRowLengths(Vec<(ComponentName, usize)>), + + #[error("Instance keys cannot be splatted")] + SplattedInstanceKeys, + + #[error("InstanceKey(u64::MAX) is reserved for Rerun internals")] + IllegalInstanceKey, + + #[error(transparent)] + PackingError(#[from] MsgBundleError), +} + +/// Facilitates building and sending component payloads with the Rerun SDK. +/// +/// ```ignore +/// fn log_coordinate_space( +/// session: &mut Session, +/// ent_path: impl Into, +/// axes: &str, +/// ) -> anyhow::Result<()> { +/// let view_coords: ViewCoordinates = axes +/// .parse() +/// .map_err(|err| anyhow!("couldn't parse {axes:?} as ViewCoordinates: {err}"))?; +/// +/// MsgSender::new(ent_path) +/// .with_timeless(true) +/// .with_component(&[view_coords])? +/// .send(session) +/// .map_err(Into::into) +/// } +/// ``` +pub struct MsgSender { + // TODO(cmc): At the moment, a `MsgBundle` can only contain data for a single entity, so + // this must be known as soon as we spawn the builder. + // This won't be true anymore once batch insertions land. + entity_path: EntityPath, + + /// All the different timestamps for this message. + /// + /// The logging time is automatically inserted during creation ([`Self::new`]). + timepoint: TimePoint, + /// If true, all timestamp data associated with this message will be dropped right before + /// sending it to Rerun. + /// + /// Timeless data is present on all timelines and behaves as if it was recorded infinitely far + /// into the past. + timeless: bool, + + /// The expected number of instances for each row of each component collections appended to the + /// current message. + /// + /// Since we don't yet support batch insertions, the number of rows for each component + /// collection will always be 1. + /// The number of instances per row, on the other hand, will be decided based upon the first + /// component collection that's appended. + nb_instances: Option, + /// All the instanced component collections that have been appended to this message. + /// + /// As of today, they must have exactly 1 row of data (no batching), which itself must have + /// `Self::nb_instances` instance keys. + instanced: Vec, + + /// All the splatted components that have been appended to this message. + /// + /// By definition, all `ComponentBundle`s in this vector will have 1 row (no batching) and more + /// importantly a single, special instance key for that row. + splatted: Vec, +} + +impl MsgSender { + /// Starts a new `MsgSender` for the given entity path. + /// + /// It is during this call that the logging time for the message is recorded! + pub fn new(ent_path: impl Into) -> Self { + Self { + entity_path: ent_path.into(), + + timepoint: [(Timeline::log_time(), Time::now().into())].into(), + timeless: false, + + nb_instances: None, + instanced: Vec::new(), + splatted: Vec::new(), + } + } + + // --- Time --- + + /// Appends a given `timepoint` to the current message. + /// + /// This can be called any number of times. In case of collisions, last write wins. + /// I.e. if `timepoint` contains a timestamp `ts1` for a timeline `my_time` and the current + /// message already has a timestamp `ts0` for that same timeline, then the new value (`ts1`) + /// will overwrite the existing value (`ts0`). + /// + /// `MsgSender` automatically keeps track of the logging time, which is recorded when + /// [`Self::new`] is first called. + pub fn with_timepoint(mut self, timepoint: TimePoint) -> Self { + for (timeline, time) in timepoint { + self.timepoint.insert(timeline, time); + } + self + } + + /// Appends a given `timeline`/`time` pair to the current message. + /// + /// This can be called any number of times. In case of collisions, last write wins. + /// I.e. if the current message already has a timestamp value for that `timeline`, then the + /// new `time` value that was just passed in will overwrite it. + /// + /// `MsgSender` automatically keeps track of the logging time, which is recorded when + /// [`Self::new`] is first called. + pub fn with_time(mut self, timeline: Timeline, time: impl Into) -> Self { + self.timepoint.insert(timeline, time.into()); + self + } + + /// Specifies whether the current message is timeless. + /// + /// A timeless message will drop all of its timestamp data before being sent to Rerun. + /// Timeless data is present on all timelines and behaves as if it was recorded infinitely far + /// into the past. + /// + /// Always `false` by default. + pub fn with_timeless(mut self, timeless: bool) -> Self { + self.timeless = timeless; + self + } + + // --- Components --- + + /// Appends a component collection to the current message. + /// + /// All component collections stored in the message must have the same row-length (i.e. number + /// of instances)! + /// The row-length of the first appended collection is used as ground truth. + /// + /// ⚠ This can only be called once per type of component! + /// The SDK does not yet support batch insertions, which are semantically identical to adding + /// the same component type multiple times in a single message. + /// Doing so will return an error when trying to `send()` the message. + // + // TODO(#589): batch insertions + pub fn with_component<'a, C: SerializableComponent>( + mut self, + data: impl IntoIterator, + ) -> Result { + let bundle = bundle_from_iter(data)?; + + let nb_instances = bundle.nb_instances(0).unwrap(); // must have exactly 1 row atm + + // If this is the first appended collection, it gets to decide the row-length (i.e. number + // of instances) of all future collections. + if self.nb_instances.is_none() { + self.nb_instances = Some(nb_instances); + } + + // Detect mismatched row-lengths early on... unless it's a Transform bundle: transforms + // behave differently and will be sent in their own message! + if C::name() != Transform::name() && self.nb_instances.unwrap() != nb_instances { + let collections = self + .instanced + .into_iter() + .map(|bundle| (bundle.name, bundle.nb_instances(0).unwrap_or(0))) + .collect(); + return Err(MsgSenderError::MismatchedRowLengths(collections)); + } + + // TODO(cmc): if this is an InstanceKey and it contains u64::MAX, fire IllegalInstanceKey. + + self.instanced.push(bundle); + + Ok(self) + } + + /// Appends a splatted component to the current message. + /// + /// Splatted components apply to all the instance keys of an entity, whatever they may be at + /// that point in time. + /// + /// ⚠ `InstanceKey`s themselves cannot be splatted! Trying to do so will return an error. + /// + /// ⚠ This can only be called once per type of component! + /// The SDK does not yet support batch insertions, which are semantically identical to adding + /// the same component type multiple times in a single message. + /// Doing so will return an error when trying to `send()` the message. + // + // TODO(#589): batch insertions + pub fn with_splat(mut self, data: C) -> Result { + if C::name() == InstanceKey::name() { + return Err(MsgSenderError::SplattedInstanceKeys); + } + + self.splatted.push(bundle_from_iter(&[data])?); + + Ok(self) + } + + /// Helper to make it easier to optionally append splatted components. + /// + /// See [`Self::with_splat`]. + pub fn with_splat_opt( + self, + data: Option, + ) -> Result { + if let Some(data) = data { + self.with_splat(data) + } else { + Ok(self) + } + } + + // --- Send --- + + /// Consumes, packs, sanity checkes and finally sends the message to the currently configured + /// target of the SDK. + pub fn send(self, session: &mut Session) -> Result<(), MsgSenderError> { + let [msg_standard, msg_transforms, msg_splats] = self.into_messages()?; + + if let Some(msg_standard) = msg_standard { + session.send(LogMsg::ArrowMsg(msg_standard.try_into()?)); + } + if let Some(msg_transforms) = msg_transforms { + session.send(LogMsg::ArrowMsg(msg_transforms.try_into()?)); + } + if let Some(msg_splats) = msg_splats { + session.send(LogMsg::ArrowMsg(msg_splats.try_into()?)); + } + + Ok(()) + } + + fn into_messages(self) -> Result<[Option; 3], MsgSenderError> { + let Self { + entity_path, + timepoint, + timeless, + nb_instances: _, + instanced, + mut splatted, + } = self; + + if timeless && timepoint.times().len() > 1 { + re_log::warn_once!("Recorded timepoints in a timeless message, they will be dropped!"); + } + + // clear current timepoint if marked as timeless + let timepoint = if timeless { [].into() } else { timepoint }; + + // separate transforms from the rest + // TODO(cmc): just use `Vec::drain_filter` once it goes stable... + let mut all_bundles: Vec<_> = instanced.into_iter().map(Some).collect(); + let standard_bundles: Vec<_> = all_bundles + .iter_mut() + .filter(|bundle| bundle.as_ref().unwrap().name != Transform::name()) + .map(|bundle| bundle.take().unwrap()) + .collect(); + let transform_bundles: Vec<_> = all_bundles + .iter_mut() + .filter(|bundle| { + bundle + .as_ref() + .map_or(false, |bundle| bundle.name == Transform::name()) + }) + .map(|bundle| bundle.take().unwrap()) + .collect(); + debug_assert!(all_bundles.into_iter().all(|bundle| bundle.is_none())); + + // TODO(cmc): The sanity checks we do in here can (and probably should) be done in + // `MsgBundle` instead so that the python SDK benefits from them too... but one step at a + // time. + + // sanity check: no row-level batching + let mut rows_per_comptype: IntMap = IntMap::default(); + for bundle in standard_bundles + .iter() + .chain(&transform_bundles) + .chain(&splatted) + { + *rows_per_comptype.entry(bundle.name).or_default() += bundle.nb_rows(); + } + if rows_per_comptype.values().any(|nb_rows| *nb_rows > 1) { + return Err(MsgSenderError::MoreThanOneRow( + rows_per_comptype.into_iter().collect(), + )); + } + + // sanity check: transforms can't handle multiple instances + let nb_transform_instances = transform_bundles + .get(0) + .and_then(|bundle| bundle.nb_instances(0)) + .unwrap_or(0); + if nb_transform_instances > 1 { + re_log::warn!("detected Transform component with multiple instances"); + } + + let mut msgs = [(); 3].map(|_| None); + + // Standard + msgs[0] = (!standard_bundles.is_empty()).then(|| { + MsgBundle::new( + MsgId::random(), + entity_path.clone(), + timepoint.clone(), + standard_bundles, + ) + }); + + // Transforms + msgs[1] = (!transform_bundles.is_empty()).then(|| { + MsgBundle::new( + MsgId::random(), + entity_path.clone(), + timepoint.clone(), + transform_bundles, + ) + }); + + // Splats + msgs[2] = (!splatted.is_empty()).then(|| { + splatted.push(bundle_from_iter(&[InstanceKey::SPLAT]).unwrap()); + MsgBundle::new(MsgId::random(), entity_path, timepoint, splatted) + }); + + Ok(msgs) + } +} + +fn bundle_from_iter<'a, C: SerializableComponent>( + data: impl IntoIterator, +) -> Result { + // TODO(cmc): Eeeh, that's not ideal to repeat that kind of logic in here, but orphan rules + // kinda force us to :/ + + let array: Box = TryIntoArrow::try_into_arrow(data)?; + let wrapped = wrap_in_listarray(array).boxed(); + + Ok(ComponentBundle::new(C::name(), wrapped)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty() { + let [standard, transforms, splats] = MsgSender::new("some/path").into_messages().unwrap(); + assert!(standard.is_none()); + assert!(transforms.is_none()); + assert!(splats.is_none()); + } + + #[test] + fn full() -> Result<(), MsgSenderError> { + let labels = vec![crate::Label("label1".into()), crate::Label("label2".into())]; + let transform = vec![crate::Transform::Rigid3(crate::Rigid3::default())]; + let color = crate::ColorRGBA::from([255, 0, 255, 255]); + + let [standard, transforms, splats] = MsgSender::new("some/path") + .with_component(&labels)? + .with_component(&transform)? + .with_splat(color)? + .into_messages() + .unwrap(); + + { + let standard = standard.unwrap(); + let idx = standard.find_component(&crate::Label::name()).unwrap(); + let bundle = &standard.components[idx]; + assert!(bundle.nb_rows() == 1); + assert!(bundle.nb_instances(0).unwrap() == 2); + } + + { + let transforms = transforms.unwrap(); + let idx = transforms + .find_component(&crate::Transform::name()) + .unwrap(); + let bundle = &transforms.components[idx]; + assert!(bundle.nb_rows() == 1); + assert!(bundle.nb_instances(0).unwrap() == 1); + } + + { + let splats = splats.unwrap(); + let idx = splats.find_component(&crate::ColorRGBA::name()).unwrap(); + let bundle = &splats.components[idx]; + assert!(bundle.nb_rows() == 1); + assert!(bundle.nb_instances(0).unwrap() == 1); + } + + Ok(()) + } + + #[test] + fn timepoint_last_write_wins() { + let my_timeline = Timeline::new("my_timeline", crate::TimeType::Sequence); + let sender = MsgSender::new("some/path") + .with_time(my_timeline, 0) + .with_time(my_timeline, 1) + .with_time(my_timeline, 2); + assert_eq!( + TimeInt::from(2), + *sender.timepoint.get(&my_timeline).unwrap() + ); + } + + #[test] + fn timepoint_timeless() -> Result<(), MsgSenderError> { + let my_timeline = Timeline::new("my_timeline", crate::TimeType::Sequence); + + let sender = MsgSender::new("some/path") + .with_timeless(true) + .with_component(&vec![crate::Label("label1".into())])? + .with_time(my_timeline, 2); + assert!(!sender.timepoint.is_empty()); // not yet + + let [standard, _, _] = sender.into_messages().unwrap(); + assert!(standard.unwrap().time_point.is_empty()); + + Ok(()) + } + + #[test] + fn attempted_batch() -> Result<(), MsgSenderError> { + let res = MsgSender::new("some/path") + .with_component(&vec![crate::Label("label1".into())])? + .with_component(&vec![crate::Label("label2".into())])? + .into_messages(); + + let Err(MsgSenderError::MoreThanOneRow(err)) = res else { panic!() }; + assert_eq!([(crate::Label::name(), 2)].to_vec(), err); + + Ok(()) + } + + #[test] + fn illegal_instance_key() -> Result<(), MsgSenderError> { + let _ = MsgSender::new("some/path") + .with_component(&vec![crate::Label("label1".into())])? + .with_component(&vec![crate::InstanceKey(u64::MAX)])? + .into_messages()?; + + // TODO(cmc): This is not detected as of today, but it probably should. + + Ok(()) + } + + #[test] + fn splatted_instance_key() -> Result<(), MsgSenderError> { + let res = MsgSender::new("some/path") + .with_component(&vec![crate::Label("label1".into())])? + .with_splat(crate::InstanceKey(42)); + + assert!(matches!(res, Err(MsgSenderError::SplattedInstanceKeys))); + + Ok(()) + } +} diff --git a/examples/raw_mesh/Cargo.toml b/examples/raw_mesh/Cargo.toml index 0701dbede0be..8e7fb6cedc2f 100644 --- a/examples/raw_mesh/Cargo.toml +++ b/examples/raw_mesh/Cargo.toml @@ -9,6 +9,7 @@ publish = false [dependencies] anyhow.workspace = true bytes = "1.3" +clap = { version = "4.0", features = ["derive"] } gltf.workspace = true mimalloc = "0.1" reqwest = { workspace = true, features = ["blocking", "rustls-tls"] } diff --git a/examples/raw_mesh/src/main.rs b/examples/raw_mesh/src/main.rs index 1632ecbc7009..d56301981306 100644 --- a/examples/raw_mesh/src/main.rs +++ b/examples/raw_mesh/src/main.rs @@ -8,11 +8,14 @@ #![allow(clippy::doc_markdown)] -use anyhow::{bail, Context}; +use std::path::PathBuf; + +use anyhow::anyhow; use bytes::Bytes; +use clap::Parser; use rerun::{ - reexports::{re_log, re_memory::AccountingAllocator}, - EntityPath, LogMsg, Mesh3D, MeshId, MsgBundle, MsgId, RawMesh3D, Session, Time, TimePoint, + external::{re_log, re_memory::AccountingAllocator}, + ApplicationId, EntityPath, Mesh3D, MeshId, MsgSender, RawMesh3D, RecordingId, Session, TimeType, Timeline, Transform, Vec4D, ViewCoordinates, }; @@ -66,18 +69,9 @@ impl From for Transform { } /// Log a glTF node with Rerun. -fn log_node(session: &mut Session, node: GltfNode) { +fn log_node(session: &mut Session, node: GltfNode) -> anyhow::Result<()> { let ent_path = EntityPath::from(node.name.as_str()); - // What time is it? - let timeline_keyframe = Timeline::new("keyframe", TimeType::Sequence); - let time_point = TimePoint::from([ - // TODO(cmc): this _has_ to be inserted by the SDK - (Timeline::log_time(), Time::now().into()), - // TODO(cmc): animations! - (timeline_keyframe, 0.into()), - ]); - // Convert glTF objects into Rerun components. let transform = node.transform.map(Transform::from); let primitives = node @@ -86,61 +80,36 @@ fn log_node(session: &mut Session, node: GltfNode) { .map(Mesh3D::from) .collect::>(); - // TODO(cmc): Transforms have to be logged separately because they are neither batches nor - // splats... the user shouldn't have to know that though! - // The SDK needs to split things up as needed when it sees a Transform component. - // - // We're going to have the same issue with splats: the SDK needs to automagically detect the - // user intention to use splats and do the necessary (like the python SDK does iirc). - if let Some(transform) = transform { - let bundle = MsgBundle::new( - MsgId::random(), - ent_path.clone(), - time_point.clone(), - // TODO(cmc): need to reproduce the viewer crash I had earlier and fix/log-an-issue for - // it. - vec![vec![transform].try_into().unwrap()], - ); - // TODO(cmc): These last conversion details need to be hidden in the SDK. - let msg = bundle.try_into().unwrap(); - session.send(LogMsg::ArrowMsg(msg)); - } - - // TODO(cmc): Working at the `ComponentBundle`/`TryIntoArrow` layer feels too low-level, - // something like a MsgBuilder kinda thing would probably be quite nice. - let bundle = MsgBundle::new( - MsgId::random(), - ent_path, - time_point, - vec![primitives.try_into().unwrap()], - ); - - // Create and send one message to the sdk - // TODO(cmc): These last conversion details need to be hidden in the SDK. - let msg = bundle.try_into().unwrap(); - session.send(LogMsg::ArrowMsg(msg)); + let timeline_keyframe = Timeline::new("keyframe", TimeType::Sequence); + MsgSender::new(ent_path) + .with_time(timeline_keyframe, 0) + .with_component(&primitives)? + .with_component(transform.as_ref())? + .send(session)?; // Recurse through all of the node's children! for mut child in node.children { child.name = [node.name.as_str(), child.name.as_str()].join("/"); - log_node(session, child); + log_node(session, child)?; } -} -// TODO(cmc): The SDK should make this call so trivial that it doesn't require this helper at all. -fn log_axis(session: &mut Session, ent_path: &EntityPath) { - // glTF always uses a right-handed coordinate system when +Y is up and meshes face +Z. - let view_coords: ViewCoordinates = "RUB".parse().unwrap(); - - let bundle = MsgBundle::new( - MsgId::random(), - ent_path.clone(), - [].into(), // TODO(cmc): doing timeless stuff shouldn't be so weird - vec![vec![view_coords].try_into().unwrap()], - ); + Ok(()) +} - let msg = bundle.try_into().unwrap(); - session.send(LogMsg::ArrowMsg(msg)); +fn log_coordinate_space( + session: &mut Session, + ent_path: impl Into, + axes: &str, +) -> anyhow::Result<()> { + let view_coords: ViewCoordinates = axes + .parse() + .map_err(|err| anyhow!("couldn't parse {axes:?} as ViewCoordinates: {err}"))?; + + MsgSender::new(ent_path) + .with_timeless(true) + .with_component(&[view_coords])? + .send(session) + .map_err(Into::into) } // --- Init --- @@ -151,37 +120,62 @@ fn log_axis(session: &mut Session, ent_path: &EntityPath) { static GLOBAL: AccountingAllocator = AccountingAllocator::new(mimalloc::MiMalloc); +#[derive(Debug, clap::Parser)] +#[clap(author, version, about)] +struct Args { + /// If specified, connects and sends the logged data to a remote Rerun viewer. + /// + /// Optionally takes an ip:port, otherwise uses Rerun's defaults. + #[clap(long)] + #[allow(clippy::option_option)] + connect: Option>, + + /// Specifies the path of the glTF scene to load. + #[clap(long)] + scene_path: PathBuf, +} + fn main() -> anyhow::Result<()> { re_log::setup_native_logging(); - // TODO(cmc): Here we shall pass argv to the SDK which will strip it out of all SDK flags, and - // give us back our actual CLI flags. - // The name of the gltf sample to load should then come from there. - - // Read glTF asset - let args = std::env::args().collect::>(); - let bytes = if let Some(path) = args.get(1) { - Bytes::from(std::fs::read(path)?) - } else { - bail!("Usage: {} ", args[0]); + let args = Args::parse(); + let addr = match args.connect.as_ref() { + Some(Some(addr)) => Some(addr.parse()?), + Some(None) => Some(rerun::default_server_addr()), + None => None, }; - // Parse glTF asset - let (doc, buffers, _) = gltf::import_slice(bytes).unwrap(); + let mut session = Session::new(); + // TODO(cmc): The Rust SDK needs a higher-level `init()` method, akin to what the python SDK + // does... which they can probably share. + // This needs to take care of the whole `official_example` thing, and also keeps track of + // whether we're using the rust or python sdk. + session.set_application_id(ApplicationId("objectron-rs".into()), true); + session.set_recording_id(RecordingId::random()); + if let Some(addr) = addr { + session.connect(addr); + } + + // Read glTF scene + let (doc, buffers, _) = gltf::import_slice(Bytes::from(std::fs::read(args.scene_path)?))?; let nodes = load_gltf(&doc, &buffers); // Log raw glTF nodes and their transforms with Rerun - let mut session = Session::new(); for root in nodes { re_log::info!(scene = root.name, "logging glTF scene"); - log_axis(&mut session, &root.name.as_str().into()); - log_node(&mut session, root); + log_coordinate_space(&mut session, root.name.as_str(), "RUB")?; + log_node(&mut session, root)?; + } + + // TODO(cmc): arg parsing and arg interpretation helpers + // TODO(cmc): missing flags: save, serve + // TODO(cmc): expose an easy to use async local mode. + if args.connect.is_none() { + let log_messages = session.drain_log_messages_buffer(); + rerun::viewer::show(log_messages)?; } - // TODO(cmc): provide high-level tools to pick and handle the different modes. - // TODO(cmc): connect, spawn_and_connect; show() probably doesn't make sense with pure rust - let log_messages = session.drain_log_messages_buffer(); - rerun::viewer::show(log_messages).context("failed to start viewer") + Ok(()) } // --- glTF parsing --- diff --git a/rerun_py/rerun/__init__.py b/rerun_py/rerun/__init__.py index 67e20eb2f652..4e72d42e2cc4 100644 --- a/rerun_py/rerun/__init__.py +++ b/rerun_py/rerun/__init__.py @@ -16,7 +16,7 @@ real_path = pathlib.Path(__file__).parent.parent.joinpath("rerun_sdk").resolve() -print("DEV ENVIRONMENT DETECTED! Re-importing rerun from: {}".format(real_path), file=sys.stderr) +print(f"DEV ENVIRONMENT DETECTED! Re-importing rerun from: {real_path}", file=sys.stderr) sys.path.insert(0, str(real_path)) diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index 807d8e83d94d..1feadb321eb9 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -101,9 +101,11 @@ pub fn build_chunk_from_components( let cmp_bundles = arrays .into_iter() .zip(fields.into_iter()) - .map(|(value, field)| ComponentBundle { - name: field.name.into(), - value: msg_bundle::wrap_in_listarray(value).boxed(), + .map(|(value, field)| { + ComponentBundle::new( + field.name.into(), + msg_bundle::wrap_in_listarray(value).boxed(), + ) }) .collect();