diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 8d5cc75c895..fdeb34c6dd2 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -52,7 +52,7 @@ impl vortex_array::aggr pub type vortex_array::aggregate_fn::combined::Combined::Options = vortex_array::aggregate_fn::combined::PairOptions<<::Left as vortex_array::aggregate_fn::AggregateFnVTable>::Options, <::Right as vortex_array::aggregate_fn::AggregateFnVTable>::Options> -pub type vortex_array::aggregate_fn::combined::Combined::Partial = (<::Left as vortex_array::aggregate_fn::AggregateFnVTable>::Partial, <::Right as vortex_array::aggregate_fn::AggregateFnVTable>::Partial) +pub type vortex_array::aggregate_fn::combined::Combined::Partial = (alloc::boxed::Box, alloc::boxed::Box) pub fn vortex_array::aggregate_fn::combined::Combined::accumulate(&self, &mut Self::Partial, &vortex_array::Columnar, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()> @@ -84,6 +84,8 @@ pub fn vortex_array::aggregate_fn::combined::Combined::to_scalar(&self, &Self pub fn vortex_array::aggregate_fn::combined::Combined::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::combined::Combined::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub struct vortex_array::aggregate_fn::combined::PairOptions(pub L, pub R) impl core::marker::StructuralPartialEq for vortex_array::aggregate_fn::combined::PairOptions @@ -222,6 +224,8 @@ pub fn vortex_array::aggregate_fn::fns::all_non_distinct::AllNonDistinct::to_sca pub fn vortex_array::aggregate_fn::fns::all_non_distinct::AllNonDistinct::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::all_non_distinct::AllNonDistinct::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub struct vortex_array::aggregate_fn::fns::all_non_distinct::AllNonDistinctPartial pub fn vortex_array::aggregate_fn::fns::all_non_distinct::all_non_distinct(&vortex_array::ArrayRef, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult @@ -274,6 +278,8 @@ pub fn vortex_array::aggregate_fn::fns::count::Count::to_scalar(&self, &Self::Pa pub fn vortex_array::aggregate_fn::fns::count::Count::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::count::Count::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub mod vortex_array::aggregate_fn::fns::first pub struct vortex_array::aggregate_fn::fns::first::First @@ -322,6 +328,8 @@ pub fn vortex_array::aggregate_fn::fns::first::First::to_scalar(&self, &Self::Pa pub fn vortex_array::aggregate_fn::fns::first::First::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::first::First::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub struct vortex_array::aggregate_fn::fns::first::FirstPartial pub fn vortex_array::aggregate_fn::fns::first::first(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult @@ -384,6 +392,8 @@ pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::to_scalar(&self pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub struct vortex_array::aggregate_fn::fns::is_constant::IsConstantPartial pub fn vortex_array::aggregate_fn::fns::is_constant::is_constant(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult @@ -442,6 +452,8 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::to_scalar(&self, &S pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub struct vortex_array::aggregate_fn::fns::is_sorted::IsSortedOptions pub vortex_array::aggregate_fn::fns::is_sorted::IsSortedOptions::strict: bool @@ -534,6 +546,8 @@ pub fn vortex_array::aggregate_fn::fns::last::Last::to_scalar(&self, &Self::Part pub fn vortex_array::aggregate_fn::fns::last::Last::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::last::Last::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub struct vortex_array::aggregate_fn::fns::last::LastPartial pub fn vortex_array::aggregate_fn::fns::last::last(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult @@ -634,6 +648,8 @@ pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::to_scalar(&self, &Self: pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub struct vortex_array::aggregate_fn::fns::min_max::MinMaxPartial pub struct vortex_array::aggregate_fn::fns::min_max::MinMaxResult @@ -714,6 +730,8 @@ pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::to_scalar(&self, &S pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub fn vortex_array::aggregate_fn::fns::nan_count::nan_count(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub mod vortex_array::aggregate_fn::fns::sum @@ -778,6 +796,8 @@ pub fn vortex_array::aggregate_fn::fns::sum::Sum::to_scalar(&self, &Self::Partia pub fn vortex_array::aggregate_fn::fns::sum::Sum::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::sum::Sum::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub struct vortex_array::aggregate_fn::fns::sum::SumPartial pub fn vortex_array::aggregate_fn::fns::sum::sum(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult @@ -830,6 +850,8 @@ pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::Uncompressed pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::uncompressed_size_in_bytes(&vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub mod vortex_array::aggregate_fn::kernels @@ -892,12 +914,20 @@ impl vortex_array::aggregate_f pub fn vortex_array::aggregate_fn::Accumulator::accumulate(&mut self, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()> +pub fn vortex_array::aggregate_fn::Accumulator::combine_partials(&mut self, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()> + +pub fn vortex_array::aggregate_fn::Accumulator::final_scalar(&self) -> vortex_error::VortexResult + pub fn vortex_array::aggregate_fn::Accumulator::finish(&mut self) -> vortex_error::VortexResult pub fn vortex_array::aggregate_fn::Accumulator::flush(&mut self) -> vortex_error::VortexResult pub fn vortex_array::aggregate_fn::Accumulator::is_saturated(&self) -> bool +pub fn vortex_array::aggregate_fn::Accumulator::partial_scalar(&self) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::Accumulator::reset(&mut self) + pub struct vortex_array::aggregate_fn::AggregateFn(_) impl vortex_array::aggregate_fn::AggregateFn @@ -1080,6 +1110,8 @@ pub fn vortex_array::aggregate_fn::AggregateFnVTable::to_scalar(&self, &Self::Pa pub fn vortex_array::aggregate_fn::AggregateFnVTable::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::AggregateFnVTable::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::all_non_distinct::AllNonDistinct pub type vortex_array::aggregate_fn::fns::all_non_distinct::AllNonDistinct::Options = vortex_array::aggregate_fn::EmptyOptions @@ -1116,6 +1148,8 @@ pub fn vortex_array::aggregate_fn::fns::all_non_distinct::AllNonDistinct::to_sca pub fn vortex_array::aggregate_fn::fns::all_non_distinct::AllNonDistinct::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::all_non_distinct::AllNonDistinct::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::count::Count pub type vortex_array::aggregate_fn::fns::count::Count::Options = vortex_array::aggregate_fn::EmptyOptions @@ -1152,6 +1186,8 @@ pub fn vortex_array::aggregate_fn::fns::count::Count::to_scalar(&self, &Self::Pa pub fn vortex_array::aggregate_fn::fns::count::Count::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::count::Count::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::first::First pub type vortex_array::aggregate_fn::fns::first::First::Options = vortex_array::aggregate_fn::EmptyOptions @@ -1188,6 +1224,8 @@ pub fn vortex_array::aggregate_fn::fns::first::First::to_scalar(&self, &Self::Pa pub fn vortex_array::aggregate_fn::fns::first::First::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::first::First::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::is_constant::IsConstant pub type vortex_array::aggregate_fn::fns::is_constant::IsConstant::Options = vortex_array::aggregate_fn::EmptyOptions @@ -1224,6 +1262,8 @@ pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::to_scalar(&self pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::is_constant::IsConstant::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::is_sorted::IsSorted pub type vortex_array::aggregate_fn::fns::is_sorted::IsSorted::Options = vortex_array::aggregate_fn::fns::is_sorted::IsSortedOptions @@ -1260,6 +1300,8 @@ pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::to_scalar(&self, &S pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::is_sorted::IsSorted::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::last::Last pub type vortex_array::aggregate_fn::fns::last::Last::Options = vortex_array::aggregate_fn::EmptyOptions @@ -1296,6 +1338,8 @@ pub fn vortex_array::aggregate_fn::fns::last::Last::to_scalar(&self, &Self::Part pub fn vortex_array::aggregate_fn::fns::last::Last::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::last::Last::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::min_max::MinMax pub type vortex_array::aggregate_fn::fns::min_max::MinMax::Options = vortex_array::aggregate_fn::EmptyOptions @@ -1332,6 +1376,8 @@ pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::to_scalar(&self, &Self: pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::min_max::MinMax::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::nan_count::NanCount pub type vortex_array::aggregate_fn::fns::nan_count::NanCount::Options = vortex_array::aggregate_fn::EmptyOptions @@ -1368,6 +1414,8 @@ pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::to_scalar(&self, &S pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::nan_count::NanCount::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::sum::Sum pub type vortex_array::aggregate_fn::fns::sum::Sum::Options = vortex_array::aggregate_fn::EmptyOptions @@ -1404,6 +1452,8 @@ pub fn vortex_array::aggregate_fn::fns::sum::Sum::to_scalar(&self, &Self::Partia pub fn vortex_array::aggregate_fn::fns::sum::Sum::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::sum::Sum::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes pub type vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::Options = vortex_array::aggregate_fn::EmptyOptions @@ -1440,11 +1490,13 @@ pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::Uncompressed pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + impl vortex_array::aggregate_fn::AggregateFnVTable for vortex_array::aggregate_fn::combined::Combined pub type vortex_array::aggregate_fn::combined::Combined::Options = vortex_array::aggregate_fn::combined::PairOptions<<::Left as vortex_array::aggregate_fn::AggregateFnVTable>::Options, <::Right as vortex_array::aggregate_fn::AggregateFnVTable>::Options> -pub type vortex_array::aggregate_fn::combined::Combined::Partial = (<::Left as vortex_array::aggregate_fn::AggregateFnVTable>::Partial, <::Right as vortex_array::aggregate_fn::AggregateFnVTable>::Partial) +pub type vortex_array::aggregate_fn::combined::Combined::Partial = (alloc::boxed::Box, alloc::boxed::Box) pub fn vortex_array::aggregate_fn::combined::Combined::accumulate(&self, &mut Self::Partial, &vortex_array::Columnar, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()> @@ -1476,6 +1528,8 @@ pub fn vortex_array::aggregate_fn::combined::Combined::to_scalar(&self, &Self pub fn vortex_array::aggregate_fn::combined::Combined::try_accumulate(&self, &mut Self::Partial, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::aggregate_fn::combined::Combined::try_partial_from_stats(&self, &vortex_array::ArrayRef) -> vortex_error::VortexResult> + pub trait vortex_array::aggregate_fn::AggregateFnVTableExt: vortex_array::aggregate_fn::AggregateFnVTable pub fn vortex_array::aggregate_fn::AggregateFnVTableExt::bind(&self, Self::Options) -> vortex_array::aggregate_fn::AggregateFnRef @@ -1488,22 +1542,38 @@ pub trait vortex_array::aggregate_fn::DynAccumulator: 'static + core::marker::Se pub fn vortex_array::aggregate_fn::DynAccumulator::accumulate(&mut self, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()> +pub fn vortex_array::aggregate_fn::DynAccumulator::combine_partials(&mut self, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()> + +pub fn vortex_array::aggregate_fn::DynAccumulator::final_scalar(&self) -> vortex_error::VortexResult + pub fn vortex_array::aggregate_fn::DynAccumulator::finish(&mut self) -> vortex_error::VortexResult pub fn vortex_array::aggregate_fn::DynAccumulator::flush(&mut self) -> vortex_error::VortexResult pub fn vortex_array::aggregate_fn::DynAccumulator::is_saturated(&self) -> bool +pub fn vortex_array::aggregate_fn::DynAccumulator::partial_scalar(&self) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::DynAccumulator::reset(&mut self) + impl vortex_array::aggregate_fn::DynAccumulator for vortex_array::aggregate_fn::Accumulator pub fn vortex_array::aggregate_fn::Accumulator::accumulate(&mut self, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()> +pub fn vortex_array::aggregate_fn::Accumulator::combine_partials(&mut self, vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()> + +pub fn vortex_array::aggregate_fn::Accumulator::final_scalar(&self) -> vortex_error::VortexResult + pub fn vortex_array::aggregate_fn::Accumulator::finish(&mut self) -> vortex_error::VortexResult pub fn vortex_array::aggregate_fn::Accumulator::flush(&mut self) -> vortex_error::VortexResult pub fn vortex_array::aggregate_fn::Accumulator::is_saturated(&self) -> bool +pub fn vortex_array::aggregate_fn::Accumulator::partial_scalar(&self) -> vortex_error::VortexResult + +pub fn vortex_array::aggregate_fn::Accumulator::reset(&mut self) + pub trait vortex_array::aggregate_fn::DynGroupedAccumulator: 'static + core::marker::Send pub fn vortex_array::aggregate_fn::DynGroupedAccumulator::accumulate_list(&mut self, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult<()> diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index 9c864eea6ca..a52fa5575fa 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -5,7 +5,6 @@ use vortex_error::VortexResult; use vortex_error::vortex_ensure; use vortex_error::vortex_err; -use crate::AnyCanonical; use crate::ArrayRef; use crate::Columnar; use crate::ExecutionCtx; @@ -13,6 +12,7 @@ use crate::aggregate_fn::AggregateFn; use crate::aggregate_fn::AggregateFnRef; use crate::aggregate_fn::AggregateFnVTable; use crate::aggregate_fn::session::AggregateFnSessionExt; +use crate::columnar::AnyColumnar; use crate::dtype::DType; use crate::executor::max_iterations; use crate::scalar::Scalar; @@ -72,9 +72,26 @@ pub trait DynAccumulator: 'static + Send { /// Accumulate a new array into the accumulator's state. fn accumulate(&mut self, batch: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<()>; + /// Fold an external partial-state scalar into this accumulator's state. + /// + /// The scalar must have the dtype reported by the vtable's `partial_dtype` for the + /// options and input dtype used to construct this accumulator. + fn combine_partials(&mut self, other: Scalar) -> VortexResult<()>; + /// Whether the accumulator's result is fully determined. fn is_saturated(&self) -> bool; + /// Reset the accumulator's state to the empty group. + fn reset(&mut self); + + /// Read the current partial state as a scalar without resetting it. + /// + /// The returned scalar has the dtype reported by the vtable's `partial_dtype`. + fn partial_scalar(&self) -> VortexResult; + + /// Compute the final aggregate result as a scalar without resetting state. + fn final_scalar(&self) -> VortexResult; + /// Flush the accumulation state and return the partial aggregate result as a scalar. /// /// Resets the accumulator state back to the initial state. @@ -99,31 +116,75 @@ impl DynAccumulator for Accumulator { batch.dtype() ); - // Allow the vtable to short-circuit on the raw array before decompression. - if self.vtable.try_accumulate(&mut self.partial, batch, ctx)? { + // 0. Stats-driven shortcut: if the aggregate can be derived directly from the batch's + // cached statistics, use that and skip both kernel dispatch and decode. This is the + // only layer that consults `batch.statistics()`; encoding kernels must not. + if let Some(result) = self.vtable.try_partial_from_stats(batch)? { + vortex_ensure!( + result.dtype() == &self.partial_dtype, + "Aggregate try_partial_from_stats returned {}, expected {}", + result.dtype(), + self.partial_dtype, + ); + self.vtable.combine_partials(&mut self.partial, result)?; return Ok(()); } let session = ctx.session().clone(); let kernels = &session.aggregate_fns().kernels; + // 1. Kernel registry first: a registered `(encoding, aggregate_fn)` kernel is strictly + // more specific than the vtable's `try_accumulate` short-circuit. Checking the + // registry first gives kernels for `Combined` aggregates a chance to fire — + // `Combined::try_accumulate` always returns true, so a later kernel check would be + // unreachable. + { + let kernels_r = kernels.read(); + let batch_id = batch.encoding_id(); + let kernel = kernels_r + .get(&(batch_id, Some(self.aggregate_fn.id()))) + .or_else(|| kernels_r.get(&(batch_id, None))) + .copied(); + drop(kernels_r); + if let Some(kernel) = kernel + && let Some(result) = kernel.aggregate(&self.aggregate_fn, batch, ctx)? + { + vortex_ensure!( + result.dtype() == &self.partial_dtype, + "Aggregate kernel returned {}, expected {}", + result.dtype(), + self.partial_dtype, + ); + self.vtable.combine_partials(&mut self.partial, result)?; + return Ok(()); + } + } + + // 2. Allow the vtable to short-circuit on the raw array before decompression. + if self.vtable.try_accumulate(&mut self.partial, batch, ctx)? { + return Ok(()); + } + + // 3. Iteratively check the registry against each intermediate encoding, executing one + // step between checks. Mirrors the loop in `GroupedAccumulator::accumulate_list_view`. + // Iteration 0 re-checks the initial encoding — a redundant HashMap miss, the price of + // keeping the loop body uniform. Terminates on `AnyColumnar` (Canonical or Constant) + // since the vtable's `accumulate(&Columnar)` handles both cases directly. let mut batch = batch.clone(); for _ in 0..max_iterations() { - if batch.is::() { + if batch.is::() { break; } let kernels_r = kernels.read(); let batch_id = batch.encoding_id(); - if let Some(result) = kernels_r + let kernel = kernels_r .get(&(batch_id, Some(self.aggregate_fn.id()))) .or_else(|| kernels_r.get(&(batch_id, None))) - .and_then(|kernel| { - kernel - .aggregate(&self.aggregate_fn, &batch, ctx) - .transpose() - }) - .transpose()? + .copied(); + drop(kernels_r); + if let Some(kernel) = kernel + && let Some(result) = kernel.aggregate(&self.aggregate_fn, &batch, ctx)? { vortex_ensure!( result.dtype() == &self.partial_dtype, @@ -135,29 +196,35 @@ impl DynAccumulator for Accumulator { return Ok(()); } - // Execute one step and try again batch = batch.execute(ctx)?; } - // Otherwise, execute the batch until it is columnar and accumulate it into the state. + // 4. Otherwise, execute the batch until it is columnar and accumulate it into the state. let columnar = batch.execute::(ctx)?; self.vtable.accumulate(&mut self.partial, &columnar, ctx) } + fn combine_partials(&mut self, other: Scalar) -> VortexResult<()> { + self.vtable.combine_partials(&mut self.partial, other) + } + fn is_saturated(&self) -> bool { self.vtable.is_saturated(&self.partial) } - fn flush(&mut self) -> VortexResult { - let partial = self.vtable.to_scalar(&self.partial)?; + fn reset(&mut self) { self.vtable.reset(&mut self.partial); + } + + fn partial_scalar(&self) -> VortexResult { + let partial = self.vtable.to_scalar(&self.partial)?; #[cfg(debug_assertions)] { vortex_ensure!( partial.dtype() == &self.partial_dtype, - "Aggregate kernel returned incorrect DType on flush: expected {}, got {}", + "Aggregate returned incorrect DType on partial_scalar: expected {}, got {}", self.partial_dtype, partial.dtype(), ); @@ -166,17 +233,216 @@ impl DynAccumulator for Accumulator { Ok(partial) } - fn finish(&mut self) -> VortexResult { + fn final_scalar(&self) -> VortexResult { let result = self.vtable.finalize_scalar(&self.partial)?; - self.vtable.reset(&mut self.partial); vortex_ensure!( result.dtype() == &self.return_dtype, - "Aggregate kernel returned incorrect DType on finalize: expected {}, got {}", + "Aggregate returned incorrect DType on final_scalar: expected {}, got {}", self.return_dtype, result.dtype(), ); Ok(result) } + + fn flush(&mut self) -> VortexResult { + let partial = self.partial_scalar()?; + self.reset(); + Ok(partial) + } + + fn finish(&mut self) -> VortexResult { + let result = self.final_scalar()?; + self.reset(); + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_session::SessionExt; + use vortex_session::VortexSession; + + use crate::ArrayRef; + use crate::ExecutionCtx; + use crate::IntoArray; + use crate::VortexSessionExecute; + use crate::aggregate_fn::Accumulator; + use crate::aggregate_fn::AggregateFnRef; + use crate::aggregate_fn::AggregateFnVTable; + use crate::aggregate_fn::DynAccumulator; + use crate::aggregate_fn::EmptyOptions; + use crate::aggregate_fn::combined::Combined; + use crate::aggregate_fn::combined::PairOptions; + use crate::aggregate_fn::fns::mean::Mean; + use crate::aggregate_fn::fns::sum::Sum; + use crate::aggregate_fn::kernels::DynAggregateKernel; + use crate::aggregate_fn::session::AggregateFnSession; + use crate::array::VTable; + use crate::arrays::Dict; + use crate::arrays::DictArray; + use crate::dtype::DType; + use crate::dtype::Nullability; + use crate::dtype::PType; + use crate::scalar::Scalar; + use crate::session::ArraySession; + + /// Mean partial sentinel `{sum: 42.0, count: 1}` — distinguishable from the + /// natural fan-out result `{sum: 7.0, count: 1}` that `Combined::try_accumulate` + /// would produce for `dict_of_seven()`. + #[derive(Debug)] + struct SentinelMeanPartialKernel; + impl DynAggregateKernel for SentinelMeanPartialKernel { + fn aggregate( + &self, + _aggregate_fn: &AggregateFnRef, + _batch: &ArrayRef, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + Ok(Some(sentinel_partial())) + } + } + + /// Returns `Ok(None)` => kernel declined, dispatch falls through. + #[derive(Debug)] + struct DeclineKernel; + impl DynAggregateKernel for DeclineKernel { + fn aggregate( + &self, + _aggregate_fn: &AggregateFnRef, + _batch: &ArrayRef, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + Ok(None) + } + } + + /// Sum partial sentinel `42.0` — distinguishable from the natural Sum of + /// `dict_of_seven()` which is `7.0`. + #[derive(Debug)] + struct SentinelSumPartialKernel; + impl DynAggregateKernel for SentinelSumPartialKernel { + fn aggregate( + &self, + _aggregate_fn: &AggregateFnRef, + _batch: &ArrayRef, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + Ok(Some(Scalar::primitive(42.0f64, Nullability::Nullable))) + } + } + + fn fresh_session() -> VortexSession { + VortexSession::empty().with::() + } + + fn dict_of_seven() -> ArrayRef { + DictArray::try_new(buffer![0u32].into_array(), buffer![7.0f64].into_array()) + .expect("valid dictionary") + .into_array() + } + + fn mean_f64_accumulator() -> VortexResult>> { + let dtype = DType::Primitive(PType::F64, Nullability::NonNullable); + Accumulator::try_new( + Mean::combined(), + PairOptions(EmptyOptions, EmptyOptions), + dtype, + ) + } + + fn sentinel_partial() -> Scalar { + let acc = mean_f64_accumulator().expect("build accumulator"); + let sum = Scalar::primitive(42.0f64, Nullability::Nullable); + let count = Scalar::primitive(1u64, Nullability::NonNullable); + Scalar::struct_(acc.partial_dtype, vec![sum, count]) + } + + /// Kernel registered for `(Dict, Combined)` fires in preference to + /// `Combined::try_accumulate`'s fan-out path — proves the dispatch reorder. + #[test] + fn combined_kernel_fires() -> VortexResult<()> { + static KERNEL: SentinelMeanPartialKernel = SentinelMeanPartialKernel; + let session = fresh_session(); + session + .get::() + .register_aggregate_kernel(Dict.id(), Some(Mean::combined().id()), &KERNEL); + let mut ctx = session.create_execution_ctx(); + + let mut acc = mean_f64_accumulator()?; + acc.accumulate(&dict_of_seven(), &mut ctx)?; + let partial = acc.flush()?; + + let s = partial.as_struct(); + assert_eq!( + s.field("sum").unwrap().as_primitive().as_::(), + Some(42.0) + ); + assert_eq!( + s.field("count").unwrap().as_primitive().as_::(), + Some(1) + ); + Ok(()) + } + + /// Kernel returns `Ok(None)` => dispatch falls through to `Combined::try_accumulate`'s + /// natural fan-out. The natural partial is `{sum: 7.0, count: 1}`. + #[test] + fn fallback_when_kernel_declines() -> VortexResult<()> { + static KERNEL: DeclineKernel = DeclineKernel; + let session = fresh_session(); + session + .get::() + .register_aggregate_kernel(Dict.id(), Some(Mean::combined().id()), &KERNEL); + let mut ctx = session.create_execution_ctx(); + + let mut acc = mean_f64_accumulator()?; + acc.accumulate(&dict_of_seven(), &mut ctx)?; + let partial = acc.flush()?; + + let s = partial.as_struct(); + assert_eq!( + s.field("sum").unwrap().as_primitive().as_::(), + Some(7.0) + ); + assert_eq!( + s.field("count").unwrap().as_primitive().as_::(), + Some(1) + ); + Ok(()) + } + + /// A kernel registered for the inner `(Dict, Sum)` child fires when accumulating a + /// Dict batch through `Combined`. This is the reusable-primitive case the + /// refactor enables: no `(Dict, Combined)` kernel is needed. + #[test] + fn child_kernel_fires_through_combined() -> VortexResult<()> { + static KERNEL: SentinelSumPartialKernel = SentinelSumPartialKernel; + let session = fresh_session(); + session + .get::() + .register_aggregate_kernel(Dict.id(), Some(Sum.id()), &KERNEL); + let mut ctx = session.create_execution_ctx(); + + let mut acc = mean_f64_accumulator()?; + acc.accumulate(&dict_of_seven(), &mut ctx)?; + let partial = acc.flush()?; + + let s = partial.as_struct(); + // `Sum` child returned the sentinel 42.0 — proves the (Dict, Sum) kernel fired + // via `Combined`'s fan-out. `Count`'s native `try_accumulate` reads the + // batch's valid_count, so count is the real 1. + assert_eq!( + s.field("sum").unwrap().as_primitive().as_::(), + Some(42.0) + ); + assert_eq!( + s.field("count").unwrap().as_primitive().as_::(), + Some(1) + ); + Ok(()) + } } diff --git a/vortex-array/src/aggregate_fn/combined.rs b/vortex-array/src/aggregate_fn/combined.rs index ab0ebca5785..76ad9877314 100644 --- a/vortex-array/src/aggregate_fn/combined.rs +++ b/vortex-array/src/aggregate_fn/combined.rs @@ -18,6 +18,8 @@ use vortex_session::VortexSession; use crate::ArrayRef; use crate::Columnar; use crate::ExecutionCtx; +use crate::aggregate_fn::Accumulator; +use crate::aggregate_fn::AccumulatorRef; use crate::aggregate_fn::AggregateFnId; use crate::aggregate_fn::AggregateFnVTable; use crate::builtins::ArrayBuiltins; @@ -44,8 +46,6 @@ impl Display for PairOptions { // Convenience aliases so signatures stay readable. type LeftOptions = <::Left as AggregateFnVTable>::Options; type RightOptions = <::Right as AggregateFnVTable>::Options; -type LeftPartial = <::Left as AggregateFnVTable>::Partial; -type RightPartial = <::Right as AggregateFnVTable>::Partial; /// Combined options for a [`BinaryCombined`] aggregate. pub type CombinedOptions = PairOptions, RightOptions>; @@ -140,7 +140,10 @@ impl Combined { impl AggregateFnVTable for Combined { type Options = CombinedOptions; - type Partial = (LeftPartial, RightPartial); + // Each child is held as a fully-fledged `AccumulatorRef` so that batches dispatched through + // `try_accumulate` consult the kernel registry per-child (e.g. a `(Dict, Sum)` kernel fires + // for the inner `Sum` child of `Combined`). + type Partial = (AccumulatorRef, AccumulatorRef); fn id(&self) -> AggregateFnId { self.0.id() @@ -173,9 +176,11 @@ impl AggregateFnVTable for Combined { options: &Self::Options, input_dtype: &DType, ) -> VortexResult { + let left = Accumulator::try_new(self.0.left(), options.0.clone(), input_dtype.clone())?; + let right = Accumulator::try_new(self.0.right(), options.1.clone(), input_dtype.clone())?; Ok(( - self.0.left().empty_partial(&options.0, input_dtype)?, - self.0.right().empty_partial(&options.1, input_dtype)?, + Box::new(left) as AccumulatorRef, + Box::new(right) as AccumulatorRef, )) } @@ -192,14 +197,14 @@ impl AggregateFnVTable for Combined { let r_field = s .field(rname) .ok_or_else(|| vortex_err!("BinaryCombined partial missing `{}` field", rname))?; - self.0.left().combine_partials(&mut partial.0, l_field)?; - self.0.right().combine_partials(&mut partial.1, r_field)?; + partial.0.combine_partials(l_field)?; + partial.1.combine_partials(r_field)?; Ok(()) } fn to_scalar(&self, partial: &Self::Partial) -> VortexResult { - let l_scalar = self.0.left().to_scalar(&partial.0)?; - let r_scalar = self.0.right().to_scalar(&partial.1)?; + let l_scalar = partial.0.partial_scalar()?; + let r_scalar = partial.1.partial_scalar()?; let dtype = self .0 .partial_struct_dtype(l_scalar.dtype().clone(), r_scalar.dtype().clone()); @@ -207,36 +212,27 @@ impl AggregateFnVTable for Combined { } fn reset(&self, partial: &mut Self::Partial) { - self.0.left().reset(&mut partial.0); - self.0.right().reset(&mut partial.1); + partial.0.reset(); + partial.1.reset(); } fn is_saturated(&self, partial: &Self::Partial) -> bool { - self.0.left().is_saturated(&partial.0) && self.0.right().is_saturated(&partial.1) + partial.0.is_saturated() && partial.1.is_saturated() } - /// Fans out to each child's `try_accumulate`, falling back to `accumulate` - /// against a lazily-canonicalized batch. We always claim to handle the - /// batch ourselves so [`Self::accumulate`] is unreachable — this is the - /// same trick `Count` uses to opt out of the canonicalization path. + /// Delegate the batch to each child's `Accumulator::accumulate`, which consults the + /// kernel registry against the child's `aggregate_fn` id. This is what makes + /// `(encoding, Child)` kernels reachable through `Combined` — without it, a + /// `(Dict, Sum)` kernel would be dead code for `Combined`. We always return + /// `true` so [`Self::accumulate`] is unreachable. fn try_accumulate( &self, state: &mut Self::Partial, batch: &ArrayRef, ctx: &mut ExecutionCtx, ) -> VortexResult { - let mut canonical: Option = None; - if !self.0.left().try_accumulate(&mut state.0, batch, ctx)? { - let c = canonical.insert(batch.clone().execute::(ctx)?); - self.0.left().accumulate(&mut state.0, c, ctx)?; - } - if !self.0.right().try_accumulate(&mut state.1, batch, ctx)? { - let c = match canonical.as_ref() { - Some(c) => c, - None => canonical.insert(batch.clone().execute::(ctx)?), - }; - self.0.right().accumulate(&mut state.1, c, ctx)?; - } + state.0.accumulate(batch, ctx)?; + state.1.accumulate(batch, ctx)?; Ok(true) } @@ -258,8 +254,8 @@ impl AggregateFnVTable for Combined { } fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult { - let l_scalar = self.0.left().finalize_scalar(&partial.0)?; - let r_scalar = self.0.right().finalize_scalar(&partial.1)?; + let l_scalar = partial.0.final_scalar()?; + let r_scalar = partial.1.final_scalar()?; BinaryCombined::finalize_scalar(&self.0, l_scalar, r_scalar) } } diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs index 848ed8c0850..1118ce40cf2 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs @@ -150,24 +150,15 @@ impl AggregateFnVTable for UncompressedSizeInBytes { false } - fn try_accumulate( - &self, - partial: &mut Self::Partial, - batch: &ArrayRef, - _ctx: &mut ExecutionCtx, - ) -> VortexResult { + fn try_partial_from_stats(&self, batch: &ArrayRef) -> VortexResult> { let Some(Precision::Exact(size_scalar)) = batch.statistics().get(Stat::UncompressedSizeInBytes) else { - return Ok(false); + return Ok(None); }; - let size = u64::try_from(&size_scalar) .map_err(|e| vortex_err!("Failed to convert uncompressed size stat to u64: {e}"))?; - *partial = partial - .checked_add(size) - .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; - Ok(true) + Ok(Some(Scalar::primitive(size, NonNullable))) } fn accumulate( diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index 3b8c30b758a..be6e1bb515b 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -28,10 +28,8 @@ use crate::aggregate_fn::kernels::DynGroupedAggregateKernel; use crate::array::ArrayId; use crate::array::VTable; use crate::arrays::Chunked; -use crate::arrays::Constant; use crate::arrays::Dict; use crate::arrays::chunked::compute::aggregate::ChunkedArrayAggregate; -use crate::arrays::constant::compute::uncompressed_size::ConstantUncompressedSizeKernel; use crate::arrays::dict::compute::is_constant::DictIsConstantKernel; use crate::arrays::dict::compute::is_sorted::DictIsSortedKernel; use crate::arrays::dict::compute::min_max::DictMinMaxKernel; @@ -81,11 +79,6 @@ impl Default for AggregateFnSession { // Register the built-in aggregate kernels. this.register_aggregate_kernel(Chunked.id(), None::, &ChunkedArrayAggregate); - this.register_aggregate_kernel( - Constant.id(), - Some(UncompressedSizeInBytes.id()), - &ConstantUncompressedSizeKernel, - ); this.register_aggregate_kernel(Dict.id(), Some(MinMax.id()), &DictMinMaxKernel); this.register_aggregate_kernel(Dict.id(), Some(IsConstant.id()), &DictIsConstantKernel); this.register_aggregate_kernel(Dict.id(), Some(IsSorted.id()), &DictIsSortedKernel); diff --git a/vortex-array/src/aggregate_fn/vtable.rs b/vortex-array/src/aggregate_fn/vtable.rs index da6fcbc4165..324f869e512 100644 --- a/vortex-array/src/aggregate_fn/vtable.rs +++ b/vortex-array/src/aggregate_fn/vtable.rs @@ -102,6 +102,25 @@ pub trait AggregateFnVTable: 'static + Sized + Clone + Send + Sync { /// final result is fully determined. fn is_saturated(&self, state: &Self::Partial) -> bool; + /// Try to derive a partial scalar from the batch's cached statistics, before any + /// kernel dispatch or canonicalization. + /// + /// Returns `Some(partial_scalar)` if the answer can be read directly from `batch.statistics()`, + /// otherwise `Ok(None)` to fall through to the rest of dispatch. The returned scalar must + /// have the dtype reported by `partial_dtype`. + /// + /// This is the single place stats-based shortcuts live; encoding kernels must not consult + /// stats themselves. Runs first so that an upstream producer who pre-populates the relevant + /// stat (e.g. a layout reader hydrating `Stat::UncompressedSizeInBytes` from file metadata) + /// can skip both kernel dispatch and decode. + /// + /// TODO: this hook may be removed once `ArrayStats` stores aggregate partials internally — + /// at that point stat-driven shortcuts can be resolved automatically by the dispatch layer + /// without each aggregate vtable opting in. + fn try_partial_from_stats(&self, _batch: &ArrayRef) -> VortexResult> { + Ok(None) + } + /// Try to accumulate the raw array before decompression. /// /// Returns `true` if the array was handled, `false` to fall through to diff --git a/vortex-array/src/arrays/constant/compute/uncompressed_size.rs b/vortex-array/src/arrays/constant/compute/uncompressed_size.rs index 53c7a366357..b9654d5840c 100644 --- a/vortex-array/src/arrays/constant/compute/uncompressed_size.rs +++ b/vortex-array/src/arrays/constant/compute/uncompressed_size.rs @@ -1,42 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use vortex_error::VortexResult; - -use crate::ArrayRef; -use crate::ExecutionCtx; -use crate::aggregate_fn::AggregateFnRef; -use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; -use crate::aggregate_fn::fns::uncompressed_size_in_bytes::constant_uncompressed_size_in_bytes; -use crate::aggregate_fn::kernels::DynAggregateKernel; -use crate::arrays::Constant; -use crate::dtype::Nullability; -use crate::scalar::Scalar; - -#[derive(Debug)] -pub(crate) struct ConstantUncompressedSizeKernel; - -impl DynAggregateKernel for ConstantUncompressedSizeKernel { - fn aggregate( - &self, - aggregate_fn: &AggregateFnRef, - batch: &ArrayRef, - ctx: &mut ExecutionCtx, - ) -> VortexResult> { - if !aggregate_fn.is::() { - return Ok(None); - } - - let Some(array) = batch.as_opt::() else { - return Ok(None); - }; - - let size = constant_uncompressed_size_in_bytes(array, ctx)?; - - Ok(Some(Scalar::primitive(size, Nullability::NonNullable))) - } -} - #[cfg(test)] mod tests { use vortex_buffer::Buffer;