From 92fc8ae3391a8432bdb5f2edbe1120c8d7c75c8c Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 14 Apr 2026 21:13:52 -0400 Subject: [PATCH 1/8] GPU kernel for sorted patches with chunk_offsets Signed-off-by: Andrew Duffy --- Cargo.lock | 1 + vortex-cuda/Cargo.toml | 5 +- vortex-cuda/benches/transpose_patches.rs | 81 ---- vortex-cuda/kernels/src/bit_unpack_16.cu | 19 +- vortex-cuda/kernels/src/bit_unpack_32.cu | 19 +- vortex-cuda/kernels/src/bit_unpack_64.cu | 19 +- vortex-cuda/kernels/src/bit_unpack_8.cu | 19 +- vortex-cuda/kernels/src/patches.cuh | 69 ++- vortex-cuda/kernels/src/patches.h | 18 +- vortex-cuda/src/bit_unpack_gen.rs | 19 +- vortex-cuda/src/kernel/encodings/bitpacked.rs | 32 +- vortex-cuda/src/kernel/mod.rs | 2 +- vortex-cuda/src/kernel/patches/types.rs | 445 +++++++----------- vortex-cuda/src/lib.rs | 2 +- 14 files changed, 313 insertions(+), 437 deletions(-) delete mode 100644 vortex-cuda/benches/transpose_patches.rs diff --git a/Cargo.lock b/Cargo.lock index deacfaf7977..92b13e859c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10392,6 +10392,7 @@ dependencies = [ "futures", "itertools 0.14.0", "kanal", + "num-traits", "object_store 0.13.2", "parking_lot", "prost 0.14.3", diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index 9d21976f92b..edf91b00def 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -30,6 +30,7 @@ cudarc = { workspace = true, features = ["f16"] } futures = { workspace = true, features = ["executor"] } itertools = { workspace = true } kanal = { workspace = true } +num-traits = { workspace = true } object_store = { workspace = true, features = ["fs"] } parking_lot = { workspace = true } prost = { workspace = true } @@ -89,7 +90,3 @@ harness = false [[bench]] name = "throughput_cuda" harness = false - -[[bench]] -name = "transpose_patches" -harness = false diff --git a/vortex-cuda/benches/transpose_patches.rs b/vortex-cuda/benches/transpose_patches.rs deleted file mode 100644 index d713a304a07..00000000000 --- a/vortex-cuda/benches/transpose_patches.rs +++ /dev/null @@ -1,81 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -#![expect(clippy::unwrap_used)] - -use std::time::Duration; - -use criterion::BenchmarkId; -use criterion::Criterion; -use criterion::Throughput; -use futures::executor::block_on; -use vortex::array::IntoArray; -use vortex::array::arrays::PrimitiveArray; -use vortex::array::dtype::PType; -use vortex::array::patches::Patches; -use vortex::buffer::Buffer; -use vortex::buffer::buffer; -use vortex::session::VortexSession; -use vortex_array::validity::Validity; -use vortex_cuda::CudaSession; -use vortex_cuda::transpose_patches; -use vortex_cuda_macros::cuda_available; -use vortex_cuda_macros::cuda_not_available; -use vortex_error::VortexExpect; - -fn benchmark_transpose(c: &mut Criterion) { - let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) - .vortex_expect("failed to create execution context"); - - let patches = block_on(async { - // Assume that we have 64k values, and we have 1024 patches evenly disbursed across - // the range. - let indices = (0..1024).map(|x| x * 64).collect::>(); - - let values = buffer![-1.0f32; 1024]; - - let device_indices = cuda_ctx.copy_to_device(indices)?.await?; - let device_values = cuda_ctx.copy_to_device(values)?.await?; - - Patches::new( - 64 * 1024, - 0, - PrimitiveArray::from_buffer_handle(device_indices, PType::U32, Validity::NonNullable) - .into_array(), - PrimitiveArray::from_buffer_handle(device_values, PType::F32, Validity::NonNullable) - .into_array(), - None, - ) - }) - .unwrap(); - - let mut group = c.benchmark_group("transpose"); - group.sample_size(100); - group.measurement_time(Duration::from_secs(10)); - - group.throughput(Throughput::Bytes( - patches.indices().nbytes() + patches.values().nbytes(), - )); - - group.bench_with_input( - BenchmarkId::new("transpose_patches", 0), - &patches, - |b, patches| { - b.iter(|| block_on(async { transpose_patches(patches, &mut cuda_ctx).await.unwrap() })) - }, - ); -} - -criterion::criterion_group! { - name = benches; - config = Criterion::default().without_plots() - .warm_up_time(Duration::from_nanos(1)) - .nresamples(10); - targets = benchmark_transpose -} - -#[cuda_available] -criterion::criterion_main!(benches); - -#[cuda_not_available] -fn main() {} diff --git a/vortex-cuda/kernels/src/bit_unpack_16.cu b/vortex-cuda/kernels/src/bit_unpack_16.cu index 63eb5f19bf9..3c05baf2011 100644 --- a/vortex-cuda/kernels/src/bit_unpack_16.cu +++ b/vortex-cuda/kernels/src/bit_unpack_16.cu @@ -5,21 +5,28 @@ template __device__ void _bit_unpack_16_device(const uint16_t *__restrict in, uint16_t *__restrict out, uint16_t reference, int thread_idx, GPUPatches& patches) { __shared__ uint16_t shared_out[1024]; + + // Step 1: Unpack into shared memory #pragma unroll for (int i = 0; i < 2; i++) { _bit_unpack_16_lane(in, shared_out, reference, thread_idx * 2 + i); } __syncwarp(); + + // Step 2: Apply patches to shared memory in parallel PatchesCursor cursor(patches, blockIdx.x, thread_idx, 32); auto patch = cursor.next(); + while (patch.index != 1024) { + shared_out[patch.index] = patch.value; + patch = cursor.next(); + } + __syncwarp(); + + // Step 3: Copy to global memory + #pragma unroll for (int i = 0; i < 32; i++) { auto idx = i * 32 + thread_idx; - if (idx == patch.index) { - out[idx] = patch.value; - patch = cursor.next(); - } else { - out[idx] = shared_out[idx]; - } + out[idx] = shared_out[idx]; } } diff --git a/vortex-cuda/kernels/src/bit_unpack_32.cu b/vortex-cuda/kernels/src/bit_unpack_32.cu index 521183eee7c..97906f612c4 100644 --- a/vortex-cuda/kernels/src/bit_unpack_32.cu +++ b/vortex-cuda/kernels/src/bit_unpack_32.cu @@ -5,21 +5,28 @@ template __device__ void _bit_unpack_32_device(const uint32_t *__restrict in, uint32_t *__restrict out, uint32_t reference, int thread_idx, GPUPatches& patches) { __shared__ uint32_t shared_out[1024]; + + // Step 1: Unpack into shared memory #pragma unroll for (int i = 0; i < 1; i++) { _bit_unpack_32_lane(in, shared_out, reference, thread_idx * 1 + i); } __syncwarp(); + + // Step 2: Apply patches to shared memory in parallel PatchesCursor cursor(patches, blockIdx.x, thread_idx, 32); auto patch = cursor.next(); + while (patch.index != 1024) { + shared_out[patch.index] = patch.value; + patch = cursor.next(); + } + __syncwarp(); + + // Step 3: Copy to global memory + #pragma unroll for (int i = 0; i < 32; i++) { auto idx = i * 32 + thread_idx; - if (idx == patch.index) { - out[idx] = patch.value; - patch = cursor.next(); - } else { - out[idx] = shared_out[idx]; - } + out[idx] = shared_out[idx]; } } diff --git a/vortex-cuda/kernels/src/bit_unpack_64.cu b/vortex-cuda/kernels/src/bit_unpack_64.cu index 9be1262f0f4..6270f4f8261 100644 --- a/vortex-cuda/kernels/src/bit_unpack_64.cu +++ b/vortex-cuda/kernels/src/bit_unpack_64.cu @@ -5,21 +5,28 @@ template __device__ void _bit_unpack_64_device(const uint64_t *__restrict in, uint64_t *__restrict out, uint64_t reference, int thread_idx, GPUPatches& patches) { __shared__ uint64_t shared_out[1024]; + + // Step 1: Unpack into shared memory #pragma unroll for (int i = 0; i < 1; i++) { _bit_unpack_64_lane(in, shared_out, reference, thread_idx * 1 + i); } __syncwarp(); + + // Step 2: Apply patches to shared memory in parallel PatchesCursor cursor(patches, blockIdx.x, thread_idx, 16); auto patch = cursor.next(); + while (patch.index != 1024) { + shared_out[patch.index] = patch.value; + patch = cursor.next(); + } + __syncwarp(); + + // Step 3: Copy to global memory + #pragma unroll for (int i = 0; i < 64; i++) { auto idx = i * 16 + thread_idx; - if (idx == patch.index) { - out[idx] = patch.value; - patch = cursor.next(); - } else { - out[idx] = shared_out[idx]; - } + out[idx] = shared_out[idx]; } } diff --git a/vortex-cuda/kernels/src/bit_unpack_8.cu b/vortex-cuda/kernels/src/bit_unpack_8.cu index c933eda4df2..064970d650f 100644 --- a/vortex-cuda/kernels/src/bit_unpack_8.cu +++ b/vortex-cuda/kernels/src/bit_unpack_8.cu @@ -5,21 +5,28 @@ template __device__ void _bit_unpack_8_device(const uint8_t *__restrict in, uint8_t *__restrict out, uint8_t reference, int thread_idx, GPUPatches& patches) { __shared__ uint8_t shared_out[1024]; + + // Step 1: Unpack into shared memory #pragma unroll for (int i = 0; i < 4; i++) { _bit_unpack_8_lane(in, shared_out, reference, thread_idx * 4 + i); } __syncwarp(); + + // Step 2: Apply patches to shared memory in parallel PatchesCursor cursor(patches, blockIdx.x, thread_idx, 32); auto patch = cursor.next(); + while (patch.index != 1024) { + shared_out[patch.index] = patch.value; + patch = cursor.next(); + } + __syncwarp(); + + // Step 3: Copy to global memory + #pragma unroll for (int i = 0; i < 32; i++) { auto idx = i * 32 + thread_idx; - if (idx == patch.index) { - out[idx] = patch.value; - patch = cursor.next(); - } else { - out[idx] = shared_out[idx]; - } + out[idx] = shared_out[idx]; } } diff --git a/vortex-cuda/kernels/src/patches.cuh b/vortex-cuda/kernels/src/patches.cuh index 24b1705164f..c9910e74ae9 100644 --- a/vortex-cuda/kernels/src/patches.cuh +++ b/vortex-cuda/kernels/src/patches.cuh @@ -5,6 +5,18 @@ #include "patches.h" +/// Load a chunk offset value, dispatching on the runtime type. +__device__ inline uint32_t load_chunk_offset(const GPUPatches &patches, uint32_t idx) { + switch (patches.chunk_offset_type) { + case CO_U8: return reinterpret_cast(patches.chunk_offsets)[idx]; + case CO_U16: return reinterpret_cast(patches.chunk_offsets)[idx]; + case CO_U32: return reinterpret_cast(patches.chunk_offsets)[idx]; + case CO_U64: return static_cast( + reinterpret_cast(patches.chunk_offsets)[idx]); + } + return 0; +} + /// A single patch: a within-chunk index and its replacement value. /// A sentinel patch has index == 1024, which can never match a valid /// within-chunk position (0–1023). @@ -14,46 +26,58 @@ struct Patch { T value; }; -/// Cursor for iterating over a single lane's patches within a chunk. +/// Cursor for iterating over a thread's portion of patches within a chunk. /// -/// Usage in the generated merge-loop: +/// Patches are divided evenly among threads. Each thread applies its patches +/// to shared memory, then all threads sync and copy to global memory. +/// +/// Usage in the generated kernel: /// /// PatchesCursor cursor(patches, blockIdx.x, thread_idx, 32); /// auto patch = cursor.next(); -/// for (int i = 0; i < 32; i++) { -/// auto idx = i * 32 + thread_idx; -/// if (idx == patch.index) { -/// out[idx] = patch.value; -/// patch = cursor.next(); -/// } else { -/// out[idx] = shared_out[idx]; -/// } +/// while (patch.index != 1024) { +/// shared_out[patch.index] = patch.value; +/// patch = cursor.next(); /// } template class PatchesCursor { public: - /// Construct a cursor positioned at the patches for the given (chunk, lane). - /// n_lanes is a compile-time constant emitted by the code generator (16 or 32). - __device__ PatchesCursor(const GPUPatches &patches, uint32_t chunk, uint32_t lane, uint32_t n_lanes) { - if (patches.lane_offsets == nullptr) { + /// Construct a cursor for this thread's portion of patches in the chunk. + __device__ PatchesCursor(const GPUPatches &patches, uint32_t chunk, uint32_t thread_idx, uint32_t n_threads) { + if (patches.chunk_offsets == nullptr) { indices = nullptr; values = nullptr; remaining = 0; return; } - auto slot = chunk * n_lanes + lane; - auto start = patches.lane_offsets[slot]; - remaining = patches.lane_offsets[slot + 1] - start; + + // Get patch range for this chunk + uint32_t chunk_start = load_chunk_offset(patches, chunk); + uint32_t chunk_end = load_chunk_offset(patches, chunk + 1); + uint32_t num_patches = chunk_end - chunk_start; + + // Divide patches among threads (ceil division) + uint32_t patches_per_thread = (num_patches + n_threads - 1) / n_threads; + uint32_t my_start = min(thread_idx * patches_per_thread, num_patches); + uint32_t my_end = min((thread_idx + 1) * patches_per_thread, num_patches); + + uint32_t start = chunk_start + my_start; + remaining = my_end - my_start; indices = patches.indices + start; values = reinterpret_cast(patches.values) + start; + + // Precompute base for within-chunk index calculation + chunk_base = patches.offset + chunk * 1024; } - /// Return the current patch and advance, or a sentinel {1024, 0} if exhausted. + /// Return the current patch (with within-chunk index) and advance, + /// or a sentinel {1024, 0} if exhausted. __device__ Patch next() { if (remaining == 0) { - return {1024, T {}}; + return {1024, T{}}; } - Patch patch = {*indices, *values}; + uint16_t within_chunk = static_cast(*indices - chunk_base); + Patch patch = {within_chunk, *values}; indices++; values++; remaining--; @@ -61,7 +85,8 @@ public: } private: - const uint16_t *indices; + const uint32_t *indices; const T *values; uint8_t remaining; -}; \ No newline at end of file + uint32_t chunk_base; +}; diff --git a/vortex-cuda/kernels/src/patches.h b/vortex-cuda/kernels/src/patches.h index acc628ae595..3347abd5dc9 100644 --- a/vortex-cuda/kernels/src/patches.h +++ b/vortex-cuda/kernels/src/patches.h @@ -9,18 +9,22 @@ extern "C" { #endif +/// Type tag for chunk_offsets pointer. +typedef enum { CO_U8 = 0, CO_U16 = 1, CO_U32 = 2, CO_U64 = 3 } ChunkOffsetType; + /// GPU-resident patches for fused exception patching during bit-unpacking. /// -/// Patches are stored in a lane-wise transposed layout: for each (chunk, lane) pair, -/// the corresponding patch indices and values are stored contiguously. The lane_offsets -/// array is a CSR-style offset array of size (n_chunks * n_lanes + 1) that maps each -/// (chunk, lane) slot to its range in the indices and values arrays. +/// Patches are stored in sorted order within each chunk. The chunk_offsets +/// array is a CSR-style offset array of size (n_chunks + 1) that maps each +/// chunk to its range in the indices and values arrays. /// -/// A NULL lane_offsets pointer indicates no patches are present. +/// A NULL chunk_offsets pointer indicates no patches are present. typedef struct { - uint32_t *lane_offsets; - uint16_t *indices; + void *chunk_offsets; + ChunkOffsetType chunk_offset_type; + uint32_t *indices; void *values; + uint32_t offset; } GPUPatches; #ifdef __cplusplus diff --git a/vortex-cuda/src/bit_unpack_gen.rs b/vortex-cuda/src/bit_unpack_gen.rs index e114f319de8..8d5eda920cd 100644 --- a/vortex-cuda/src/bit_unpack_gen.rs +++ b/vortex-cuda/src/bit_unpack_gen.rs @@ -153,21 +153,28 @@ fn generate_device_kernel_template( r#"template __device__ void _bit_unpack_{bits}_device(const uint{bits}_t *__restrict in, uint{bits}_t *__restrict out, uint{bits}_t reference, int thread_idx, GPUPatches& patches) {{ __shared__ uint{bits}_t shared_out[1024]; + + // Step 1: Unpack into shared memory #pragma unroll for (int i = 0; i < {per_thread_loop_count}; i++) {{ _bit_unpack_{bits}_lane(in, shared_out, reference, thread_idx * {per_thread_loop_count} + i); }} __syncwarp(); + + // Step 2: Apply patches to shared memory in parallel PatchesCursor cursor(patches, blockIdx.x, thread_idx, {thread_count}); auto patch = cursor.next(); + while (patch.index != 1024) {{ + shared_out[patch.index] = patch.value; + patch = cursor.next(); + }} + __syncwarp(); + + // Step 3: Copy to global memory + #pragma unroll for (int i = 0; i < {shared_copy_ncount}; i++) {{ auto idx = i * {thread_count} + thread_idx; - if (idx == patch.index) {{ - out[idx] = patch.value; - patch = cursor.next(); - }} else {{ - out[idx] = shared_out[idx]; - }} + out[idx] = shared_out[idx]; }} }} "# diff --git a/vortex-cuda/src/kernel/encodings/bitpacked.rs b/vortex-cuda/src/kernel/encodings/bitpacked.rs index 5fb576b5089..c5eec806617 100644 --- a/vortex-cuda/src/kernel/encodings/bitpacked.rs +++ b/vortex-cuda/src/kernel/encodings/bitpacked.rs @@ -21,6 +21,7 @@ use vortex::encodings::fastlanes::BitPackedArray; use vortex::encodings::fastlanes::BitPackedDataParts; use vortex::encodings::fastlanes::unpack_iter::BitPacked as BitPackedUnpack; use vortex::error::VortexResult; +use vortex::error::vortex_bail; use vortex::error::vortex_ensure; use vortex::error::vortex_err; @@ -28,8 +29,13 @@ use crate::CudaBufferExt; use crate::CudaDeviceBuffer; use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; +use crate::kernel::patches::gpu::ChunkOffsetType; +use crate::kernel::patches::gpu::ChunkOffsetType_CO_U8; +use crate::kernel::patches::gpu::ChunkOffsetType_CO_U16; +use crate::kernel::patches::gpu::ChunkOffsetType_CO_U32; +use crate::kernel::patches::gpu::ChunkOffsetType_CO_U64; use crate::kernel::patches::gpu::GPUPatches; -use crate::kernel::patches::types::transpose_patches; +use crate::kernel::patches::types::load_patches; /// CUDA decoder for bit-packed arrays. #[derive(Debug)] @@ -86,6 +92,17 @@ pub fn bitpacked_cuda_launch_config(output_width: usize, len: usize) -> VortexRe unsafe impl DeviceRepr for GPUPatches {} +/// Convert a PType to the corresponding ChunkOffsetType for GPU patches. +fn ptype_to_chunk_offset_type(ptype: vortex::dtype::PType) -> VortexResult { + match ptype { + vortex::dtype::PType::U8 => Ok(ChunkOffsetType_CO_U8), + vortex::dtype::PType::U16 => Ok(ChunkOffsetType_CO_U16), + vortex::dtype::PType::U32 => Ok(ChunkOffsetType_CO_U32), + vortex::dtype::PType::U64 => Ok(ChunkOffsetType_CO_U64), + _ => vortex_bail!("Invalid PType for chunk_offsets: {:?}", ptype), + } +} + #[instrument(skip_all)] pub(crate) async fn decode_bitpacked( array: BitPackedArray, @@ -124,23 +141,28 @@ where // We hold this here to keep the device buffers alive. let device_patches = if let Some(patches) = patches { - Some(transpose_patches(&patches, ctx).await?) + Some(load_patches(&patches, ctx).await?) } else { None }; + #[expect(clippy::cast_possible_truncation)] let patches_arg = if let Some(p) = &device_patches { GPUPatches { - lane_offsets: p.lane_offsets.cuda_device_ptr()? as _, + chunk_offsets: p.chunk_offsets.cuda_device_ptr()? as _, + chunk_offset_type: ptype_to_chunk_offset_type(p.chunk_offset_ptype)?, indices: p.indices.cuda_device_ptr()? as _, values: p.values.cuda_device_ptr()? as _, + offset: p.offset as u32, } } else { - // NULL lane_offsets signals no patches to the kernel + // NULL chunk_offsets signals no patches to the kernel GPUPatches { - lane_offsets: std::ptr::null_mut(), + chunk_offsets: std::ptr::null_mut(), + chunk_offset_type: ChunkOffsetType_CO_U32, indices: std::ptr::null_mut(), values: std::ptr::null_mut(), + offset: 0, } }; diff --git a/vortex-cuda/src/kernel/mod.rs b/vortex-cuda/src/kernel/mod.rs index 93ffd768df5..22dc81e1e28 100644 --- a/vortex-cuda/src/kernel/mod.rs +++ b/vortex-cuda/src/kernel/mod.rs @@ -34,7 +34,7 @@ pub use encodings::ZstdKernelPrep; pub use encodings::zstd_kernel_prepare; pub(crate) use encodings::*; pub(crate) use filter::FilterExecutor; -pub use patches::types::transpose_patches; +pub use patches::types::load_patches; pub(crate) use slice::SliceExecutor; use crate::CudaKernelEvents; diff --git a/vortex-cuda/src/kernel/patches/types.rs b/vortex-cuda/src/kernel/patches/types.rs index 380fefb2d94..ab4811bba5e 100644 --- a/vortex-cuda/src/kernel/patches/types.rs +++ b/vortex-cuda/src/kernel/patches/types.rs @@ -1,354 +1,227 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -//! An implementation of lane-wise patches instead of linear patches. This layout for exception -//! patching enables fully parallel GPU execution, as outlined by Hepkema et al. in -//! "G-ALP: Rethinking Light-weight Encodings for GPUs" +//! GPU patches loading for fused exception patching during bit-unpacking. -use vortex::array::Canonical; +use num_traits::ToPrimitive; use vortex::array::buffer::BufferHandle; -use vortex::array::dtype::IntegerPType; -use vortex::array::dtype::NativePType; use vortex::buffer::Buffer; use vortex::buffer::BufferMut; -use vortex_array::match_each_native_ptype; +use vortex::dtype::PType; +use vortex::dtype::UnsignedPType; use vortex_array::match_each_unsigned_integer_ptype; use vortex_array::patches::Patches; use vortex_error::VortexResult; use crate::CudaExecutionCtx; +use crate::executor::CudaArrayExt; -/// A set of device-resident patches that live in the GPU. -/// -/// These are dynamically typed. -#[repr(C)] +/// A set of device-resident patches. pub struct DevicePatches { - pub(crate) lane_offsets: BufferHandle, + pub(crate) chunk_offsets: BufferHandle, + pub(crate) chunk_offset_ptype: PType, pub(crate) indices: BufferHandle, pub(crate) values: BufferHandle, + pub(crate) offset: usize, } -/// Number of lanes used at patch time for a value of type `V`. +/// Load patches for GPU use. /// -/// This is *NOT* equal to the number of FastLanes lanes for the type `V`, rather this will -/// correspond with the number of CUDA threads dedicated to executing each 1024-element vector. -const fn patch_lanes() -> usize { - // For types 32-bits or smaller, we use a 32 lane configuration, and for 64-bit we use 16 lanes. - // This matches up with the number of lanes we use to execute copying results from bit-unpacking - // from shared to global memory. - if size_of::() < 8 { 32 } else { 16 } -} - -/// A set of patches of values `V` existing in host buffers. -#[allow(dead_code)] -pub struct HostPatches { - n_chunks: usize, - n_lanes: usize, - lane_offsets: Buffer, - indices: Buffer, - /// Values. This is a buffer handle which might live on the new buffer type here - values: Buffer, -} - -#[cfg(test)] -struct LanePatches<'a, V> { - indices: &'a [u16], - values: &'a [V], -} - -impl HostPatches { - /// Get number of patches for a specific lane. - #[cfg(test)] - fn patch_count(&self, chunk: usize, lane: usize) -> usize { - let start = chunk * self.n_lanes + lane; - let end = start + 1; - let count = self.lane_offsets[end] - self.lane_offsets[start]; - - count as usize - } - - /// Get an ordered list of patches for the given chunk/lane. - #[cfg(test)] - fn patches(&self, chunk: usize, lane: usize) -> LanePatches<'_, V> { - let start = chunk * self.n_lanes + lane; - let end = start + 1; - - let lane_start = self.lane_offsets[start] as usize; - let lane_stop = self.lane_offsets[end] as usize; - - LanePatches { - indices: &self.indices[lane_start..lane_stop], - values: &self.values[lane_start..lane_stop], - } - } - - /// Apply the patches on top of the other buffer. - #[cfg(test)] - fn apply(&self, output: &mut BufferMut) { - for chunk in 0..self.n_chunks { - for lane in 0..self.n_lanes { - let patches = self.patches(chunk, lane); - for (&index, &value) in std::iter::zip(patches.indices, patches.values) { - let full_index = chunk * 1024 + (index as usize); - output[full_index] = value; - } - } - } - } - - /// Export the patches for use on the device associated with the provided execution context. - pub async fn export_to_device( - mut self, - ctx: &mut CudaExecutionCtx, - ) -> VortexResult { - let lane_offsets = std::mem::take(&mut self.lane_offsets); - let indices = std::mem::take(&mut self.indices); - let values = std::mem::take(&mut self.values); - - // Convert each into a handle that can be passed around. - let lane_offsets_handle = BufferHandle::new_host(lane_offsets.into_byte_buffer()); - let indices_handle = BufferHandle::new_host(indices.into_byte_buffer()); - let values_handle = BufferHandle::new_host(values.into_byte_buffer()); - - let lane_offsets_handle = ctx.ensure_on_device(lane_offsets_handle).await?; - let indices_handle = ctx.ensure_on_device(indices_handle).await?; - let values_handle = ctx.ensure_on_device(values_handle).await?; - - Ok(DevicePatches { - lane_offsets: lane_offsets_handle, - indices: indices_handle, - values: values_handle, - }) - } -} - -/// Transpose a set of patches from the default sorted layout into the data parallel layout. -#[expect(clippy::cognitive_complexity)] -pub async fn transpose_patches( +/// If chunk_offsets is not present in the patches, computes it by scanning indices. +/// Indices are kept as-is (u32); the kernel computes within-chunk offsets at runtime. +pub async fn load_patches( patches: &Patches, ctx: &mut CudaExecutionCtx, ) -> VortexResult { - let array_len = patches.array_len(); let offset = patches.offset(); + let array_len = patches.array_len(); + // Get or compute chunk_offsets + let (chunk_offsets, chunk_offset_ptype) = if let Some(co) = patches.chunk_offsets() { + let co_canonical = co.clone().execute_cuda(ctx).await?.into_primitive(); + let ptype = co_canonical.ptype(); + (co_canonical.buffer_handle().clone(), ptype) + } else { + // Build chunk_offsets by scanning indices + let chunk_offsets = build_chunk_offsets(patches, array_len, ctx).await?; + ( + BufferHandle::new_host(chunk_offsets.freeze().into_byte_buffer()), + PType::U32, + ) + }; + + // Load indices - must be converted to u32 for GPU use let indices = patches .indices() .clone() - .execute::(ctx.execution_ctx())? + .execute_cuda(ctx) + .await? .into_primitive(); + let indices_ptype = indices.ptype(); + #[expect(clippy::expect_used)] + let indices = if indices_ptype == PType::U32 { + indices.buffer_handle().clone() + } else { + // Convert indices to u32 + let indices_buf = indices.buffer_handle().to_host().await; + let indices_u32 = match_each_unsigned_integer_ptype!(indices_ptype, |I| { + let src: Buffer = Buffer::from_byte_buffer(indices_buf); + let mut dst: BufferMut = BufferMut::with_capacity(src.len()); + for &idx in src.as_slice() { + // Indices are limited to u32 range for GPU + dst.push(idx.to_u32().expect("index should fit in u32")); + } + dst.freeze() + }); + BufferHandle::new_host(indices_u32.into_byte_buffer()) + }; + // Load values let values = patches .values() .clone() - .execute::(ctx.execution_ctx())? + .execute_cuda(ctx) + .await? .into_primitive(); - let indices_ptype = indices.ptype(); - let values_ptype = values.ptype(); - - let indices = indices.buffer_handle().to_host().await; - let values = values.buffer_handle().to_host().await; - - match_each_unsigned_integer_ptype!(indices_ptype, |I| { - match_each_native_ptype!(values_ptype, |V| { - let indices: Buffer = Buffer::from_byte_buffer(indices); - let values: Buffer = Buffer::from_byte_buffer(values); - - let host_patches = transpose(indices.as_slice(), values.as_slice(), offset, array_len); - - host_patches.export_to_device(ctx).await - }) + // Ensure all on device + let chunk_offsets = ctx.ensure_on_device(chunk_offsets).await?; + let indices = ctx.ensure_on_device(indices).await?; + let values = ctx.ensure_on_device(values.buffer_handle().clone()).await?; + + Ok(DevicePatches { + chunk_offsets, + chunk_offset_ptype, + indices, + values, + offset, }) } -#[expect(clippy::cast_possible_truncation)] -fn transpose( - indices_in: &[I], - values_in: &[V], - offset: usize, +/// Build chunk_offsets by scanning indices when not provided. +async fn build_chunk_offsets( + patches: &Patches, array_len: usize, -) -> HostPatches { - // Total number of slots is number of chunks times number of lanes. - let n_chunks = array_len.div_ceil(1024); - assert!( - n_chunks <= u32::MAX as usize, - "Cannot transpose patches for array with >= 4 trillion elements" - ); - - let n_lanes = patch_lanes::(); - - // We know upfront how many indices and values we'll have. - let mut indices_buffer = BufferMut::with_capacity(indices_in.len()); - let mut values_buffer = BufferMut::with_capacity(values_in.len()); - - // number of patches in each chunk. - let mut lane_offsets: BufferMut = BufferMut::zeroed(n_chunks * n_lanes + 1); - - // Scan the index/values once to get chunk/lane counts - for index in indices_in { - let index = index.as_() - offset; - let chunk = index / 1024; - let lane = index % n_lanes; - - lane_offsets[chunk * n_lanes + lane + 1] += 1; - } - - // Prefix-sum sizes -> offsets - for index in 1..lane_offsets.len() { - lane_offsets[index] += lane_offsets[index - 1]; - } - - // Loop over patches, writing them to final positions - let indices_out = indices_buffer.spare_capacity_mut(); - let values_out = values_buffer.spare_capacity_mut(); - for (index, &value) in std::iter::zip(indices_in, values_in) { - let index = index.as_() - offset; - let chunk = index / 1024; - let lane = index % n_lanes; + ctx: &mut CudaExecutionCtx, +) -> VortexResult> { + let indices = patches + .indices() + .clone() + .execute_cuda(ctx) + .await? + .into_primitive(); - let position = &mut lane_offsets[chunk * n_lanes + lane]; - indices_out[*position as usize].write((index % 1024) as u16); - values_out[*position as usize].write(value); - *position += 1; - } + let offset = patches.offset(); + let n_chunks = array_len.div_ceil(1024); - // SAFETY: we know there are exactly indices_in.len() indices/values, and we just - // set them to the appropriate values in the loop above. - unsafe { - indices_buffer.set_len(indices_in.len()); - values_buffer.set_len(values_in.len()); - } + let indices_ptype = indices.ptype(); + let indices_buf = indices.buffer_handle().to_host().await; - // Now, pass over all the indices and values again and subtract out the position increments. - for index in indices_in { - let index = index.as_() - offset; - let chunk = index / 1024; - let lane = index % n_lanes; + match_each_unsigned_integer_ptype!(indices_ptype, |I| { + let indices_slice: Buffer = Buffer::from_byte_buffer(indices_buf); + Ok(compute_chunk_offsets( + indices_slice.as_slice(), + offset, + n_chunks, + )) + }) +} - lane_offsets[chunk * n_lanes + lane] -= 1; +#[expect(clippy::cast_possible_truncation, clippy::expect_used)] +fn compute_chunk_offsets( + indices: &[I], + offset: usize, + n_chunks: usize, +) -> BufferMut { + let mut chunk_offsets: BufferMut = BufferMut::zeroed(n_chunks + 1); + + // For each patch, determine which chunk it belongs to + for (i, &idx) in indices.iter().enumerate() { + let absolute_idx: usize = idx.to_usize().expect("index should fit in usize"); + let chunk = (absolute_idx - offset) / 1024; + // Update offsets for all chunks after this patch's chunk + // Since indices are sorted, we can just set the end offset for subsequent chunks + for c in (chunk + 1)..chunk_offsets.len() { + if chunk_offsets[c] < (i + 1) as u32 { + chunk_offsets[c] = (i + 1) as u32; + } + } } - HostPatches { - n_chunks, - n_lanes, - lane_offsets: lane_offsets.freeze(), - indices: indices_buffer.freeze(), - values: values_buffer.freeze(), - } + chunk_offsets } #[cfg(test)] mod tests { - use vortex::array::ExecutionCtx; - use vortex::buffer::BufferMut; - use vortex::buffer::buffer; - use vortex::buffer::buffer_mut; use vortex_array::IntoArray; - use vortex_array::LEGACY_SESSION; use vortex_array::arrays::PrimitiveArray; - use vortex_array::assert_arrays_eq; - use vortex_array::dtype::NativePType; use vortex_array::patches::Patches; use vortex_error::VortexResult; - use crate::kernel::patches::types::transpose; - - #[crate::test] - fn test_transpose_patches() { - let patch_values = buffer![0u32, 10, 20, 30, 40, 50, 60, 70, 80]; - - let mut patch_indices = BufferMut::empty(); - // CHUNK 0. patch_values have value type i32, which means there will be 32 lanes. - patch_indices.extend_from_slice(&[0, 31, 63, 64]); - - // CHUNK 1. - patch_indices.extend_from_slice(&[1024, 1056, 1058]); - - // CHUNK 2: empty - patch_indices.extend_from_slice(&[]); + use super::compute_chunk_offsets; - // CHUNK 3 - patch_indices.extend_from_slice(&[3073, 3076]); - - let patch_indices = patch_indices.freeze(); - - let transposed = transpose( - patch_indices.as_slice(), - patch_values.as_slice(), - 0, - 1024 * 5, - ); - - // Chunk 0 should have patches in lanes 0, 31 - assert_eq!(transposed.patches(0, 0).values, &[0, 30]); - assert_eq!(transposed.patches(0, 0).indices, &[0, 64]); - - assert_eq!(transposed.patches(0, 31).values, &[10, 20]); - assert_eq!(transposed.patches(0, 31).indices, &[31, 63]); - - // Chunk 1 should have patches in lanes 0, 2 - assert_eq!(transposed.patches(1, 0).values, &[40, 50]); - assert_eq!(transposed.patches(1, 0).indices, &[0, 32]); - assert_eq!(transposed.patches(1, 2).values, &[60]); - assert_eq!(transposed.patches(1, 2).indices, &[34]); - - // Chunk 2 should be empty - for lane in 0..transposed.n_lanes { - assert_eq!(transposed.patch_count(2, lane), 0); - } - - // Chunk 3 contains patches at lanes 1, 4 - assert_eq!(transposed.patches(3, 1).values, &[70]); - assert_eq!(transposed.patches(3, 1).indices, &[1]); - assert_eq!(transposed.patches(3, 4).values, &[80]); - assert_eq!(transposed.patches(3, 4).indices, &[4]); + #[test] + fn test_compute_chunk_offsets_single_chunk() { + // All patches in chunk 0 + let indices: &[u32] = &[0, 100, 500, 1000]; + let offsets = compute_chunk_offsets(indices, 0, 2); + assert_eq!(offsets.as_slice(), &[0, 4, 4]); } #[test] - #[expect(clippy::cast_possible_truncation)] - fn test_transpose_complex() -> VortexResult<()> { - test_case(1024, 0, &[0], &[0f32])?; - test_case(512, 512, &[512, 513, 514], &[10i8, 20, 30])?; - test_case(10_000, 100, &[500, 1_000, 1_001, 1_002], &[1i16, 2, 3, 4])?; - - for len in (1..4096).step_by(10) { - let offset = len / 2; - - let indices: Vec = (offset..len).map(|x| x as u32).collect(); + fn test_compute_chunk_offsets_multiple_chunks() { + // Patches spread across chunks + let indices: &[u32] = &[0, 500, 1024, 1500, 2048, 3072]; + let offsets = compute_chunk_offsets(indices, 0, 4); + // Chunk 0: indices 0, 500 (positions 0..2) + // Chunk 1: indices 1024, 1500 (positions 2..4) + // Chunk 2: indices 2048 (positions 4..5) + // Chunk 3: indices 3072 (positions 5..6) + assert_eq!(offsets.as_slice(), &[0, 2, 4, 5, 6]); + } - test_case(len, offset, &indices, &indices)?; - } + #[test] + fn test_compute_chunk_offsets_with_offset() { + // Patches with array offset + let indices: &[u32] = &[1024, 1500, 2048]; + let offsets = compute_chunk_offsets(indices, 1024, 2); + // After subtracting offset 1024: + // Chunk 0: indices 0, 476 (positions 0..2) + // Chunk 1: index 1024 (positions 2..3) + assert_eq!(offsets.as_slice(), &[0, 2, 3]); + } - Ok(()) + #[test] + fn test_compute_chunk_offsets_empty_chunks() { + // Patches skip some chunks + let indices: &[u32] = &[0, 3072]; + let offsets = compute_chunk_offsets(indices, 0, 4); + // Chunk 0: index 0 (positions 0..1) + // Chunk 1: empty (positions 1..1) + // Chunk 2: empty (positions 1..1) + // Chunk 3: index 3072 (positions 1..2) + assert_eq!(offsets.as_slice(), &[0, 1, 1, 1, 2]); } - fn test_case( - len: usize, - offset: usize, - patch_indices: &[u32], - patch_values: &[V], - ) -> VortexResult<()> { - let mut data = buffer_mut![V::default(); len]; - let array = PrimitiveArray::from_iter(data.iter().copied()); + #[test] + fn test_patches_with_chunk_offsets() -> VortexResult<()> { + // Test creating patches with pre-computed chunk_offsets + let indices = PrimitiveArray::from_iter([0u32, 500, 1024, 2000]); + let values = PrimitiveArray::from_iter([10u32, 20, 30, 40]); + let chunk_offsets = PrimitiveArray::from_iter([0u32, 2, 3, 4]); let patches = Patches::new( - len, - offset, - PrimitiveArray::from_iter(patch_indices.iter().copied()).into_array(), - PrimitiveArray::from_iter(patch_values.iter().copied()).into_array(), - None, + 3072, + 0, + indices.into_array(), + values.into_array(), + Some(chunk_offsets.into_array()), )?; - // Verify that the outputs match between Patches and transpose_patches(). - let mut ctx = ExecutionCtx::new(LEGACY_SESSION.clone()); - let patched = array.patch(&patches, &mut ctx)?.into_array(); - - let transposed = transpose(patch_indices, patch_values, offset, len); - transposed.apply(&mut data); - - let patched_transposed = data.freeze().into_array(); - - assert_arrays_eq!(patched, patched_transposed); + assert!(patches.chunk_offsets().is_some()); + assert_eq!(patches.chunk_offset_at(0)?, 0); + assert_eq!(patches.chunk_offset_at(1)?, 2); + assert_eq!(patches.chunk_offset_at(2)?, 3); Ok(()) } diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index 7ce8fa09b84..23799665d94 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -47,7 +47,7 @@ use kernel::ZigZagExecutor; use kernel::ZstdBuffersExecutor; use kernel::ZstdExecutor; pub use kernel::ZstdKernelPrep; -pub use kernel::transpose_patches; +pub use kernel::load_patches; pub use kernel::zstd_kernel_prepare; pub use pinned::PinnedByteBufferPool; pub use pinned::PinnedPoolStats; From 26544ebe645da791d2511194ae323affdf1f615b Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 15 Apr 2026 15:37:55 -0400 Subject: [PATCH 2/8] fix struct Signed-off-by: Andrew Duffy --- vortex-cuda/kernels/src/patches.cuh | 7 +++++-- vortex-cuda/kernels/src/patches.h | 7 +++++-- vortex-cuda/src/kernel/encodings/bitpacked.rs | 4 ++++ vortex-cuda/src/kernel/patches/types.rs | 7 +++++++ 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/vortex-cuda/kernels/src/patches.cuh b/vortex-cuda/kernels/src/patches.cuh index c9910e74ae9..e09a82dffeb 100644 --- a/vortex-cuda/kernels/src/patches.cuh +++ b/vortex-cuda/kernels/src/patches.cuh @@ -51,9 +51,12 @@ public: return; } - // Get patch range for this chunk + // Get patch range for this chunk. + // chunk_offsets has n_chunks elements; the final offset is implicit (num_patches). uint32_t chunk_start = load_chunk_offset(patches, chunk); - uint32_t chunk_end = load_chunk_offset(patches, chunk + 1); + uint32_t chunk_end = (chunk + 1 < patches.n_chunks) + ? load_chunk_offset(patches, chunk + 1) + : patches.num_patches; uint32_t num_patches = chunk_end - chunk_start; // Divide patches among threads (ceil division) diff --git a/vortex-cuda/kernels/src/patches.h b/vortex-cuda/kernels/src/patches.h index 3347abd5dc9..ecdc02f6f7b 100644 --- a/vortex-cuda/kernels/src/patches.h +++ b/vortex-cuda/kernels/src/patches.h @@ -15,8 +15,9 @@ typedef enum { CO_U8 = 0, CO_U16 = 1, CO_U32 = 2, CO_U64 = 3 } ChunkOffsetType; /// GPU-resident patches for fused exception patching during bit-unpacking. /// /// Patches are stored in sorted order within each chunk. The chunk_offsets -/// array is a CSR-style offset array of size (n_chunks + 1) that maps each -/// chunk to its range in the indices and values arrays. +/// array maps each chunk to the start of its range in the indices/values arrays. +/// The array has n_chunks elements (not n_chunks+1); the final offset is implicit +/// and equals num_patches. /// /// A NULL chunk_offsets pointer indicates no patches are present. typedef struct { @@ -25,6 +26,8 @@ typedef struct { uint32_t *indices; void *values; uint32_t offset; + uint32_t num_patches; + uint32_t n_chunks; } GPUPatches; #ifdef __cplusplus diff --git a/vortex-cuda/src/kernel/encodings/bitpacked.rs b/vortex-cuda/src/kernel/encodings/bitpacked.rs index c5eec806617..79ac0a96d34 100644 --- a/vortex-cuda/src/kernel/encodings/bitpacked.rs +++ b/vortex-cuda/src/kernel/encodings/bitpacked.rs @@ -154,6 +154,8 @@ where indices: p.indices.cuda_device_ptr()? as _, values: p.values.cuda_device_ptr()? as _, offset: p.offset as u32, + num_patches: p.num_patches as u32, + n_chunks: p.n_chunks as u32, } } else { // NULL chunk_offsets signals no patches to the kernel @@ -163,6 +165,8 @@ where indices: std::ptr::null_mut(), values: std::ptr::null_mut(), offset: 0, + num_patches: 0, + n_chunks: 0, } }; diff --git a/vortex-cuda/src/kernel/patches/types.rs b/vortex-cuda/src/kernel/patches/types.rs index ab4811bba5e..cdf6be4ddb1 100644 --- a/vortex-cuda/src/kernel/patches/types.rs +++ b/vortex-cuda/src/kernel/patches/types.rs @@ -23,6 +23,8 @@ pub struct DevicePatches { pub(crate) indices: BufferHandle, pub(crate) values: BufferHandle, pub(crate) offset: usize, + pub(crate) num_patches: usize, + pub(crate) n_chunks: usize, } /// Load patches for GPU use. @@ -89,12 +91,17 @@ pub async fn load_patches( let indices = ctx.ensure_on_device(indices).await?; let values = ctx.ensure_on_device(values.buffer_handle().clone()).await?; + let num_patches = patches.num_patches(); + let n_chunks = array_len.div_ceil(1024); + Ok(DevicePatches { chunk_offsets, chunk_offset_ptype, indices, values, offset, + num_patches, + n_chunks, }) } From 0385083a0aaa9c130e453bc0f72b0bd97f3191a1 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 15 Apr 2026 15:45:22 -0400 Subject: [PATCH 3/8] format Signed-off-by: Andrew Duffy --- vortex-cuda/kernels/src/patches.cuh | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/vortex-cuda/kernels/src/patches.cuh b/vortex-cuda/kernels/src/patches.cuh index e09a82dffeb..ddb3f6b0545 100644 --- a/vortex-cuda/kernels/src/patches.cuh +++ b/vortex-cuda/kernels/src/patches.cuh @@ -8,11 +8,14 @@ /// Load a chunk offset value, dispatching on the runtime type. __device__ inline uint32_t load_chunk_offset(const GPUPatches &patches, uint32_t idx) { switch (patches.chunk_offset_type) { - case CO_U8: return reinterpret_cast(patches.chunk_offsets)[idx]; - case CO_U16: return reinterpret_cast(patches.chunk_offsets)[idx]; - case CO_U32: return reinterpret_cast(patches.chunk_offsets)[idx]; - case CO_U64: return static_cast( - reinterpret_cast(patches.chunk_offsets)[idx]); + case CO_U8: + return reinterpret_cast(patches.chunk_offsets)[idx]; + case CO_U16: + return reinterpret_cast(patches.chunk_offsets)[idx]; + case CO_U32: + return reinterpret_cast(patches.chunk_offsets)[idx]; + case CO_U64: + return static_cast(reinterpret_cast(patches.chunk_offsets)[idx]); } return 0; } @@ -43,7 +46,8 @@ template class PatchesCursor { public: /// Construct a cursor for this thread's portion of patches in the chunk. - __device__ PatchesCursor(const GPUPatches &patches, uint32_t chunk, uint32_t thread_idx, uint32_t n_threads) { + __device__ + PatchesCursor(const GPUPatches &patches, uint32_t chunk, uint32_t thread_idx, uint32_t n_threads) { if (patches.chunk_offsets == nullptr) { indices = nullptr; values = nullptr; @@ -54,9 +58,8 @@ public: // Get patch range for this chunk. // chunk_offsets has n_chunks elements; the final offset is implicit (num_patches). uint32_t chunk_start = load_chunk_offset(patches, chunk); - uint32_t chunk_end = (chunk + 1 < patches.n_chunks) - ? load_chunk_offset(patches, chunk + 1) - : patches.num_patches; + uint32_t chunk_end = + (chunk + 1 < patches.n_chunks) ? load_chunk_offset(patches, chunk + 1) : patches.num_patches; uint32_t num_patches = chunk_end - chunk_start; // Divide patches among threads (ceil division) @@ -77,7 +80,7 @@ public: /// or a sentinel {1024, 0} if exhausted. __device__ Patch next() { if (remaining == 0) { - return {1024, T{}}; + return {1024, T {}}; } uint16_t within_chunk = static_cast(*indices - chunk_base); Patch patch = {within_chunk, *values}; From 18eadc96de0fb184dd266b85590f586220ef8978 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 16 Apr 2026 15:22:32 -0400 Subject: [PATCH 4/8] cleanup Signed-off-by: Andrew Duffy --- vortex-cuda/kernels/src/patches.cuh | 10 +- vortex-cuda/kernels/src/patches.h | 1 + vortex-cuda/src/kernel/encodings/bitpacked.rs | 151 ++++++++++++++++++ vortex-cuda/src/kernel/mod.rs | 1 - vortex-cuda/src/kernel/patches/types.rs | 124 ++------------ vortex-cuda/src/lib.rs | 1 - 6 files changed, 174 insertions(+), 114 deletions(-) diff --git a/vortex-cuda/kernels/src/patches.cuh b/vortex-cuda/kernels/src/patches.cuh index ddb3f6b0545..81b5d76ad29 100644 --- a/vortex-cuda/kernels/src/patches.cuh +++ b/vortex-cuda/kernels/src/patches.cuh @@ -55,9 +55,15 @@ public: return; } - // Get patch range for this chunk. - // chunk_offsets has n_chunks elements; the final offset is implicit (num_patches). + // get index into patch indices/values for `chunk`. uint32_t chunk_start = load_chunk_offset(patches, chunk); + + // IF this is chunk 0, we need to apply the offset_within_chunk to + // the iteration start. This does not affect where the iteration ends. + if (chunk == 0) { + chunk_start += patches.offset_within_chunk; + } + uint32_t chunk_end = (chunk + 1 < patches.n_chunks) ? load_chunk_offset(patches, chunk + 1) : patches.num_patches; uint32_t num_patches = chunk_end - chunk_start; diff --git a/vortex-cuda/kernels/src/patches.h b/vortex-cuda/kernels/src/patches.h index ecdc02f6f7b..ed7240821e6 100644 --- a/vortex-cuda/kernels/src/patches.h +++ b/vortex-cuda/kernels/src/patches.h @@ -26,6 +26,7 @@ typedef struct { uint32_t *indices; void *values; uint32_t offset; + uint32_t offset_within_chunk; uint32_t num_patches; uint32_t n_chunks; } GPUPatches; diff --git a/vortex-cuda/src/kernel/encodings/bitpacked.rs b/vortex-cuda/src/kernel/encodings/bitpacked.rs index 79ac0a96d34..854fdac8e07 100644 --- a/vortex-cuda/src/kernel/encodings/bitpacked.rs +++ b/vortex-cuda/src/kernel/encodings/bitpacked.rs @@ -154,6 +154,7 @@ where indices: p.indices.cuda_device_ptr()? as _, values: p.values.cuda_device_ptr()? as _, offset: p.offset as u32, + offset_within_chunk: p.offset_within_chunk as u32, num_patches: p.num_patches as u32, n_chunks: p.n_chunks as u32, } @@ -165,6 +166,7 @@ where indices: std::ptr::null_mut(), values: std::ptr::null_mut(), offset: 0, + offset_within_chunk: 0, num_patches: 0, n_chunks: 0, } @@ -561,4 +563,153 @@ mod tests { Ok(()) } + + /// Test slicing a bitpacked array with patches where the slice boundary + /// falls in the middle of a chunk's patch range, creating a non-zero + /// offset_within_chunk. + #[crate::test] + fn test_cuda_bitunpack_sliced_patches_offset_within_chunk() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + // Create an array with values that will generate patches. + // We use values 0-511 (fits in 9 bits) but include some larger values + // that will become patches. + let mut values: Vec = Vec::with_capacity(3072); + for i in 0u16..3072 { + if i == 100 || i == 200 || i == 300 || i == 1100 || i == 1200 || i == 2100 { + // These will be patches (values > 511) + values.push(600); + } else { + values.push(i % 512); + } + } + + let primitive_array = + PrimitiveArray::new(Buffer::from_iter(values.iter().copied()), NonNullable); + + // Encode with bit width 9 (max value 511) + let bitpacked_array = BitPacked::encode(&primitive_array.into_array(), 9)?; + assert!( + bitpacked_array.patches().is_some(), + "Expected patches to be present" + ); + + // Slice to create non-zero offset_within_chunk. + // The first chunk (0-1023) has patches at indices 100, 200, 300. + // Slicing from 150 should skip the patch at 100, creating offset_within_chunk=1. + let sliced_array = bitpacked_array.into_array().slice(150..2500)?; + assert!(sliced_array.is::()); + + let cpu_result = sliced_array.to_canonical()?; + let gpu_result = block_on(async { + BitPackedExecutor + .execute(sliced_array, &mut cuda_ctx) + .await + .vortex_expect("GPU decompression failed") + .into_host() + .await + .map(|a| a.into_array()) + })?; + + assert_arrays_eq!(cpu_result.into_array(), gpu_result); + + Ok(()) + } + + /// Test slicing a bitpacked array multiple times, accumulating offset_within_chunk. + #[crate::test] + fn test_cuda_bitunpack_double_sliced_patches() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + // Create an array with values that will generate patches. + let mut values: Vec = Vec::with_capacity(3072); + for i in 0u16..3072 { + if i == 50 || i == 100 || i == 200 || i == 300 || i == 400 || i == 1100 || i == 2100 { + values.push(600); + } else { + values.push(i % 512); + } + } + + let primitive_array = + PrimitiveArray::new(Buffer::from_iter(values.iter().copied()), NonNullable); + + let bitpacked_array = BitPacked::encode(&primitive_array.into_array(), 9)?; + assert!( + bitpacked_array.patches().is_some(), + "Expected patches to be present" + ); + + // First slice: drop the patch at index 50 from the front of chunk 0. + let first_slice = bitpacked_array.into_array().slice(75..3000)?; + // Second slice (relative to first): drop patch at original index 100. + // The second slice's range is kept wide enough that num_blocks still + // covers every chunk in the packed buffer. + let second_slice = first_slice.slice(50..2900)?; + assert!(second_slice.is::()); + + let cpu_result = second_slice.to_canonical()?; + let gpu_result = block_on(async { + BitPackedExecutor + .execute(second_slice, &mut cuda_ctx) + .await + .vortex_expect("GPU decompression failed") + .into_host() + .await + .map(|a| a.into_array()) + })?; + + assert_arrays_eq!(cpu_result.into_array(), gpu_result); + + Ok(()) + } + + /// Test slicing to skip an entire chunk's worth of patches. + #[crate::test] + fn test_cuda_bitunpack_sliced_skip_first_chunk_patches() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + // Create patches in first chunk only, then slice past them all. + let mut values: Vec = Vec::with_capacity(3072); + for i in 0u16..3072 { + if i == 100 || i == 200 || i == 300 { + values.push(600); + } else if i == 1500 || i == 2500 { + values.push(700); + } else { + values.push(i % 512); + } + } + + let primitive_array = + PrimitiveArray::new(Buffer::from_iter(values.iter().copied()), NonNullable); + + let bitpacked_array = BitPacked::encode(&primitive_array.into_array(), 9)?; + assert!( + bitpacked_array.patches().is_some(), + "Expected patches to be present" + ); + + // Slice to skip past all first chunk patches + let sliced_array = bitpacked_array.into_array().slice(1024..3072)?; + assert!(sliced_array.is::()); + + let cpu_result = sliced_array.to_canonical()?; + let gpu_result = block_on(async { + BitPackedExecutor + .execute(sliced_array, &mut cuda_ctx) + .await + .vortex_expect("GPU decompression failed") + .into_host() + .await + .map(|a| a.into_array()) + })?; + + assert_arrays_eq!(cpu_result.into_array(), gpu_result); + + Ok(()) + } } diff --git a/vortex-cuda/src/kernel/mod.rs b/vortex-cuda/src/kernel/mod.rs index 22dc81e1e28..68f40a15005 100644 --- a/vortex-cuda/src/kernel/mod.rs +++ b/vortex-cuda/src/kernel/mod.rs @@ -34,7 +34,6 @@ pub use encodings::ZstdKernelPrep; pub use encodings::zstd_kernel_prepare; pub(crate) use encodings::*; pub(crate) use filter::FilterExecutor; -pub use patches::types::load_patches; pub(crate) use slice::SliceExecutor; use crate::CudaKernelEvents; diff --git a/vortex-cuda/src/kernel/patches/types.rs b/vortex-cuda/src/kernel/patches/types.rs index cdf6be4ddb1..350c5210e8a 100644 --- a/vortex-cuda/src/kernel/patches/types.rs +++ b/vortex-cuda/src/kernel/patches/types.rs @@ -8,10 +8,10 @@ use vortex::array::buffer::BufferHandle; use vortex::buffer::Buffer; use vortex::buffer::BufferMut; use vortex::dtype::PType; -use vortex::dtype::UnsignedPType; use vortex_array::match_each_unsigned_integer_ptype; use vortex_array::patches::Patches; use vortex_error::VortexResult; +use vortex_error::vortex_bail; use crate::CudaExecutionCtx; use crate::executor::CudaArrayExt; @@ -23,33 +23,34 @@ pub struct DevicePatches { pub(crate) indices: BufferHandle, pub(crate) values: BufferHandle, pub(crate) offset: usize, + pub(crate) offset_within_chunk: usize, pub(crate) num_patches: usize, pub(crate) n_chunks: usize, } /// Load patches for GPU use. /// -/// If chunk_offsets is not present in the patches, computes it by scanning indices. -/// Indices are kept as-is (u32); the kernel computes within-chunk offsets at runtime. -pub async fn load_patches( +/// # Errors +/// +/// If the patches do not have `chunk_offsets`. They have been written by +/// default in new Vortex files since 0.54.0. +pub(crate) async fn load_patches( patches: &Patches, ctx: &mut CudaExecutionCtx, ) -> VortexResult { let offset = patches.offset(); + let offset_within_chunk = patches.offset_within_chunk().unwrap_or_default(); let array_len = patches.array_len(); // Get or compute chunk_offsets - let (chunk_offsets, chunk_offset_ptype) = if let Some(co) = patches.chunk_offsets() { + let Some(co) = patches.chunk_offsets() else { + vortex_bail!("cannot execute_cuda for patched BitPacked array without chunk_offsets") + }; + + let (chunk_offsets, chunk_offset_ptype) = { let co_canonical = co.clone().execute_cuda(ctx).await?.into_primitive(); let ptype = co_canonical.ptype(); (co_canonical.buffer_handle().clone(), ptype) - } else { - // Build chunk_offsets by scanning indices - let chunk_offsets = build_chunk_offsets(patches, array_len, ctx).await?; - ( - BufferHandle::new_host(chunk_offsets.freeze().into_byte_buffer()), - PType::U32, - ) }; // Load indices - must be converted to u32 for GPU use @@ -100,64 +101,12 @@ pub async fn load_patches( indices, values, offset, + offset_within_chunk, num_patches, n_chunks, }) } -/// Build chunk_offsets by scanning indices when not provided. -async fn build_chunk_offsets( - patches: &Patches, - array_len: usize, - ctx: &mut CudaExecutionCtx, -) -> VortexResult> { - let indices = patches - .indices() - .clone() - .execute_cuda(ctx) - .await? - .into_primitive(); - - let offset = patches.offset(); - let n_chunks = array_len.div_ceil(1024); - - let indices_ptype = indices.ptype(); - let indices_buf = indices.buffer_handle().to_host().await; - - match_each_unsigned_integer_ptype!(indices_ptype, |I| { - let indices_slice: Buffer = Buffer::from_byte_buffer(indices_buf); - Ok(compute_chunk_offsets( - indices_slice.as_slice(), - offset, - n_chunks, - )) - }) -} - -#[expect(clippy::cast_possible_truncation, clippy::expect_used)] -fn compute_chunk_offsets( - indices: &[I], - offset: usize, - n_chunks: usize, -) -> BufferMut { - let mut chunk_offsets: BufferMut = BufferMut::zeroed(n_chunks + 1); - - // For each patch, determine which chunk it belongs to - for (i, &idx) in indices.iter().enumerate() { - let absolute_idx: usize = idx.to_usize().expect("index should fit in usize"); - let chunk = (absolute_idx - offset) / 1024; - // Update offsets for all chunks after this patch's chunk - // Since indices are sorted, we can just set the end offset for subsequent chunks - for c in (chunk + 1)..chunk_offsets.len() { - if chunk_offsets[c] < (i + 1) as u32 { - chunk_offsets[c] = (i + 1) as u32; - } - } - } - - chunk_offsets -} - #[cfg(test)] mod tests { use vortex_array::IntoArray; @@ -165,51 +114,6 @@ mod tests { use vortex_array::patches::Patches; use vortex_error::VortexResult; - use super::compute_chunk_offsets; - - #[test] - fn test_compute_chunk_offsets_single_chunk() { - // All patches in chunk 0 - let indices: &[u32] = &[0, 100, 500, 1000]; - let offsets = compute_chunk_offsets(indices, 0, 2); - assert_eq!(offsets.as_slice(), &[0, 4, 4]); - } - - #[test] - fn test_compute_chunk_offsets_multiple_chunks() { - // Patches spread across chunks - let indices: &[u32] = &[0, 500, 1024, 1500, 2048, 3072]; - let offsets = compute_chunk_offsets(indices, 0, 4); - // Chunk 0: indices 0, 500 (positions 0..2) - // Chunk 1: indices 1024, 1500 (positions 2..4) - // Chunk 2: indices 2048 (positions 4..5) - // Chunk 3: indices 3072 (positions 5..6) - assert_eq!(offsets.as_slice(), &[0, 2, 4, 5, 6]); - } - - #[test] - fn test_compute_chunk_offsets_with_offset() { - // Patches with array offset - let indices: &[u32] = &[1024, 1500, 2048]; - let offsets = compute_chunk_offsets(indices, 1024, 2); - // After subtracting offset 1024: - // Chunk 0: indices 0, 476 (positions 0..2) - // Chunk 1: index 1024 (positions 2..3) - assert_eq!(offsets.as_slice(), &[0, 2, 3]); - } - - #[test] - fn test_compute_chunk_offsets_empty_chunks() { - // Patches skip some chunks - let indices: &[u32] = &[0, 3072]; - let offsets = compute_chunk_offsets(indices, 0, 4); - // Chunk 0: index 0 (positions 0..1) - // Chunk 1: empty (positions 1..1) - // Chunk 2: empty (positions 1..1) - // Chunk 3: index 3072 (positions 1..2) - assert_eq!(offsets.as_slice(), &[0, 1, 1, 1, 2]); - } - #[test] fn test_patches_with_chunk_offsets() -> VortexResult<()> { // Test creating patches with pre-computed chunk_offsets diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index 23799665d94..7d9caf6aa99 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -47,7 +47,6 @@ use kernel::ZigZagExecutor; use kernel::ZstdBuffersExecutor; use kernel::ZstdExecutor; pub use kernel::ZstdKernelPrep; -pub use kernel::load_patches; pub use kernel::zstd_kernel_prepare; pub use pinned::PinnedByteBufferPool; pub use pinned::PinnedPoolStats; From ecf4d55703f055faa5f7511bcdb04783a3477708 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 16 Apr 2026 15:31:16 -0400 Subject: [PATCH 5/8] simplify unit test Signed-off-by: Andrew Duffy --- vortex-cuda/src/kernel/encodings/bitpacked.rs | 24 ++++--------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/vortex-cuda/src/kernel/encodings/bitpacked.rs b/vortex-cuda/src/kernel/encodings/bitpacked.rs index 854fdac8e07..3f4cf993a22 100644 --- a/vortex-cuda/src/kernel/encodings/bitpacked.rs +++ b/vortex-cuda/src/kernel/encodings/bitpacked.rs @@ -202,7 +202,7 @@ mod tests { use vortex::array::assert_arrays_eq; use vortex::array::dtype::NativePType; use vortex::array::validity::Validity::NonNullable; - use vortex::buffer::Buffer; + use vortex::buffer::{Buffer, buffer}; use vortex::encodings::fastlanes::BitPackedArrayExt; use vortex::error::VortexExpect; use vortex::session::VortexSession; @@ -575,30 +575,16 @@ mod tests { // Create an array with values that will generate patches. // We use values 0-511 (fits in 9 bits) but include some larger values // that will become patches. - let mut values: Vec = Vec::with_capacity(3072); - for i in 0u16..3072 { - if i == 100 || i == 200 || i == 300 || i == 1100 || i == 1200 || i == 2100 { - // These will be patches (values > 511) - values.push(600); - } else { - values.push(i % 512); - } - } + let primitive_array = PrimitiveArray::new(buffer![100u8, 101, 102, 3, 4, 5], NonNullable); - let primitive_array = - PrimitiveArray::new(Buffer::from_iter(values.iter().copied()), NonNullable); - - // Encode with bit width 9 (max value 511) - let bitpacked_array = BitPacked::encode(&primitive_array.into_array(), 9)?; + // Encode with bit width 4. First 3 elements patched, remainder will pack. + let bitpacked_array = BitPacked::encode(&primitive_array.into_array(), 4)?; assert!( bitpacked_array.patches().is_some(), "Expected patches to be present" ); - // Slice to create non-zero offset_within_chunk. - // The first chunk (0-1023) has patches at indices 100, 200, 300. - // Slicing from 150 should skip the patch at 100, creating offset_within_chunk=1. - let sliced_array = bitpacked_array.into_array().slice(150..2500)?; + let sliced_array = bitpacked_array.into_array().slice(2..6)?; assert!(sliced_array.is::()); let cpu_result = sliced_array.to_canonical()?; From a8ca2f1c9ba607f955c4294ec665e840fed9d592 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 16 Apr 2026 15:59:14 -0400 Subject: [PATCH 6/8] try Signed-off-by: Andrew Duffy --- vortex-cuda/kernels/src/patches.cuh | 28 ++++++++++++------- vortex-cuda/src/kernel/encodings/bitpacked.rs | 3 +- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/vortex-cuda/kernels/src/patches.cuh b/vortex-cuda/kernels/src/patches.cuh index 81b5d76ad29..2245e87bb13 100644 --- a/vortex-cuda/kernels/src/patches.cuh +++ b/vortex-cuda/kernels/src/patches.cuh @@ -55,25 +55,33 @@ public: return; } - // get index into patch indices/values for `chunk`. - uint32_t chunk_start = load_chunk_offset(patches, chunk); + // mirrors the logic from vortex-array/src/arrays/primitive/array/patch.rs - // IF this is chunk 0, we need to apply the offset_within_chunk to - // the iteration start. This does not affect where the iteration ends. - if (chunk == 0) { - chunk_start += patches.offset_within_chunk; + // Compute base_offset from the first chunk offset. + uint32_t base_offset = load_chunk_offset(patches, 0); + + uint32_t patches_start_idx = load_chunk_offset(patches, chunk) - base_offset; + patches_start_idx -= min(patches_start_idx, patches.offset_within_chunk); + + // calculate the ending index. + uint32_t patches_end_idx; + if ((chunk + 1) < patches.n_chunks) { + patches_end_idx = load_chunk_offset(patches, chunk + 1) - base_offset; + // if this is the end of times, we should drop it out here... + patches_end_idx -= min(patches_end_idx, patches.offset_within_chunk); + } else { + patches_end_idx = patches.num_patches; } - uint32_t chunk_end = - (chunk + 1 < patches.n_chunks) ? load_chunk_offset(patches, chunk + 1) : patches.num_patches; - uint32_t num_patches = chunk_end - chunk_start; + // calculate how many patches are in the chunk + uint32_t num_patches = patches_end_idx - patches_start_idx; // Divide patches among threads (ceil division) uint32_t patches_per_thread = (num_patches + n_threads - 1) / n_threads; uint32_t my_start = min(thread_idx * patches_per_thread, num_patches); uint32_t my_end = min((thread_idx + 1) * patches_per_thread, num_patches); - uint32_t start = chunk_start + my_start; + uint32_t start = patches_start_idx + my_start; remaining = my_end - my_start; indices = patches.indices + start; values = reinterpret_cast(patches.values) + start; diff --git a/vortex-cuda/src/kernel/encodings/bitpacked.rs b/vortex-cuda/src/kernel/encodings/bitpacked.rs index 3f4cf993a22..00acbf83dc5 100644 --- a/vortex-cuda/src/kernel/encodings/bitpacked.rs +++ b/vortex-cuda/src/kernel/encodings/bitpacked.rs @@ -202,7 +202,8 @@ mod tests { use vortex::array::assert_arrays_eq; use vortex::array::dtype::NativePType; use vortex::array::validity::Validity::NonNullable; - use vortex::buffer::{Buffer, buffer}; + use vortex::buffer::Buffer; + use vortex::buffer::buffer; use vortex::encodings::fastlanes::BitPackedArrayExt; use vortex::error::VortexExpect; use vortex::session::VortexSession; From 82f8911cdae3f1a533ea2c5aeefc15f7b8038b1c Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 16 Apr 2026 17:08:47 -0400 Subject: [PATCH 7/8] fix Signed-off-by: Andrew Duffy --- vortex-cuda/kernels/src/patches.cuh | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/vortex-cuda/kernels/src/patches.cuh b/vortex-cuda/kernels/src/patches.cuh index 2245e87bb13..66a97b935e9 100644 --- a/vortex-cuda/kernels/src/patches.cuh +++ b/vortex-cuda/kernels/src/patches.cuh @@ -86,8 +86,11 @@ public: indices = patches.indices + start; values = reinterpret_cast(patches.values) + start; - // Precompute base for within-chunk index calculation - chunk_base = patches.offset + chunk * 1024; + // The iterator returns indices relative to the start of the chunk. + // `chunk_base` is the index of the first element within a chunk, accounting + // for the slice offset. + chunk_base = chunk * 1024 + patches.offset; + chunk_base -= min(chunk_base, patches.offset % 1024); } /// Return the current patch (with within-chunk index) and advance, From 1a1c67250a9d9e5dffc2922caa01041d0fa5ffd8 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 16 Apr 2026 17:15:32 -0400 Subject: [PATCH 8/8] Replace deprecated to_canonical with execute:: CI clippy was failing on deprecation warnings in three CUDA bitpacked tests using `ArrayRef::to_canonical`. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Andrew Duffy --- vortex-cuda/src/kernel/encodings/bitpacked.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/vortex-cuda/src/kernel/encodings/bitpacked.rs b/vortex-cuda/src/kernel/encodings/bitpacked.rs index 00acbf83dc5..d29f6115889 100644 --- a/vortex-cuda/src/kernel/encodings/bitpacked.rs +++ b/vortex-cuda/src/kernel/encodings/bitpacked.rs @@ -588,7 +588,9 @@ mod tests { let sliced_array = bitpacked_array.into_array().slice(2..6)?; assert!(sliced_array.is::()); - let cpu_result = sliced_array.to_canonical()?; + let cpu_result = sliced_array + .clone() + .execute::(cuda_ctx.execution_ctx())?; let gpu_result = block_on(async { BitPackedExecutor .execute(sliced_array, &mut cuda_ctx) @@ -637,7 +639,9 @@ mod tests { let second_slice = first_slice.slice(50..2900)?; assert!(second_slice.is::()); - let cpu_result = second_slice.to_canonical()?; + let cpu_result = second_slice + .clone() + .execute::(cuda_ctx.execution_ctx())?; let gpu_result = block_on(async { BitPackedExecutor .execute(second_slice, &mut cuda_ctx) @@ -684,7 +688,9 @@ mod tests { let sliced_array = bitpacked_array.into_array().slice(1024..3072)?; assert!(sliced_array.is::()); - let cpu_result = sliced_array.to_canonical()?; + let cpu_result = sliced_array + .clone() + .execute::(cuda_ctx.execution_ctx())?; let gpu_result = block_on(async { BitPackedExecutor .execute(sliced_array, &mut cuda_ctx)