From 93e517d265a5c8b82c80415ec2433454f641a21f Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Wed, 20 May 2026 15:52:23 +0100 Subject: [PATCH] deser array tree from a vortex array Signed-off-by: Onur Satici --- vortex-array/public-api.lock | 186 +++ vortex-array/src/serde/columnar.rs | 1124 +++++++++++++++++++ vortex-array/src/{serde.rs => serde/mod.rs} | 12 + 3 files changed, 1322 insertions(+) create mode 100644 vortex-array/src/serde/columnar.rs rename vortex-array/src/{serde.rs => serde/mod.rs} (98%) diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 51733789a4b..3dbab310aca 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -19938,6 +19938,100 @@ pub fn S::search_sorted_by(&self, F, N, vortex_array::search_sorted::Searc pub mod vortex_array::serde +pub mod vortex_array::serde::columnar + +pub struct vortex_array::serde::columnar::ColumnarArrayTree + +pub vortex_array::serde::columnar::ColumnarArrayTree::buffers: vortex_array::arrays::StructArray + +pub vortex_array::serde::columnar::ColumnarArrayTree::nodes: vortex_array::arrays::StructArray + +impl vortex_array::serde::columnar::ColumnarArrayTree + +pub fn vortex_array::serde::columnar::ColumnarArrayTree::nnodes(&self) -> usize + +pub fn vortex_array::serde::columnar::ColumnarArrayTree::try_new(vortex_array::arrays::PrimitiveArray, vortex_array::arrays::PrimitiveArray, vortex_array::arrays::VarBinViewArray, vortex_array::arrays::PrimitiveArray, vortex_array::arrays::PrimitiveArray, vortex_array::arrays::PrimitiveArray, vortex_array::serde::columnar::StatsColumns, vortex_array::arrays::PrimitiveArray, vortex_array::arrays::PrimitiveArray, vortex_array::arrays::PrimitiveArray) -> vortex_error::VortexResult + +impl core::clone::Clone for vortex_array::serde::columnar::ColumnarArrayTree + +pub fn vortex_array::serde::columnar::ColumnarArrayTree::clone(&self) -> vortex_array::serde::columnar::ColumnarArrayTree + +impl core::fmt::Debug for vortex_array::serde::columnar::ColumnarArrayTree + +pub fn vortex_array::serde::columnar::ColumnarArrayTree::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub struct vortex_array::serde::columnar::ColumnarSerializedArray + +impl vortex_array::serde::columnar::ColumnarSerializedArray + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::child(&self, usize) -> vortex_array::serde::columnar::ColumnarSerializedArray + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::decode(&self, &vortex_array::dtype::DType, usize, &vortex_session::registry::ReadContext, &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::encoding_id(&self) -> u16 + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::from_segment_and_tree(vortex_array::buffer::BufferHandle, alloc::sync::Arc) -> vortex_error::VortexResult + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::metadata(&self) -> vortex_buffer::ByteBuffer + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::nbuffers(&self) -> usize + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::nchildren(&self) -> usize + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::new(alloc::sync::Arc, alloc::sync::Arc<[vortex_array::buffer::BufferHandle]>) -> vortex_error::VortexResult + +impl core::clone::Clone for vortex_array::serde::columnar::ColumnarSerializedArray + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::clone(&self) -> vortex_array::serde::columnar::ColumnarSerializedArray + +impl core::fmt::Debug for vortex_array::serde::columnar::ColumnarSerializedArray + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub struct vortex_array::serde::columnar::StatsColumns(_) + +impl vortex_array::serde::columnar::StatsColumns + +pub fn vortex_array::serde::columnar::StatsColumns::as_struct(&self) -> &vortex_array::arrays::StructArray + +pub fn vortex_array::serde::columnar::StatsColumns::into_struct(self) -> vortex_array::arrays::StructArray + +pub fn vortex_array::serde::columnar::StatsColumns::new(vortex_array::arrays::StructArray) -> vortex_error::VortexResult + +pub fn vortex_array::serde::columnar::StatsColumns::nrows(&self) -> usize + +pub fn vortex_array::serde::columnar::StatsColumns::read(&self, usize, &vortex_array::dtype::DType, &mut vortex_array::ExecutionCtx, &vortex_session::VortexSession) -> vortex_error::VortexResult> + +impl core::clone::Clone for vortex_array::serde::columnar::StatsColumns + +pub fn vortex_array::serde::columnar::StatsColumns::clone(&self) -> vortex_array::serde::columnar::StatsColumns + +impl core::fmt::Debug for vortex_array::serde::columnar::StatsColumns + +pub fn vortex_array::serde::columnar::StatsColumns::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub struct vortex_array::serde::columnar::StatsColumnsBuilder + +impl vortex_array::serde::columnar::StatsColumnsBuilder + +pub fn vortex_array::serde::columnar::StatsColumnsBuilder::finish(self) -> vortex_error::VortexResult + +pub fn vortex_array::serde::columnar::StatsColumnsBuilder::push(&mut self, core::option::Option<&vortex_array::stats::StatsSet>) + +pub fn vortex_array::serde::columnar::StatsColumnsBuilder::with_capacity(usize) -> Self + +pub static vortex_array::serde::columnar::BUFFER_COLUMNS_DTYPE: std::sync::lazy_lock::LazyLock + +pub static vortex_array::serde::columnar::NODES_COLUMNS_DTYPE: std::sync::lazy_lock::LazyLock + +pub static vortex_array::serde::columnar::STATS_COLUMNS_DTYPE: std::sync::lazy_lock::LazyLock + +pub fn vortex_array::serde::columnar::compute_buffer_offsets(&[u16]) -> vortex_buffer::buffer::Buffer + +pub fn vortex_array::serde::columnar::compute_subtree_sizes(&[u8]) -> vortex_buffer::buffer::Buffer + +pub fn vortex_array::serde::columnar::serialize_to_columnar_tree(&vortex_array::ArrayRef, &vortex_array::ArrayContext, &vortex_session::VortexSession, &vortex_array::serde::SerializeOptions) -> vortex_error::VortexResult<(alloc::vec::Vec, vortex_array::serde::columnar::ColumnarArrayTree)> + pub struct vortex_array::serde::ArrayNodeFlatBuffer<'a> impl<'a> vortex_array::serde::ArrayNodeFlatBuffer<'a> @@ -19946,6 +20040,54 @@ pub fn vortex_array::serde::ArrayNodeFlatBuffer<'a>::try_new(&'a vortex_array::A pub fn vortex_array::serde::ArrayNodeFlatBuffer<'a>::try_write_flatbuffer<'fb>(&self, &mut flatbuffers::builder::FlatBufferBuilder<'fb>) -> vortex_error::VortexResult>> +pub struct vortex_array::serde::ColumnarArrayTree + +pub vortex_array::serde::ColumnarArrayTree::buffers: vortex_array::arrays::StructArray + +pub vortex_array::serde::ColumnarArrayTree::nodes: vortex_array::arrays::StructArray + +impl vortex_array::serde::columnar::ColumnarArrayTree + +pub fn vortex_array::serde::columnar::ColumnarArrayTree::nnodes(&self) -> usize + +pub fn vortex_array::serde::columnar::ColumnarArrayTree::try_new(vortex_array::arrays::PrimitiveArray, vortex_array::arrays::PrimitiveArray, vortex_array::arrays::VarBinViewArray, vortex_array::arrays::PrimitiveArray, vortex_array::arrays::PrimitiveArray, vortex_array::arrays::PrimitiveArray, vortex_array::serde::columnar::StatsColumns, vortex_array::arrays::PrimitiveArray, vortex_array::arrays::PrimitiveArray, vortex_array::arrays::PrimitiveArray) -> vortex_error::VortexResult + +impl core::clone::Clone for vortex_array::serde::columnar::ColumnarArrayTree + +pub fn vortex_array::serde::columnar::ColumnarArrayTree::clone(&self) -> vortex_array::serde::columnar::ColumnarArrayTree + +impl core::fmt::Debug for vortex_array::serde::columnar::ColumnarArrayTree + +pub fn vortex_array::serde::columnar::ColumnarArrayTree::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub struct vortex_array::serde::ColumnarSerializedArray + +impl vortex_array::serde::columnar::ColumnarSerializedArray + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::child(&self, usize) -> vortex_array::serde::columnar::ColumnarSerializedArray + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::decode(&self, &vortex_array::dtype::DType, usize, &vortex_session::registry::ReadContext, &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::encoding_id(&self) -> u16 + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::from_segment_and_tree(vortex_array::buffer::BufferHandle, alloc::sync::Arc) -> vortex_error::VortexResult + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::metadata(&self) -> vortex_buffer::ByteBuffer + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::nbuffers(&self) -> usize + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::nchildren(&self) -> usize + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::new(alloc::sync::Arc, alloc::sync::Arc<[vortex_array::buffer::BufferHandle]>) -> vortex_error::VortexResult + +impl core::clone::Clone for vortex_array::serde::columnar::ColumnarSerializedArray + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::clone(&self) -> vortex_array::serde::columnar::ColumnarSerializedArray + +impl core::fmt::Debug for vortex_array::serde::columnar::ColumnarSerializedArray + +pub fn vortex_array::serde::columnar::ColumnarSerializedArray::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + pub struct vortex_array::serde::SerializeOptions pub vortex_array::serde::SerializeOptions::include_padding: bool @@ -20008,6 +20150,44 @@ impl core::fmt::Debug for vortex_array::serde::SerializedArray pub fn vortex_array::serde::SerializedArray::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result +pub struct vortex_array::serde::StatsColumns(_) + +impl vortex_array::serde::columnar::StatsColumns + +pub fn vortex_array::serde::columnar::StatsColumns::as_struct(&self) -> &vortex_array::arrays::StructArray + +pub fn vortex_array::serde::columnar::StatsColumns::into_struct(self) -> vortex_array::arrays::StructArray + +pub fn vortex_array::serde::columnar::StatsColumns::new(vortex_array::arrays::StructArray) -> vortex_error::VortexResult + +pub fn vortex_array::serde::columnar::StatsColumns::nrows(&self) -> usize + +pub fn vortex_array::serde::columnar::StatsColumns::read(&self, usize, &vortex_array::dtype::DType, &mut vortex_array::ExecutionCtx, &vortex_session::VortexSession) -> vortex_error::VortexResult> + +impl core::clone::Clone for vortex_array::serde::columnar::StatsColumns + +pub fn vortex_array::serde::columnar::StatsColumns::clone(&self) -> vortex_array::serde::columnar::StatsColumns + +impl core::fmt::Debug for vortex_array::serde::columnar::StatsColumns + +pub fn vortex_array::serde::columnar::StatsColumns::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub struct vortex_array::serde::StatsColumnsBuilder + +impl vortex_array::serde::columnar::StatsColumnsBuilder + +pub fn vortex_array::serde::columnar::StatsColumnsBuilder::finish(self) -> vortex_error::VortexResult + +pub fn vortex_array::serde::columnar::StatsColumnsBuilder::push(&mut self, core::option::Option<&vortex_array::stats::StatsSet>) + +pub fn vortex_array::serde::columnar::StatsColumnsBuilder::with_capacity(usize) -> Self + +pub static vortex_array::serde::BUFFER_COLUMNS_DTYPE: std::sync::lazy_lock::LazyLock + +pub static vortex_array::serde::NODES_COLUMNS_DTYPE: std::sync::lazy_lock::LazyLock + +pub static vortex_array::serde::STATS_COLUMNS_DTYPE: std::sync::lazy_lock::LazyLock + pub trait vortex_array::serde::ArrayChildren pub fn vortex_array::serde::ArrayChildren::get(&self, usize, &vortex_array::dtype::DType, usize) -> vortex_error::VortexResult @@ -20024,6 +20204,12 @@ pub fn T::is_empty(&self) -> bool pub fn T::len(&self) -> usize +pub fn vortex_array::serde::compute_buffer_offsets(&[u16]) -> vortex_buffer::buffer::Buffer + +pub fn vortex_array::serde::compute_subtree_sizes(&[u8]) -> vortex_buffer::buffer::Buffer + +pub fn vortex_array::serde::serialize_to_columnar_tree(&vortex_array::ArrayRef, &vortex_array::ArrayContext, &vortex_session::VortexSession, &vortex_array::serde::SerializeOptions) -> vortex_error::VortexResult<(alloc::vec::Vec, vortex_array::serde::columnar::ColumnarArrayTree)> + pub mod vortex_array::session pub struct vortex_array::session::ArraySession diff --git a/vortex-array/src/serde/columnar.rs b/vortex-array/src/serde/columnar.rs new file mode 100644 index 00000000000..f69e214ee4d --- /dev/null +++ b/vortex-array/src/serde/columnar.rs @@ -0,0 +1,1124 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Columnar serialization of array trees — a parallel to [`SerializedArray`] that sources +//! its encoding-tree metadata from a columnar representation rather than a flatbuffer. +//! +//! `SerializedArray` parses a per-array-tree flatbuffer (`fba::Array`) and navigates it via +//! offsets baked into the fb. That format lives in the trailing buffer of a `FlatLayout` +//! segment. `ColumnarSerializedArray` is the parallel decode entry point used when the +//! encoding tree is stored as a struct-of-Lists vortex array. +//! The plugin contract — [`ArrayChildren`] plus `plugin.deserialize(dtype, len, metadata, +//! buffers, children, session)` — doesn't care which source the metadata/buffers/children +//! come from, so this module implements the same decode flow without ever constructing or +//! parsing a flatbuffer. +//! +//! The writer entry point [`serialize_to_columnar_tree`] walks an [`ArrayRef`] in +//! pre-order and produces both the data-segment buffer list (no trailing array node flatbuffer) +//! and a [`ColumnarArrayTree`] capturing the encoding tree, per-node stats, and per-buffer +//! descriptors. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; +use std::sync::LazyLock; + +use vortex_buffer::Alignment; +use vortex_buffer::Buffer; +use vortex_buffer::ByteBuffer; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; +use vortex_session::VortexSession; +use vortex_session::registry::ReadContext; + +use crate::ArrayContext; +use crate::ArrayRef; +use crate::IntoArray; +use crate::VortexSessionExecute; +use crate::arrays::PrimitiveArray; +use crate::arrays::StructArray; +use crate::arrays::VarBinViewArray; +use crate::arrays::struct_::StructArrayExt; +use crate::buffer::BufferHandle; +use crate::builders::ArrayBuilder; +use crate::builders::BoolBuilder; +use crate::builders::PrimitiveBuilder; +use crate::builders::VarBinViewBuilder; +use crate::dtype::DType; +use crate::dtype::FieldName; +use crate::dtype::Nullability; +use crate::dtype::PType; +use crate::dtype::StructFields; +use crate::executor::ExecutionCtx; +use crate::expr::stats::Precision; +use crate::expr::stats::Stat; +use crate::scalar::ScalarValue; +use crate::serde::ArrayChildren; +use crate::serde::SerializeOptions; +use crate::session::ArraySessionExt; +use crate::stats::StatsSet; +use crate::validity::Validity; + +/// Canonical column names for [`STATS_COLUMNS_DTYPE`]. +const STATS_FIELDS: [&str; 11] = [ + "min", + "min_exact", + "max", + "max_exact", + "sum", + "null_count", + "nan_count", + "uncompressed_size_in_bytes", + "is_constant", + "is_sorted", + "is_strict_sorted", +]; + +/// The canonical dtype of a [`StatsColumns`] struct: 11 nullable typed columns mirroring +/// `fba::ArrayStats`. Binary columns hold `ScalarValue::to_proto_bytes` blobs (decoded +/// using the array's dtype at read time); the `*_exact` bools tag whether `min`/`max` +/// were exact. `sum` is exact-only by construction so there is no `sum_exact` column. +pub static STATS_COLUMNS_DTYPE: LazyLock = LazyLock::new(|| { + let binary = || DType::Binary(Nullability::Nullable); + let bool_ = || DType::Bool(Nullability::Nullable); + let u64_ = || DType::Primitive(PType::U64, Nullability::Nullable); + DType::Struct( + StructFields::new( + STATS_FIELDS + .iter() + .map(|n| FieldName::from(*n)) + .collect::>() + .into(), + vec![ + binary(), + bool_(), + binary(), + bool_(), + binary(), + u64_(), + u64_(), + u64_(), + bool_(), + bool_(), + bool_(), + ], + ), + Nullability::NonNullable, + ) +}); + +/// Per-node statistics stored as a [`StructArray`] with the canonical +/// [`STATS_COLUMNS_DTYPE`] schema — 11 sibling typed nullable columns, one per stat slot. +/// +/// Hydration into a typed [`StatsSet`] happens at decode time via [`Self::read`], when +/// the array's dtype is known. +#[derive(Debug, Clone)] +pub struct StatsColumns(StructArray); + +impl StatsColumns { + /// Wrap a [`StructArray`], validating its dtype matches [`STATS_COLUMNS_DTYPE`]. + pub fn new(inner: StructArray) -> VortexResult { + if inner.as_ref().dtype() != &*STATS_COLUMNS_DTYPE { + vortex_bail!( + "StatsColumns dtype mismatch: got {}, expected {}", + inner.as_ref().dtype(), + *STATS_COLUMNS_DTYPE, + ); + } + Ok(Self(inner)) + } + + /// Reference to underlying [`StructArray`]. + pub fn as_struct(&self) -> &StructArray { + &self.0 + } + + /// Consume into the underlying [`StructArray`]. + pub fn into_struct(self) -> StructArray { + self.0 + } + + /// Number of node-rows held by these columns. + pub fn nrows(&self) -> usize { + self.0.as_ref().len() + } + + /// Read node `idx` from the columns into a typed [`StatsSet`]. + /// + /// Returns `None` if every column is null at that row (no stats were persisted for + /// this node). Proto-encoded `min` / `max` / `sum` are deserialized using + /// `array_dtype` to recover the typed `ScalarValue`. + /// + /// `ctx` is reused across the per-stat `execute_scalar` calls for this row. + pub fn read( + &self, + idx: usize, + array_dtype: &DType, + ctx: &mut ExecutionCtx, + session: &VortexSession, + ) -> VortexResult> { + let mut field = |name: &str| -> VortexResult { + self.0 + .unmasked_field_by_name_opt(name) + .ok_or_else(|| vortex_err!("StatsColumns missing field {}", name))? + .execute_scalar(idx, ctx) + }; + + let mut set = StatsSet::default(); + + // Binary stats: bytes + (for min/max) an exact tag. + if let Some(bytes) = field("min")?.as_binary().value().cloned() + && let Some(stat_dtype) = Stat::Min.dtype(array_dtype) + && let Some(value) = + ScalarValue::from_proto_bytes(bytes.as_slice(), &stat_dtype, session)? + { + let exact = field("min_exact")?.as_bool().value().unwrap_or(true); + let kind = if exact { + Precision::Exact + } else { + Precision::Inexact + }; + set.set(Stat::Min, kind(value)); + } + if let Some(bytes) = field("max")?.as_binary().value().cloned() + && let Some(stat_dtype) = Stat::Max.dtype(array_dtype) + && let Some(value) = + ScalarValue::from_proto_bytes(bytes.as_slice(), &stat_dtype, session)? + { + let exact = field("max_exact")?.as_bool().value().unwrap_or(true); + let kind = if exact { + Precision::Exact + } else { + Precision::Inexact + }; + set.set(Stat::Max, kind(value)); + } + if let Some(bytes) = field("sum")?.as_binary().value().cloned() + && let Some(stat_dtype) = Stat::Sum.dtype(array_dtype) + && let Some(value) = + ScalarValue::from_proto_bytes(bytes.as_slice(), &stat_dtype, session)? + { + set.set(Stat::Sum, Precision::Exact(value)); + } + + // Typed primitive counts. + if let Some(v) = field("null_count")?.as_primitive().as_::() { + set.set(Stat::NullCount, Precision::Exact(ScalarValue::from(v))); + } + if let Some(v) = field("nan_count")?.as_primitive().as_::() { + set.set(Stat::NaNCount, Precision::Exact(ScalarValue::from(v))); + } + if let Some(v) = field("uncompressed_size_in_bytes")? + .as_primitive() + .as_::() + { + set.set( + Stat::UncompressedSizeInBytes, + Precision::Exact(ScalarValue::from(v)), + ); + } + + // Typed bool predicates. + if let Some(v) = field("is_constant")?.as_bool().value() { + set.set(Stat::IsConstant, Precision::Exact(ScalarValue::from(v))); + } + if let Some(v) = field("is_sorted")?.as_bool().value() { + set.set(Stat::IsSorted, Precision::Exact(ScalarValue::from(v))); + } + if let Some(v) = field("is_strict_sorted")?.as_bool().value() { + set.set(Stat::IsStrictSorted, Precision::Exact(ScalarValue::from(v))); + } + + Ok((!set.is_empty()).then_some(set)) + } +} + +/// Streaming accumulator for [`StatsColumns`]. Push one [`StatsSet`] per node (or `None` +/// for "no stats persisted"); call [`Self::finish`] to assemble a [`StructArray`] with +/// the canonical [`STATS_COLUMNS_DTYPE`] schema. +pub struct StatsColumnsBuilder { + n: usize, + min: VarBinViewBuilder, + min_exact: BoolBuilder, + max: VarBinViewBuilder, + max_exact: BoolBuilder, + sum: VarBinViewBuilder, + null_count: PrimitiveBuilder, + nan_count: PrimitiveBuilder, + uncompressed_size_in_bytes: PrimitiveBuilder, + is_constant: BoolBuilder, + is_sorted: BoolBuilder, + is_strict_sorted: BoolBuilder, +} + +impl StatsColumnsBuilder { + pub fn with_capacity(n: usize) -> Self { + let nullable = Nullability::Nullable; + let bin = || VarBinViewBuilder::with_capacity(DType::Binary(nullable), n); + let bool_ = || BoolBuilder::with_capacity(nullable, n); + let u64_ = || PrimitiveBuilder::::with_capacity(nullable, n); + Self { + n: 0, + min: bin(), + min_exact: bool_(), + max: bin(), + max_exact: bool_(), + sum: bin(), + null_count: u64_(), + nan_count: u64_(), + uncompressed_size_in_bytes: u64_(), + is_constant: bool_(), + is_sorted: bool_(), + is_strict_sorted: bool_(), + } + } + + /// Append one node's stats. `None` writes all columns as null at this row. + pub fn push(&mut self, stats: Option<&StatsSet>) { + self.n += 1; + let Some(stats) = stats else { + return self.push_all_nulls(); + }; + + let bool_dtype = DType::Bool(Nullability::NonNullable); + let u64_dtype: DType = PType::U64.into(); + + // min/max: serialize value + record exact flag. + match stats.get(Stat::Min) { + Some(p) => { + let exact = p.is_exact(); + let bytes = ScalarValue::to_proto_bytes::>(Some(&p.into_inner())); + self.min.append_value(&bytes); + self.min_exact.append_value(exact); + } + None => { + self.min.append_null(); + self.min_exact.append_null(); + } + } + match stats.get(Stat::Max) { + Some(p) => { + let exact = p.is_exact(); + let bytes = ScalarValue::to_proto_bytes::>(Some(&p.into_inner())); + self.max.append_value(&bytes); + self.max_exact.append_value(exact); + } + None => { + self.max.append_null(); + self.max_exact.append_null(); + } + } + // sum: exact-only; inexact sums are dropped. + match stats.get(Stat::Sum).and_then(|p| p.as_exact()) { + Some(v) => { + let bytes = ScalarValue::to_proto_bytes::>(Some(&v)); + self.sum.append_value(&bytes); + } + None => self.sum.append_null(), + } + push_opt_u64( + &mut self.null_count, + stats + .get_as::(Stat::NullCount, &u64_dtype) + .and_then(Precision::as_exact), + ); + push_opt_u64( + &mut self.nan_count, + stats + .get_as::(Stat::NaNCount, &u64_dtype) + .and_then(Precision::as_exact), + ); + push_opt_u64( + &mut self.uncompressed_size_in_bytes, + stats + .get_as::(Stat::UncompressedSizeInBytes, &u64_dtype) + .and_then(Precision::as_exact), + ); + push_opt_bool( + &mut self.is_constant, + stats + .get_as::(Stat::IsConstant, &bool_dtype) + .and_then(Precision::as_exact), + ); + push_opt_bool( + &mut self.is_sorted, + stats + .get_as::(Stat::IsSorted, &bool_dtype) + .and_then(Precision::as_exact), + ); + push_opt_bool( + &mut self.is_strict_sorted, + stats + .get_as::(Stat::IsStrictSorted, &bool_dtype) + .and_then(Precision::as_exact), + ); + } + + fn push_all_nulls(&mut self) { + self.min.append_null(); + self.min_exact.append_null(); + self.max.append_null(); + self.max_exact.append_null(); + self.sum.append_null(); + self.null_count.append_null(); + self.nan_count.append_null(); + self.uncompressed_size_in_bytes.append_null(); + self.is_constant.append_null(); + self.is_sorted.append_null(); + self.is_strict_sorted.append_null(); + } + + /// Materialize the accumulated rows into a [`StatsColumns`]. + pub fn finish(mut self) -> VortexResult { + let n = self.n; + let fields: Vec = vec![ + self.min.finish_into_varbinview().into_array(), + self.min_exact.finish_into_bool().into_array(), + self.max.finish_into_varbinview().into_array(), + self.max_exact.finish_into_bool().into_array(), + self.sum.finish_into_varbinview().into_array(), + self.null_count.finish_into_primitive().into_array(), + self.nan_count.finish_into_primitive().into_array(), + self.uncompressed_size_in_bytes + .finish_into_primitive() + .into_array(), + self.is_constant.finish_into_bool().into_array(), + self.is_sorted.finish_into_bool().into_array(), + self.is_strict_sorted.finish_into_bool().into_array(), + ]; + let names = STATS_FIELDS + .iter() + .map(|s| FieldName::from(*s)) + .collect::>(); + let inner = StructArray::try_new(names.into(), fields, n, Validity::NonNullable)?; + StatsColumns::new(inner) + } +} + +fn push_opt_u64(builder: &mut PrimitiveBuilder, v: Option) { + match v { + Some(v) => builder.append_value(v), + None => builder.append_null(), + } +} + +fn push_opt_bool(builder: &mut BoolBuilder, v: Option) { + match v { + Some(v) => builder.append_value(v), + None => builder.append_null(), + } +} + +/// Canonical schema of the `nodes` struct that backs a [`ColumnarArrayTree`] +/// (one row per `ArrayNode` in pre-order). +/// +/// Carries everything a `ColumnarSerializedArray` needs to navigate and decode a node: +/// encoding id, child count, plugin-specific metadata, buffer count, the precomputed +/// `subtree_size` and `buffer_offset` nav values, and a nested `stats` struct (see +/// [`STATS_COLUMNS_DTYPE`]). +pub static NODES_COLUMNS_DTYPE: LazyLock = LazyLock::new(|| { + let prim = |p: PType| DType::Primitive(p, Nullability::NonNullable); + DType::Struct( + StructFields::new( + NODE_FIELDS + .iter() + .map(|n| FieldName::from(*n)) + .collect::>() + .into(), + vec![ + prim(PType::U16), // encoding_id + prim(PType::U8), // child_count + DType::Binary(Nullability::NonNullable), // metadata + prim(PType::U16), // buffers_per_node + prim(PType::U32), // subtree_size + prim(PType::U32), // buffer_offset + STATS_COLUMNS_DTYPE.clone(), // stats + ], + ), + Nullability::NonNullable, + ) +}); + +const NODE_FIELDS: [&str; 7] = [ + "encoding_id", + "child_count", + "metadata", + "buffers_per_node", + "subtree_size", + "buffer_offset", + "stats", +]; + +/// Canonical schema of the `buffers` struct that backs a [`ColumnarArrayTree`] +/// (one row per buffer descriptor, concatenated across all nodes in pre-order). +pub static BUFFER_COLUMNS_DTYPE: LazyLock = LazyLock::new(|| { + let nn = Nullability::NonNullable; + let prim = |p: PType| DType::Primitive(p, nn); + DType::Struct( + StructFields::new( + BUFFER_FIELDS + .iter() + .map(|n| FieldName::from(*n)) + .collect::>() + .into(), + vec![prim(PType::U16), prim(PType::U8), prim(PType::U32)], + ), + nn, + ) +}); + +const BUFFER_FIELDS: [&str; 3] = ["padding", "alignment_exponent", "length"]; + +/// Columnar representation of one `ArrayNode` tree, shared by all +/// `ColumnarSerializedArray` nodes that navigate it via `Arc`. +/// +/// The private typed-field handles below are zero-copy [`Arc`] clones of the underlying +/// struct fields, supplied to [`Self::try_new`] so per-node decode access doesn't +/// pay a field-name lookup + downcast per call. +/// +/// `subtree_size` and `buffer_offset` give O(1) child navigation / buffer slicing. +#[derive(Debug, Clone)] +pub struct ColumnarArrayTree { + /// Canonical `NODES_COLUMNS_DTYPE` struct, one row per array node. + pub nodes: StructArray, + /// Canonical `BUFFERS_COLUMNS_DTYPE` struct, one row per buffer descriptor. + pub buffers: StructArray, + + // Cached references to the columns of the struct arrays above + encoding_ids: PrimitiveArray, + child_counts: PrimitiveArray, + node_metadata: VarBinViewArray, + buffers_per_node: PrimitiveArray, + subtree_sizes: PrimitiveArray, + buffer_offsets: PrimitiveArray, + buffer_padding: PrimitiveArray, + buffer_alignment_exponent: PrimitiveArray, + buffer_length: PrimitiveArray, + stats: StatsColumns, +} + +/// Compute `subtree_sizes` from `child_counts` via a single right-to-left pass. +/// +/// In pre-order traversal, a node's subtree occupies a contiguous range of indices +/// starting at the node, so a node's subtree size is `1 + sum of its children's subtree +/// sizes`. Iterating right-to-left guarantees each node's children have already been +/// visited when we reach it. +pub fn compute_subtree_sizes(child_counts: &[u8]) -> Buffer { + let n = child_counts.len(); + let mut sizes = vec![0u32; n]; + for i in (0..n).rev() { + let mut total = 1u32; + let mut cursor = i + 1; + for _ in 0..child_counts[i] { + let child_size = sizes[cursor]; + total += child_size; + cursor += child_size as usize; + } + sizes[i] = total; + } + Buffer::from(sizes) +} + +/// Compute `buffer_offsets` as a prefix sum of `buffers_per_node`. +pub fn compute_buffer_offsets(buffers_per_node: &[u16]) -> Buffer { + buffers_per_node + .iter() + .scan(0u32, |acc, &n| { + let out = *acc; + *acc += n as u32; + Some(out) + }) + .collect::>() + .into() +} + +impl ColumnarArrayTree { + /// Construct a `ColumnarArrayTree` from typed per-node and per-buffer columns, + /// assembling the canonical `nodes` and `buffers` [`StructArray`]s in the process. + /// + /// The input types — `PrimitiveArray` / `VarBinViewArray` / [`StatsColumns`] — match + /// the outer slots of [`NODES_COLUMNS_DTYPE`] and [`BUFFER_COLUMNS_DTYPE`], so dtype + /// validation collapses to length consistency, which [`StructArray::try_new`] checks. + /// `PrimitiveArray::ptype` per column is trusted: writer callers always go through + /// the typed helpers, and reader callers must downcast field-by-field before calling. + #[allow(clippy::too_many_arguments)] + pub fn try_new( + encoding_ids: PrimitiveArray, + child_counts: PrimitiveArray, + node_metadata: VarBinViewArray, + buffers_per_node: PrimitiveArray, + subtree_sizes: PrimitiveArray, + buffer_offsets: PrimitiveArray, + stats: StatsColumns, + buffer_padding: PrimitiveArray, + buffer_alignment_exponent: PrimitiveArray, + buffer_length: PrimitiveArray, + ) -> VortexResult { + let n_nodes = encoding_ids.as_ref().len(); + let n_buffers = buffer_padding.as_ref().len(); + + let nodes = StructArray::try_new( + NODE_FIELDS + .iter() + .map(|s| FieldName::from(*s)) + .collect::>() + .into(), + vec![ + encoding_ids.clone().into_array(), + child_counts.clone().into_array(), + node_metadata.clone().into_array(), + buffers_per_node.clone().into_array(), + subtree_sizes.clone().into_array(), + buffer_offsets.clone().into_array(), + stats.clone().into_struct().into_array(), + ], + n_nodes, + Validity::NonNullable, + )?; + + let buffers = StructArray::try_new( + BUFFER_FIELDS + .iter() + .map(|s| FieldName::from(*s)) + .collect::>() + .into(), + vec![ + buffer_padding.clone().into_array(), + buffer_alignment_exponent.clone().into_array(), + buffer_length.clone().into_array(), + ], + n_buffers, + Validity::NonNullable, + )?; + + Ok(Self { + nodes, + buffers, + encoding_ids, + child_counts, + node_metadata, + buffers_per_node, + subtree_sizes, + buffer_offsets, + buffer_padding, + buffer_alignment_exponent, + buffer_length, + stats, + }) + } + + /// Number of nodes in the tree. + pub fn nnodes(&self) -> usize { + self.nodes.as_ref().len() + } +} + +/// Parallel to [`crate::serde::SerializedArray`] but sourced from a columnar representation +/// of the encoding tree rather than a flatbuffer. +/// +/// Holds an `Arc` plus a `node_index` that identifies the current +/// node within the tree. `child(idx)` returns a new `ColumnarSerializedArray` pointing +/// at the requested child by computing the child's pre-order index from +/// `subtree_sizes`. +/// +/// `decode()` performs the same plugin dispatch as `SerializedArray::decode`, just sourcing +/// metadata/buffers/stats from the columnar tree. +#[derive(Clone)] +pub struct ColumnarSerializedArray { + tree: Arc, + node_index: usize, + buffers: Arc<[BufferHandle]>, +} + +impl Debug for ColumnarSerializedArray { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ColumnarSerializedArray") + .field("encoding_id", &self.encoding_id()) + .field("node_index", &self.node_index) + .field("nchildren", &self.nchildren()) + .field("nbuffers", &self.nbuffers()) + .finish() + } +} + +impl ColumnarSerializedArray { + /// Construct a new root-level `ColumnarSerializedArray` for the given tree. + pub fn new(tree: Arc, buffers: Arc<[BufferHandle]>) -> VortexResult { + if tree.nnodes() == 0 { + vortex_bail!("ColumnarArrayTree must have at least one node"); + } + Ok(Self { + tree, + node_index: 0, + buffers, + }) + } + + /// Slice the data segment into per-buffer handles using the descriptors in `tree`, + /// then construct a root-level `ColumnarSerializedArray`. + /// + /// The segment is expected to be data-only — no trailing flatbuffer or length suffix — + /// as produced by [`serialize_to_columnar_tree`]. + pub fn from_segment_and_tree( + segment: BufferHandle, + tree: Arc, + ) -> VortexResult { + let segment = segment.ensure_aligned(Alignment::none())?; + let n_buffers = tree.buffer_length.as_ref().len(); + let padding = tree.buffer_padding.as_slice::(); + let lengths = tree.buffer_length.as_slice::(); + let alignments = tree.buffer_alignment_exponent.as_slice::(); + let mut handles: Vec = Vec::with_capacity(n_buffers); + let mut offset = 0; + for i in 0..n_buffers { + offset += padding[i] as usize; + let buffer_len = lengths[i] as usize; + let alignment = Alignment::from_exponent(alignments[i]); + let buffer = segment.slice(offset..(offset + buffer_len)); + handles.push(buffer.ensure_aligned(alignment)?); + offset += buffer_len; + } + Self::new(tree, Arc::from(handles)) + } + + /// Returns the encoding id (as the interned `u16` in the file's `ArrayContext`) of the + /// current node. + pub fn encoding_id(&self) -> u16 { + self.tree.encoding_ids.as_slice::()[self.node_index] + } + + /// Returns the metadata bytes for the current node. + pub fn metadata(&self) -> ByteBuffer { + self.tree.node_metadata.bytes_at(self.node_index) + } + + /// Returns the number of direct children of the current node. + pub fn nchildren(&self) -> usize { + self.tree.child_counts.as_slice::()[self.node_index] as usize + } + + /// Returns a `ColumnarSerializedArray` pointing at the `idx`th direct child of the + /// current node. + pub fn child(&self, idx: usize) -> ColumnarSerializedArray { + let n_children = self.nchildren(); + if idx >= n_children { + vortex_panic!( + "Invalid child index {} for node with {} children", + idx, + n_children + ); + } + // Children are laid out in pre-order immediately after the current node. The first + // child is at node_index + 1; each subsequent child sits at the previous child's + // index + that child's subtree size. + let subtree_sizes = self.tree.subtree_sizes.as_slice::(); + let mut cursor = self.node_index + 1; + for _ in 0..idx { + cursor += subtree_sizes[cursor] as usize; + } + Self { + tree: Arc::clone(&self.tree), + node_index: cursor, + buffers: Arc::clone(&self.buffers), + } + } + + /// Number of buffers owned by the current node. + pub fn nbuffers(&self) -> usize { + self.tree.buffers_per_node.as_slice::()[self.node_index] as usize + } + + /// Return the slice of buffer handles owned by the current node. + fn node_buffers(&self) -> VortexResult<&[BufferHandle]> { + let start = self.tree.buffer_offsets.as_slice::()[self.node_index] as usize; + let count = self.nbuffers(); + self.buffers.get(start..start + count).ok_or_else(|| { + vortex_err!( + "buffer indices {}..{} out of range for {} buffers", + start, + start + count, + self.buffers.len(), + ) + }) + } + + /// Decode this node into an `ArrayRef` using the same plugin contract as + /// [`crate::serde::SerializedArray::decode`]. + pub fn decode( + &self, + dtype: &DType, + len: usize, + ctx: &ReadContext, + session: &VortexSession, + ) -> VortexResult { + let encoding_idx = self.encoding_id(); + let encoding_id = ctx + .resolve(encoding_idx) + .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?; + let plugin = session + .arrays() + .registry() + .find(&encoding_id) + .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?; + + let buffers = self.node_buffers()?; + let children = ColumnarSerializedArrayChildren { + ser: self, + ctx, + session, + }; + + let metadata = self.metadata(); + let decoded = + plugin.deserialize(dtype, len, metadata.as_slice(), buffers, &children, session)?; + + assert_eq!( + decoded.len(), + len, + "Array decoded from {} has incorrect length {}, expected {}", + encoding_id, + decoded.len(), + len + ); + assert_eq!( + decoded.dtype(), + dtype, + "Array decoded from {} has incorrect dtype {}, expected {}", + encoding_id, + decoded.dtype(), + dtype, + ); + assert!( + plugin.is_supported_encoding(&decoded.encoding_id()), + "Array decoded from {} has incorrect encoding {}", + encoding_id, + decoded.encoding_id(), + ); + + // Populate statistics from the columnar tree. `StatsColumns::read` walks + // the 11 stat columns at this node's row and rehydrates a `StatsSet`, decoding + // min/max/sum proto bytes using the now-known dtype. We create a temporary + // `ExecutionCtx` per node decode — see the discussion in the columnar module + // header about why we don't thread one through the recursive children machinery. + let mut stats_ctx = session.create_execution_ctx(); + if let Some(stats_set) = + self.tree + .stats + .read(self.node_index, dtype, &mut stats_ctx, session)? + { + decoded.statistics().set_iter(stats_set.into_iter()); + } + + Ok(decoded) + } +} + +struct ColumnarSerializedArrayChildren<'a> { + ser: &'a ColumnarSerializedArray, + ctx: &'a ReadContext, + session: &'a VortexSession, +} + +impl ArrayChildren for ColumnarSerializedArrayChildren<'_> { + fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult { + self.ser + .child(index) + .decode(dtype, len, self.ctx, self.session) + } + + fn len(&self) -> usize { + self.ser.nchildren() + } +} + +/// Writer-side entry point. Walks `array` in pre-order once and produces: +/// +/// 1. The data-segment buffer list (data buffers only, no trailing flatbuffer or length +/// suffix — segments are not self-contained and must be paired with the +/// [`ColumnarArrayTree`] to decode). +/// 2. A [`ColumnarArrayTree`] capturing the encoding tree, per-node stats, per-buffer +/// descriptors, and the precomputed `subtree_sizes` / `buffer_offsets` nav columns. +pub fn serialize_to_columnar_tree( + array: &ArrayRef, + ctx: &ArrayContext, + session: &VortexSession, + options: &SerializeOptions, +) -> VortexResult<(Vec, ColumnarArrayTree)> { + // Per-node columns collected during the DFS walk. + let mut encoding_ids = Vec::new(); + let mut child_counts = Vec::new(); + let mut node_metadata = Vec::new(); + let mut buffers_per_node = Vec::new(); + let mut stats_builder = StatsColumnsBuilder::with_capacity(0); + // Flat list of all data buffers across all nodes, in pre-order. + let mut array_buffers = Vec::new(); + + for node in array.depth_first_traversal() { + let encoding_idx = ctx.intern(&node.encoding_id()).ok_or_else(|| { + vortex_err!("Array encoding {} not permitted by ctx", node.encoding_id()) + })?; + encoding_ids.push(encoding_idx); + + let n_children = u8::try_from(node.nchildren()) + .map_err(|_| vortex_err!("Array node has more than u8::MAX children"))?; + child_counts.push(n_children); + + let metadata_bytes = session.array_serialize(&node)?.ok_or_else(|| { + vortex_err!( + "Array {} does not support serialization", + node.encoding_id() + ) + })?; + node_metadata.push(ByteBuffer::from(metadata_bytes)); + + let node_bufs = node.buffers(); + let n_buffers = u16::try_from(node_bufs.len()) + .map_err(|_| vortex_err!("Array node has more than u16::MAX buffers"))?; + buffers_per_node.push(n_buffers); + + // Snapshot the current StatsSet straight into the per-stat columns. Empty sets + // push all-nulls — semantically identical to "no stats persisted" since the read + // side treats all-null as `None` from `StatsColumns::read`. + let stats_set = node.statistics().to_owned(); + stats_builder.push(if stats_set.is_empty() { + None + } else { + Some(&stats_set) + }); + + array_buffers.extend(node_bufs); + } + + // Emit the data buffer list and per-buffer descriptor columns in one pass. Padding + // math is the same rule the inline flatbuffer path uses: each buffer is padded to + // its required alignment, tracked through a running `pos` cursor. + let max_alignment = array_buffers + .iter() + .map(|buf| buf.alignment()) + .max() + .unwrap_or(Alignment::none()); + let zeros = ByteBuffer::zeroed(*max_alignment); + + let mut buffers = vec![ByteBuffer::zeroed_aligned(0, max_alignment)]; + let mut buffer_padding = Vec::::with_capacity(array_buffers.len()); + let mut buffer_alignment_exponent = Vec::::with_capacity(array_buffers.len()); + let mut buffer_length = Vec::::with_capacity(array_buffers.len()); + let mut pos = options.offset; + + for buffer in &array_buffers { + let padding = if options.include_padding { + let padding = pos.next_multiple_of(*buffer.alignment()) - pos; + if padding > 0 { + pos += padding; + buffers.push(zeros.slice(0..padding)); + } + padding + } else { + 0 + }; + buffer_padding + .push(u16::try_from(padding).map_err(|_| vortex_err!("buffer padding overflows u16"))?); + buffer_alignment_exponent.push(buffer.alignment().exponent()); + buffer_length.push( + u32::try_from(buffer.len()).map_err(|_| vortex_err!("buffer length overflows u32"))?, + ); + + pos += buffer.len(); + buffers.push(buffer.clone().aligned(Alignment::none())); + } + + // these two precomputed columns help O(1) child access on read + let subtree_sizes = compute_subtree_sizes(&child_counts); + let buffer_offsets = compute_buffer_offsets(&buffers_per_node); + + let node_metadata = VarBinViewArray::from_iter_bin(node_metadata.iter().map(|b| b.as_slice())); + let stats = stats_builder.finish()?; + + let tree = ColumnarArrayTree::try_new( + primitive_array_u16(encoding_ids), + primitive_array_u8(child_counts), + node_metadata, + primitive_array_u16(buffers_per_node), + primitive_array_u32_buffer(subtree_sizes), + primitive_array_u32_buffer(buffer_offsets), + stats, + primitive_array_u16(buffer_padding), + primitive_array_u8(buffer_alignment_exponent), + primitive_array_u32(buffer_length), + )?; + + Ok((buffers, tree)) +} + +fn primitive_array_u16(v: Vec) -> PrimitiveArray { + PrimitiveArray::new(Buffer::from(v), Validity::NonNullable) +} +fn primitive_array_u8(v: Vec) -> PrimitiveArray { + PrimitiveArray::new(Buffer::from(v), Validity::NonNullable) +} +fn primitive_array_u32(v: Vec) -> PrimitiveArray { + PrimitiveArray::new(Buffer::from(v), Validity::NonNullable) +} +fn primitive_array_u32_buffer(b: Buffer) -> PrimitiveArray { + PrimitiveArray::new(b, Validity::NonNullable) +} + +#[cfg(test)] +mod tests { + use std::iter; + + use super::*; + + /// Tree shape: + /// 0 (root, 2 children) + /// ├── 1 (leaf) + /// └── 2 (1 child) + /// └── 3 (leaf) + /// Subtree sizes: [4, 1, 2, 1]. + #[test] + fn subtree_sizes_basic() -> VortexResult<()> { + let child_counts = vec![2u8, 0, 1, 0]; + let sizes = compute_subtree_sizes(&child_counts); + assert_eq!(sizes.as_slice(), &[4, 1, 2, 1]); + Ok(()) + } + + /// Single-node tree. + #[test] + fn subtree_sizes_leaf() -> VortexResult<()> { + let sizes = compute_subtree_sizes(&[0u8]); + assert_eq!(sizes.as_slice(), &[1]); + Ok(()) + } + + /// Deeply nested tree (left-skewed): + /// 0 -> 1 -> 2 -> 3 (leaf) + /// Subtree sizes: [4, 3, 2, 1]. + #[test] + fn subtree_sizes_skewed() -> VortexResult<()> { + let sizes = compute_subtree_sizes(&[1u8, 1, 1, 0]); + assert_eq!(sizes.as_slice(), &[4, 3, 2, 1]); + Ok(()) + } + + #[test] + fn buffer_offsets_basic() { + let offsets = compute_buffer_offsets(&[2u16, 0, 3, 1]); + assert_eq!(offsets.as_slice(), &[0, 2, 2, 5]); + } + + /// Round-trip a populated `StatsSet` through the `StatsColumnsBuilder` -> + /// `StatsColumns::read` path to confirm the per-stat columns preserve the same + /// selection of stats and their values across the columnar wire format. + #[test] + fn stats_columns_roundtrip_i32() -> VortexResult<()> { + use crate::LEGACY_SESSION; + + let dtype = DType::Primitive(PType::I32, Nullability::NonNullable); + let mut set = StatsSet::default(); + set.set(Stat::Min, Precision::Exact(ScalarValue::from(-3i32))); + set.set(Stat::Max, Precision::Inexact(ScalarValue::from(42i32))); + set.set(Stat::Sum, Precision::Exact(ScalarValue::from(100i64))); + set.set(Stat::NullCount, Precision::Exact(ScalarValue::from(7u64))); + set.set(Stat::IsConstant, Precision::Exact(ScalarValue::from(false))); + set.set(Stat::IsSorted, Precision::Exact(ScalarValue::from(true))); + + let mut builder = StatsColumnsBuilder::with_capacity(1); + builder.push(Some(&set)); + let cols = builder.finish()?; + + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let back = cols + .read(0, &dtype, &mut ctx, &LEGACY_SESSION)? + .expect("non-empty"); + assert_eq!( + back.get_as::(Stat::Min, &dtype), + Some(Precision::Exact(-3)) + ); + assert_eq!( + back.get_as::(Stat::Max, &dtype), + Some(Precision::Inexact(42)) + ); + assert_eq!( + back.get_as::(Stat::NullCount, &PType::U64.into()), + Some(Precision::Exact(7)) + ); + assert_eq!( + back.get_as::(Stat::IsConstant, &DType::Bool(Nullability::NonNullable)), + Some(Precision::Exact(false)) + ); + assert_eq!( + back.get_as::(Stat::IsSorted, &DType::Bool(Nullability::NonNullable)), + Some(Precision::Exact(true)) + ); + assert!( + back.get(Stat::IsStrictSorted).is_none(), + "unset stats stay unset" + ); + Ok(()) + } + + /// Empty stats round-trip to `None` (every column at the row is null). + #[test] + fn stats_columns_empty() -> VortexResult<()> { + use crate::LEGACY_SESSION; + + let dtype = DType::Primitive(PType::I32, Nullability::NonNullable); + let mut builder = StatsColumnsBuilder::with_capacity(1); + builder.push(None); + let cols = builder.finish()?; + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + assert!(cols.read(0, &dtype, &mut ctx, &LEGACY_SESSION)?.is_none()); + Ok(()) + } + + /// Child navigation: from root (idx 0) of a tree + /// 0 [2 children] + /// ├── 1 [leaf] + /// └── 2 [1 child] + /// └── 3 [leaf] + /// expect child(0) -> node 1, child(1) -> node 2. Then from node 2, child(0) -> node 3. + #[test] + fn child_navigation() -> VortexResult<()> { + let child_counts = vec![2u8, 0, 1, 0]; + let buffers_per_node = vec![0u16; 4]; + let subtree_sizes = compute_subtree_sizes(&child_counts); + let buffer_offsets = compute_buffer_offsets(&buffers_per_node); + + let stats = { + let mut b = StatsColumnsBuilder::with_capacity(4); + for _ in 0..4 { + b.push(None); + } + b.finish()? + }; + + let tree = Arc::new(ColumnarArrayTree::try_new( + primitive_array_u16(vec![0u16, 1, 2, 3]), + primitive_array_u8(child_counts), + VarBinViewArray::from_iter_bin(iter::repeat_n(b"".as_slice(), 4)), + primitive_array_u16(buffers_per_node), + primitive_array_u32_buffer(subtree_sizes), + primitive_array_u32_buffer(buffer_offsets), + stats, + primitive_array_u16(Vec::new()), + primitive_array_u8(Vec::new()), + primitive_array_u32(Vec::new()), + )?); + let root = ColumnarSerializedArray::new(tree, Arc::new([]))?; + assert_eq!(root.encoding_id(), 0); + assert_eq!(root.nchildren(), 2); + let c0 = root.child(0); + assert_eq!(c0.encoding_id(), 1); + assert_eq!(c0.nchildren(), 0); + let c1 = root.child(1); + assert_eq!(c1.encoding_id(), 2); + assert_eq!(c1.nchildren(), 1); + let c1c0 = c1.child(0); + assert_eq!(c1c0.encoding_id(), 3); + assert_eq!(c1c0.nchildren(), 0); + Ok(()) + } +} diff --git a/vortex-array/src/serde.rs b/vortex-array/src/serde/mod.rs similarity index 98% rename from vortex-array/src/serde.rs rename to vortex-array/src/serde/mod.rs index 637b57324c0..f36321c1d7e 100644 --- a/vortex-array/src/serde.rs +++ b/vortex-array/src/serde/mod.rs @@ -1,12 +1,24 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +pub mod columnar; + use std::borrow::Cow; use std::fmt::Debug; use std::fmt::Formatter; use std::iter; use std::sync::Arc; +pub use columnar::BUFFER_COLUMNS_DTYPE; +pub use columnar::ColumnarArrayTree; +pub use columnar::ColumnarSerializedArray; +pub use columnar::NODES_COLUMNS_DTYPE; +pub use columnar::STATS_COLUMNS_DTYPE; +pub use columnar::StatsColumns; +pub use columnar::StatsColumnsBuilder; +pub use columnar::compute_buffer_offsets; +pub use columnar::compute_subtree_sizes; +pub use columnar::serialize_to_columnar_tree; use flatbuffers::FlatBufferBuilder; use flatbuffers::Follow; use flatbuffers::WIPOffset;