Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,17 @@ pub struct DecimalBytePartsArray {
// NOTE: the lower_parts is currently unused, we reserve this field so that it is properly
// read/written during serde, but provide no constructor to initialize this to anything
// other than the empty Vec.
// Must update `DecimalBytePartsArrayParts` too.
_lower_parts: Vec<ArrayRef>,
dtype: DType,
stats_set: ArrayStats,
}

pub struct DecimalBytePartsArrayParts {
pub msp: ArrayRef,
pub dtype: DType,
}

impl DecimalBytePartsArray {
pub fn try_new(msp: ArrayRef, decimal_dtype: DecimalDType) -> VortexResult<Self> {
if !msp.dtype().is_signed_int() {
Expand All @@ -184,6 +190,14 @@ impl DecimalBytePartsArray {
}
}

/// If `_lower_parts` is supported check all calls use this correctly.
pub fn into_parts(self) -> DecimalBytePartsArrayParts {
DecimalBytePartsArrayParts {
msp: self.msp,
dtype: self.dtype,
}
}

pub fn decimal_dtype(&self) -> &DecimalDType {
self.dtype
.as_decimal_opt()
Expand Down
18 changes: 16 additions & 2 deletions vortex-array/src/arrays/primitive/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ pub struct PrimitiveArray {
pub(super) stats_set: ArrayStats,
}

pub struct PrimitiveArrayParts {
pub ptype: PType,
pub nullability: Nullability,
pub buffer: BufferHandle,
pub validity: Validity,
}

// TODO(connor): There are a lot of places where we could be using `new_unchecked` in the codebase.
impl PrimitiveArray {
/// Create a new array from a buffer handle.
Expand Down Expand Up @@ -173,8 +180,15 @@ impl PrimitiveArray {

impl PrimitiveArray {
/// Consume the primitive array and returns its component parts.
pub fn into_parts(self) -> (DType, BufferHandle, Validity, ArrayStats) {
(self.dtype, self.buffer, self.validity, self.stats_set)
pub fn into_parts(self) -> PrimitiveArrayParts {
let ptype = self.ptype();
let nullability = self.dtype.nullability();
PrimitiveArrayParts {
ptype,
nullability,
buffer: self.buffer,
validity: self.validity,
}
}
}

Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

mod array;
pub use array::PrimitiveArray;
pub use array::PrimitiveArrayParts;
pub use array::chunk_range;
pub use array::patch_chunk;

Expand Down
2 changes: 2 additions & 0 deletions vortex-cuda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tracing = { workspace = true }
vortex-alp = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-decimal-byte-parts = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-fastlanes = { workspace = true }
Expand All @@ -39,6 +40,7 @@ vortex-zigzag = { workspace = true }

[dev-dependencies]
criterion = { package = "codspeed-criterion-compat-walltime", version = "4.3.0" }
rstest = { workspace = true }
tokio = { workspace = true, features = ["rt", "macros"] }
vortex-array = { workspace = true, features = ["_test-harness"] }

Expand Down
13 changes: 8 additions & 5 deletions vortex-cuda/src/kernel/encodings/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use vortex_array::Array;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::PrimitiveArrayParts;
use vortex_array::buffer::BufferHandle;
use vortex_dtype::NativePType;
use vortex_error::VortexResult;
Expand All @@ -30,16 +31,16 @@ use crate::launch_cuda_kernel_impl;

/// CUDA decoder for ALP (Adaptive Lossless floating-Point) decompression.
#[derive(Debug)]
pub struct ALPDecoder;
pub struct ALPExecutor;

impl ALPDecoder {
impl ALPExecutor {
fn try_specialize(array: ArrayRef) -> Option<ALPArray> {
array.try_into::<ALPVTable>().ok()
}
}

#[async_trait]
impl CudaExecute for ALPDecoder {
impl CudaExecute for ALPExecutor {
async fn execute(
&self,
array: ArrayRef,
Expand Down Expand Up @@ -67,7 +68,9 @@ where
// Execute child and copy to device
let canonical = array.encoded().clone().execute_cuda(ctx).await?;
let primitive = canonical.into_primitive();
let (_, buffer, validity, ..) = primitive.into_parts();
let PrimitiveArrayParts {
buffer, validity, ..
} = primitive.into_parts();

let device_input: BufferHandle = if buffer.is_on_device() {
buffer
Expand Down Expand Up @@ -149,7 +152,7 @@ mod tests {
)
.vortex_expect("failed to create ALP array");

let result = ALPDecoder
let result = ALPExecutor
.execute(alp_array.to_array(), &mut cuda_ctx)
.await
.vortex_expect("GPU decompression failed");
Expand Down
103 changes: 103 additions & 0 deletions vortex-cuda/src/kernel/encodings/decimal_byte_parts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::fmt::Debug;

use async_trait::async_trait;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::arrays::DecimalArray;
use vortex_array::arrays::PrimitiveArrayParts;
use vortex_decimal_byte_parts::DecimalBytePartsArrayParts;
use vortex_decimal_byte_parts::DecimalBytePartsVTable;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;

use crate::CudaExecutionCtx;
use crate::executor::CudaArrayExt;
use crate::executor::CudaExecute;

// See `DecimalBytePartsArray`
#[derive(Debug)]
pub struct DecimalBytePartsExecutor;

#[async_trait]
impl CudaExecute for DecimalBytePartsExecutor {
async fn execute(
&self,
array: ArrayRef,
ctx: &mut CudaExecutionCtx,
) -> VortexResult<Canonical> {
let Ok(array) = array.try_into::<DecimalBytePartsVTable>() else {
vortex_bail!("cannot downcast to DecimalBytePartsArray")
};

let decimal_dtype = *array.decimal_dtype();
let DecimalBytePartsArrayParts { msp, .. } = array.into_parts();
let PrimitiveArrayParts {
buffer,
ptype,
validity,
..
} = msp.execute_cuda(ctx).await?.into_primitive().into_parts();

// SAFETY: The primitive array's buffer is already validated with correct type.
// The decimal dtype matches the array's dtype, and validity is preserved.
Ok(Canonical::Decimal(unsafe {
DecimalArray::new_unchecked_handle(buffer, ptype.try_into()?, decimal_dtype, validity)
}))
}
}

#[cfg(test)]
mod tests {
use rstest::rstest;
use vortex_array::IntoArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::assert_arrays_eq;
use vortex_array::validity::Validity;
use vortex_buffer::Buffer;
use vortex_decimal_byte_parts::DecimalBytePartsArray;
use vortex_dtype::DecimalDType;
use vortex_error::VortexExpect;
use vortex_session::VortexSession;

use super::*;
use crate::has_nvcc;
use crate::session::CudaSession;

#[rstest]
#[case::i8_p5_s2(Buffer::from(vec![100i8, 101, 102, -1, -100]), 5, 2)]
#[case::i16_p10_s2(Buffer::from(vec![100i16, 200, 300, 400, 500]), 10, 2)]
#[case::i32_p18_s4(Buffer::from(vec![100i32, 200, 300, 400, 500]), 18, 4)]
#[case::i64_p38_s6(Buffer::from(vec![100i64, 200, 300, 400, 500]), 38, 6)]
#[tokio::test]
async fn test_decimal_byte_parts_gpu_decode<T: vortex_dtype::NativePType>(
#[case] encoded: Buffer<T>,
#[case] precision: u8,
#[case] scale: i8,
) {
if !has_nvcc() {
return;
}

let mut cuda_ctx = CudaSession::create_execution_ctx(VortexSession::empty())
.vortex_expect("create execution context");

let decimal_dtype = DecimalDType::new(precision, scale);
let dbp_array = DecimalBytePartsArray::try_new(
PrimitiveArray::new(encoded, Validity::NonNullable).into_array(),
decimal_dtype,
)
.vortex_expect("create DecimalBytePartsArray");

let cpu_result = dbp_array.to_canonical().vortex_expect("CPU canonicalize");

let gpu_result = DecimalBytePartsExecutor
.execute(dbp_array.to_array(), &mut cuda_ctx)
.await
.vortex_expect("GPU decode");

assert_arrays_eq!(cpu_result.into_array(), gpu_result.into_array());
}
}
19 changes: 11 additions & 8 deletions vortex-cuda/src/kernel/encodings/for_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::PrimitiveArrayParts;
use vortex_array::buffer::BufferHandle;
use vortex_dtype::NativePType;
use vortex_dtype::match_each_native_simd_ptype;
Expand All @@ -27,16 +28,16 @@ use crate::launch_cuda_kernel_impl;

/// CUDA decoder for frame-of-reference.
#[derive(Debug)]
pub struct FoRDecoder;
pub struct FoRExecutor;

impl FoRDecoder {
impl FoRExecutor {
fn try_specialize(array: ArrayRef) -> Option<FoRArray> {
array.try_into::<FoRVTable>().ok()
}
}

#[async_trait]
impl CudaExecute for FoRDecoder {
impl CudaExecute for FoRExecutor {
async fn execute(
&self,
array: ArrayRef,
Expand Down Expand Up @@ -64,7 +65,9 @@ where
// Execute child and copy to device
let canonical = array.encoded().clone().execute_cuda(ctx).await?;
let primitive = canonical.into_primitive();
let (_, buffer, validity, ..) = primitive.into_parts();
let PrimitiveArrayParts {
buffer, validity, ..
} = primitive.into_parts();

let device_buffer: BufferHandle = if buffer.is_on_device() {
buffer
Expand Down Expand Up @@ -137,7 +140,7 @@ mod tests {
.vortex_expect("CPU canonicalize failed");

// Decode on GPU
let gpu_result = FoRDecoder
let gpu_result = FoRExecutor
.execute(for_array.to_array(), &mut cuda_ctx)
.await
.vortex_expect("GPU decompression failed");
Expand Down Expand Up @@ -174,7 +177,7 @@ mod tests {
.vortex_expect("CPU canonicalize failed");

// Decode on GPU
let gpu_result = FoRDecoder
let gpu_result = FoRExecutor
.execute(for_array.to_array(), &mut cuda_ctx)
.await
.vortex_expect("GPU decompression failed");
Expand Down Expand Up @@ -211,7 +214,7 @@ mod tests {
.vortex_expect("CPU canonicalize failed");

// Decode on GPU
let gpu_result = FoRDecoder
let gpu_result = FoRExecutor
.execute(for_array.to_array(), &mut cuda_ctx)
.await
.vortex_expect("GPU decompression failed");
Expand Down Expand Up @@ -248,7 +251,7 @@ mod tests {
.vortex_expect("CPU canonicalize failed");

// Decode on GPU
let gpu_result = FoRDecoder
let gpu_result = FoRExecutor
.execute(for_array.to_array(), &mut cuda_ctx)
.await
.vortex_expect("GPU decompression failed");
Expand Down
8 changes: 5 additions & 3 deletions vortex-cuda/src/kernel/encodings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

mod alp;
mod decimal_byte_parts;
mod for_;
mod zigzag;

pub use alp::ALPDecoder;
pub use for_::FoRDecoder;
pub use zigzag::ZigZagDecoder;
pub use alp::ALPExecutor;
pub use decimal_byte_parts::DecimalBytePartsExecutor;
pub use for_::FoRExecutor;
pub use zigzag::ZigZagExecutor;
13 changes: 8 additions & 5 deletions vortex-cuda/src/kernel/encodings/zigzag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::PrimitiveArrayParts;
use vortex_array::buffer::BufferHandle;
use vortex_dtype::NativePType;
use vortex_dtype::PType;
Expand All @@ -27,16 +28,16 @@ use crate::launch_cuda_kernel_impl;

/// CUDA decoder for ZigZag decoding.
#[derive(Debug)]
pub struct ZigZagDecoder;
pub struct ZigZagExecutor;

impl ZigZagDecoder {
impl ZigZagExecutor {
fn try_specialize(array: ArrayRef) -> Option<ZigZagArray> {
array.try_into::<ZigZagVTable>().ok()
}
}

#[async_trait]
impl CudaExecute for ZigZagDecoder {
impl CudaExecute for ZigZagExecutor {
async fn execute(
&self,
array: ArrayRef,
Expand Down Expand Up @@ -69,7 +70,9 @@ where
// Execute child and copy to device
let canonical = array.encoded().clone().execute_cuda(ctx).await?;
let primitive = canonical.into_primitive();
let (_, buffer, validity, ..) = primitive.into_parts();
let PrimitiveArrayParts {
buffer, validity, ..
} = primitive.into_parts();

let device_buffer: BufferHandle = if buffer.is_on_device() {
buffer
Expand Down Expand Up @@ -141,7 +144,7 @@ mod tests {
.vortex_expect("CPU canonicalize failed");

// Decode on GPU
let gpu_result = ZigZagDecoder
let gpu_result = ZigZagExecutor
.execute(zigzag_array.to_array(), &mut cuda_ctx)
.await
.vortex_expect("GPU decompression failed");
Expand Down
Loading
Loading