From fe23cb2d2122666bba29fd0cfadff2383c1f5bf4 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Mon, 23 Mar 2026 11:41:03 +0000 Subject: [PATCH] fix: delta compress with nulls Signed-off-by: Joe Isaacs --- .../src/delta/array/delta_compress.rs | 33 +++++++++++- encodings/fastlanes/src/lib.rs | 50 +++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/encodings/fastlanes/src/delta/array/delta_compress.rs b/encodings/fastlanes/src/delta/array/delta_compress.rs index 07174e294fa..8a49d1a7bc8 100644 --- a/encodings/fastlanes/src/delta/array/delta_compress.rs +++ b/encodings/fastlanes/src/delta/array/delta_compress.rs @@ -17,13 +17,18 @@ use vortex_buffer::BufferMut; use vortex_error::VortexResult; use crate::bit_transpose::transpose_validity; +use crate::fill_forward_nulls; pub fn delta_compress( array: &PrimitiveArray, ctx: &mut ExecutionCtx, ) -> VortexResult<(PrimitiveArray, PrimitiveArray)> { let (bases, deltas) = match_each_unsigned_integer_ptype!(array.ptype(), |T| { - let (bases, deltas) = compress_primitive::(array.as_slice::()); + // Fill-forward null values so that transposed deltas at null positions remain + // small. Without this, bitpacking may skip patches for null positions, and the + // corrupted delta values propagate through the cumulative sum during decompression. + let filled = fill_forward_nulls(array.to_buffer::(), array.validity()); + let (bases, deltas) = compress_primitive::(&filled); // TODO(robert): This can be avoided if we add TransposedBoolArray that performs index translation when necessary. let validity = transpose_validity(array.validity(), ctx)?; ( @@ -124,4 +129,30 @@ mod tests { assert_arrays_eq!(decompressed, array); Ok(()) } + + /// Regression test: delta + bitpacked encoding must correctly round-trip nullable arrays + /// where null positions contain arbitrary values. Without fill-forward, the delta cumulative + /// sum propagates corrupted values from null positions. + #[test] + fn delta_bitpacked_trailing_nulls() { + use vortex_array::IntoArray; + use vortex_array::ToCanonical; + + use crate::bitpack_compress::bitpack_encode; + use crate::delta_compress; + + let array = PrimitiveArray::from_option_iter( + (0u8..200).map(|i| (!(50..100).contains(&i)).then_some(i)), + ); + let (bases, deltas) = delta_compress(&array, &mut SESSION.create_execution_ctx()).unwrap(); + let bitpacked_deltas = bitpack_encode(&deltas, 1, None).unwrap(); + let packed_delta = DeltaArray::try_new( + bases.into_array(), + bitpacked_deltas.into_array(), + 0, + array.len(), + ) + .unwrap(); + assert_arrays_eq!(packed_delta.to_primitive(), array); + } } diff --git a/encodings/fastlanes/src/lib.rs b/encodings/fastlanes/src/lib.rs index b22e9737365..8f7edbc267e 100644 --- a/encodings/fastlanes/src/lib.rs +++ b/encodings/fastlanes/src/lib.rs @@ -7,6 +7,10 @@ pub use bitpacking::*; pub use delta::*; pub use r#for::*; pub use rle::*; +use vortex_array::ToCanonical; +use vortex_array::validity::Validity; +use vortex_buffer::Buffer; +use vortex_buffer::BufferMut; pub mod bit_transpose; mod bitpacking; @@ -51,6 +55,52 @@ pub fn initialize(session: &mut VortexSession) { ); } +/// Fill-forward null values in a buffer, replacing each null with the last valid value seen. +/// +/// Returns the original buffer if there are no nulls (i.e. the validity is +/// `NonNullable` or `AllValid`), avoiding any allocation or copy. +pub(crate) fn fill_forward_nulls( + values: Buffer, + validity: &Validity, +) -> Buffer { + match validity { + Validity::NonNullable | Validity::AllValid => values, + Validity::AllInvalid => Buffer::zeroed(values.len()), + Validity::Array(validity_array) => { + let bit_buffer = validity_array.to_bool().to_bit_buffer(); + let mut last_valid = T::default(); + match values.try_into_mut() { + Ok(mut to_fill_mut) => { + for (v, is_valid) in to_fill_mut.iter_mut().zip(bit_buffer.iter()) { + if is_valid { + last_valid = *v; + } else { + *v = last_valid; + } + } + to_fill_mut.freeze() + } + Err(to_fill) => { + let mut to_fill_mut = BufferMut::::with_capacity(to_fill.len()); + for (v, (out, is_valid)) in to_fill.iter().zip( + to_fill_mut + .spare_capacity_mut() + .iter_mut() + .zip(bit_buffer.iter()), + ) { + if is_valid { + last_valid = *v; + } + out.write(last_valid); + } + unsafe { to_fill_mut.set_len(to_fill.len()) }; + to_fill_mut.freeze() + } + } + } + } +} + #[cfg(test)] mod test { use std::sync::LazyLock;