Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion encodings/fastlanes/src/delta/array/delta_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ use vortex_buffer::BufferMut;
use vortex_error::VortexResult;

use crate::bit_transpose::transpose_validity;
use crate::fill_forward_nulls;

pub fn delta_compress(
array: &PrimitiveArray,
ctx: &mut ExecutionCtx,
) -> VortexResult<(PrimitiveArray, PrimitiveArray)> {
let (bases, deltas) = match_each_unsigned_integer_ptype!(array.ptype(), |T| {
let (bases, deltas) = compress_primitive::<T, { T::LANES }>(array.as_slice::<T>());
// Fill-forward null values so that transposed deltas at null positions remain
// small. Without this, bitpacking may skip patches for null positions, and the
// corrupted delta values propagate through the cumulative sum during decompression.
let filled = fill_forward_nulls(array.to_buffer::<T>(), array.validity());
let (bases, deltas) = compress_primitive::<T, { T::LANES }>(&filled);
// TODO(robert): This can be avoided if we add TransposedBoolArray that performs index translation when necessary.
let validity = transpose_validity(array.validity(), ctx)?;
(
Expand Down Expand Up @@ -124,4 +129,30 @@ mod tests {
assert_arrays_eq!(decompressed, array);
Ok(())
}

/// Regression test: delta + bitpacked encoding must correctly round-trip nullable arrays
/// where null positions contain arbitrary values. Without fill-forward, the delta cumulative
/// sum propagates corrupted values from null positions.
#[test]
fn delta_bitpacked_trailing_nulls() {
use vortex_array::IntoArray;
use vortex_array::ToCanonical;

use crate::bitpack_compress::bitpack_encode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use crate::bitpack_compress::bitpack_encode;
use crate::delta_compress;

Move up?

use crate::delta_compress;

let array = PrimitiveArray::from_option_iter(
(0u8..200).map(|i| (!(50..100).contains(&i)).then_some(i)),
);
let (bases, deltas) = delta_compress(&array, &mut SESSION.create_execution_ctx()).unwrap();
let bitpacked_deltas = bitpack_encode(&deltas, 1, None).unwrap();
let packed_delta = DeltaArray::try_new(
bases.into_array(),
bitpacked_deltas.into_array(),
0,
array.len(),
)
.unwrap();
assert_arrays_eq!(packed_delta.to_primitive(), array);
}
}
50 changes: 50 additions & 0 deletions encodings/fastlanes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ pub use bitpacking::*;
pub use delta::*;
pub use r#for::*;
pub use rle::*;
use vortex_array::ToCanonical;
use vortex_array::validity::Validity;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;

pub mod bit_transpose;
mod bitpacking;
Expand Down Expand Up @@ -51,6 +55,52 @@ pub fn initialize(session: &mut VortexSession) {
);
}

/// Fill-forward null values in a buffer, replacing each null with the last valid value seen.
///
/// Returns the original buffer if there are no nulls (i.e. the validity is
/// `NonNullable` or `AllValid`), avoiding any allocation or copy.
pub(crate) fn fill_forward_nulls<T: Copy + Default>(
values: Buffer<T>,
validity: &Validity,
) -> Buffer<T> {
match validity {
Validity::NonNullable | Validity::AllValid => values,
Validity::AllInvalid => Buffer::zeroed(values.len()),
Validity::Array(validity_array) => {
let bit_buffer = validity_array.to_bool().to_bit_buffer();
let mut last_valid = T::default();
match values.try_into_mut() {
Ok(mut to_fill_mut) => {
for (v, is_valid) in to_fill_mut.iter_mut().zip(bit_buffer.iter()) {
if is_valid {
last_valid = *v;
} else {
*v = last_valid;
}
}
to_fill_mut.freeze()
}
Err(to_fill) => {
let mut to_fill_mut = BufferMut::<T>::with_capacity(to_fill.len());
for (v, (out, is_valid)) in to_fill.iter().zip(
to_fill_mut
.spare_capacity_mut()
.iter_mut()
.zip(bit_buffer.iter()),
) {
if is_valid {
last_valid = *v;
}
out.write(last_valid);
}
unsafe { to_fill_mut.set_len(to_fill.len()) };
to_fill_mut.freeze()
}
}
}
}
}

#[cfg(test)]
mod test {
use std::sync::LazyLock;
Expand Down
Loading