Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ harness = false
name = "bool_zip"
harness = false

[[bench]]
name = "primitive_zip"
harness = false

[[bench]]
name = "take_primitive"
harness = false
Expand Down
80 changes: 80 additions & 0 deletions vortex-array/benches/primitive_zip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

#![expect(clippy::unwrap_used)]
#![expect(
clippy::cast_possible_truncation,
reason = "benchmark fixtures use indices that fit in the chosen widths"
)]

use divan::Bencher;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::RecursiveCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::builtins::ArrayBuiltins;
use vortex_buffer::BufferMut;
use vortex_mask::Mask;

fn main() {
divan::main();
}

// Sized so the bench stays well under a few hundred microseconds under CodSpeed's instruction-count
// simulation, which runs ~10x the local walltime; the branchless value blend is still exercised.
const LEN: usize = 16_384;

/// Fragmented (alternating) mask: the worst case for the generic run/slice copy path this kernel
/// replaces. The branchless per-row blend is mask-shape-independent, so one shape suffices.
fn mask() -> Mask {
Mask::from_iter((0..LEN).map(|i| i.is_multiple_of(2)))
}

#[divan::bench]
fn nonnull(bencher: Bencher) {
let if_true = nonnull_array(0).into_array();
let if_false = nonnull_array(1_000_000).into_array();
run(bencher, if_true, if_false);
}

#[divan::bench]
fn nullable(bencher: Bencher) {
let if_true = nullable_array(0, 7).into_array();
let if_false = nullable_array(1_000_000, 5).into_array();
run(bencher, if_true, if_false);
}

fn run(bencher: Bencher, if_true: vortex_array::ArrayRef, if_false: vortex_array::ArrayRef) {
let mask = mask();
bencher
.with_inputs(|| {
(
if_true.clone(),
if_false.clone(),
mask.clone().into_array(),
LEGACY_SESSION.create_execution_ctx(),
)
})
.bench_refs(|(t, f, m, ctx)| {
m.zip(t.clone(), f.clone())
.unwrap()
.execute::<RecursiveCanonical>(ctx)
.unwrap();
});
}

fn nonnull_array(base: i64) -> PrimitiveArray {
let mut values = BufferMut::<i64>::with_capacity(LEN);
values.extend((0..LEN as i64).map(|i| base + i));
PrimitiveArray::new(
values.freeze(),
vortex_array::validity::Validity::NonNullable,
)
}

fn nullable_array(base: i64, null_every: usize) -> PrimitiveArray {
PrimitiveArray::from_option_iter(
(0..LEN as i64).map(|i| (!(i as usize).is_multiple_of(null_every)).then_some(base + i)),
)
}
1 change: 1 addition & 0 deletions vortex-array/src/arrays/primitive/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod mask;
pub(crate) mod rules;
mod slice;
mod take;
mod zip;

#[cfg(test)]
mod tests {
Expand Down
216 changes: 216 additions & 0 deletions vortex-array/src/arrays/primitive/compute/zip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::mem::MaybeUninit;

use vortex_buffer::BufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_mask::Mask;

use crate::ArrayRef;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::array::ArrayView;
use crate::arrays::Primitive;
use crate::arrays::PrimitiveArray;
use crate::dtype::NativePType;
use crate::match_each_native_ptype;
use crate::scalar_fn::fns::zip::ZipKernel;
use crate::scalar_fn::fns::zip::zip_validity;

/// A dedicated primitive zip kernel that selects values branchlessly per row.
///
/// The generic zip path copies runs of `if_true`/`if_false` between mask boundaries, which is fast
/// for clustered masks but degrades to per-element work on fragmented masks. This kernel instead
/// walks the mask as 64-bit chunks and blends both sides per row without a data-dependent branch,
/// so the inner loop stays branch-free and auto-vectorizable regardless of mask shape.
impl ZipKernel for Primitive {
fn zip(
if_true: ArrayView<'_, Primitive>,
if_false: &ArrayRef,
mask: &ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
let Some(if_false) = if_false.as_opt::<Primitive>() else {
return Ok(None);
};

if if_true.ptype() != if_false.ptype() {
vortex_bail!(
"zip requires if_true and if_false to share a primitive type, got {} and {}",
if_true.ptype(),
if_false.ptype()
);
}

// Null mask entries select `if_false`, matching `Zip`'s SQL ELSE semantics.
let mask = mask.try_to_mask_fill_null_false(ctx)?;
match &mask {
// Defer trivial masks to the generic zip, which just casts the surviving side.
Mask::AllTrue(_) | Mask::AllFalse(_) => return Ok(None),
Mask::Values(_) => {}
}

let validity = zip_validity(if_true.validity()?, if_false.validity()?, &mask)?;

let array = match_each_native_ptype!(if_true.ptype(), |T| {
let values =
select_values::<T>(if_true.as_slice::<T>(), if_false.as_slice::<T>(), &mask);
PrimitiveArray::new(values.freeze(), validity).into_array()
});
Ok(Some(array))
}
}

/// Branchlessly blend `if_true` and `if_false` per row into a fresh value buffer.
fn select_values<T: NativePType>(
true_values: &[T],
false_values: &[T],
mask: &Mask,
) -> BufferMut<T> {
let len = true_values.len();
let mut out = BufferMut::<T>::with_capacity(len);
{
let out_slice = out.spare_capacity_mut();

let mask_bits = mask
.values()
.vortex_expect("mask is Mask::Values")
.bit_buffer();
let chunks = mask_bits.chunks();

let mut base = 0;
for word in chunks.iter() {
let end = base + 64;
select_block(
word,
&true_values[base..end],
&false_values[base..end],
&mut out_slice[base..end],
);
base = end;
}

let remainder = chunks.remainder_len();
if remainder > 0 {
let end = base + remainder;
select_block(
chunks.remainder_bits(),
&true_values[base..end],
&false_values[base..end],
&mut out_slice[base..end],
);
}
}

// SAFETY: `select_block` initialized every slot covered by the chunks plus remainder, i.e. `len`.
unsafe { out.set_len(len) };
out
}

/// Blend one 64-bit mask chunk's worth of rows: bit `j` (LSB-first) keeps `true_values[j]`, an unset
/// bit keeps `false_values[j]`. Slices are trimmed to the output length up front so the compiler can
/// elide bounds checks and lower the body to a vector blend / conditional move.
#[inline]
fn select_block<T: NativePType>(
word: u64,
true_values: &[T],
false_values: &[T],
out: &mut [MaybeUninit<T>],
) {
let n = out.len();
let true_values = &true_values[..n];
let false_values = &false_values[..n];
for j in 0..n {
let pick = (word >> j) & 1 == 1;
out[j].write(if pick {
true_values[j]
} else {
false_values[j]
});
}
}

#[cfg(test)]
mod tests {
#![allow(
clippy::cast_possible_truncation,
reason = "test fixtures use small indices that fit the target widths"
)]

use vortex_error::VortexResult;
use vortex_mask::Mask;

use crate::ArrayRef;
use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::arrays::Primitive;
use crate::arrays::PrimitiveArray;
use crate::assert_arrays_eq;
use crate::builtins::ArrayBuiltins;

/// The branchless kernel must agree with the scalar reference across the chunk boundary (index
/// 63/64) and the trailing remainder, for non-nullable inputs.
#[test]
fn zip_nonnull_spans_mask_chunks() -> VortexResult<()> {
let len = 150usize;
let if_true = PrimitiveArray::from_iter(0..len as i64).into_array();
let if_false = PrimitiveArray::from_iter((0..len as i64).map(|i| 1_000 + i)).into_array();

let bits: Vec<bool> = (0..len).map(|i| i.is_multiple_of(3) || i == 64).collect();
let mask = Mask::from_iter(bits.iter().copied());

let mut ctx = LEGACY_SESSION.create_execution_ctx();
let result = mask
.into_array()
.zip(if_true, if_false)?
.execute::<ArrayRef>(&mut ctx)?;
assert!(result.is::<Primitive>());

let expected = PrimitiveArray::from_iter(
(0..len).map(|i| if bits[i] { i as i64 } else { 1_000 + i as i64 }),
)
.into_array();
assert_arrays_eq!(result, expected);
Ok(())
}

/// With `Validity::Array` on both sides the kernel must select values and validity from the
/// chosen side across the chunk boundary.
#[test]
fn zip_nullable_selects_values_and_validity() -> VortexResult<()> {
let len = 130usize;
let if_true =
PrimitiveArray::from_option_iter((0..len as i64).map(|i| (i % 4 != 0).then_some(i)))
.into_array();
let if_false = PrimitiveArray::from_option_iter(
(0..len as i64).map(|i| (i % 5 != 0).then_some(1_000 + i)),
)
.into_array();

let bits: Vec<bool> = (0..len).map(|i| i.is_multiple_of(2)).collect();
let mask = Mask::from_iter(bits.iter().copied());

let mut ctx = LEGACY_SESSION.create_execution_ctx();
let result = mask
.into_array()
.zip(if_true, if_false)?
.execute::<ArrayRef>(&mut ctx)?;
assert!(result.is::<Primitive>());

let expected = PrimitiveArray::from_option_iter((0..len).map(|i| {
let v = i as i64;
if bits[i] {
(!i.is_multiple_of(4)).then_some(v)
} else {
(!i.is_multiple_of(5)).then_some(1_000 + v)
}
}))
.into_array();
assert_arrays_eq!(result, expected);
Ok(())
}
}
2 changes: 2 additions & 0 deletions vortex-array/src/arrays/primitive/vtable/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use crate::kernel::ParentKernelSet;
use crate::scalar_fn::fns::between::BetweenExecuteAdaptor;
use crate::scalar_fn::fns::cast::CastExecuteAdaptor;
use crate::scalar_fn::fns::fill_null::FillNullExecuteAdaptor;
use crate::scalar_fn::fns::zip::ZipExecuteAdaptor;

pub(super) const PARENT_KERNELS: ParentKernelSet<Primitive> = ParentKernelSet::new(&[
ParentKernelSet::lift(&BetweenExecuteAdaptor(Primitive)),
ParentKernelSet::lift(&CastExecuteAdaptor(Primitive)),
ParentKernelSet::lift(&FillNullExecuteAdaptor(Primitive)),
ParentKernelSet::lift(&TakeExecuteAdaptor(Primitive)),
ParentKernelSet::lift(&ZipExecuteAdaptor(Primitive)),
]);
Loading