diff --git a/vortex-cuda/src/arrow/canonical.rs b/vortex-cuda/src/arrow/canonical.rs index 5b1c84ea796..96382e91068 100644 --- a/vortex-cuda/src/arrow/canonical.rs +++ b/vortex-cuda/src/arrow/canonical.rs @@ -17,12 +17,15 @@ use vortex::array::arrays::primitive::PrimitiveDataParts; use vortex::array::arrays::struct_::StructDataParts; use vortex::array::arrays::varbinview::VarBinViewDataParts; use vortex::array::buffer::BufferHandle; +use vortex::array::validity::Validity; use vortex::buffer::Buffer; +use vortex::buffer::ByteBuffer; use vortex::dtype::DecimalType; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_ensure; use vortex::extension::datetime::AnyTemporal; +use vortex::mask::Mask; use crate::CudaExecutionCtx; use crate::arrow::ARROW_DEVICE_CUDA; @@ -31,7 +34,6 @@ use crate::arrow::ArrowDeviceArray; use crate::arrow::ExportDeviceArray; use crate::arrow::PrivateData; use crate::arrow::SyncEvent; -use crate::arrow::check_validity_empty; use crate::executor::CudaArrayExt; /// An implementation of `ExportDeviceArray` that exports Vortex arrays to `ArrowDeviceArray` by @@ -74,11 +76,11 @@ fn export_canonical( buffer, validity, .. } = primitive.into_data_parts(); - check_validity_empty(&validity)?; - + let (validity_buffer, null_count) = + export_arrow_validity_buffer(validity, len, 0, ctx).await?; let buffer = ctx.ensure_on_device(buffer).await?; - export_fixed_size(buffer, len, 0, ctx) + export_fixed_size(buffer, len, 0, validity_buffer, null_count, ctx) } Canonical::Null(null_array) => { let len = null_array.len(); @@ -101,18 +103,17 @@ fn export_canonical( .. } = decimal.into_data_parts(); - // verify that there is no null buffer - check_validity_empty(&validity)?; - // TODO(aduffy): GPU kernel for upcasting. vortex_ensure!( values_type >= DecimalType::I32, "cannot export DecimalArray with values type {values_type}. must be i32 or wider." ); + let (validity_buffer, null_count) = + export_arrow_validity_buffer(validity, len, 0, ctx).await?; let buffer = ctx.ensure_on_device(values).await?; - export_fixed_size(buffer, len, 0, ctx) + export_fixed_size(buffer, len, 0, validity_buffer, null_count, ctx) } Canonical::Extension(extension) => { if !extension.ext_dtype().is::() { @@ -129,10 +130,11 @@ fn export_canonical( buffer, validity, .. } = values.into_data_parts(); - check_validity_empty(&validity)?; + let (validity_buffer, null_count) = + export_arrow_validity_buffer(validity, len, 0, ctx).await?; let buffer = ctx.ensure_on_device(buffer).await?; - export_fixed_size(buffer, len, 0, ctx) + export_fixed_size(buffer, len, 0, validity_buffer, null_count, ctx) } Canonical::Bool(bool_array) => { let len = bool_array.len(); @@ -141,10 +143,11 @@ fn export_canonical( bits, offset, len, .. } = bool_array.into_data().into_parts(len); - check_validity_empty(&validity)?; + let (validity_buffer, null_count) = + export_arrow_validity_buffer(validity, len, offset, ctx).await?; let bits = ctx.ensure_on_device(bits).await?; - export_fixed_size(bits, len, offset, ctx) + export_fixed_size(bits, len, offset, validity_buffer, null_count, ctx) } Canonical::VarBinView(varbinview) => { let len = varbinview.len(); @@ -155,11 +158,12 @@ fn export_canonical( .. } = varbinview.into_data_parts(); - check_validity_empty(&validity)?; + let (validity_buffer, null_count) = + export_arrow_validity_buffer(validity, len, 0, ctx).await?; let views = ctx.ensure_on_device(views).await?; let mut buffers = Vec::with_capacity(data_buffers.len() + 3); - buffers.push(None); + buffers.push(validity_buffer); buffers.push(Some(views)); for buffer in data_buffers.iter() { buffers.push(Some(ctx.ensure_on_device(buffer.clone()).await?)); @@ -182,7 +186,7 @@ fn export_canonical( let sync_event = private_data.sync_event(); let arrow_array = ArrowArray { length: len as i64, - null_count: 0, + null_count, offset: 0, // Arrow Utf8View/BinaryView layout: optional null bitmap, views, data buffers, // and trailing variadic buffer sizes. @@ -202,6 +206,30 @@ fn export_canonical( }) } +/// Export Vortex validity as an Arrow validity byte buffer. +/// +/// Returns `None` for the buffer when Arrow can omit validity because all rows are valid. +async fn export_arrow_validity_buffer( + validity: Validity, + len: usize, + arrow_offset: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(Option, i64)> { + let mask = validity.execute_mask(len, ctx.execution_ctx())?; + let null_count = i64::try_from(mask.false_count())?; + + let validity_buffer = match mask { + Mask::AllTrue(_) => return Ok((None, 0)), + Mask::AllFalse(len) => ByteBuffer::zeroed((len + arrow_offset).div_ceil(8)), + values @ Mask::Values(_) => values.into_bit_buffer().into_inner().2, + }; + let validity = ctx + .ensure_on_device(BufferHandle::new_host(validity_buffer)) + .await?; + + Ok((Some(validity), null_count)) +} + async fn export_struct( array: StructArray, ctx: &mut CudaExecutionCtx, @@ -211,7 +239,7 @@ async fn export_struct( validity, fields, .. } = array.into_data_parts(); - check_validity_empty(&validity)?; + let (validity_buffer, null_count) = export_arrow_validity_buffer(validity, len, 0, ctx).await?; // We need the children to be held across await points. let mut children = Vec::with_capacity(fields.len()); @@ -222,17 +250,17 @@ async fn export_struct( children.push(arrow_field); } - let mut private_data = PrivateData::new(vec![None], children, ctx)?; + let mut private_data = PrivateData::new(vec![validity_buffer], children, ctx)?; let sync_event: SyncEvent = private_data.sync_event(); // Populate the ArrowArray with the child arrays. let mut arrow_struct = ArrowArray::empty(); arrow_struct.length = len as i64; + arrow_struct.null_count = null_count; arrow_struct.n_children = fields.len() as i64; arrow_struct.children = private_data.children.as_mut_ptr(); - // StructArray _can_ contain a validity buffer. In this case, we just write a null pointer - // for it. + // StructArray has one buffer slot for its optional validity bitmap. arrow_struct.n_buffers = 1; arrow_struct.buffers = private_data.buffer_ptrs.as_mut_ptr(); arrow_struct.release = Some(release_array); @@ -246,6 +274,8 @@ fn export_fixed_size( buffer: BufferHandle, len: usize, offset: usize, + validity: Option, + null_count: i64, ctx: &mut CudaExecutionCtx, ) -> VortexResult<(ArrowArray, SyncEvent)> { vortex_ensure!( @@ -253,15 +283,13 @@ fn export_fixed_size( "buffer must already be copied to device before calling" ); - // Non-trivial validity is rejected before fixed-size export, so the Arrow null bitmap slot is - // always null for now. Future nullable export support should pass the validity bitmap here. - let mut private_data = PrivateData::new(vec![None, Some(buffer)], vec![], ctx)?; + let mut private_data = PrivateData::new(vec![validity, Some(buffer)], vec![], ctx)?; let sync_event: SyncEvent = private_data.sync_event(); // Return a copy of the CudaEvent let arrow_array = ArrowArray { length: len as i64, - null_count: 0, + null_count, offset: offset as i64, // 1 (optional) buffer for nulls, one buffer for the data n_buffers: 2, @@ -337,9 +365,12 @@ mod tests { use vortex::extension::datetime::TimeUnit; use vortex::session::VortexSession; + use crate::CudaExecutionCtx; use crate::arrow::ARROW_DEVICE_CUDA; use crate::arrow::ArrowArray; + use crate::arrow::ArrowDeviceArray; use crate::arrow::DeviceArrayExt; + use crate::arrow::PrivateData; use crate::session::CudaSession; unsafe fn release_exported_array(array: *mut ArrowArray) { @@ -352,7 +383,7 @@ mod tests { // Assert Arrow Device metadata that consumers use before reading buffers. fn assert_device_metadata( - device_array: &crate::arrow::ArrowDeviceArray, + device_array: &ArrowDeviceArray, expected_device_id: i64, expect_sync_event: bool, ) { @@ -362,20 +393,26 @@ mod tests { assert_eq!(device_array.sync_event.is_null(), !expect_sync_event); } - // Assert nullable exports fail until CUDA null-mask export is implemented. - async fn assert_rejects_non_trivial_validity( + // Assert an exported array has a device null bitmap in buffer slot 0. + fn assert_null_buffer(array: &ArrowArray, expected_null_count: i64) -> VortexResult<()> { + assert_eq!(array.null_count, expected_null_count); + let buffers = + unsafe { std::slice::from_raw_parts(array.buffers, usize::try_from(array.n_buffers)?) }; + assert!(!buffers[0].is_null()); + Ok(()) + } + + // Export a nullable array and assert its null-buffer metadata. + async fn assert_nullable_export( array: ArrayRef, - ctx: &mut crate::CudaExecutionCtx, - ) { - let err = array - .export_device_array(ctx) - .await - .expect_err("nullable Arrow Device export should fail until null masks are supported"); - assert!( - err.to_string() - .contains("Exporting array with non-trivial validity not supported yet"), - "unexpected error: {err}" - ); + expected_n_buffers: i64, + expected_null_count: i64, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + let device_array = array.export_device_array(ctx).await?; + assert_eq!(device_array.array.n_buffers, expected_n_buffers); + assert_null_buffer(&device_array.array, expected_null_count)?; + Ok(device_array) } // Build a nested struct fixture with an out-of-line string-view value. @@ -596,37 +633,167 @@ mod tests { Ok(()) } - // Check nullable canonical arrays are rejected rather than exported unsafely. + // Check nullable primitives export Arrow null bitmaps on device. #[crate::test] - async fn test_rejects_nullable_exports_until_null_masks_are_supported() -> VortexResult<()> { + async fn test_export_nullable_primitive() -> VortexResult<()> { let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) .vortex_expect("failed to create execution context"); - assert_rejects_non_trivial_validity( + let mut primitive = assert_nullable_export( PrimitiveArray::from_option_iter([Some(1i32), None, Some(3)]).into_array(), + 2, + 1, &mut ctx, ) - .await; - assert_rejects_non_trivial_validity( - BoolArray::from_iter([Some(true), None, Some(false)]).into_array(), + .await?; + unsafe { release_exported_array(&raw mut primitive.array) }; + + let mut all_null_primitive = assert_nullable_export( + PrimitiveArray::from_option_iter([None::, None]).into_array(), + 2, + 2, &mut ctx, ) - .await; - assert_rejects_non_trivial_validity( - VarBinViewArray::from_iter_nullable_str([Some("one"), None, Some("three")]) - .into_array(), + .await?; + unsafe { release_exported_array(&raw mut all_null_primitive.array) }; + + Ok(()) + } + + // Check nullable bool exports preserve Arrow offset metadata. + #[crate::test] + async fn test_export_nullable_bool() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let mut bools = assert_nullable_export( + BoolArray::from_iter([Some(true), None, Some(false), Some(true)]) + .into_array() + .slice(1..4)?, + 2, + 1, &mut ctx, ) - .await; - - let nullable_struct = StructArray::try_new( - FieldNames::from_iter(["a"]), - vec![PrimitiveArray::from_iter(0u32..3).into_array()], - 3, - Validity::from_iter([true, false, true]), - )? - .into_array(); - assert_rejects_non_trivial_validity(nullable_struct, &mut ctx).await; + .await?; + assert_eq!(bools.array.offset, 1); + unsafe { release_exported_array(&raw mut bools.array) }; + + Ok(()) + } + + // Check synthesized all-null bool validity is large enough for Arrow offset-based reads. + #[crate::test] + async fn test_export_all_null_sliced_bool_validity_covers_arrow_offset() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let mut bools = assert_nullable_export( + BoolArray::from_iter([None; 10]).into_array().slice(7..9)?, + 2, + 2, + &mut ctx, + ) + .await?; + assert_eq!(bools.array.offset, 7); + + let private_data = unsafe { &*bools.array.private_data.cast::() }; + let null_buffer = private_data.buffers[0] + .as_ref() + .vortex_expect("null buffer should be present"); + assert_eq!(null_buffer.len(), 2); + + unsafe { release_exported_array(&raw mut bools.array) }; + + Ok(()) + } + + // Check nullable decimal exports include Arrow null bitmaps. + #[crate::test] + async fn test_export_nullable_decimal() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let mut decimal = assert_nullable_export( + DecimalArray::from_option_iter( + [Some(100i32), None, Some(300)], + DecimalDType::new(10, 2), + ) + .into_array(), + 2, + 1, + &mut ctx, + ) + .await?; + unsafe { release_exported_array(&raw mut decimal.array) }; + + Ok(()) + } + + // Check nullable temporal exports include Arrow null bitmaps. + #[crate::test] + async fn test_export_nullable_temporal() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let mut temporal = assert_nullable_export( + TemporalArray::new_date( + PrimitiveArray::from_option_iter([Some(100i32), None, Some(300)]).into_array(), + TimeUnit::Days, + ) + .into_array(), + 2, + 1, + &mut ctx, + ) + .await?; + unsafe { release_exported_array(&raw mut temporal.array) }; + + Ok(()) + } + + // Check nullable string-view exports include Arrow null bitmaps. + #[crate::test] + async fn test_export_nullable_varbinview() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let mut varbinview = assert_nullable_export( + VarBinViewArray::from_iter_nullable_str([ + Some("one"), + None, + Some("this is a longer string for out-of-line storage"), + ]) + .into_array(), + 4, + 1, + &mut ctx, + ) + .await?; + unsafe { release_exported_array(&raw mut varbinview.array) }; + + Ok(()) + } + + // Check nullable struct exports include Arrow null bitmaps. + #[crate::test] + async fn test_export_nullable_struct() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let mut struct_array = assert_nullable_export( + StructArray::try_new( + FieldNames::from_iter(["a"]), + vec![PrimitiveArray::from_iter(0u32..3).into_array()], + 3, + Validity::from_iter([true, false, true]), + )? + .into_array(), + 1, + 1, + &mut ctx, + ) + .await?; + unsafe { release_exported_array(&raw mut struct_array.array) }; Ok(()) } diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 90e44c2413a..3739a12e6ec 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -24,10 +24,8 @@ use cudarc::runtime::sys::cudaEvent_t; use vortex::array::ArrayRef; use vortex::array::arrow::ArrowSessionExt; use vortex::array::buffer::BufferHandle; -use vortex::array::validity::Validity; use vortex::dtype::DType; use vortex::error::VortexResult; -use vortex::error::vortex_bail; use vortex::error::vortex_err; use crate::CudaBufferExt; @@ -236,12 +234,3 @@ pub trait ExportDeviceArray: Debug + Send + Sync + 'static { ctx: &mut CudaExecutionCtx, ) -> VortexResult; } - -/// Check that the validity buffer is empty and does not need to be copied over the device boundary. -pub(crate) fn check_validity_empty(validity: &Validity) -> VortexResult<()> { - if let Validity::AllInvalid | Validity::Array(_) = validity { - vortex_bail!("Exporting array with non-trivial validity not supported yet") - } - - Ok(()) -}