Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Feb 5, 2023
1 parent f7b57da commit 527eb79
Show file tree
Hide file tree
Showing 12 changed files with 395 additions and 192 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/re_arrow_store/src/arrow_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ fn test_clean_for_polars_nomodify() {

// Colors don't need polars cleaning
let bundle: ComponentBundle = build_some_colors(5).try_into().unwrap();
let cleaned = bundle.value.clean_for_polars();
assert_eq!(bundle.value, cleaned);
let cleaned = bundle.value().clean_for_polars();
assert_eq!(bundle.value(), &*cleaned);
}

#[test]
Expand All @@ -282,7 +282,7 @@ fn test_clean_for_polars_modify() {

let bundle: ComponentBundle = transforms.try_into().unwrap();
assert_eq!(
*bundle.value.data_type(),
*bundle.data_type(),
DataType::List(Box::new(Field::new(
"item",
DataType::Union(
Expand Down Expand Up @@ -340,7 +340,7 @@ fn test_clean_for_polars_modify() {
)))
);

let cleaned = bundle.value.clean_for_polars();
let cleaned = bundle.value().clean_for_polars();

assert_eq!(
*cleaned.data_type(),
Expand Down
56 changes: 24 additions & 32 deletions crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,51 +66,44 @@ impl DataStore {
/// If the bundle doesn't carry a payload for the cluster key, one will be auto-generated
/// based on the length of the components in the payload, in the form of an array of
/// monotonically increasing u64s going from `0` to `N-1`.
pub fn insert(&mut self, bundle: &MsgBundle) -> WriteResult<()> {
pub fn insert(&mut self, msg: &MsgBundle) -> WriteResult<()> {
// TODO(cmc): kind & insert_id need to somehow propagate through the span system.
self.insert_id += 1;

let MsgBundle {
msg_id,
entity_path: ent_path,
time_point,
components,
} = bundle;
components: bundles,
} = msg;

if components.is_empty() {
if bundles.is_empty() {
return Ok(());
}

crate::profile_function!();

let ent_path_hash = ent_path.hash();
let nb_rows = components[0].value.len();
let nb_rows = bundles[0].nb_rows();

// Effectively the same thing as having a non-unit length batch, except it's really not
// worth more than an assertion since:
// - A) `MsgBundle` should already guarantee this
// - B) this limitation should be gone soon enough
debug_assert!(
bundle
.components
.iter()
.map(|bundle| bundle.name)
.all_unique(),
msg.components.iter().map(|bundle| bundle.name).all_unique(),
"cannot insert same component multiple times, this is equivalent to multiple rows",
);
// Batches cannot contain more than 1 row at the moment.
if nb_rows != 1 {
return Err(WriteError::MoreThanOneRow(nb_rows));
}
// Components must share the same number of rows.
if !components
.iter()
.all(|bundle| bundle.value.len() == nb_rows)
{
if !bundles.iter().all(|bundle| bundle.nb_rows() == nb_rows) {
return Err(WriteError::MismatchedRows(
components
bundles
.iter()
.map(|bundle| (bundle.name, bundle.value.len()))
.map(|bundle| (bundle.name, bundle.nb_rows()))
.collect(),
));
}
Expand All @@ -123,12 +116,12 @@ impl DataStore {
.map(|(timeline, time)| (timeline.name(), timeline.typ().format(*time)))
.collect::<Vec<_>>(),
entity = %ent_path,
components = ?components.iter().map(|bundle| &bundle.name).collect::<Vec<_>>(),
components = ?bundles.iter().map(|bundle| &bundle.name).collect::<Vec<_>>(),
nb_rows,
"insertion started..."
);

let cluster_comp_pos = components
let cluster_comp_pos = bundles
.iter()
.find_position(|bundle| bundle.name == self.cluster_key)
.map(|(pos, _)| pos);
Expand All @@ -138,7 +131,7 @@ impl DataStore {

// TODO(#589): support for batched row component insertions
for row_nr in 0..nb_rows {
self.insert_timeless_row(row_nr, cluster_comp_pos, components, &mut row_indices)?;
self.insert_timeless_row(row_nr, cluster_comp_pos, bundles, &mut row_indices)?;
}

let index = self
Expand All @@ -155,7 +148,7 @@ impl DataStore {
time_point,
row_nr,
cluster_comp_pos,
components,
bundles,
&mut row_indices,
)?;
}
Expand Down Expand Up @@ -212,7 +205,7 @@ impl DataStore {
.iter()
.filter(|bundle| bundle.name != self.cluster_key)
{
let ComponentBundle { name, value: rows } = bundle;
let (name, rows) = (bundle.name, bundle.value());

// Unwrapping a ListArray is somewhat costly, especially considering we're just
// gonna rewrap it again in a minute... so we'd rather just slice it to a list of
Expand All @@ -224,13 +217,12 @@ impl DataStore {
// So use the fact that `rows` is always of unit-length for now.
let rows_single = rows;

// TODO(#440): support for splats
let nb_instances = rows_single.get_child_length(0);
if nb_instances != cluster_len {
return Err(WriteError::MismatchedInstances {
cluster_comp: self.cluster_key,
cluster_comp_nb_instances: cluster_len,
key: *name,
key: name,
nb_instances,
});
}
Expand All @@ -240,13 +232,13 @@ impl DataStore {
.entry(bundle.name)
.or_insert_with(|| {
PersistentComponentTable::new(
*name,
name,
ListArray::<i32>::get_child_type(rows_single.data_type()),
)
});

let row_idx = table.push(rows_single.as_ref());
row_indices.insert(*name, row_idx);
row_indices.insert(name, row_idx);
}

Ok(())
Expand Down Expand Up @@ -285,7 +277,7 @@ impl DataStore {
.iter()
.filter(|bundle| bundle.name != self.cluster_key)
{
let ComponentBundle { name, value: rows } = bundle;
let (name, rows) = (bundle.name, bundle.value());

// Unwrapping a ListArray is somewhat costly, especially considering we're just
// gonna rewrap it again in a minute... so we'd rather just slice it to a list of
Expand All @@ -303,20 +295,20 @@ impl DataStore {
return Err(WriteError::MismatchedInstances {
cluster_comp: self.cluster_key,
cluster_comp_nb_instances: cluster_len,
key: *name,
key: name,
nb_instances,
});
}

let table = self.components.entry(bundle.name).or_insert_with(|| {
ComponentTable::new(
*name,
name,
ListArray::<i32>::get_child_type(rows_single.data_type()),
)
});

let row_idx = table.push(&self.config, time_point, rows_single.as_ref());
row_indices.insert(*name, row_idx);
row_indices.insert(name, row_idx);
}

Ok(())
Expand Down Expand Up @@ -349,7 +341,7 @@ impl DataStore {

let cluster_comp = &components[cluster_comp_pos];
let data = cluster_comp
.value
.value()
.as_any()
.downcast_ref::<ListArray<i32>>()
.unwrap()
Expand All @@ -367,7 +359,7 @@ impl DataStore {

(
len,
ClusterData::UserData(cluster_comp.value.clone() /* shallow */),
ClusterData::UserData(cluster_comp.value().to_boxed() /* shallow */),
)
} else {
// The caller has not specified any cluster component, and so we'll have to generate
Expand All @@ -378,7 +370,7 @@ impl DataStore {
// share the same length at this point anyway.
let len = components
.first()
.map_or(0, |comp| comp.value.get_child_length(0));
.map_or(0, |comp| comp.value().get_child_length(0));

if let Some(row_idx) = self.cluster_comp_cache.get(&len) {
// Cache hit! Re-use that row index.
Expand Down
27 changes: 11 additions & 16 deletions crates/re_arrow_store/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ fn write_errors() {
]);

// make instances 2 rows long
bundle.components[0].value =
concatenate(&[&*bundle.components[0].value, &*bundle.components[0].value]).unwrap();
bundle.components[0] = ComponentBundle::new(
bundle.components[0].name,
concatenate(&[bundle.components[0].value(), bundle.components[0].value()]).unwrap(),
);

// The first component of the bundle determines the number of rows for all other
// components in there (since it has to match for all of them), so in this case we get a
Expand All @@ -67,8 +69,10 @@ fn write_errors() {
]);

// make component 2 rows long
bundle.components[1].value =
concatenate(&[&*bundle.components[1].value, &*bundle.components[1].value]).unwrap();
bundle.components[1] = ComponentBundle::new(
bundle.components[1].name,
concatenate(&[bundle.components[1].value(), bundle.components[1].value()]).unwrap(),
);

// The first component of the bundle determines the number of rows for all other
// components in there (since it has to match for all of them), so in this case we get a
Expand All @@ -82,10 +86,7 @@ fn write_errors() {
{
pub fn build_sparse_instances() -> ComponentBundle {
let ids = wrap_in_listarray(UInt64Array::from(vec![Some(1), None, Some(3)]).boxed());
ComponentBundle {
name: InstanceKey::name(),
value: ids.boxed(),
}
ComponentBundle::new(InstanceKey::name(), ids.boxed())
}

let mut store = DataStore::new(InstanceKey::name(), Default::default());
Expand All @@ -102,17 +103,11 @@ fn write_errors() {
{
pub fn build_unsorted_instances() -> ComponentBundle {
let ids = wrap_in_listarray(UInt64Array::from_vec(vec![1, 3, 2]).boxed());
ComponentBundle {
name: InstanceKey::name(),
value: ids.boxed(),
}
ComponentBundle::new(InstanceKey::name(), ids.boxed())
}
pub fn build_duped_instances() -> ComponentBundle {
let ids = wrap_in_listarray(UInt64Array::from_vec(vec![1, 2, 2]).boxed());
ComponentBundle {
name: InstanceKey::name(),
value: ids.boxed(),
}
ComponentBundle::new(InstanceKey::name(), ids.boxed())
}

let mut store = DataStore::new(InstanceKey::name(), Default::default());
Expand Down
7 changes: 4 additions & 3 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,14 +868,15 @@ fn joint_df(cluster_key: ComponentName, bundles: &[(ComponentName, &MsgBundle)])
let cluster_comp = if let Some(idx) = bundle.find_component(&cluster_key) {
Series::try_from((
cluster_key.as_str(),
bundle.components[idx].value.to_boxed(),
bundle.components[idx].value().to_boxed(),
))
.unwrap()
} else {
let nb_instances = bundle.nb_instances(0).unwrap_or(0);
Series::try_from((
cluster_key.as_str(),
wrap_in_listarray(
UInt64Array::from_vec((0..bundle.row_len(0) as u64).collect()).to_boxed(),
UInt64Array::from_vec((0..nb_instances as u64).collect()).to_boxed(),
)
.to_boxed(),
))
Expand All @@ -887,7 +888,7 @@ fn joint_df(cluster_key: ComponentName, bundles: &[(ComponentName, &MsgBundle)])
cluster_comp,
Series::try_from((
component.as_str(),
bundle.components[comp_idx].value.to_boxed(),
bundle.components[comp_idx].value().to_boxed(),
))
.unwrap(),
])
Expand Down
Loading

0 comments on commit 527eb79

Please sign in to comment.