Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for FFI of dictionary-encoded arrays.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 10, 2021
1 parent 522c7c7 commit ef2bc28
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 26 deletions.
16 changes: 15 additions & 1 deletion arrow-pyarrow-integration-testing/tests/test_sql.py
Expand Up @@ -104,7 +104,7 @@ def test_decimal_roundtrip(self):
data = [
round(decimal.Decimal(722.82), 2),
round(decimal.Decimal(-934.11), 2),
None
None,
]
a = pyarrow.array(data, pyarrow.decimal128(5, 2))
b = arrow_pyarrow_integration_testing.round_trip(a)
Expand Down Expand Up @@ -179,3 +179,17 @@ def test_list_list_array(self):
b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type

def test_dict(self):
"""
Python -> Rust -> Python
"""
a = pyarrow.array(
["a", "a", "b", None, "c"],
pyarrow.dictionary(pyarrow.int64(), pyarrow.utf8()),
)
b = arrow_pyarrow_integration_testing.round_trip(a)

b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type
37 changes: 26 additions & 11 deletions src/array/ffi.rs
Expand Up @@ -29,13 +29,28 @@ pub unsafe trait FromFfi<T: ffi::ArrowArrayRef>: Sized {
macro_rules! ffi_dyn {
($array:expr, $ty:ty) => {{
let array = $array.as_any().downcast_ref::<$ty>().unwrap();
(array.buffers(), array.children())
(array.buffers(), array.children(), None)
}};
}

type BuffersChildren = (Vec<Option<std::ptr::NonNull<u8>>>, Vec<Arc<dyn Array>>);
macro_rules! ffi_dict_dyn {
($array:expr, $ty:ty) => {{
let array = $array.as_any().downcast_ref::<$ty>().unwrap();
(
array.buffers(),
array.children(),
Some(array.values().clone()),
)
}};
}

type BuffersChildren = (
Vec<Option<std::ptr::NonNull<u8>>>,
Vec<Arc<dyn Array>>,
Option<Arc<dyn Array>>,
);

pub fn buffers_children(array: &dyn Array) -> BuffersChildren {
pub fn buffers_children_dictionary(array: &dyn Array) -> BuffersChildren {
match array.data_type() {
DataType::Null => ffi_dyn!(array, NullArray),
DataType::Boolean => ffi_dyn!(array, BooleanArray),
Expand Down Expand Up @@ -72,14 +87,14 @@ pub fn buffers_children(array: &dyn Array) -> BuffersChildren {
DataType::Struct(_) => ffi_dyn!(array, StructArray),
DataType::Union(_) => unimplemented!(),
DataType::Dictionary(key_type, _) => match key_type.as_ref() {
DataType::Int8 => ffi_dyn!(array, DictionaryArray::<i8>),
DataType::Int16 => ffi_dyn!(array, DictionaryArray::<i16>),
DataType::Int32 => ffi_dyn!(array, DictionaryArray::<i32>),
DataType::Int64 => ffi_dyn!(array, DictionaryArray::<i64>),
DataType::UInt8 => ffi_dyn!(array, DictionaryArray::<u8>),
DataType::UInt16 => ffi_dyn!(array, DictionaryArray::<u16>),
DataType::UInt32 => ffi_dyn!(array, DictionaryArray::<u32>),
DataType::UInt64 => ffi_dyn!(array, DictionaryArray::<u64>),
DataType::Int8 => ffi_dict_dyn!(array, DictionaryArray::<i8>),
DataType::Int16 => ffi_dict_dyn!(array, DictionaryArray::<i16>),
DataType::Int32 => ffi_dict_dyn!(array, DictionaryArray::<i32>),
DataType::Int64 => ffi_dict_dyn!(array, DictionaryArray::<i64>),
DataType::UInt8 => ffi_dict_dyn!(array, DictionaryArray::<u8>),
DataType::UInt16 => ffi_dict_dyn!(array, DictionaryArray::<u16>),
DataType::UInt32 => ffi_dict_dyn!(array, DictionaryArray::<u32>),
DataType::UInt64 => ffi_dict_dyn!(array, DictionaryArray::<u64>),
_ => unreachable!(),
},
}
Expand Down
2 changes: 1 addition & 1 deletion src/array/mod.rs
Expand Up @@ -401,7 +401,7 @@ pub use specification::Offset;
pub use struct_::StructArray;
pub use utf8::{MutableUtf8Array, Utf8Array, Utf8ValuesIter};

pub(crate) use self::ffi::buffers_children;
pub(crate) use self::ffi::buffers_children_dictionary;
pub use self::ffi::FromFfi;
pub use self::ffi::ToFfi;

Expand Down
17 changes: 8 additions & 9 deletions src/ffi/array.rs
Expand Up @@ -67,13 +67,15 @@ pub fn try_from<A: ArrowArrayRef>(array: A) -> Result<Box<dyn Array>> {
DataType::LargeList(_) => Box::new(ListArray::<i64>::try_from_ffi(array)?),
DataType::Struct(_) => Box::new(StructArray::try_from_ffi(array)?),
DataType::Dictionary(keys, _) => match keys.as_ref() {
DataType::Int8 => Box::new(DictionaryArray::<i8>::try_from_ffi(array)?),
DataType::Int16 => Box::new(DictionaryArray::<i16>::try_from_ffi(array)?),
DataType::Int32 => Box::new(DictionaryArray::<i32>::try_from_ffi(array)?),
DataType::Int64 => Box::new(DictionaryArray::<i64>::try_from_ffi(array)?),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Reading dictionary of keys \"{}\" is not yet supported.",
other
)))
}
DataType::UInt8 => Box::new(DictionaryArray::<u8>::try_from_ffi(array)?),
DataType::UInt16 => Box::new(DictionaryArray::<u16>::try_from_ffi(array)?),
DataType::UInt32 => Box::new(DictionaryArray::<u32>::try_from_ffi(array)?),
DataType::UInt64 => Box::new(DictionaryArray::<u64>::try_from_ffi(array)?),
_ => unreachable!(),
},
data_type => {
return Err(ArrowError::NotYetImplemented(format!(
Expand All @@ -89,7 +91,6 @@ pub fn try_from<A: ArrowArrayRef>(array: A) -> Result<Box<dyn Array>> {
#[cfg(test)]
mod tests {
use super::*;
use crate::array::*;
use crate::datatypes::TimeUnit;
use crate::{error::Result, ffi};
use std::sync::Arc;
Expand Down Expand Up @@ -209,7 +210,6 @@ mod tests {
test_round_trip(array)
}

/*
#[test]
fn test_dict() -> Result<()> {
let data = vec![Some("a"), Some("a"), None, Some("b")];
Expand All @@ -221,5 +221,4 @@ mod tests {

test_round_trip(array)
}
*/
}
15 changes: 12 additions & 3 deletions src/ffi/ffi.rs
Expand Up @@ -19,7 +19,7 @@ use std::{ptr::NonNull, sync::Arc};

use super::schema::{to_field, Ffi_ArrowSchema};
use crate::{
array::{buffers_children, Array},
array::{buffers_children_dictionary, Array},
bitmap::{utils::bytes_for, Bitmap},
buffer::{
bytes::{Bytes, Deallocation},
Expand Down Expand Up @@ -75,6 +75,10 @@ unsafe extern "C" fn c_release_array(array: *mut Ffi_ArrowArray) {
let _ = Box::from_raw(*child);
}

if let Some(ptr) = private.dictionary_ptr {
let _ = Box::from_raw(ptr);
}

array.release = None;
}

Expand All @@ -83,6 +87,7 @@ struct PrivateData {
array: Arc<dyn Array>,
buffers_ptr: Box<[*const std::os::raw::c_void]>,
children_ptr: Box<[*mut Ffi_ArrowArray]>,
dictionary_ptr: Option<*mut Ffi_ArrowArray>,
}

impl Ffi_ArrowArray {
Expand All @@ -91,7 +96,7 @@ impl Ffi_ArrowArray {
/// This method releases `buffers`. Consumers of this struct *must* call `release` before
/// releasing this struct, or contents in `buffers` leak.
fn new(array: Arc<dyn Array>) -> Self {
let (buffers, children) = buffers_children(array.as_ref());
let (buffers, children, dictionary) = buffers_children_dictionary(array.as_ref());

let buffers_ptr = buffers
.iter()
Expand All @@ -109,13 +114,17 @@ impl Ffi_ArrowArray {
.collect::<Box<_>>();
let n_children = children_ptr.len() as i64;

let dictionary_ptr =
dictionary.map(|array| Box::into_raw(Box::new(Ffi_ArrowArray::new(array))));

let length = array.len() as i64;
let null_count = array.null_count() as i64;

let mut private_data = Box::new(PrivateData {
array,
buffers_ptr,
children_ptr,
dictionary_ptr,
});

Self {
Expand All @@ -126,7 +135,7 @@ impl Ffi_ArrowArray {
n_children,
buffers: private_data.buffers_ptr.as_mut_ptr(),
children: private_data.children_ptr.as_mut_ptr(),
dictionary: std::ptr::null_mut(),
dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()),
release: Some(c_release_array),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
Expand Down
16 changes: 15 additions & 1 deletion src/ffi/schema.rs
Expand Up @@ -9,6 +9,7 @@ use crate::{
struct SchemaPrivateData {
field: Field,
children_ptr: Box<[*mut Ffi_ArrowSchema]>,
dictionary: Option<*mut Ffi_ArrowSchema>,
}

/// ABI-compatible struct for `ArrowSchema` from C Data Interface
Expand Down Expand Up @@ -43,6 +44,10 @@ unsafe extern "C" fn c_release_schema(schema: *mut Ffi_ArrowSchema) {
let _ = Box::from_raw(*child);
}

if let Some(ptr) = private.dictionary {
let _ = Box::from_raw(ptr);
}

schema.release = None;
}

Expand Down Expand Up @@ -75,9 +80,18 @@ impl Ffi_ArrowSchema {

let flags = field.is_nullable() as i64 * 2;

let dictionary = if let DataType::Dictionary(_, values) = field.data_type() {
// we do not store field info in the dict values, so can't recover it all :(
let field = Field::new("item", values.as_ref().clone(), true);
Some(Box::new(Ffi_ArrowSchema::try_new(field)?))
} else {
None
};

let mut private = Box::new(SchemaPrivateData {
field,
children_ptr,
dictionary: dictionary.map(Box::into_raw),
});

// <https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema>
Expand All @@ -88,7 +102,7 @@ impl Ffi_ArrowSchema {
flags,
n_children,
children: private.children_ptr.as_mut_ptr(),
dictionary: std::ptr::null_mut(),
dictionary: private.dictionary.unwrap_or(std::ptr::null_mut()),
release: Some(c_release_schema),
private_data: Box::into_raw(private) as *mut ::std::os::raw::c_void,
})
Expand Down

0 comments on commit ef2bc28

Please sign in to comment.