From 4d40f10f953c020d1205849fce443d2ba03db8d3 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 19 May 2026 23:35:32 +0000 Subject: [PATCH 1/3] Add Sparse pushdown kernels for is_constant, sum, and compare MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sparse arrays previously had no aggregate or compare pushdown, so `is_constant`, `sum`, and ` op ` on a Sparse column all fell through to full canonical materialization — O(N) work regardless of patch density. Each new kernel pushes the operation into the patches: - `SparseIsConstantKernel` checks `is_constant(patch_values)` and whether the common patch value equals the fill scalar. - `SparseSumKernel` folds `fill * (N - P) + sum(patch_values)` through the existing `Sum` accumulator so overflow saturation is preserved. - `CompareKernel for Sparse` maps a constant-RHS comparison through `patches.map_values` and rebuilds a `Sparse` with `scalar_cmp` applied to the fill, preserving downstream sparsity (the filter parent kernel already handles `Sparse` masks). All three are O(P) instead of O(N). Benchmarks on a 1M-element Sparse i32 with non-null fill show: - `is_constant`: 78-93x speedup (137us -> 1.7us at P=10..1000) - `sum`: 109-581x speedup (768us -> 1.3us at P=10) - `compare`: 19-84x speedup (777us -> 9us at P=10 with downstream canonicalization; bigger when consumers stay sparse) Aggregate kernels are wired through the session-scoped registry via a new `vortex_sparse::initialize` (called from `vortex-file`'s default encodings). Compare is wired through `PARENT_KERNELS` so it fires during `execute_parent` on `ScalarFn(Binary, cmp)` nodes whose child is Sparse. Signed-off-by: Claude --- encodings/sparse/Cargo.toml | 4 + encodings/sparse/benches/sparse_pushdown.rs | 196 ++++++++++++++++++++ encodings/sparse/public-api.lock | 6 + encodings/sparse/src/compute/compare.rs | 122 ++++++++++++ encodings/sparse/src/compute/is_constant.rs | 160 ++++++++++++++++ encodings/sparse/src/compute/mod.rs | 3 + encodings/sparse/src/compute/sum.rs | 151 +++++++++++++++ encodings/sparse/src/kernel.rs | 2 + encodings/sparse/src/lib.rs | 26 +++ vortex-file/src/lib.rs | 3 +- 10 files changed, 671 insertions(+), 2 deletions(-) create mode 100644 encodings/sparse/benches/sparse_pushdown.rs create mode 100644 encodings/sparse/src/compute/compare.rs create mode 100644 encodings/sparse/src/compute/is_constant.rs create mode 100644 encodings/sparse/src/compute/sum.rs diff --git a/encodings/sparse/Cargo.toml b/encodings/sparse/Cargo.toml index b24f64e37e6..acf99a74ccf 100644 --- a/encodings/sparse/Cargo.toml +++ b/encodings/sparse/Cargo.toml @@ -35,3 +35,7 @@ vortex-array = { workspace = true, features = ["_test-harness"] } [[bench]] name = "sparse_canonical" harness = false + +[[bench]] +name = "sparse_pushdown" +harness = false diff --git a/encodings/sparse/benches/sparse_pushdown.rs b/encodings/sparse/benches/sparse_pushdown.rs new file mode 100644 index 00000000000..97324f41ba7 --- /dev/null +++ b/encodings/sparse/benches/sparse_pushdown.rs @@ -0,0 +1,196 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Benchmarks measuring the wins from pushdown kernels on Sparse arrays. +//! +//! For each kernel we compare the registered Sparse-aware path (`with_kernel`) against +//! the baseline canonical-fallback path (`canonical`) using the same input. The session +//! difference is the only knob: the canonical baseline runs against a session in which +//! Sparse has no aggregate/compare kernel registered, forcing the accumulator to +//! materialize the full array. + +#![expect(clippy::cast_possible_truncation)] + +use std::sync::LazyLock; + +use divan::Bencher; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::aggregate_fn::fns::is_constant::is_constant; +use vortex_array::aggregate_fn::fns::sum::sum; +use vortex_array::arrays::ConstantArray; +use vortex_array::builtins::ArrayBuiltins; +use vortex_array::scalar::Scalar; +use vortex_array::scalar_fn::fns::operators::Operator; +use vortex_array::session::ArraySession; +use vortex_array::session::ArraySessionExt; +use vortex_buffer::Buffer; +use vortex_error::VortexExpect; +use vortex_session::VortexSession; +use vortex_sparse::Sparse; + +fn main() { + divan::main(); +} + +/// Session with Sparse encoding registered but no Sparse-specific kernels. +/// This is the "before" path: dispatch falls through to canonical materialization. +static CANONICAL_SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + session.arrays().register(Sparse); + session +}); + +/// Session with Sparse encoding *and* its pushdown kernels registered. +/// This is the "after" path. +static KERNEL_SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + vortex_sparse::initialize(&session); + session +}); + +/// (array_len, num_patches) +const ARGS: &[(usize, usize)] = &[ + (1_000_000, 10), // 0.001% patches → 10⁵× upside ceiling + (1_000_000, 1_000), // 0.1% patches + (1_000_000, 10_000), // 1% patches + (100_000, 10), // 0.01% patches +]; + +/// Build a sparse i32 array of `len` with `num_patches` uniformly-spaced patches. +/// Fill is a non-null constant (1), patches are increasing values (2, 3, …) so the +/// array is NOT constant — exercises the full-comparison path of the kernel. +fn make_sparse_i32(len: usize, num_patches: usize) -> ArrayRef { + assert!(num_patches > 0 && num_patches <= len); + let stride = len / num_patches; + let indices: Buffer = (0..num_patches).map(|i| (i * stride) as u32).collect(); + let values: Buffer = (0..num_patches as i32).map(|i| 2 + i).collect(); + Sparse::try_new( + indices.into_array(), + values.into_array(), + len, + Scalar::from(1i32), + ) + .vortex_expect("valid sparse") + .into_array() +} + +// ---------- is_constant ---------- + +#[divan::bench(args = ARGS)] +fn is_constant_canonical(bencher: Bencher, (len, np): (usize, usize)) { + bencher + .with_inputs(|| { + ( + make_sparse_i32(len, np), + CANONICAL_SESSION.create_execution_ctx(), + ) + }) + .bench_values(|(array, mut ctx)| { + divan::black_box(is_constant(&array, &mut ctx).vortex_expect("is_constant")) + }); +} + +#[divan::bench(args = ARGS)] +fn is_constant_with_kernel(bencher: Bencher, (len, np): (usize, usize)) { + bencher + .with_inputs(|| { + ( + make_sparse_i32(len, np), + KERNEL_SESSION.create_execution_ctx(), + ) + }) + .bench_values(|(array, mut ctx)| { + divan::black_box(is_constant(&array, &mut ctx).vortex_expect("is_constant")) + }); +} + +// ---------- sum ---------- + +#[divan::bench(args = ARGS)] +fn sum_canonical(bencher: Bencher, (len, np): (usize, usize)) { + bencher + .with_inputs(|| { + ( + make_sparse_i32(len, np), + CANONICAL_SESSION.create_execution_ctx(), + ) + }) + .bench_values(|(array, mut ctx)| { + divan::black_box(sum(&array, &mut ctx).vortex_expect("sum")) + }); +} + +#[divan::bench(args = ARGS)] +fn sum_with_kernel(bencher: Bencher, (len, np): (usize, usize)) { + bencher + .with_inputs(|| { + ( + make_sparse_i32(len, np), + KERNEL_SESSION.create_execution_ctx(), + ) + }) + .bench_values(|(array, mut ctx)| { + divan::black_box(sum(&array, &mut ctx).vortex_expect("sum")) + }); +} + +// ---------- compare (Sparse == constant) ---------- +// +// NOTE: `CompareExecuteAdaptor(Sparse)` is registered in `PARENT_KERNELS`, which is +// statically attached to the Sparse encoding vtable (not session-scoped). To benchmark +// the "no-kernel" baseline we explicitly canonicalize the input first so the comparison +// runs against a `PrimitiveArray`. The kernel path lets the comparison push through. + +fn compare_with_pushdown(array: ArrayRef, mut ctx: vortex_array::ExecutionCtx) { + let rhs = ConstantArray::new(Scalar::from(1i32), array.len()).into_array(); + let result = array + .binary(rhs, Operator::Eq) + .vortex_expect("binary build"); + divan::black_box( + result + .execute::(&mut ctx) + .vortex_expect("execute"), + ); +} + +fn compare_after_canonicalize(array: ArrayRef, mut ctx: vortex_array::ExecutionCtx) { + let canonical = array + .execute::(&mut ctx) + .vortex_expect("canonicalize") + .into_array(); + let rhs = ConstantArray::new(Scalar::from(1i32), canonical.len()).into_array(); + let result = canonical + .binary(rhs, Operator::Eq) + .vortex_expect("binary build"); + divan::black_box( + result + .execute::(&mut ctx) + .vortex_expect("execute"), + ); +} + +#[divan::bench(args = ARGS)] +fn compare_canonical(bencher: Bencher, (len, np): (usize, usize)) { + bencher + .with_inputs(|| { + ( + make_sparse_i32(len, np), + KERNEL_SESSION.create_execution_ctx(), + ) + }) + .bench_values(|(array, ctx)| compare_after_canonicalize(array, ctx)); +} + +#[divan::bench(args = ARGS)] +fn compare_with_kernel(bencher: Bencher, (len, np): (usize, usize)) { + bencher + .with_inputs(|| { + ( + make_sparse_i32(len, np), + KERNEL_SESSION.create_execution_ctx(), + ) + }) + .bench_values(|(array, ctx)| compare_with_pushdown(array, ctx)); +} diff --git a/encodings/sparse/public-api.lock b/encodings/sparse/public-api.lock index 33ec9fc9361..5c82c1c1636 100644 --- a/encodings/sparse/public-api.lock +++ b/encodings/sparse/public-api.lock @@ -68,6 +68,10 @@ impl vortex_array::arrays::slice::SliceKernel for vortex_sparse::Sparse pub fn vortex_sparse::Sparse::slice(vortex_array::array::view::ArrayView<'_, Self>, core::ops::range::Range, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::scalar_fn::fns::binary::compare::CompareKernel for vortex_sparse::Sparse + +pub fn vortex_sparse::Sparse::compare(vortex_array::array::view::ArrayView<'_, Self>, &vortex_array::array::erased::ArrayRef, vortex_array::scalar_fn::fns::operators::CompareOperator, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> + impl vortex_array::scalar_fn::fns::cast::kernel::CastReduce for vortex_sparse::Sparse pub fn vortex_sparse::Sparse::cast(vortex_array::array::view::ArrayView<'_, Self>, &vortex_array::dtype::DType) -> vortex_error::VortexResult> @@ -222,4 +226,6 @@ pub fn vortex_array::array::view::ArrayView<'_, vortex_sparse::Sparse>::patches( pub fn vortex_array::array::view::ArrayView<'_, vortex_sparse::Sparse>::resolved_patches(&self) -> vortex_error::VortexResult +pub fn vortex_sparse::initialize(&vortex_session::VortexSession) + pub type vortex_sparse::SparseArray = vortex_array::array::typed::Array diff --git a/encodings/sparse/src/compute/compare.rs b/encodings/sparse/src/compute/compare.rs new file mode 100644 index 00000000000..fb6cadece0f --- /dev/null +++ b/encodings/sparse/src/compute/compare.rs @@ -0,0 +1,122 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::ConstantArray; +use vortex_array::builtins::ArrayBuiltins; +use vortex_array::scalar_fn::fns::binary::CompareKernel; +use vortex_array::scalar_fn::fns::binary::scalar_cmp; +use vortex_array::scalar_fn::fns::operators::CompareOperator; +use vortex_array::scalar_fn::fns::operators::Operator; +use vortex_error::VortexResult; + +use crate::Sparse; +use crate::SparseExt as _; + +/// Sparse-specific compare kernel. +/// +/// When the RHS is a constant scalar, the result of any comparison is itself sparse: +/// every unpatched position resolves to `compare(fill, rhs)`, and every patched position +/// to `compare(patch, rhs)`. We push the comparison into the patches and rebuild a +/// `Sparse` with the new fill, preserving downstream sparsity (filter masks, etc.). +/// +/// For non-constant RHS we decline and let the canonical fallback handle it. +impl CompareKernel for Sparse { + fn compare( + lhs: ArrayView<'_, Self>, + rhs: &ArrayRef, + operator: CompareOperator, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + let Some(rhs_scalar) = rhs.as_constant() else { + return Ok(None); + }; + + let fill_bool = scalar_cmp(lhs.fill_scalar(), &rhs_scalar, operator)?; + let patches = lhs.patches(); + + let new_patches = patches.map_values(|values| { + let len = values.len(); + values.binary( + ConstantArray::new(rhs_scalar.clone(), len).into_array(), + Operator::from(operator), + ) + })?; + + Ok(Some( + Sparse::try_new_from_patches(new_patches, fill_bool)?.into_array(), + )) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rstest::rstest; + use vortex_array::Canonical; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::ConstantArray; + use vortex_array::assert_arrays_eq; + use vortex_array::builtins::ArrayBuiltins; + use vortex_array::scalar::Scalar; + use vortex_array::scalar_fn::fns::operators::Operator; + use vortex_array::session::ArraySession; + use vortex_array::session::ArraySessionExt; + use vortex_buffer::buffer; + use vortex_session::VortexSession; + + use crate::Sparse; + use crate::SparseArray; + use crate::initialize; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + initialize(&session); + session + }); + + static CANONICAL_SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + session.arrays().register(Sparse); + session + }); + + #[rstest] + #[case::eq_fill(Scalar::from(1i32), Operator::Eq)] + #[case::eq_patch(Scalar::from(10i32), Operator::Eq)] + #[case::gt(Scalar::from(5i32), Operator::Gt)] + #[case::lte(Scalar::from(10i32), Operator::Lte)] + #[case::neq(Scalar::from(1i32), Operator::NotEq)] + fn compare_matches_canonical(#[case] rhs: Scalar, #[case] op: Operator) { + let array: SparseArray = Sparse::try_new( + buffer![1u64, 3, 5].into_array(), + buffer![10i32, 20, 30].into_array(), + 8, + Scalar::from(1i32), + ) + .unwrap(); + let arr = array.into_array(); + + let len = arr.len(); + let mut k_ctx = SESSION.create_execution_ctx(); + let kernel_bool = arr + .binary(ConstantArray::new(rhs.clone(), len).into_array(), op) + .unwrap() + .execute::(&mut k_ctx) + .unwrap(); + + let mut c_ctx = CANONICAL_SESSION.create_execution_ctx(); + let canonical_bool = arr + .binary(ConstantArray::new(rhs, len).into_array(), op) + .unwrap() + .execute::(&mut c_ctx) + .unwrap(); + + assert_arrays_eq!(kernel_bool, canonical_bool); + } +} diff --git a/encodings/sparse/src/compute/is_constant.rs b/encodings/sparse/src/compute/is_constant.rs new file mode 100644 index 00000000000..cfd31c6698d --- /dev/null +++ b/encodings/sparse/src/compute/is_constant.rs @@ -0,0 +1,160 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::fns::is_constant::IsConstant; +use vortex_array::aggregate_fn::fns::is_constant::is_constant; +use vortex_array::aggregate_fn::kernels::DynAggregateKernel; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; + +use crate::Sparse; +use crate::SparseExt as _; + +/// Sparse-specific `is_constant` kernel. +/// +/// A `SparseArray` of length `N` with `P` patches and a fill value `F` is constant iff: +/// - `P == 0`: all positions hold `F`. +/// - `0 < P < N`: every patch equals `F`, i.e. `is_constant(patch_values)` AND the common +/// patch value equals `F`. +/// - `P == N`: every position is patched, so the answer is `is_constant(patch_values)`. +/// +/// In all cases the work is `O(P)` instead of `O(N)`. +#[derive(Debug)] +pub(crate) struct SparseIsConstantKernel; + +impl DynAggregateKernel for SparseIsConstantKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + let Some(sparse) = batch.as_opt::() else { + return Ok(None); + }; + + let patches = sparse.patches(); + let num_patches = patches.num_patches(); + let len = sparse.len(); + + let result = if num_patches == 0 { + // Whole array is the fill value. + true + } else if num_patches < len { + // Mixed: needs all patches equal AND equal to fill. + if !is_constant(patches.values(), ctx)? { + false + } else { + let first_patch = patches.values().execute_scalar(0, ctx)?; + &first_patch == sparse.fill_scalar() + } + } else { + // Every position is patched; answer depends purely on patch_values. + is_constant(patches.values(), ctx)? + }; + + Ok(Some(IsConstant::make_partial(batch, result, ctx)?)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::aggregate_fn::fns::is_constant::is_constant; + use vortex_array::scalar::Scalar; + use vortex_array::session::ArraySession; + use vortex_array::session::ArraySessionExt; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use crate::Sparse; + use crate::SparseArray; + use crate::initialize; + + /// Session with Sparse + its pushdown kernels. + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + initialize(&session); + session + }); + + /// Baseline session: Sparse registered but no pushdown kernels. + static CANONICAL_SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + session.arrays().register(Sparse); + session + }); + + fn check(array: SparseArray) -> VortexResult { + let arr = array.into_array(); + let kernel_result = is_constant(&arr, &mut SESSION.create_execution_ctx())?; + let canonical_result = is_constant(&arr, &mut CANONICAL_SESSION.create_execution_ctx())?; + assert_eq!( + kernel_result, canonical_result, + "kernel and canonical paths disagree" + ); + Ok(kernel_result) + } + + #[rstest] + #[case::all_patches_equal_fill( + Sparse::try_new( + buffer![1u64, 3, 5].into_array(), + buffer![7i32, 7, 7].into_array(), + 10, + Scalar::from(7i32), + ).unwrap(), + true, + )] + #[case::mixed_patches_but_unequal_fill( + Sparse::try_new( + buffer![1u64, 3].into_array(), + buffer![9i32, 9].into_array(), + 5, + Scalar::from(7i32), + ).unwrap(), + false, + )] + #[case::single_patch_differs( + Sparse::try_new( + buffer![1u64].into_array(), + buffer![3i32].into_array(), + 5, + Scalar::from(7i32), + ).unwrap(), + false, + )] + #[case::all_patched_constant( + Sparse::try_new( + buffer![0u64, 1, 2, 3].into_array(), + buffer![5i32, 5, 5, 5].into_array(), + 4, + Scalar::from(99i32), // fill is unreachable + ).unwrap(), + true, + )] + #[case::all_patched_not_constant( + Sparse::try_new( + buffer![0u64, 1, 2].into_array(), + buffer![1i32, 2, 3].into_array(), + 3, + Scalar::from(99i32), + ).unwrap(), + false, + )] + fn is_constant_kernel(#[case] array: SparseArray, #[case] expected: bool) { + assert_eq!(check(array).unwrap(), expected); + } +} diff --git a/encodings/sparse/src/compute/mod.rs b/encodings/sparse/src/compute/mod.rs index 28e8c366b51..f4d79579906 100644 --- a/encodings/sparse/src/compute/mod.rs +++ b/encodings/sparse/src/compute/mod.rs @@ -2,7 +2,10 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors mod cast; +mod compare; mod filter; +pub(crate) mod is_constant; +pub(crate) mod sum; mod take; #[cfg(test)] diff --git a/encodings/sparse/src/compute/sum.rs b/encodings/sparse/src/compute/sum.rs new file mode 100644 index 00000000000..4b068aa57b4 --- /dev/null +++ b/encodings/sparse/src/compute/sum.rs @@ -0,0 +1,151 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::aggregate_fn::Accumulator; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::DynAccumulator; +use vortex_array::aggregate_fn::EmptyOptions; +use vortex_array::aggregate_fn::fns::sum::Sum; +use vortex_array::aggregate_fn::kernels::DynAggregateKernel; +use vortex_array::arrays::ConstantArray; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; + +use crate::Sparse; +use crate::SparseExt as _; + +/// Sparse-specific `sum` kernel. +/// +/// `sum(Sparse{ F, patches }) = sum(patches.values) + F * (N - patches.num_patches())`. +/// +/// The constant contribution is computed via the existing `Sum` accumulator's constant +/// short-circuit (`multiply_constant`), so overflow saturates to null exactly as in the +/// baseline. The work is `O(P)` instead of `O(N)`. +#[derive(Debug)] +pub(crate) struct SparseSumKernel; + +impl DynAggregateKernel for SparseSumKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + let Some(sparse) = batch.as_opt::() else { + return Ok(None); + }; + + let patches = sparse.patches(); + let n_fill = sparse.len() - patches.num_patches(); + + // Build a fresh Sum accumulator over the array dtype and fold in the fill and patch + // contributions. The accumulator's existing semantics (checked overflow → null + // partial) are preserved. + let mut acc = Accumulator::try_new(Sum, EmptyOptions, batch.dtype().clone())?; + + if n_fill > 0 { + let fill_array = ConstantArray::new(sparse.fill_scalar().clone(), n_fill).into_array(); + acc.accumulate(&fill_array, ctx)?; + } + + if !patches.values().is_empty() { + acc.accumulate(patches.values(), ctx)?; + } + + Ok(Some(acc.partial_scalar()?)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::aggregate_fn::fns::sum::sum; + use vortex_array::scalar::Scalar; + use vortex_array::session::ArraySession; + use vortex_array::session::ArraySessionExt; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use crate::Sparse; + use crate::SparseArray; + use crate::initialize; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + initialize(&session); + session + }); + + static CANONICAL_SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + session.arrays().register(Sparse); + session + }); + + fn check(array: SparseArray) -> VortexResult { + let arr = array.into_array(); + let kernel_result = sum(&arr, &mut SESSION.create_execution_ctx())?; + let canonical_result = sum(&arr, &mut CANONICAL_SESSION.create_execution_ctx())?; + assert_eq!( + kernel_result, canonical_result, + "kernel and canonical sum paths disagree" + ); + Ok(kernel_result) + } + + #[rstest] + #[case::positive_fill( + Sparse::try_new( + buffer![0u64, 2].into_array(), + buffer![10i32, 20].into_array(), + 5, + Scalar::from(1i32), + ).unwrap(), + // 10 + 1 + 20 + 1 + 1 = 33 + 33i64, + )] + #[case::zero_fill( + Sparse::try_new( + buffer![1u64, 4].into_array(), + buffer![7i32, 8].into_array(), + 10, + Scalar::from(0i32), + ).unwrap(), + 15i64, + )] + fn sum_kernel_i32(#[case] array: SparseArray, #[case] expected: i64) { + let result = check(array).unwrap(); + assert_eq!(result.as_primitive().typed_value::(), Some(expected)); + } + + #[rstest] + #[case::null_fill_no_overflow( + Sparse::try_new( + buffer![0u64, 3].into_array(), + vortex_array::arrays::PrimitiveArray::from_option_iter([Some(5i64), Some(11)]) + .into_array(), + 6, + Scalar::null(vortex_array::dtype::DType::Primitive( + vortex_array::dtype::PType::I64, + vortex_array::dtype::Nullability::Nullable, + )), + ).unwrap(), + 16i64, + )] + fn sum_kernel_nullable(#[case] array: SparseArray, #[case] expected: i64) { + let result = check(array).unwrap(); + assert_eq!(result.as_primitive().typed_value::(), Some(expected)); + } +} diff --git a/encodings/sparse/src/kernel.rs b/encodings/sparse/src/kernel.rs index 18928ea0142..3b5634a0473 100644 --- a/encodings/sparse/src/kernel.rs +++ b/encodings/sparse/src/kernel.rs @@ -5,10 +5,12 @@ use vortex_array::arrays::dict::TakeExecuteAdaptor; use vortex_array::arrays::filter::FilterExecuteAdaptor; use vortex_array::arrays::slice::SliceExecuteAdaptor; use vortex_array::kernel::ParentKernelSet; +use vortex_array::scalar_fn::fns::binary::CompareExecuteAdaptor; use crate::Sparse; pub(crate) static PARENT_KERNELS: ParentKernelSet = ParentKernelSet::new(&[ + ParentKernelSet::lift(&CompareExecuteAdaptor(Sparse)), ParentKernelSet::lift(&FilterExecuteAdaptor(Sparse)), ParentKernelSet::lift(&SliceExecuteAdaptor(Sparse)), ParentKernelSet::lift(&TakeExecuteAdaptor(Sparse)), diff --git a/encodings/sparse/src/lib.rs b/encodings/sparse/src/lib.rs index 74b137fd1c7..a69b7130842 100644 --- a/encodings/sparse/src/lib.rs +++ b/encodings/sparse/src/lib.rs @@ -68,6 +68,32 @@ mod ops; mod rules; mod slice; +use vortex_array::aggregate_fn::AggregateFnVTable as _; +use vortex_array::aggregate_fn::fns::is_constant::IsConstant; +use vortex_array::aggregate_fn::fns::sum::Sum; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; +use vortex_array::session::ArraySessionExt; + +/// Initialize Sparse encoding in the given session. +/// +/// Registers the Sparse array vtable and its aggregate kernels (`IsConstant`, `Sum`). +/// Compare pushdown is wired through `PARENT_KERNELS` (see `kernel.rs`) and does not +/// require registration here. +pub fn initialize(session: &VortexSession) { + session.arrays().register(Sparse); + + session.aggregate_fns().register_aggregate_kernel( + Sparse.id(), + Some(IsConstant.id()), + &compute::is_constant::SparseIsConstantKernel, + ); + session.aggregate_fns().register_aggregate_kernel( + Sparse.id(), + Some(Sum.id()), + &compute::sum::SparseSumKernel, + ); +} + /// A [`Sparse`]-encoded Vortex array. pub type SparseArray = Array; diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index ce6598173a6..e69b5848de2 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -117,7 +117,6 @@ use vortex_bytebool::ByteBool; use vortex_fsst::FSST; use vortex_pco::Pco; use vortex_session::VortexSession; -use vortex_sparse::Sparse; use vortex_zigzag::ZigZag; pub use writer::*; @@ -164,7 +163,6 @@ pub fn register_default_encodings(session: &VortexSession) { arrays.register(Dict); arrays.register(FSST); arrays.register(Pco); - arrays.register(Sparse); arrays.register(ZigZag); #[cfg(feature = "zstd")] arrays.register(vortex_zstd::Zstd); @@ -183,6 +181,7 @@ pub fn register_default_encodings(session: &VortexSession) { vortex_fastlanes::initialize(session); vortex_runend::initialize(session); vortex_sequence::initialize(session); + vortex_sparse::initialize(session); #[cfg(feature = "unstable_encodings")] vortex_tensor::initialize(session); From 76661fd4a0afc516776eea51c88022e249cc1543 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 20 May 2026 07:34:50 +0000 Subject: [PATCH 2/3] Trim Sparse pushdown benches to three single-config cases MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodSpeed tracked 24 entries (canonical+kernel × 4 args × 3 ops). Collapse to exactly three benchmarks — one per kernel, single config each, sized so each lands in the ~10-100µs range: - sparse_is_constant: ~87µs (150k constant patches, worst case: full scan) - sparse_sum: ~33µs (100k patches) - sparse_compare: ~41µs (10k patches, materialized result) The canonical baselines are dropped; CodSpeed only needs to track the kernel path going forward. Signed-off-by: Claude --- encodings/sparse/benches/sparse_pushdown.rs | 168 ++++---------------- 1 file changed, 35 insertions(+), 133 deletions(-) diff --git a/encodings/sparse/benches/sparse_pushdown.rs b/encodings/sparse/benches/sparse_pushdown.rs index 97324f41ba7..2fff0b4efaa 100644 --- a/encodings/sparse/benches/sparse_pushdown.rs +++ b/encodings/sparse/benches/sparse_pushdown.rs @@ -1,13 +1,11 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -//! Benchmarks measuring the wins from pushdown kernels on Sparse arrays. +//! Benchmarks for the Sparse pushdown kernels (`is_constant`, `sum`, compare). //! -//! For each kernel we compare the registered Sparse-aware path (`with_kernel`) against -//! the baseline canonical-fallback path (`canonical`) using the same input. The session -//! difference is the only knob: the canonical baseline runs against a session in which -//! Sparse has no aggregate/compare kernel registered, forcing the accumulator to -//! materialize the full array. +//! Each benchmark exercises the registered kernel path on a single representative +//! sparse `i32` array. All three are `O(num_patches)`; the patch counts below are +//! sized so each lands in the ~10-100µs range for a stable CodSpeed signal. #![expect(clippy::cast_possible_truncation)] @@ -15,6 +13,8 @@ use std::sync::LazyLock; use divan::Bencher; use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; use vortex_array::aggregate_fn::fns::is_constant::is_constant; @@ -24,7 +24,6 @@ use vortex_array::builtins::ArrayBuiltins; use vortex_array::scalar::Scalar; use vortex_array::scalar_fn::fns::operators::Operator; use vortex_array::session::ArraySession; -use vortex_array::session::ArraySessionExt; use vortex_buffer::Buffer; use vortex_error::VortexExpect; use vortex_session::VortexSession; @@ -34,163 +33,66 @@ fn main() { divan::main(); } -/// Session with Sparse encoding registered but no Sparse-specific kernels. -/// This is the "before" path: dispatch falls through to canonical materialization. -static CANONICAL_SESSION: LazyLock = LazyLock::new(|| { - let session = VortexSession::empty().with::(); - session.arrays().register(Sparse); - session -}); +const LEN: usize = 1_000_000; -/// Session with Sparse encoding *and* its pushdown kernels registered. -/// This is the "after" path. -static KERNEL_SESSION: LazyLock = LazyLock::new(|| { +/// Session with Sparse and its pushdown kernels registered. +static SESSION: LazyLock = LazyLock::new(|| { let session = VortexSession::empty().with::(); vortex_sparse::initialize(&session); session }); -/// (array_len, num_patches) -const ARGS: &[(usize, usize)] = &[ - (1_000_000, 10), // 0.001% patches → 10⁵× upside ceiling - (1_000_000, 1_000), // 0.1% patches - (1_000_000, 10_000), // 1% patches - (100_000, 10), // 0.01% patches -]; - -/// Build a sparse i32 array of `len` with `num_patches` uniformly-spaced patches. -/// Fill is a non-null constant (1), patches are increasing values (2, 3, …) so the -/// array is NOT constant — exercises the full-comparison path of the kernel. -fn make_sparse_i32(len: usize, num_patches: usize) -> ArrayRef { - assert!(num_patches > 0 && num_patches <= len); - let stride = len / num_patches; +/// Build a sparse `i32` array of `LEN` with `num_patches` uniformly-spaced patches and +/// fill value 1. When `constant` is true every patch also equals 1, so the whole array +/// is constant (the worst case for `is_constant`: it must scan all patches to confirm). +fn make_sparse(num_patches: usize, constant: bool) -> ArrayRef { + let stride = LEN / num_patches; let indices: Buffer = (0..num_patches).map(|i| (i * stride) as u32).collect(); - let values: Buffer = (0..num_patches as i32).map(|i| 2 + i).collect(); + let values: Buffer = (0..num_patches) + .map(|i| if constant { 1 } else { 2 + i as i32 }) + .collect(); Sparse::try_new( indices.into_array(), values.into_array(), - len, + LEN, Scalar::from(1i32), ) .vortex_expect("valid sparse") .into_array() } -// ---------- is_constant ---------- - -#[divan::bench(args = ARGS)] -fn is_constant_canonical(bencher: Bencher, (len, np): (usize, usize)) { - bencher - .with_inputs(|| { - ( - make_sparse_i32(len, np), - CANONICAL_SESSION.create_execution_ctx(), - ) - }) - .bench_values(|(array, mut ctx)| { - divan::black_box(is_constant(&array, &mut ctx).vortex_expect("is_constant")) - }); -} - -#[divan::bench(args = ARGS)] -fn is_constant_with_kernel(bencher: Bencher, (len, np): (usize, usize)) { +#[divan::bench] +fn sparse_is_constant(bencher: Bencher) { bencher - .with_inputs(|| { - ( - make_sparse_i32(len, np), - KERNEL_SESSION.create_execution_ctx(), - ) - }) + .with_inputs(|| (make_sparse(150_000, true), SESSION.create_execution_ctx())) .bench_values(|(array, mut ctx)| { divan::black_box(is_constant(&array, &mut ctx).vortex_expect("is_constant")) }); } -// ---------- sum ---------- - -#[divan::bench(args = ARGS)] -fn sum_canonical(bencher: Bencher, (len, np): (usize, usize)) { +#[divan::bench] +fn sparse_sum(bencher: Bencher) { bencher - .with_inputs(|| { - ( - make_sparse_i32(len, np), - CANONICAL_SESSION.create_execution_ctx(), - ) - }) + .with_inputs(|| (make_sparse(100_000, false), SESSION.create_execution_ctx())) .bench_values(|(array, mut ctx)| { divan::black_box(sum(&array, &mut ctx).vortex_expect("sum")) }); } -#[divan::bench(args = ARGS)] -fn sum_with_kernel(bencher: Bencher, (len, np): (usize, usize)) { +#[divan::bench] +fn sparse_compare(bencher: Bencher) { bencher - .with_inputs(|| { - ( - make_sparse_i32(len, np), - KERNEL_SESSION.create_execution_ctx(), - ) - }) + .with_inputs(|| (make_sparse(10_000, false), SESSION.create_execution_ctx())) .bench_values(|(array, mut ctx)| { - divan::black_box(sum(&array, &mut ctx).vortex_expect("sum")) + let rhs = ConstantArray::new(Scalar::from(1i32), array.len()).into_array(); + let result = array.binary(rhs, Operator::Eq).vortex_expect("binary"); + divan::black_box(materialize(result, &mut ctx)) }); } -// ---------- compare (Sparse == constant) ---------- -// -// NOTE: `CompareExecuteAdaptor(Sparse)` is registered in `PARENT_KERNELS`, which is -// statically attached to the Sparse encoding vtable (not session-scoped). To benchmark -// the "no-kernel" baseline we explicitly canonicalize the input first so the comparison -// runs against a `PrimitiveArray`. The kernel path lets the comparison push through. - -fn compare_with_pushdown(array: ArrayRef, mut ctx: vortex_array::ExecutionCtx) { - let rhs = ConstantArray::new(Scalar::from(1i32), array.len()).into_array(); - let result = array - .binary(rhs, Operator::Eq) - .vortex_expect("binary build"); - divan::black_box( - result - .execute::(&mut ctx) - .vortex_expect("execute"), - ); -} - -fn compare_after_canonicalize(array: ArrayRef, mut ctx: vortex_array::ExecutionCtx) { - let canonical = array - .execute::(&mut ctx) - .vortex_expect("canonicalize") - .into_array(); - let rhs = ConstantArray::new(Scalar::from(1i32), canonical.len()).into_array(); - let result = canonical - .binary(rhs, Operator::Eq) - .vortex_expect("binary build"); - divan::black_box( - result - .execute::(&mut ctx) - .vortex_expect("execute"), - ); -} - -#[divan::bench(args = ARGS)] -fn compare_canonical(bencher: Bencher, (len, np): (usize, usize)) { - bencher - .with_inputs(|| { - ( - make_sparse_i32(len, np), - KERNEL_SESSION.create_execution_ctx(), - ) - }) - .bench_values(|(array, ctx)| compare_after_canonicalize(array, ctx)); -} - -#[divan::bench(args = ARGS)] -fn compare_with_kernel(bencher: Bencher, (len, np): (usize, usize)) { - bencher - .with_inputs(|| { - ( - make_sparse_i32(len, np), - KERNEL_SESSION.create_execution_ctx(), - ) - }) - .bench_values(|(array, ctx)| compare_with_pushdown(array, ctx)); +fn materialize(array: ArrayRef, ctx: &mut ExecutionCtx) -> ArrayRef { + array + .execute::(ctx) + .vortex_expect("execute") + .into_array() } From 56d5689296e04f64245e1b983994ed06878e2db8 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 20 May 2026 08:22:20 +0000 Subject: [PATCH 3/3] Add MinMax, NullCount, NanCount, Between, FillNull pushdown to Sparse MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends the Sparse pushdown set with the remaining high-value kernels, all O(num_patches) instead of O(N): Aggregates (session-registered in `initialize`): - SparseMinMaxKernel: folds min/max(patch_values) with the fill scalar when the fill is reachable (P < N) and non-null. - SparseNullCountKernel: null_count(patch_values) + (fill null ? N-P : 0); O(1) when the patch null-count stat is cached. - SparseNanCountKernel: nan_count(patch_values) + (fill NaN ? N-P : 0); declines for non-float dtypes. Filter pushdowns (wired via PARENT_KERNELS): - BetweenKernel: range predicate with constant bounds → Sparse, same shape as the compare kernel. - FillNullKernel: replaces null fill/patches with the constant, stays sparse. MinMax and NullCount in particular are the zone-map/pruning kernels that Dict and RunEnd already had and Sparse lacked. Deliberately skipped: Mask (a dense mask masks unpatched fill positions, so the result can't stay sparse), IsSorted (rarely true for sparse, position-dependent and error-prone), Like/Zip/ListContainsElement (niche string/list cases), and Mean (already free via Combined). Benches: added sparse_min_max and sparse_null_count alongside the existing three (skipping between/fill_null/nan_count, which mirror compare/null_count cost profiles). All five single-config, ~50-80µs. Tests compare each kernel against the canonical baseline (aggregates via an unregistered session; parent kernels by canonicalizing the input first). Signed-off-by: Claude --- encodings/sparse/benches/sparse_pushdown.rs | 54 +++++++- encodings/sparse/public-api.lock | 8 ++ encodings/sparse/src/compute/between.rs | 137 ++++++++++++++++++++ encodings/sparse/src/compute/compare.rs | 21 ++- encodings/sparse/src/compute/fill_null.rs | 111 ++++++++++++++++ encodings/sparse/src/compute/min_max.rs | 110 ++++++++++++++++ encodings/sparse/src/compute/mod.rs | 5 + encodings/sparse/src/compute/nan_count.rs | 114 ++++++++++++++++ encodings/sparse/src/compute/null_count.rs | 118 +++++++++++++++++ encodings/sparse/src/kernel.rs | 4 + encodings/sparse/src/lib.rs | 29 ++++- 11 files changed, 689 insertions(+), 22 deletions(-) create mode 100644 encodings/sparse/src/compute/between.rs create mode 100644 encodings/sparse/src/compute/fill_null.rs create mode 100644 encodings/sparse/src/compute/min_max.rs create mode 100644 encodings/sparse/src/compute/nan_count.rs create mode 100644 encodings/sparse/src/compute/null_count.rs diff --git a/encodings/sparse/benches/sparse_pushdown.rs b/encodings/sparse/benches/sparse_pushdown.rs index 2fff0b4efaa..5abf0905f6e 100644 --- a/encodings/sparse/benches/sparse_pushdown.rs +++ b/encodings/sparse/benches/sparse_pushdown.rs @@ -1,11 +1,13 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -//! Benchmarks for the Sparse pushdown kernels (`is_constant`, `sum`, compare). +//! Benchmarks for the Sparse pushdown kernels (`is_constant`, `sum`, `min_max`, +//! `null_count`, compare). //! //! Each benchmark exercises the registered kernel path on a single representative -//! sparse `i32` array. All three are `O(num_patches)`; the patch counts below are -//! sized so each lands in the ~10-100µs range for a stable CodSpeed signal. +//! sparse `i32` array. All are `O(num_patches)`; the patch counts below are sized so +//! each lands in the ~10-100µs range for a stable CodSpeed signal. `between`/`fill_null`/ +//! `nan_count` are omitted since they mirror the compare/null_count cost profiles. #![expect(clippy::cast_possible_truncation)] @@ -18,9 +20,15 @@ use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; use vortex_array::aggregate_fn::fns::is_constant::is_constant; +use vortex_array::aggregate_fn::fns::min_max::min_max; +use vortex_array::aggregate_fn::fns::null_count::null_count; use vortex_array::aggregate_fn::fns::sum::sum; use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::PrimitiveArray; use vortex_array::builtins::ArrayBuiltins; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; use vortex_array::scalar::Scalar; use vortex_array::scalar_fn::fns::operators::Operator; use vortex_array::session::ArraySession; @@ -61,10 +69,25 @@ fn make_sparse(num_patches: usize, constant: bool) -> ArrayRef { .into_array() } +/// Build a sparse `i32` array of `LEN` with a null fill and `num_patches` nullable patches +/// (every third patch null), so `null_count` does real `O(P)` work over the patch validity. +fn make_sparse_nullable(num_patches: usize) -> ArrayRef { + let stride = LEN / num_patches; + let indices: Buffer = (0..num_patches).map(|i| (i * stride) as u32).collect(); + let values = PrimitiveArray::from_option_iter( + (0..num_patches).map(|i| if i % 3 == 0 { None } else { Some(i as i32) }), + ) + .into_array(); + let nullable = DType::Primitive(PType::I32, Nullability::Nullable); + Sparse::try_new(indices.into_array(), values, LEN, Scalar::null(nullable)) + .vortex_expect("valid sparse") + .into_array() +} + #[divan::bench] fn sparse_is_constant(bencher: Bencher) { bencher - .with_inputs(|| (make_sparse(150_000, true), SESSION.create_execution_ctx())) + .with_inputs(|| (make_sparse(100_000, true), SESSION.create_execution_ctx())) .bench_values(|(array, mut ctx)| { divan::black_box(is_constant(&array, &mut ctx).vortex_expect("is_constant")) }); @@ -79,6 +102,29 @@ fn sparse_sum(bencher: Bencher) { }); } +#[divan::bench] +fn sparse_min_max(bencher: Bencher) { + bencher + .with_inputs(|| (make_sparse(40_000, false), SESSION.create_execution_ctx())) + .bench_values(|(array, mut ctx)| { + divan::black_box(min_max(&array, &mut ctx).vortex_expect("min_max")) + }); +} + +#[divan::bench] +fn sparse_null_count(bencher: Bencher) { + bencher + .with_inputs(|| { + ( + make_sparse_nullable(130_000), + SESSION.create_execution_ctx(), + ) + }) + .bench_values(|(array, mut ctx)| { + divan::black_box(null_count(&array, &mut ctx).vortex_expect("null_count")) + }); +} + #[divan::bench] fn sparse_compare(bencher: Bencher) { bencher diff --git a/encodings/sparse/public-api.lock b/encodings/sparse/public-api.lock index 5c82c1c1636..4b1badddae4 100644 --- a/encodings/sparse/public-api.lock +++ b/encodings/sparse/public-api.lock @@ -68,6 +68,10 @@ impl vortex_array::arrays::slice::SliceKernel for vortex_sparse::Sparse pub fn vortex_sparse::Sparse::slice(vortex_array::array::view::ArrayView<'_, Self>, core::ops::range::Range, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::scalar_fn::fns::between::kernel::BetweenKernel for vortex_sparse::Sparse + +pub fn vortex_sparse::Sparse::between(vortex_array::array::view::ArrayView<'_, Self>, &vortex_array::array::erased::ArrayRef, &vortex_array::array::erased::ArrayRef, &vortex_array::scalar_fn::fns::between::BetweenOptions, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> + impl vortex_array::scalar_fn::fns::binary::compare::CompareKernel for vortex_sparse::Sparse pub fn vortex_sparse::Sparse::compare(vortex_array::array::view::ArrayView<'_, Self>, &vortex_array::array::erased::ArrayRef, vortex_array::scalar_fn::fns::operators::CompareOperator, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> @@ -76,6 +80,10 @@ impl vortex_array::scalar_fn::fns::cast::kernel::CastReduce for vortex_sparse::S pub fn vortex_sparse::Sparse::cast(vortex_array::array::view::ArrayView<'_, Self>, &vortex_array::dtype::DType) -> vortex_error::VortexResult> +impl vortex_array::scalar_fn::fns::fill_null::kernel::FillNullKernel for vortex_sparse::Sparse + +pub fn vortex_sparse::Sparse::fill_null(vortex_array::array::view::ArrayView<'_, Self>, &vortex_array::scalar::Scalar, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> + impl vortex_array::scalar_fn::fns::not::kernel::NotReduce for vortex_sparse::Sparse pub fn vortex_sparse::Sparse::invert(vortex_array::array::view::ArrayView<'_, Self>) -> vortex_error::VortexResult> diff --git a/encodings/sparse/src/compute/between.rs b/encodings/sparse/src/compute/between.rs new file mode 100644 index 00000000000..053a6694131 --- /dev/null +++ b/encodings/sparse/src/compute/between.rs @@ -0,0 +1,137 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::ConstantArray; +use vortex_array::builtins::ArrayBuiltins; +use vortex_array::scalar_fn::fns::between::BetweenKernel; +use vortex_array::scalar_fn::fns::between::BetweenOptions; +use vortex_error::VortexResult; + +use crate::Sparse; +use crate::SparseExt as _; + +/// Sparse-specific between kernel. +/// +/// `lower <= x <= upper` (with per-bound strictness) over a Sparse column with constant +/// bounds is itself sparse: every unpatched position resolves to `between(F, lo, hi)` and +/// every patched position to `between(patch, lo, hi)`. We push the range check into the +/// patches and rebuild a `Sparse` with the new fill, preserving downstream sparsity. +/// +/// Declines (falls back to canonical) unless both bounds are constants. +impl BetweenKernel for Sparse { + fn between( + array: ArrayView<'_, Self>, + lower: &ArrayRef, + upper: &ArrayRef, + options: &BetweenOptions, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + let (Some(lo), Some(hi)) = (lower.as_constant(), upper.as_constant()) else { + return Ok(None); + }; + + let patches = array.patches(); + + let fill_bool = ConstantArray::new(array.fill_scalar().clone(), 1) + .into_array() + .between( + ConstantArray::new(lo.clone(), 1).into_array(), + ConstantArray::new(hi.clone(), 1).into_array(), + options.clone(), + )? + .execute_scalar(0, ctx)?; + + let new_patches = patches.map_values(|values| { + let len = values.len(); + values.between( + ConstantArray::new(lo.clone(), len).into_array(), + ConstantArray::new(hi.clone(), len).into_array(), + options.clone(), + ) + })?; + + Ok(Some( + Sparse::try_new_from_patches(new_patches, fill_bool)?.into_array(), + )) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rstest::rstest; + use vortex_array::Canonical; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::ConstantArray; + use vortex_array::assert_arrays_eq; + use vortex_array::builtins::ArrayBuiltins; + use vortex_array::scalar::Scalar; + use vortex_array::scalar_fn::fns::between::BetweenOptions; + use vortex_array::scalar_fn::fns::between::StrictComparison; + use vortex_array::session::ArraySession; + use vortex_buffer::buffer; + use vortex_session::VortexSession; + + use crate::Sparse; + use crate::initialize; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + initialize(&session); + session + }); + + #[rstest] + #[case(0i32, 100i32, StrictComparison::NonStrict, StrictComparison::NonStrict)] + #[case(5i32, 25i32, StrictComparison::Strict, StrictComparison::Strict)] + #[case(1i32, 20i32, StrictComparison::NonStrict, StrictComparison::Strict)] + fn between_matches_canonical( + #[case] lo: i32, + #[case] hi: i32, + #[case] lower_strict: StrictComparison, + #[case] upper_strict: StrictComparison, + ) { + let array = Sparse::try_new( + buffer![1u64, 3, 5].into_array(), + buffer![10i32, 20, 30].into_array(), + 8, + Scalar::from(1i32), + ) + .unwrap() + .into_array(); + let len = array.len(); + let options = BetweenOptions { + lower_strict, + upper_strict, + }; + + let lower = ConstantArray::new(Scalar::from(lo), len).into_array(); + let upper = ConstantArray::new(Scalar::from(hi), len).into_array(); + + let mut ctx = SESSION.create_execution_ctx(); + + // Kernel path: between pushes through the Sparse encoding. + let kernel = array + .clone() + .between(lower.clone(), upper.clone(), options.clone()) + .unwrap() + .execute::(&mut ctx) + .unwrap(); + + // Baseline: canonicalize the input first so between runs on a PrimitiveArray. + let canonical_input = array.execute::(&mut ctx).unwrap().into_array(); + let baseline = canonical_input + .between(lower, upper, options) + .unwrap() + .execute::(&mut ctx) + .unwrap(); + + assert_arrays_eq!(kernel, baseline); + } +} diff --git a/encodings/sparse/src/compute/compare.rs b/encodings/sparse/src/compute/compare.rs index fb6cadece0f..c64e8142298 100644 --- a/encodings/sparse/src/compute/compare.rs +++ b/encodings/sparse/src/compute/compare.rs @@ -66,7 +66,6 @@ mod tests { use vortex_array::scalar::Scalar; use vortex_array::scalar_fn::fns::operators::Operator; use vortex_array::session::ArraySession; - use vortex_array::session::ArraySessionExt; use vortex_buffer::buffer; use vortex_session::VortexSession; @@ -80,12 +79,6 @@ mod tests { session }); - static CANONICAL_SESSION: LazyLock = LazyLock::new(|| { - let session = VortexSession::empty().with::(); - session.arrays().register(Sparse); - session - }); - #[rstest] #[case::eq_fill(Scalar::from(1i32), Operator::Eq)] #[case::eq_patch(Scalar::from(10i32), Operator::Eq)] @@ -101,20 +94,22 @@ mod tests { ) .unwrap(); let arr = array.into_array(); - let len = arr.len(); - let mut k_ctx = SESSION.create_execution_ctx(); + let mut ctx = SESSION.create_execution_ctx(); + + // Kernel path: compare pushes through the Sparse encoding. let kernel_bool = arr .binary(ConstantArray::new(rhs.clone(), len).into_array(), op) .unwrap() - .execute::(&mut k_ctx) + .execute::(&mut ctx) .unwrap(); - let mut c_ctx = CANONICAL_SESSION.create_execution_ctx(); - let canonical_bool = arr + // Baseline: canonicalize first, then compare on the PrimitiveArray. + let canonical_input = arr.execute::(&mut ctx).unwrap().into_array(); + let canonical_bool = canonical_input .binary(ConstantArray::new(rhs, len).into_array(), op) .unwrap() - .execute::(&mut c_ctx) + .execute::(&mut ctx) .unwrap(); assert_arrays_eq!(kernel_bool, canonical_bool); diff --git a/encodings/sparse/src/compute/fill_null.rs b/encodings/sparse/src/compute/fill_null.rs new file mode 100644 index 00000000000..fd62926b2f3 --- /dev/null +++ b/encodings/sparse/src/compute/fill_null.rs @@ -0,0 +1,111 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::builtins::ArrayBuiltins; +use vortex_array::scalar::Scalar; +use vortex_array::scalar_fn::fns::fill_null::FillNullKernel; +use vortex_error::VortexResult; + +use crate::Sparse; +use crate::SparseExt as _; + +/// Sparse-specific fill_null kernel. +/// +/// `fill_null(Sparse{ F, patches }, v)` replaces nulls in the fill and in each patch value +/// with the (non-null) `v`, staying sparse: the new fill is `v` if `F` was null, else `F` +/// cast to the non-nullable result dtype. The work is `O(P)`. +impl FillNullKernel for Sparse { + fn fill_null( + array: ArrayView<'_, Self>, + fill_value: &Scalar, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + let new_fill = if array.fill_scalar().is_null() { + fill_value.clone() + } else { + array.fill_scalar().cast(fill_value.dtype())? + }; + + let new_patches = array + .patches() + .map_values(|values| values.fill_null(fill_value.clone()))?; + + Ok(Some( + Sparse::try_new_from_patches(new_patches, new_fill)?.into_array(), + )) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rstest::rstest; + use vortex_array::Canonical; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::assert_arrays_eq; + use vortex_array::builtins::ArrayBuiltins; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; + use vortex_array::scalar::Scalar; + use vortex_array::session::ArraySession; + use vortex_buffer::buffer; + use vortex_session::VortexSession; + + use crate::Sparse; + use crate::initialize; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + initialize(&session); + session + }); + + fn nullable_i32() -> DType { + DType::Primitive(PType::I32, Nullability::Nullable) + } + + #[rstest] + // null fill, some null patches + #[case(Sparse::try_new( + buffer![1u64, 3, 5].into_array(), + PrimitiveArray::from_option_iter([Some(10i32), None, Some(30)]).into_array().cast(nullable_i32()).unwrap(), + 8, + Scalar::null(nullable_i32()), + ).unwrap().into_array())] + // non-null fill, nullable patches with a null + #[case(Sparse::try_new( + buffer![0u64, 2].into_array(), + PrimitiveArray::from_option_iter([Some(7i32), None]).into_array().cast(nullable_i32()).unwrap(), + 4, + Scalar::from(1i32).cast(&nullable_i32()).unwrap(), + ).unwrap().into_array())] + fn fill_null_matches_canonical(#[case] array: vortex_array::ArrayRef) { + let mut ctx = SESSION.create_execution_ctx(); + let fill = Scalar::from(0i32); + + // Kernel path: fill_null pushes through the Sparse encoding. + let kernel = array + .fill_null(fill.clone()) + .unwrap() + .execute::(&mut ctx) + .unwrap(); + + // Baseline: canonicalize first, then fill_null on the PrimitiveArray. + let canonical_input = array.execute::(&mut ctx).unwrap().into_array(); + let baseline = canonical_input + .fill_null(fill) + .unwrap() + .execute::(&mut ctx) + .unwrap(); + + assert_arrays_eq!(kernel, baseline); + } +} diff --git a/encodings/sparse/src/compute/min_max.rs b/encodings/sparse/src/compute/min_max.rs new file mode 100644 index 00000000000..22ff3e86c53 --- /dev/null +++ b/encodings/sparse/src/compute/min_max.rs @@ -0,0 +1,110 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::aggregate_fn::Accumulator; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::DynAccumulator; +use vortex_array::aggregate_fn::EmptyOptions; +use vortex_array::aggregate_fn::fns::min_max::MinMax; +use vortex_array::aggregate_fn::kernels::DynAggregateKernel; +use vortex_array::arrays::ConstantArray; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; + +use crate::Sparse; +use crate::SparseExt as _; + +/// Sparse-specific min/max kernel. +/// +/// `min/max(Sparse{ F, patches })` folds the min/max of `patch_values` together with the +/// fill scalar `F` — but only when `F` is reachable (`P < N`) and valid. The work is +/// `O(P)` instead of `O(N)`. +#[derive(Debug)] +pub(crate) struct SparseMinMaxKernel; + +impl DynAggregateKernel for SparseMinMaxKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + let Some(sparse) = batch.as_opt::() else { + return Ok(None); + }; + + let patches = sparse.patches(); + + let mut acc = Accumulator::try_new(MinMax, EmptyOptions, batch.dtype().clone())?; + + if !patches.values().is_empty() { + acc.accumulate(patches.values(), ctx)?; + } + + // Fold the fill value in only when at least one position is unpatched and the fill + // is non-null (null fill never participates in min/max). + if patches.num_patches() < sparse.len() && sparse.fill_scalar().is_valid() { + let fill_array = ConstantArray::new(sparse.fill_scalar().clone(), 1).into_array(); + acc.accumulate(&fill_array, ctx)?; + } + + Ok(Some(acc.partial_scalar()?)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::aggregate_fn::fns::min_max::MinMaxResult; + use vortex_array::aggregate_fn::fns::min_max::min_max; + use vortex_array::scalar::Scalar; + use vortex_array::session::ArraySession; + use vortex_array::session::ArraySessionExt; + use vortex_buffer::buffer; + use vortex_session::VortexSession; + + use crate::Sparse; + use crate::SparseArray; + use crate::initialize; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + initialize(&session); + session + }); + + static CANONICAL_SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + session.arrays().register(Sparse); + session + }); + + #[rstest] + // fill below all patches + #[case(Sparse::try_new(buffer![1u64, 3, 5].into_array(), buffer![10i32, 20, 30].into_array(), 8, Scalar::from(1i32)).unwrap())] + // fill above all patches + #[case(Sparse::try_new(buffer![1u64, 3, 5].into_array(), buffer![10i32, 20, 30].into_array(), 8, Scalar::from(99i32)).unwrap())] + // fill in the middle + #[case(Sparse::try_new(buffer![1u64, 3, 5].into_array(), buffer![10i32, 20, 30].into_array(), 8, Scalar::from(15i32)).unwrap())] + // every position patched (fill unreachable) + #[case(Sparse::try_new(buffer![0u64, 1, 2].into_array(), buffer![7i32, 3, 9].into_array(), 3, Scalar::from(99i32)).unwrap())] + fn min_max_matches_canonical(#[case] array: SparseArray) { + let arr = array.into_array(); + let kernel: Option = + min_max(&arr, &mut SESSION.create_execution_ctx()).unwrap(); + let canonical: Option = + min_max(&arr, &mut CANONICAL_SESSION.create_execution_ctx()).unwrap(); + assert_eq!(kernel, canonical); + } +} diff --git a/encodings/sparse/src/compute/mod.rs b/encodings/sparse/src/compute/mod.rs index f4d79579906..fac33edf47e 100644 --- a/encodings/sparse/src/compute/mod.rs +++ b/encodings/sparse/src/compute/mod.rs @@ -1,10 +1,15 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +mod between; mod cast; mod compare; +mod fill_null; mod filter; pub(crate) mod is_constant; +pub(crate) mod min_max; +pub(crate) mod nan_count; +pub(crate) mod null_count; pub(crate) mod sum; mod take; diff --git a/encodings/sparse/src/compute/nan_count.rs b/encodings/sparse/src/compute/nan_count.rs new file mode 100644 index 00000000000..7fe03e21103 --- /dev/null +++ b/encodings/sparse/src/compute/nan_count.rs @@ -0,0 +1,114 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::aggregate_fn::Accumulator; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::AggregateFnVTable as _; +use vortex_array::aggregate_fn::DynAccumulator; +use vortex_array::aggregate_fn::EmptyOptions; +use vortex_array::aggregate_fn::fns::nan_count::NanCount; +use vortex_array::aggregate_fn::kernels::DynAggregateKernel; +use vortex_array::arrays::ConstantArray; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; + +use crate::Sparse; +use crate::SparseExt as _; + +/// Sparse-specific NaN-count kernel. +/// +/// `nan_count(Sparse{ F, patches }) = nan_count(patch_values) + (F is NaN ? N - P : 0)`. +/// +/// Declines for non-float dtypes. The work is `O(P)` instead of `O(N)`. +#[derive(Debug)] +pub(crate) struct SparseNanCountKernel; + +impl DynAggregateKernel for SparseNanCountKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + let Some(sparse) = batch.as_opt::() else { + return Ok(None); + }; + + // NaN count is only defined for floating-point dtypes. + if NanCount + .return_dtype(&EmptyOptions, batch.dtype()) + .is_none() + { + return Ok(None); + } + + let patches = sparse.patches(); + + let mut acc = Accumulator::try_new(NanCount, EmptyOptions, batch.dtype().clone())?; + + let n_fill = sparse.len() - patches.num_patches(); + if n_fill > 0 { + // The Constant accumulate path checks `is_nan` once and multiplies by length. + let fill_array = ConstantArray::new(sparse.fill_scalar().clone(), n_fill).into_array(); + acc.accumulate(&fill_array, ctx)?; + } + + if !patches.values().is_empty() { + acc.accumulate(patches.values(), ctx)?; + } + + Ok(Some(acc.partial_scalar()?)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::aggregate_fn::fns::nan_count::nan_count; + use vortex_array::scalar::Scalar; + use vortex_array::session::ArraySession; + use vortex_array::session::ArraySessionExt; + use vortex_buffer::buffer; + use vortex_session::VortexSession; + + use crate::Sparse; + use crate::SparseArray; + use crate::initialize; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + initialize(&session); + session + }); + + static CANONICAL_SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + session.arrays().register(Sparse); + session + }); + + #[rstest] + // NaN fill value → all unpatched positions are NaN + #[case(Sparse::try_new(buffer![1u64, 3].into_array(), buffer![1.0f32, 2.0].into_array(), 6, Scalar::from(f32::NAN)).unwrap())] + // NaN patch values, finite fill + #[case(Sparse::try_new(buffer![1u64, 3].into_array(), buffer![f32::NAN, 2.0].into_array(), 6, Scalar::from(0.0f32)).unwrap())] + // no NaNs anywhere + #[case(Sparse::try_new(buffer![1u64, 3].into_array(), buffer![1.0f32, 2.0].into_array(), 6, Scalar::from(0.0f32)).unwrap())] + fn nan_count_matches_canonical(#[case] array: SparseArray) { + let arr = array.into_array(); + let kernel = nan_count(&arr, &mut SESSION.create_execution_ctx()).unwrap(); + let canonical = nan_count(&arr, &mut CANONICAL_SESSION.create_execution_ctx()).unwrap(); + assert_eq!(kernel, canonical); + } +} diff --git a/encodings/sparse/src/compute/null_count.rs b/encodings/sparse/src/compute/null_count.rs new file mode 100644 index 00000000000..ad5e875ea6c --- /dev/null +++ b/encodings/sparse/src/compute/null_count.rs @@ -0,0 +1,118 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::fns::null_count::NullCount; +use vortex_array::aggregate_fn::fns::null_count::null_count; +use vortex_array::aggregate_fn::kernels::DynAggregateKernel; +use vortex_array::dtype::Nullability::NonNullable; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; + +use crate::Sparse; +use crate::SparseExt as _; + +/// Sparse-specific null-count kernel. +/// +/// `null_count(Sparse{ F, patches }) = null_count(patch_values) + (F is null ? N - P : 0)`. +/// +/// When the fill is non-null this is just the patches' null count (often a cached `O(1)` +/// statistic); either way the work is `O(P)` instead of `O(N)`. +#[derive(Debug)] +pub(crate) struct SparseNullCountKernel; + +impl DynAggregateKernel for SparseNullCountKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + let Some(sparse) = batch.as_opt::() else { + return Ok(None); + }; + + let patches = sparse.patches(); + let fill_nulls = if sparse.fill_scalar().is_null() { + (sparse.len() - patches.num_patches()) as u64 + } else { + 0 + }; + let patch_nulls = null_count(patches.values(), ctx)? as u64; + + Ok(Some(Scalar::primitive( + fill_nulls + patch_nulls, + NonNullable, + ))) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::aggregate_fn::fns::null_count::null_count; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::builtins::ArrayBuiltins; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; + use vortex_array::scalar::Scalar; + use vortex_array::session::ArraySession; + use vortex_array::session::ArraySessionExt; + use vortex_buffer::buffer; + use vortex_session::VortexSession; + + use crate::Sparse; + use crate::SparseArray; + use crate::initialize; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + initialize(&session); + session + }); + + static CANONICAL_SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + session.arrays().register(Sparse); + session + }); + + fn nullable_i32() -> DType { + DType::Primitive(PType::I32, Nullability::Nullable) + } + + #[rstest] + // non-null fill, no null patches → 0 + #[case(Sparse::try_new(buffer![1u64, 3].into_array(), buffer![10i32, 20].into_array(), 5, Scalar::from(1i32)).unwrap())] + // null fill (8 - 2 = 6 fill nulls), patches non-null + #[case(Sparse::try_new( + buffer![1u64, 3].into_array(), + PrimitiveArray::from_option_iter([Some(10i32), Some(20)]).into_array().cast(nullable_i32()).unwrap(), + 8, + Scalar::null(nullable_i32()), + ).unwrap())] + // null fill + some null patches + #[case(Sparse::try_new( + buffer![0u64, 2, 4].into_array(), + PrimitiveArray::from_option_iter([Some(10i32), None, Some(30)]).into_array().cast(nullable_i32()).unwrap(), + 6, + Scalar::null(nullable_i32()), + ).unwrap())] + fn null_count_matches_canonical(#[case] array: SparseArray) { + let arr = array.into_array(); + let kernel = null_count(&arr, &mut SESSION.create_execution_ctx()).unwrap(); + let canonical = null_count(&arr, &mut CANONICAL_SESSION.create_execution_ctx()).unwrap(); + assert_eq!(kernel, canonical); + } +} diff --git a/encodings/sparse/src/kernel.rs b/encodings/sparse/src/kernel.rs index 3b5634a0473..0f5d9fd51c0 100644 --- a/encodings/sparse/src/kernel.rs +++ b/encodings/sparse/src/kernel.rs @@ -5,12 +5,16 @@ use vortex_array::arrays::dict::TakeExecuteAdaptor; use vortex_array::arrays::filter::FilterExecuteAdaptor; use vortex_array::arrays::slice::SliceExecuteAdaptor; use vortex_array::kernel::ParentKernelSet; +use vortex_array::scalar_fn::fns::between::BetweenExecuteAdaptor; use vortex_array::scalar_fn::fns::binary::CompareExecuteAdaptor; +use vortex_array::scalar_fn::fns::fill_null::FillNullExecuteAdaptor; use crate::Sparse; pub(crate) static PARENT_KERNELS: ParentKernelSet = ParentKernelSet::new(&[ + ParentKernelSet::lift(&BetweenExecuteAdaptor(Sparse)), ParentKernelSet::lift(&CompareExecuteAdaptor(Sparse)), + ParentKernelSet::lift(&FillNullExecuteAdaptor(Sparse)), ParentKernelSet::lift(&FilterExecuteAdaptor(Sparse)), ParentKernelSet::lift(&SliceExecuteAdaptor(Sparse)), ParentKernelSet::lift(&TakeExecuteAdaptor(Sparse)), diff --git a/encodings/sparse/src/lib.rs b/encodings/sparse/src/lib.rs index a69b7130842..42b5cd46724 100644 --- a/encodings/sparse/src/lib.rs +++ b/encodings/sparse/src/lib.rs @@ -70,28 +70,47 @@ mod slice; use vortex_array::aggregate_fn::AggregateFnVTable as _; use vortex_array::aggregate_fn::fns::is_constant::IsConstant; +use vortex_array::aggregate_fn::fns::min_max::MinMax; +use vortex_array::aggregate_fn::fns::nan_count::NanCount; +use vortex_array::aggregate_fn::fns::null_count::NullCount; use vortex_array::aggregate_fn::fns::sum::Sum; use vortex_array::aggregate_fn::session::AggregateFnSessionExt; use vortex_array::session::ArraySessionExt; /// Initialize Sparse encoding in the given session. /// -/// Registers the Sparse array vtable and its aggregate kernels (`IsConstant`, `Sum`). -/// Compare pushdown is wired through `PARENT_KERNELS` (see `kernel.rs`) and does not -/// require registration here. +/// Registers the Sparse array vtable and its aggregate kernels (`IsConstant`, `Sum`, +/// `MinMax`, `NullCount`, `NanCount`). Compare/between/fill_null pushdown is wired +/// through `PARENT_KERNELS` (see `kernel.rs`) and does not require registration here. pub fn initialize(session: &VortexSession) { session.arrays().register(Sparse); - session.aggregate_fns().register_aggregate_kernel( + let aggregate_fns = session.aggregate_fns(); + aggregate_fns.register_aggregate_kernel( Sparse.id(), Some(IsConstant.id()), &compute::is_constant::SparseIsConstantKernel, ); - session.aggregate_fns().register_aggregate_kernel( + aggregate_fns.register_aggregate_kernel( Sparse.id(), Some(Sum.id()), &compute::sum::SparseSumKernel, ); + aggregate_fns.register_aggregate_kernel( + Sparse.id(), + Some(MinMax.id()), + &compute::min_max::SparseMinMaxKernel, + ); + aggregate_fns.register_aggregate_kernel( + Sparse.id(), + Some(NullCount.id()), + &compute::null_count::SparseNullCountKernel, + ); + aggregate_fns.register_aggregate_kernel( + Sparse.id(), + Some(NanCount.id()), + &compute::nan_count::SparseNanCountKernel, + ); } /// A [`Sparse`]-encoded Vortex array.