Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions encodings/alp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub use alp_rd::*;
use vortex_array::ArrayVTable;
use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::nan_count::NanCount;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::arrays::patched::use_experimental_patches;
use vortex_array::session::ArraySessionExt;
Expand All @@ -46,4 +48,14 @@ pub fn initialize(session: &VortexSession) {
Some(NanCount.id()),
&compute::nan_count::ALPNanCountKernel,
);
session.aggregate_fns().register_aggregate_kernel(
ALP.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
session.aggregate_fns().register_aggregate_kernel(
ALPRD.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
}
2 changes: 2 additions & 0 deletions encodings/bytebool/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,6 @@ impl<T: vortex_array::array::typed::TypedArrayRef<vortex_bytebool::ByteBool>> vo

pub fn T::validity(&self) -> vortex_array::validity::Validity

pub fn vortex_bytebool::initialize(&vortex_session::VortexSession)

pub type vortex_bytebool::ByteBoolArray = vortex_array::array::typed::Array<vortex_bytebool::ByteBool>
17 changes: 17 additions & 0 deletions encodings/bytebool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,26 @@
//! [spec]: https://arrow.apache.org/docs/format/CanonicalExtensions.html#bit-boolean

pub use array::*;
use vortex_array::ArrayVTable;
use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;

mod array;
mod compute;
mod kernel;
mod rules;
mod slice;

/// Initialize ByteBool encoding in the given session.
pub fn initialize(session: &VortexSession) {
session.arrays().register(ByteBool);
session.aggregate_fns().register_aggregate_kernel(
ByteBool.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
}
7 changes: 7 additions & 0 deletions encodings/datetime-parts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ mod timestamp;
use vortex_array::ArrayVTable;
use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::is_constant::IsConstant;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;
Expand All @@ -27,6 +29,11 @@ pub fn initialize(session: &VortexSession) {
Some(IsConstant.id()),
&compute::is_constant::DateTimePartsIsConstantKernel,
);
session.aggregate_fns().register_aggregate_kernel(
DateTimeParts.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
}

#[cfg(test)]
Expand Down
7 changes: 7 additions & 0 deletions encodings/decimal-byte-parts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub use decimal_byte_parts::*;
use vortex_array::ArrayVTable;
use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::is_constant::IsConstant;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;
Expand All @@ -29,4 +31,9 @@ pub fn initialize(session: &VortexSession) {
Some(IsConstant.id()),
&DecimalBytePartsIsConstantKernel,
);
session.aggregate_fns().register_aggregate_kernel(
DecimalByteParts.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
}
22 changes: 22 additions & 0 deletions encodings/fastlanes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use vortex_array::ArrayVTable;
use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::is_constant::IsConstant;
use vortex_array::aggregate_fn::fns::is_sorted::IsSorted;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::arrays::patched::use_experimental_patches;
use vortex_array::session::ArraySessionExt;
Expand Down Expand Up @@ -64,6 +66,26 @@ pub fn initialize(session: &VortexSession) {
Some(IsSorted.id()),
&FoRIsSortedKernel,
);
session.aggregate_fns().register_aggregate_kernel(
BitPacked.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
session.aggregate_fns().register_aggregate_kernel(
Delta.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
session.aggregate_fns().register_aggregate_kernel(
FoR.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
session.aggregate_fns().register_aggregate_kernel(
RLE.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
}

/// Fill-forward null values in a buffer, replacing each null with the last valid value seen.
Expand Down
2 changes: 2 additions & 0 deletions encodings/fsst/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,6 @@ pub fn vortex_fsst::fsst_compress_iter<'a, I>(I, usize, vortex_array::dtype::DTy

pub fn vortex_fsst::fsst_train_compressor<A: vortex_array::accessor::ArrayAccessor<[u8]>>(&A) -> fsst::Compressor

pub fn vortex_fsst::initialize(&vortex_session::VortexSession)

pub type vortex_fsst::FSSTArray = vortex_array::array::typed::Array<vortex_fsst::FSST>
1 change: 1 addition & 0 deletions encodings/fsst/src/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod cast;
mod compare;
mod filter;
mod like;
pub(crate) mod uncompressed_size;

use vortex_array::ArrayRef;
use vortex_array::ArrayView;
Expand Down
104 changes: 104 additions & 0 deletions encodings/fsst/src/compute/uncompressed_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::mem::size_of;

use vortex_array::ArrayRef;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::aggregate_fn::AggregateFnRef;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use vortex_array::aggregate_fn::kernels::DynAggregateKernel;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::arrays::varbinview::build_views::BinaryView;
use vortex_array::dtype::IntegerPType;
use vortex_array::match_each_integer_ptype;
use vortex_array::scalar::Scalar;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_mask::Mask;

use crate::FSST;
use crate::FSSTArrayExt;

#[derive(Debug)]
pub(crate) struct FSSTUncompressedSizeInBytesKernel;

impl DynAggregateKernel for FSSTUncompressedSizeInBytesKernel {
fn aggregate(
&self,
aggregate_fn: &AggregateFnRef,
batch: &ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<Scalar>> {
if !aggregate_fn.is::<UncompressedSizeInBytes>() {
return Ok(None);
}

let Some(array) = batch.as_opt::<FSST>() else {
return Ok(None);
};

let views_size = checked_len_mul(array.len(), size_of::<BinaryView>(), "binary view")?;
let data_size = uncompressed_lengths_size(
&array
.uncompressed_lengths()
.clone()
.execute::<PrimitiveArray>(ctx)?,
)?;
let validity_size = validity_uncompressed_size_in_bytes(
array
.as_ref()
.validity()?
.execute_mask(array.as_ref().len(), ctx)?,
)?;

let size = views_size
.checked_add(data_size)
.and_then(|size| size.checked_add(validity_size))
.ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?;

Ok(Some(Scalar::from(size)))
}
}

fn uncompressed_lengths_size(lengths: &PrimitiveArray) -> VortexResult<u64> {
match_each_integer_ptype!(lengths.ptype(), |P| {
uncompressed_lengths_size_typed(lengths.as_slice::<P>())
})
}

fn uncompressed_lengths_size_typed<P: IntegerPType>(lengths: &[P]) -> VortexResult<u64> {
let mut size = 0u64;
for len in lengths {
let len = len
.to_u64()
.ok_or_else(|| vortex_err!("uncompressed length cannot be negative"))?;
size = size
.checked_add(len)
.ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?;
}
Ok(size)
}

fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult<u64> {
match validity {
Mask::AllTrue(_) => Ok(0),
Mask::AllFalse(len) => Ok(ConstantArray::new(false, len).into_array().nbytes()),
Mask::Values(values) => u64::try_from(values.len())
.map(|len| len.div_ceil(8))
.map_err(|e| vortex_err!("Failed to convert bit buffer length to u64: {e}")),
}
}

fn checked_len_mul(len: usize, width: usize, name: &str) -> VortexResult<u64> {
let len = u64::try_from(len)
.map_err(|e| vortex_err!("Failed to convert {name} length to u64: {e}"))?;
let width = u64::try_from(width)
.map_err(|e| vortex_err!("Failed to convert {name} byte width to u64: {e}"))?;

len.checked_mul(width)
.ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))
}
16 changes: 16 additions & 0 deletions encodings/fsst/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,19 @@ mod tests;

pub use array::*;
pub use compress::*;
use vortex_array::ArrayVTable;
use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;

/// Initialize FSST encoding in the given session.
pub fn initialize(session: &VortexSession) {
session.arrays().register(FSST);
session.aggregate_fns().register_aggregate_kernel(
FSST.id(),
Some(UncompressedSizeInBytes.id()),
&compute::uncompressed_size::FSSTUncompressedSizeInBytesKernel,
);
}
2 changes: 2 additions & 0 deletions encodings/pco/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,6 @@ pub fn vortex_pco::PcoPageInfo::clear(&mut self)

pub fn vortex_pco::PcoPageInfo::encoded_len(&self) -> usize

pub fn vortex_pco::initialize(&vortex_session::VortexSession)

pub type vortex_pco::PcoArray = vortex_array::array::typed::Array<vortex_pco::Pco>
17 changes: 17 additions & 0 deletions encodings/pco/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,23 @@ mod rules;
mod slice;

pub use array::*;
use vortex_array::ArrayVTable;
use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;

/// Initialize Pco encoding in the given session.
pub fn initialize(session: &VortexSession) {
session.arrays().register(Pco);
session.aggregate_fns().register_aggregate_kernel(
Pco.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
}

#[derive(Clone, prost::Message)]
pub struct PcoPageInfo {
Expand Down
7 changes: 7 additions & 0 deletions encodings/runend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::is_constant::IsConstant;
use vortex_array::aggregate_fn::fns::is_sorted::IsSorted;
use vortex_array::aggregate_fn::fns::min_max::MinMax;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;
Expand All @@ -55,6 +57,11 @@ pub fn initialize(session: &VortexSession) {
Some(IsSorted.id()),
&compute::is_sorted::RunEndIsSortedKernel,
);
session.aggregate_fns().register_aggregate_kernel(
RunEnd.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
}

#[cfg(test)]
Expand Down
7 changes: 7 additions & 0 deletions encodings/sequence/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use vortex_array::ArrayVTable;
use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::is_sorted::IsSorted;
use vortex_array::aggregate_fn::fns::min_max::MinMax;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::session::ArraySessionExt;
use vortex_session::VortexSession;
Expand All @@ -39,6 +41,11 @@ pub fn initialize(session: &VortexSession) {
Some(IsSorted.id()),
&compute::is_sorted::SequenceIsSortedKernel,
);
session.aggregate_fns().register_aggregate_kernel(
Sequence.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
}

// TODO(joe): hook up to the compressor
Expand Down
2 changes: 2 additions & 0 deletions encodings/sparse/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,6 @@ pub fn vortex_array::array::view::ArrayView<'_, vortex_sparse::Sparse>::patches(

pub fn vortex_array::array::view::ArrayView<'_, vortex_sparse::Sparse>::resolved_patches(&self) -> vortex_error::VortexResult<vortex_array::patches::Patches>

pub fn vortex_sparse::initialize(&vortex_session::VortexSession)

pub type vortex_sparse::SparseArray = vortex_array::array::typed::Array<vortex_sparse::Sparse>
15 changes: 15 additions & 0 deletions encodings/sparse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ use vortex_array::ExecutionCtx;
use vortex_array::ExecutionResult;
use vortex_array::IntoArray;
use vortex_array::Precision;
use vortex_array::aggregate_fn::AggregateFnVTable;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel;
use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes;
use vortex_array::aggregate_fn::session::AggregateFnSessionExt;
use vortex_array::arrays::BoolArray;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::Primitive;
Expand All @@ -42,6 +46,7 @@ use vortex_array::scalar::Scalar;
use vortex_array::scalar::ScalarValue;
use vortex_array::scalar_fn::fns::operators::Operator;
use vortex_array::serde::ArrayChildren;
use vortex_array::session::ArraySessionExt;
use vortex_array::validity::Validity;
use vortex_array::vtable::VTable;
use vortex_array::vtable::ValidityVTable;
Expand All @@ -68,6 +73,16 @@ mod ops;
mod rules;
mod slice;

/// Initialize Sparse encoding in the given session.
pub fn initialize(session: &VortexSession) {
session.arrays().register(Sparse);
session.aggregate_fns().register_aggregate_kernel(
Sparse.id(),
Some(UncompressedSizeInBytes.id()),
&FixedWidthUncompressedSizeInBytesKernel,
);
}

/// A [`Sparse`]-encoded Vortex array.
pub type SparseArray = Array<Sparse>;

Expand Down
2 changes: 2 additions & 0 deletions encodings/zigzag/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ pub fn T::encoded(&self) -> &vortex_array::array::erased::ArrayRef

pub fn T::ptype(&self) -> vortex_array::dtype::ptype::PType

pub fn vortex_zigzag::initialize(&vortex_session::VortexSession)

pub fn vortex_zigzag::zigzag_decode(vortex_array::arrays::primitive::vtable::PrimitiveArray) -> vortex_array::arrays::primitive::vtable::PrimitiveArray

pub fn vortex_zigzag::zigzag_encode(vortex_array::array::view::ArrayView<'_, vortex_array::arrays::primitive::vtable::Primitive>) -> vortex_error::VortexResult<vortex_zigzag::ZigZagArray>
Expand Down
Loading
Loading