From 893138b0f6e269c154080e636e922639e63c06a8 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 12 Apr 2023 17:25:59 +0200 Subject: [PATCH] Datastore revamp 2: serialization & formatting (#1735) --- crates/re_arrow_store/Cargo.toml | 5 +- crates/re_arrow_store/src/lib.rs | 1 + crates/re_arrow_store/src/store.rs | 37 ++-- crates/re_arrow_store/src/store_arrow.rs | 197 ++++++++++++++++++ crates/re_arrow_store/src/store_format.rs | 38 ++-- crates/re_arrow_store/src/store_polars.rs | 17 +- crates/re_arrow_store/src/store_write.rs | 12 +- crates/re_log_types/Cargo.toml | 1 + crates/re_log_types/src/data_table.rs | 43 +++- crates/re_log_types/src/lib.rs | 2 +- .../re_log_types/src/time_point/timeline.rs | 1 + 11 files changed, 291 insertions(+), 63 deletions(-) create mode 100644 crates/re_arrow_store/src/store_arrow.rs diff --git a/crates/re_arrow_store/Cargo.toml b/crates/re_arrow_store/Cargo.toml index 9e18e264e8bc..837147d3a01b 100644 --- a/crates/re_arrow_store/Cargo.toml +++ b/crates/re_arrow_store/Cargo.toml @@ -38,10 +38,7 @@ re_log.workspace = true # External dependencies: ahash.workspace = true -arrow2 = { workspace = true, features = [ - "compute_concatenate", - "compute_aggregate", -] } +arrow2 = { workspace = true, features = ["compute_concatenate"] } arrow2_convert.workspace = true document-features = "0.2" indent = "0.1" diff --git a/crates/re_arrow_store/src/lib.rs b/crates/re_arrow_store/src/lib.rs index ce913676cd37..978a8b0b4a8b 100644 --- a/crates/re_arrow_store/src/lib.rs +++ b/crates/re_arrow_store/src/lib.rs @@ -16,6 +16,7 @@ mod arrow_util; mod store; +mod store_arrow; mod store_format; mod store_gc; mod store_read; diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index 1d6ca1b2e3da..c8824e401671 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -2,8 +2,7 @@ use std::collections::BTreeMap; use std::sync::atomic::AtomicU64; use ahash::HashMap; -use arrow2::array::Int64Array; -use arrow2::datatypes::{DataType, TimeUnit}; +use arrow2::datatypes::DataType; use smallvec::SmallVec; use nohash_hasher::{IntMap, IntSet}; @@ -272,8 +271,7 @@ fn datastore_internal_repr() { store.insert_table(&temporal).unwrap(); store.sanity_check().unwrap(); - // TODO(#1619): bring back formatting - // eprintln!("{store}"); + eprintln!("{store}"); } // --- Temporal --- @@ -283,7 +281,13 @@ fn datastore_internal_repr() { /// /// See also [`IndexedBucket`]. /// -// TODO(#1619): show internal structure once formatting is back +/// Run the following command to display a visualization of the store's internal datastructures and +/// better understand how everything fits together: +/// ```text +/// cargo test -p re_arrow_store -- --nocapture datastore_internal_repr +/// ``` +// +// TODO(#1524): inline visualization once it's back to a manageable state #[derive(Debug)] pub struct IndexedTable { /// The timeline this table operates in, for debugging purposes. @@ -432,26 +436,17 @@ impl Default for IndexedBucketInner { } } -impl IndexedBucket { - /// Returns a (name, [`Int64Array`]) with a logical type matching the timeline. - // TODO(cmc): should be defined in `DataTable` serialization stuff - pub fn times(&self) -> (String, Int64Array) { - crate::profile_function!(); - - let times = Int64Array::from_slice(self.inner.read().col_time.as_slice()); - let logical_type = match self.timeline.typ() { - re_log_types::TimeType::Time => DataType::Timestamp(TimeUnit::Nanosecond, None), - re_log_types::TimeType::Sequence => DataType::Int64, - }; - (self.timeline.name().to_string(), times.to(logical_type)) - } -} - // --- Timeless --- /// The timeless specialization of an [`IndexedTable`]. /// -// TODO(#1619): show internal structure once formatting is back +/// Run the following command to display a visualization of the store's internal datastructures and +/// better understand how everything fits together: +/// ```text +/// cargo test -p re_arrow_store -- --nocapture datastore_internal_repr +/// ``` +// +// TODO(#1524): inline visualization once it's back to a manageable state #[derive(Debug)] pub struct PersistentIndexedTable { /// The entity this table is related to, for debugging purposes. diff --git a/crates/re_arrow_store/src/store_arrow.rs b/crates/re_arrow_store/src/store_arrow.rs new file mode 100644 index 000000000000..f9016ecde50e --- /dev/null +++ b/crates/re_arrow_store/src/store_arrow.rs @@ -0,0 +1,197 @@ +use std::collections::BTreeMap; + +use arrow2::{array::Array, chunk::Chunk, datatypes::Schema}; +use nohash_hasher::IntMap; +use re_log_types::{ + ComponentName, DataCellColumn, DataTable, DataTableResult, RowId, Timeline, COLUMN_INSERT_ID, + COLUMN_NUM_INSTANCES, COLUMN_ROW_ID, +}; + +use crate::store::{IndexedBucket, IndexedBucketInner, PersistentIndexedTable}; + +// --- + +impl IndexedBucket { + /// Serializes the entire bucket into an arrow payload and schema. + /// + /// Column order: + /// - `insert_id` + /// - `row_id` + /// - `time` + /// - `num_instances` + /// - `$cluster_key` + /// - rest of component columns in ascending lexical order + pub fn serialize(&self) -> DataTableResult<(Schema, Chunk>)> { + crate::profile_function!(); + + let Self { + timeline, + cluster_key, + inner, + } = self; + + let IndexedBucketInner { + is_sorted: _, + time_range: _, + col_time, + col_insert_id, + col_row_id, + col_num_instances, + columns, + total_size_bytes: _, + } = &*inner.read(); + + serialize( + cluster_key, + Some((*timeline, col_time)), + col_insert_id, + col_row_id, + col_num_instances, + columns, + ) + } +} + +impl PersistentIndexedTable { + /// Serializes the entire table into an arrow payload and schema. + /// + /// Column order: + /// - `insert_id` + /// - `row_id` + /// - `time` + /// - `num_instances` + /// - `$cluster_key` + /// - rest of component columns in ascending lexical order + pub fn serialize(&self) -> DataTableResult<(Schema, Chunk>)> { + crate::profile_function!(); + + let Self { + ent_path: _, + cluster_key, + col_insert_id, + col_row_id, + col_num_instances, + columns, + total_size_bytes: _, + } = self; + + serialize( + cluster_key, + None, + col_insert_id, + col_row_id, + col_num_instances, + columns, + ) + } +} + +// --- + +fn serialize( + cluster_key: &ComponentName, + col_time: Option<(Timeline, &[i64])>, + col_insert_id: &[u64], + col_row_id: &[RowId], + col_num_instances: &[u32], + table: &IntMap, +) -> DataTableResult<(Schema, Chunk>)> { + crate::profile_function!(); + + let mut schema = Schema::default(); + let mut columns = Vec::new(); + + { + let (control_schema, control_columns) = + serialize_control_columns(col_time, col_insert_id, col_row_id, col_num_instances)?; + schema.fields.extend(control_schema.fields); + schema.metadata.extend(control_schema.metadata); + columns.extend(control_columns.into_iter()); + } + + { + let (data_schema, data_columns) = serialize_data_columns(cluster_key, table)?; + schema.fields.extend(data_schema.fields); + schema.metadata.extend(data_schema.metadata); + columns.extend(data_columns.into_iter()); + } + + Ok((schema, Chunk::new(columns))) +} + +fn serialize_control_columns( + col_time: Option<(Timeline, &[i64])>, + col_insert_id: &[u64], + col_row_id: &[RowId], + col_num_instances: &[u32], +) -> DataTableResult<(Schema, Vec>)> { + crate::profile_function!(); + + let mut schema = Schema::default(); + let mut columns = Vec::new(); + + // NOTE: ordering is taken into account! + // - insert_id + // - row_id + // - time + // - num_instances + + let (insert_id_field, insert_id_column) = + DataTable::serialize_primitive_column(COLUMN_INSERT_ID, col_insert_id, None)?; + schema.fields.push(insert_id_field); + columns.push(insert_id_column); + + let (row_id_field, row_id_column) = + DataTable::serialize_control_column(COLUMN_ROW_ID, col_row_id)?; + schema.fields.push(row_id_field); + columns.push(row_id_column); + + if let Some((timeline, col_time)) = col_time { + let (time_field, time_column) = DataTable::serialize_primitive_column( + timeline.name(), + col_time, + timeline.datatype().into(), + )?; + schema.fields.push(time_field); + columns.push(time_column); + } + + let (num_instances_field, num_instances_column) = + DataTable::serialize_primitive_column(COLUMN_NUM_INSTANCES, col_num_instances, None)?; + schema.fields.push(num_instances_field); + columns.push(num_instances_column); + + Ok((schema, columns)) +} + +fn serialize_data_columns( + cluster_key: &ComponentName, + table: &IntMap, +) -> DataTableResult<(Schema, Vec>)> { + crate::profile_function!(); + + let mut schema = Schema::default(); + let mut columns = Vec::new(); + + // NOTE: ordering is taken into account! + let mut table: BTreeMap<_, _> = table.iter().collect(); + + // Cluster column first and foremost! + // + // NOTE: cannot fail, the cluster key _has_ to be there by definition + let cluster_column = table.remove(&cluster_key).unwrap(); + { + let (field, column) = + DataTable::serialize_data_column(cluster_key.as_str(), cluster_column)?; + schema.fields.push(field); + columns.push(column); + } + + for (component, column) in table { + let (field, column) = DataTable::serialize_data_column(component.as_str(), column)?; + schema.fields.push(field); + columns.push(column); + } + + Ok((schema, columns)) +} diff --git a/crates/re_arrow_store/src/store_format.rs b/crates/re_arrow_store/src/store_format.rs index 0ed827e04d0f..9974f725c228 100644 --- a/crates/re_arrow_store/src/store_format.rs +++ b/crates/re_arrow_store/src/store_format.rs @@ -130,16 +130,15 @@ impl std::fmt::Display for IndexedBucket { }; f.write_fmt(format_args!("{time_range}\n"))?; - // TODO(#1619): bring back formatting - // let (schema, columns) = self.serialize().map_err(|err| { - // re_log::error_once!("couldn't display indexed bucket: {err}"); - // std::fmt::Error - // })?; - // re_format::arrow::format_table( - // columns.columns(), - // schema.fields.iter().map(|field| field.name.as_str()), - // ) - // .fmt(f)?; + let (schema, columns) = self.serialize().map_err(|err| { + re_log::error_once!("couldn't display indexed bucket: {err}"); + std::fmt::Error + })?; + re_format::arrow::format_table( + columns.columns(), + schema.fields.iter().map(|field| field.name.as_str()), + ) + .fmt(f)?; writeln!(f) } @@ -168,16 +167,15 @@ impl std::fmt::Display for PersistentIndexedTable { format_number(self.total_rows() as _), ))?; - // TODO(#1619): bring back formatting - // let (schema, columns) = self.serialize().map_err(|err| { - // re_log::error_once!("couldn't display timeless indexed table: {err}"); - // std::fmt::Error - // })?; - // re_format::arrow::format_table( - // columns.columns(), - // schema.fields.iter().map(|field| field.name.as_str()), - // ) - // .fmt(f)?; + let (schema, columns) = self.serialize().map_err(|err| { + re_log::error_once!("couldn't display timeless indexed table: {err}"); + std::fmt::Error + })?; + re_format::arrow::format_table( + columns.columns(), + schema.fields.iter().map(|field| field.name.as_str()), + ) + .fmt(f)?; writeln!(f) } diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 94b6ddd1ad06..494b320aad55 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -9,7 +9,7 @@ use arrow2::{ offset::Offsets, }; use polars_core::{functions::diag_concat_df, prelude::*}; -use re_log_types::{ComponentName, DataCell}; +use re_log_types::{ComponentName, DataCell, DataTable}; use crate::{ store::InsertIdVec, ArrayExt, DataStore, DataStoreConfig, IndexedBucket, IndexedBucketInner, @@ -209,13 +209,10 @@ impl IndexedBucket { pub fn to_dataframe(&self, store: &DataStore, config: &DataStoreConfig) -> DataFrame { crate::profile_function!(); - let (_, times) = self.times(); - let num_rows = times.len(); - let IndexedBucketInner { is_sorted: _, time_range: _, - col_time: _, + col_time, col_insert_id, col_row_id, col_num_instances, @@ -223,6 +220,14 @@ impl IndexedBucket { total_size_bytes: _, } = &*self.inner.read(); + let (_, times) = DataTable::serialize_primitive_column( + self.timeline.name(), + col_time, + self.timeline.datatype().into(), + ) + .unwrap(); + let num_rows = times.len(); + let insert_ids = config .store_insert_ids .then(|| insert_ids_as_series(&col_insert_id)); @@ -234,7 +239,7 @@ impl IndexedBucket { // One column for the time index. Some(new_infallible_series( self.timeline.name().as_str(), - ×, + &*times, num_rows, )), ] diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index d4ed5489f622..772750f48c97 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -445,11 +445,13 @@ impl IndexedBucket { /// This function makes sure to uphold that restriction, which sometimes means splitting the /// bucket into two uneven parts, or even not splitting it at all. /// - /// Here's an example of an indexed tables configured to have a maximum of 2 rows per bucket: one - /// can see that the 1st and 2nd buckets exceed this maximum in order to uphold the restriction - /// described above: - /// - /// TODO(#1619): show internal structure once formatting is back + /// Run the following command to display a visualization of the store's internal + /// datastructures and better understand how everything fits together: + /// ```text + /// cargo test -p re_arrow_store -- --nocapture datastore_internal_repr + /// ``` + // + // TODO(#1524): inline visualization once it's back to a manageable state fn split(&self) -> Option<(TimeInt, Self)> { let Self { timeline, diff --git a/crates/re_log_types/Cargo.toml b/crates/re_log_types/Cargo.toml index db17514bde85..5cc0fa31314c 100644 --- a/crates/re_log_types/Cargo.toml +++ b/crates/re_log_types/Cargo.toml @@ -56,6 +56,7 @@ array-init = "2.1.0" arrow2 = { workspace = true, features = [ "io_ipc", "io_print", + "compute_aggregate", "compute_concatenate", "compute_aggregate", ] } diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index 77b849152688..063fa44f91b1 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -44,8 +44,6 @@ pub type DataTableResult = ::std::result::Result; // TODO(#1757): The timepoint should be serialized as one column per timeline... that would be both // more efficient and yield much better debugging views of our tables. -// TODO(#1712): implement fast ser/deser paths for primitive types, both control & data. - // --- pub type RowIdVec = SmallVec<[RowId; 4]>; @@ -440,6 +438,7 @@ use arrow2::{ chunk::Chunk, datatypes::{DataType, Field, Schema, TimeUnit}, offset::Offsets, + types::NativeType, }; use arrow2_convert::{ deserialize::TryIntoCollection, field::ArrowField, serialize::ArrowSerialize, @@ -448,6 +447,7 @@ use arrow2_convert::{ // TODO(#1696): Those names should come from the datatypes themselves. +pub const COLUMN_INSERT_ID: &str = "rerun.insert_id"; pub const COLUMN_ROW_ID: &str = "rerun.row_id"; pub const COLUMN_TIMEPOINT: &str = "rerun.timepoint"; pub const COLUMN_ENTITY_PATH: &str = "rerun.entity_path"; @@ -570,9 +570,11 @@ impl DataTable { schema.fields.push(entity_path_field); columns.push(entity_path_column); - // TODO(#1712): This is unnecessarily slow... - let (num_instances_field, num_instances_column) = - Self::serialize_control_column(COLUMN_NUM_INSTANCES, col_num_instances)?; + let (num_instances_field, num_instances_column) = Self::serialize_primitive_column( + COLUMN_NUM_INSTANCES, + col_num_instances.as_slice(), + None, + )?; schema.fields.push(num_instances_field); columns.push(num_instances_column); @@ -619,6 +621,31 @@ impl DataTable { Ok((field, data)) } + /// Serializes a single control column; optimized path for primitive datatypes. + pub fn serialize_primitive_column( + name: &str, + values: &[T], + datatype: Option, + ) -> DataTableResult<(Field, Box)> { + let data = PrimitiveArray::from_slice(values); + + let datatype = datatype.unwrap_or(data.data_type().clone()); + let data = data.to(datatype.clone()).boxed(); + + let mut field = Field::new(name, datatype.clone(), false) + .with_metadata([(METADATA_KIND.to_owned(), METADATA_KIND_CONTROL.to_owned())].into()); + + // TODO(cmc): why do we have to do this manually on the way out, but it's done + // automatically on our behalf on the way in...? + if let DataType::Extension(name, _, _) = datatype { + field + .metadata + .extend([("ARROW:extension:name".to_owned(), name)]); + } + + Ok((field, data)) + } + /// Serializes all data columns into an arrow payload and schema. /// /// They are optional, potentially sparse, and never deserialized on the server-side (not by @@ -714,7 +741,11 @@ impl DataTable { } else { // NOTE: This is a column of cells, it shouldn't ever fail to concatenate since // they share the same underlying type. - let data = arrow2::compute::concatenate::concatenate(cell_refs.as_slice())?; + let data = + arrow2::compute::concatenate::concatenate(cell_refs.as_slice()).map_err(|err| { + re_log::warn_once!("failed to concatenate cells for column {name}"); + err + })?; data_to_lists(column, data, ext_name.cloned()) }; diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index fb6186c501aa..38f6de4dc615 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -47,7 +47,7 @@ pub use self::data_cell::{DataCell, DataCellError, DataCellInner, DataCellResult pub use self::data_row::{DataRow, DataRowError, DataRowResult}; pub use self::data_table::{ DataCellColumn, DataCellOptVec, DataTable, DataTableError, DataTableResult, EntityPathVec, - ErasedTimeVec, NumInstancesVec, RowIdVec, TimePointVec, COLUMN_ENTITY_PATH, + ErasedTimeVec, NumInstancesVec, RowIdVec, TimePointVec, COLUMN_ENTITY_PATH, COLUMN_INSERT_ID, COLUMN_NUM_INSTANCES, COLUMN_ROW_ID, COLUMN_TIMEPOINT, METADATA_KIND, METADATA_KIND_CONTROL, METADATA_KIND_DATA, }; diff --git a/crates/re_log_types/src/time_point/timeline.rs b/crates/re_log_types/src/time_point/timeline.rs index 9d9f14a2c9da..20c207365cbc 100644 --- a/crates/re_log_types/src/time_point/timeline.rs +++ b/crates/re_log_types/src/time_point/timeline.rs @@ -78,6 +78,7 @@ impl Timeline { } /// Returns a formatted string of `time_range` on this `Timeline`. + #[inline] pub fn format_time_range(&self, time_range: &TimeRange) -> String { format!( " - {}: from {} to {} (all inclusive)",