Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 90 additions & 36 deletions vortex-array/src/arrow/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::buffer::OffsetBuffer;
use arrow_schema::DataType;
use arrow_schema::TimeUnit as ArrowTimeUnit;
use itertools::Itertools;
use vortex_buffer::Alignment;
use vortex_buffer::BitBuffer;
use vortex_buffer::Buffer;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexExpect as _;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_ensure_eq;
use vortex_error::vortex_err;
use vortex_error::vortex_panic;

use crate::ArrayRef;
Expand Down Expand Up @@ -140,7 +141,7 @@ macro_rules! impl_from_arrow_primitive {
impl FromArrowArray<&ArrowPrimitiveArray<$T>> for ArrayRef {
fn from_arrow(value: &ArrowPrimitiveArray<$T>, nullable: bool) -> VortexResult<Self> {
let buffer = Buffer::from_arrow_scalar_buffer(value.values().clone());
let validity = nulls(value.nulls(), nullable);
let validity = nulls(value.nulls(), nullable)?;
Ok(PrimitiveArray::new(buffer, validity).into_array())
}
}
Expand All @@ -166,7 +167,7 @@ impl FromArrowArray<&ArrowPrimitiveArray<Decimal32Type>> for ArrayRef {
) -> VortexResult<Self> {
let decimal_type = DecimalDType::new(array.precision(), array.scale());
let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
let validity = nulls(array.nulls(), nullable);
let validity = nulls(array.nulls(), nullable)?;
Ok(DecimalArray::new(buffer, decimal_type, validity).into_array())
}
}
Expand All @@ -178,7 +179,7 @@ impl FromArrowArray<&ArrowPrimitiveArray<Decimal64Type>> for ArrayRef {
) -> VortexResult<Self> {
let decimal_type = DecimalDType::new(array.precision(), array.scale());
let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
let validity = nulls(array.nulls(), nullable);
let validity = nulls(array.nulls(), nullable)?;
Ok(DecimalArray::new(buffer, decimal_type, validity).into_array())
}
}
Expand All @@ -190,7 +191,7 @@ impl FromArrowArray<&ArrowPrimitiveArray<Decimal128Type>> for ArrayRef {
) -> VortexResult<Self> {
let decimal_type = DecimalDType::new(array.precision(), array.scale());
let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
let validity = nulls(array.nulls(), nullable);
let validity = nulls(array.nulls(), nullable)?;
Ok(DecimalArray::new(buffer, decimal_type, validity).into_array())
}
}
Expand All @@ -207,7 +208,7 @@ impl FromArrowArray<&ArrowPrimitiveArray<Decimal256Type>> for ArrayRef {
// of either type.
let buffer =
unsafe { std::mem::transmute::<Buffer<arrow_buffer::i256>, Buffer<i256>>(buffer) };
let validity = nulls(array.nulls(), nullable);
let validity = nulls(array.nulls(), nullable)?;
Ok(DecimalArray::new(buffer, decimal_type, validity).into_array())
}
}
Expand All @@ -219,7 +220,7 @@ macro_rules! impl_from_arrow_temporal {
value: &ArrowPrimitiveArray<$T>,
nullable: bool,
) -> vortex_error::VortexResult<Self> {
Ok(temporal_array(value, nullable))
temporal_array(value, nullable)
}
}
};
Expand All @@ -241,17 +242,20 @@ impl_from_arrow_temporal!(Time64NanosecondType);
impl_from_arrow_temporal!(Date32Type);
impl_from_arrow_temporal!(Date64Type);

fn temporal_array<T: ArrowPrimitiveType>(value: &ArrowPrimitiveArray<T>, nullable: bool) -> ArrayRef
fn temporal_array<T: ArrowPrimitiveType>(
value: &ArrowPrimitiveArray<T>,
nullable: bool,
) -> VortexResult<ArrayRef>
where
T::Native: NativePType,
{
let arr = PrimitiveArray::new(
Buffer::from_arrow_scalar_buffer(value.values().clone()),
nulls(value.nulls(), nullable),
nulls(value.nulls(), nullable)?,
)
.into_array();

match value.data_type() {
Ok(match value.data_type() {
DataType::Timestamp(time_unit, tz) => {
TemporalArray::new_timestamp(arr, time_unit.into(), tz.clone()).into()
}
Expand All @@ -262,7 +266,7 @@ where
DataType::Duration(_) => unimplemented!(),
DataType::Interval(_) => unimplemented!(),
_ => vortex_panic!("Invalid temporal type: {}", value.data_type()),
}
})
}

impl<T: ByteArrayType> FromArrowArray<&GenericByteArray<T>> for ArrayRef
Expand All @@ -281,7 +285,7 @@ where
value.offsets().clone().into_array(),
ByteBuffer::from_arrow_buffer(value.values().clone(), Alignment::of::<u8>()),
dtype,
nulls(value.nulls(), nullable),
nulls(value.nulls(), nullable)?,
)
}
.into_array())
Expand Down Expand Up @@ -313,7 +317,7 @@ impl<T: ByteViewType> FromArrowArray<&GenericByteViewArray<T>> for ArrayRef {
.collect::<Vec<_>>(),
),
dtype,
nulls(value.nulls(), nullable),
nulls(value.nulls(), nullable)?,
)
.into_array()
})
Expand All @@ -324,17 +328,17 @@ impl FromArrowArray<&ArrowBooleanArray> for ArrayRef {
fn from_arrow(value: &ArrowBooleanArray, nullable: bool) -> VortexResult<Self> {
Ok(BoolArray::new(
value.values().clone().into(),
nulls(value.nulls(), nullable),
nulls(value.nulls(), nullable)?,
)
.into_array())
}
}

/// Strip out the nulls from this array and return a new array without nulls.
pub(crate) fn remove_nulls(data: arrow_data::ArrayData) -> arrow_data::ArrayData {
pub(crate) fn remove_nulls(data: arrow_data::ArrayData) -> VortexResult<arrow_data::ArrayData> {
if data.null_count() == 0 {
// No nulls to remove, return the array as is
return data;
return Ok(data);
}

let children = match data.data_type() {
Expand All @@ -344,12 +348,12 @@ pub(crate) fn remove_nulls(data: arrow_data::ArrayData) -> arrow_data::ArrayData
.zip(data.child_data().iter())
.map(|(field, child_data)| {
if field.is_nullable() {
child_data.clone()
Ok(child_data.clone())
} else {
remove_nulls(child_data.clone())
}
})
.collect_vec(),
.collect::<VortexResult<Vec<_>>>()?,
),
DataType::List(f)
| DataType::LargeList(f)
Expand All @@ -359,12 +363,12 @@ pub(crate) fn remove_nulls(data: arrow_data::ArrayData) -> arrow_data::ArrayData
if !f.is_nullable() =>
{
// All list types only have one child
assert_eq!(
vortex_ensure_eq!(
data.child_data().len(),
1,
"List types should have one child"
);
Some(vec![remove_nulls(data.child_data()[0].clone())])
Some(vec![remove_nulls(data.child_data()[0].clone())?])
}
_ => None,
};
Expand All @@ -375,7 +379,7 @@ pub(crate) fn remove_nulls(data: arrow_data::ArrayData) -> arrow_data::ArrayData
}
builder
.build()
.vortex_expect("reconstructing array without nulls")
.map_err(|e| vortex_err!("Failed to reconstruct Arrow array without nulls: {e}"))
}

impl FromArrowArray<&ArrowStructArray> for ArrayRef {
Expand All @@ -390,15 +394,15 @@ impl FromArrowArray<&ArrowStructArray> for ArrayRef {
// Arrow pushes down nulls, even into non-nullable fields. So we strip them
// out here because Vortex is a little more strict.
if c.null_count() > 0 && !field.is_nullable() {
let stripped = make_array(remove_nulls(c.into_data()));
let stripped = make_array(remove_nulls(c.into_data())?);
Self::from_arrow(stripped.as_ref(), false)
} else {
Self::from_arrow(c.as_ref(), field.is_nullable())
}
})
.collect::<VortexResult<Vec<_>>>()?,
value.len(),
nulls(value.nulls(), nullable),
nulls(value.nulls(), nullable)?,
)?
.into_array())
}
Expand All @@ -417,7 +421,7 @@ impl<O: IntegerPType + OffsetSizeTrait> FromArrowArray<&GenericListArray<O>> for

// `offsets` are always non-nullable.
let offsets = value.offsets().clone().into_array();
let nulls = nulls(value.nulls(), nullable);
let nulls = nulls(value.nulls(), nullable)?;

Ok(ListArray::try_new(elements, offsets, nulls)?.into_array())
}
Expand All @@ -437,7 +441,7 @@ impl<O: OffsetSizeTrait + NativePType> FromArrowArray<&GenericListViewArray<O>>
// `offsets` and `sizes` are always non-nullable.
let offsets = array.offsets().clone().into_array();
let sizes = array.sizes().clone().into_array();
let nulls = nulls(array.nulls(), nullable);
let nulls = nulls(array.nulls(), nullable)?;

Ok(ListViewArray::try_new(elements, offsets, sizes, nulls)?.into_array())
}
Expand All @@ -452,7 +456,7 @@ impl FromArrowArray<&ArrowFixedSizeListArray> for ArrayRef {
Ok(FixedSizeListArray::try_new(
Self::from_arrow(array.values().as_ref(), field.is_nullable())?,
*list_size as u32,
nulls(array.nulls(), nullable),
nulls(array.nulls(), nullable)?,
array.len(),
)?
.into_array())
Expand All @@ -461,7 +465,10 @@ impl FromArrowArray<&ArrowFixedSizeListArray> for ArrayRef {

impl FromArrowArray<&ArrowNullArray> for ArrayRef {
fn from_arrow(value: &ArrowNullArray, nullable: bool) -> VortexResult<Self> {
assert!(nullable);
vortex_ensure!(
nullable,
"Cannot convert an Arrow NullArray into a non-nullable Vortex array"
);
Ok(NullArray::new(value.len()).into_array())
}
}
Expand All @@ -476,20 +483,25 @@ impl<K: ArrowDictionaryKeyType> FromArrowArray<&DictionaryArray<K>> for DictArra
}
}

pub(crate) fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity {
pub(crate) fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> VortexResult<Validity> {
if nullable {
nulls
Ok(nulls
.map(|nulls| {
if nulls.null_count() == nulls.len() {
Validity::AllInvalid
} else {
Validity::from(BitBuffer::from(nulls.inner().clone()))
}
})
.unwrap_or_else(|| Validity::AllValid)
.unwrap_or(Validity::AllValid))
} else {
assert!(nulls.map(|x| x.null_count() == 0).unwrap_or(true));
Validity::NonNullable
let null_count = nulls.map(NullBuffer::null_count).unwrap_or(0);
vortex_ensure_eq!(
null_count,
0,
"Cannot convert an Arrow array containing {null_count} nulls into a non-nullable Vortex array"
);
Ok(Validity::NonNullable)
Comment on lines +498 to +504
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the main reason for this change, it doesn't make much sense to assert when basically all callers return results

}
}

Expand Down Expand Up @@ -1488,8 +1500,48 @@ mod tests {
}

#[test]
#[should_panic]
pub fn cannot_handle_nullable_struct_containing_non_nullable_dictionary() {
fn non_nullable_request_rejects_nulls() {
// Requesting `nullable = false` on an Arrow array that physically contains nulls is a
// contradiction and must surface as an error, not a panic.
let arrow_array = Int32Array::from(vec![Some(1), None, Some(3)]);
assert!(ArrayRef::from_arrow(&arrow_array, false).is_err());
}

#[test]
fn non_nullable_request_rejects_null_array() {
// An Arrow NullArray is entirely null, so it cannot be converted to a non-nullable
// Vortex array.
let arrow_array = NullArray::new(5);
assert!(ArrayRef::from_arrow(&arrow_array, false).is_err());
}

#[test]
fn non_nullable_struct_with_nulls_errors() {
// A struct array carrying top-level nulls cannot be converted to a non-nullable Vortex
// struct; the struct-level validity reconciliation must error rather than panic.
let struct_array = new_null_array(
&DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3,
);
assert!(ArrayRef::from_arrow(struct_array.as_ref(), false).is_err());
}

#[test]
fn non_nullable_list_with_nulls_errors() {
// Likewise for a list array with a null entry: requesting a non-nullable list must error
// rather than panic.
let mut builder = ListBuilder::new(Int32Builder::new());
builder.append_value([Some(1), Some(2)]);
builder.append_null();
let list = builder.finish();
assert!(ArrayRef::from_arrow(&list, false).is_err());
}

#[test]
pub fn nullable_struct_containing_non_nullable_dictionary_with_nulls_errors() {
// `remove_nulls` cannot strip pushed-down nulls out of a non-nullable dictionary field,
// so the values end up converted with `nullable = false` while still containing nulls.
// This must surface as an error rather than panicking.
let null_struct_array_with_non_nullable_field = new_null_array(
&DataType::Struct(Fields::from(vec![Field::new(
"non_nullable_deeper_inner",
Expand All @@ -1499,6 +1551,8 @@ mod tests {
1,
);

ArrayRef::from_arrow(null_struct_array_with_non_nullable_field.as_ref(), true).unwrap();
assert!(
ArrayRef::from_arrow(null_struct_array_with_non_nullable_field.as_ref(), true).is_err()
);
}
}
21 changes: 21 additions & 0 deletions vortex-array/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,28 @@ use crate::ArrayRef;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;

/// Construct a Vortex array from an Arrow array (or other Arrow container) of type `A`.
///
/// Implementations reuse the underlying Arrow buffers without copying wherever the Arrow and
/// Vortex memory layouts allow it.
pub trait FromArrowArray<A> {
/// Convert `array` into a Vortex array whose [`DType`](crate::dtype::DType) has the requested
/// `nullable` [`Nullability`](crate::dtype::Nullability).
///
/// An Arrow array can carry a validity (null) buffer regardless of whether its schema declares
/// the field nullable, so the desired nullability is supplied separately by the caller
/// (typically from the corresponding Arrow `Field`'s `is_nullable`). This flag is reconciled
/// with the array's physical nulls as follows:
///
/// - `nullable == true`: the resulting validity is derived from the array's null buffer, or
/// all-valid when the array has none.
/// - `nullable == false`: the array must contain no nulls, and the result is non-nullable.
///
/// # Errors
///
/// Returns an error if `nullable` is `false` but `array` physically contains one or more nulls
/// (including an Arrow `NullArray`, which is entirely null), or if the Arrow data type is not
/// supported.
fn from_arrow(array: A, nullable: bool) -> VortexResult<Self>
where
Self: Sized;
Expand Down
Loading
Loading