Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read and write extension types to and from parquet #396

Merged
merged 2 commits into from
Sep 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,17 @@ pub use schema::Schema;

pub(crate) use field::{get_extension, Extension, Metadata};

/// The set of datatypes that are supported by this implementation of Apache Arrow.
///
/// The Arrow specification on data types includes some more types.
/// See also [`Schema.fbs`](https://github.com/apache/arrow/blob/master/format/Schema.fbs)
/// for Arrow's specification.
///
/// The variants of this enum include primitive fixed size types as well as parametric or
/// nested types.
/// Currently the Rust implementation supports the following nested types:
/// - `List<T>`
/// - `Struct<T, U, V, ...>`
///
/// Nested types can themselves be nested within other arrays.
/// For more information on these types please see
/// [the physical memory layout of Apache Arrow](https://arrow.apache.org/docs/format/Columnar.html#physical-memory-layout).
/// The set of supported logical types.
/// Each variant uniquely identifies a logical type, which define specific semantics to the data (e.g. how it should be represented).
/// A [`DataType`] has an unique corresponding [`PhysicalType`], obtained via [`DataType::to_physical_type`],
/// which uniquely identifies an in-memory representation of data.
/// The [`DataType::Extension`] is special in that it augments a [`DataType`] with metadata to support custom types.
/// Use `to_logical_type` to desugar such type and return its correspoding logical type.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DataType {
/// Null type, representing an array without values or validity, only a length.
/// Null type
Null,
/// A boolean datatype representing the values `true` and `false`.
/// `true` and `false`.
Boolean,
/// A signed 8-bit integer.
Int8,
Expand Down Expand Up @@ -223,6 +214,17 @@ impl DataType {
Extension(_, key, _) => key.to_physical_type(),
}
}

/// Returns `&self` for all but [`DataType::Extension`]. For [`DataType::Extension`],
/// (recursively) returns the inner [`DataType`].
/// Never returns the variant [`DataType::Extension`].
pub fn to_logical_type(&self) -> &DataType {
use DataType::*;
match self {
Extension(_, key, _) => key.to_logical_type(),
_ => self,
}
}
}

fn to_dictionary_index_type(data_type: &DataType) -> DictionaryIndexType {
Expand Down
67 changes: 39 additions & 28 deletions src/io/ipc/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,29 @@ fn write_metadata<'a>(
}
}

fn write_extension<'a>(
fbb: &mut FlatBufferBuilder<'a>,
name: &str,
metadata: &Option<String>,
kv_vec: &mut Vec<WIPOffset<ipc::KeyValue<'a>>>,
) {
// metadata
if let Some(metadata) = metadata {
let kv_args = ipc::KeyValueArgs {
key: Some(fbb.create_string("ARROW:extension:metadata")),
value: Some(fbb.create_string(metadata.as_str())),
};
kv_vec.push(ipc::KeyValue::create(fbb, &kv_args));
}

// name
let kv_args = ipc::KeyValueArgs {
key: Some(fbb.create_string("ARROW:extension:name")),
value: Some(fbb.create_string(name)),
};
kv_vec.push(ipc::KeyValue::create(fbb, &kv_args));
}

/// Create an IPC Field from an Arrow Field
pub(crate) fn build_field<'a>(
fbb: &mut FlatBufferBuilder<'a>,
Expand All @@ -341,39 +364,16 @@ pub(crate) fn build_field<'a>(
// custom metadata.
let mut kv_vec = vec![];
if let DataType::Extension(name, _, metadata) = field.data_type() {
// append extension information.

// metadata
if let Some(metadata) = metadata {
let kv_args = ipc::KeyValueArgs {
key: Some(fbb.create_string("ARROW:extension:metadata")),
value: Some(fbb.create_string(metadata.as_str())),
};
kv_vec.push(ipc::KeyValue::create(fbb, &kv_args));
}

// name
let kv_args = ipc::KeyValueArgs {
key: Some(fbb.create_string("ARROW:extension:name")),
value: Some(fbb.create_string(name.as_str())),
};
kv_vec.push(ipc::KeyValue::create(fbb, &kv_args));
write_extension(fbb, name, metadata, &mut kv_vec);
}
if let Some(metadata) = field.metadata() {
if !metadata.is_empty() {
write_metadata(fbb, metadata, &mut kv_vec);
}
};
let fb_metadata = if !kv_vec.is_empty() {
Some(fbb.create_vector(&kv_vec))
} else {
None
};

let fb_field_name = fbb.create_string(field.name().as_str());
let field_type = get_fb_field_type(field.data_type(), field.is_nullable(), fbb);

let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
let fb_dictionary = if let Dictionary(index_type, inner) = field.data_type() {
if let DataType::Extension(name, _, metadata) = inner.as_ref() {
write_extension(fbb, name, metadata, &mut kv_vec);
}
Some(get_fb_dictionary(
index_type,
field
Expand All @@ -388,6 +388,17 @@ pub(crate) fn build_field<'a>(
None
};

if let Some(metadata) = field.metadata() {
if !metadata.is_empty() {
write_metadata(fbb, metadata, &mut kv_vec);
}
};
let fb_metadata = if !kv_vec.is_empty() {
Some(fbb.create_vector(&kv_vec))
} else {
None
};

let mut field_builder = ipc::FieldBuilder::new(fbb);
field_builder.add_name(fb_field_name);
if let Some(dictionary) = fb_dictionary {
Expand Down
6 changes: 5 additions & 1 deletion src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn read_dict_optional<K, O>(
{
let length = indices.len() + additional;
values.extend_from_slice(dict.values());
offsets.extend(
offsets.extend_from_trusted_len_iter(
dict.offsets()
.iter()
.map(|x| O::from_usize(*x as usize).unwrap()),
Expand Down Expand Up @@ -152,6 +152,10 @@ where
)?
}

if offsets.len() == 0 {
// the array is empty and thus we need to push the first offset ourselves.
offsets.push(O::zero());
};
let keys = PrimitiveArray::from_data(K::DATA_TYPE, indices.into(), validity.into());
let data_type = DictionaryArray::<K>::get_child(&data_type).clone();
let values = Arc::new(Utf8Array::from_data(
Expand Down
28 changes: 15 additions & 13 deletions src/io/parquet/read/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ pub(crate) fn read_dict_buffer(
validity_buffer: &[u8],
indices_buffer: &[u8],
additional: usize,
size: i32,
size: usize,
dict: &FixedLenByteArrayPageDict,
values: &mut MutableBuffer<u8>,
validity: &mut MutableBitmap,
) {
let size = size as usize;
let length = values.len() * size + additional;
let dict_values = dict.values();

Expand Down Expand Up @@ -75,11 +74,10 @@ pub(crate) fn read_optional(
validity_buffer: &[u8],
values_buffer: &[u8],
additional: usize,
size: i32,
size: usize,
values: &mut MutableBuffer<u8>,
validity: &mut MutableBitmap,
) {
let size = size as usize;
let length = values.len() * size + additional;

assert_eq!(values_buffer.len() % size, 0);
Expand Down Expand Up @@ -122,25 +120,27 @@ pub(crate) fn read_optional(
pub(crate) fn read_required(
buffer: &[u8],
additional: usize,
size: i32,
size: usize,
values: &mut MutableBuffer<u8>,
) {
assert_eq!(buffer.len(), additional * size as usize);
assert_eq!(buffer.len(), additional * size);
values.extend_from_slice(buffer);
}

pub fn iter_to_array<I, E>(
mut iter: I,
size: i32,
data_type: DataType,
metadata: &ColumnChunkMetaData,
) -> Result<FixedSizeBinaryArray>
where
ArrowError: From<E>,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
{
let size = *FixedSizeBinaryArray::get_size(&data_type) as usize;

let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size as usize);
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
extend_from_page(
Expand All @@ -153,24 +153,26 @@ where
}

Ok(FixedSizeBinaryArray::from_data(
DataType::FixedSizeBinary(size),
data_type,
values.into(),
validity.into(),
))
}

pub async fn stream_to_array<I, E>(
pages: I,
size: i32,
data_type: DataType,
metadata: &ColumnChunkMetaData,
) -> Result<FixedSizeBinaryArray>
where
ArrowError: From<E>,
E: Clone,
I: Stream<Item = std::result::Result<DataPage, E>>,
{
let size = *FixedSizeBinaryArray::get_size(&data_type) as usize;

let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size as usize);
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size);
let mut validity = MutableBitmap::with_capacity(capacity);

pin_mut!(pages); // needed for iteration
Expand All @@ -186,15 +188,15 @@ where
}

Ok(FixedSizeBinaryArray::from_data(
DataType::FixedSizeBinary(size),
data_type,
values.into(),
validity.into(),
))
}

pub(crate) fn extend_from_page(
page: &DataPage,
size: i32,
size: usize,
descriptor: &ColumnDescriptor,
values: &mut MutableBuffer<u8>,
validity: &mut MutableBitmap,
Expand Down
14 changes: 7 additions & 7 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn dict_read<
panic!()
};

match values_data_type {
match values_data_type.to_logical_type() {
UInt8 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
Expand Down Expand Up @@ -169,7 +169,7 @@ pub fn page_iter_to_array<
data_type: DataType,
) -> Result<Box<dyn Array>> {
use DataType::*;
match data_type {
match data_type.to_logical_type() {
// INT32
UInt8 => primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as u8),
UInt16 => primitive::iter_to_array(iter, metadata, data_type, |x: i32| x as u16),
Expand Down Expand Up @@ -207,7 +207,7 @@ pub fn page_iter_to_array<
Binary | Utf8 => binary::iter_to_array::<i32, _, _>(iter, metadata, &data_type),
LargeBinary | LargeUtf8 => binary::iter_to_array::<i64, _, _>(iter, metadata, &data_type),
FixedSizeBinary(size) => Ok(Box::new(fixed_size_binary::iter_to_array(
iter, size, metadata,
iter, data_type, metadata,
)?)),

List(ref inner) => match inner.data_type() {
Expand Down Expand Up @@ -247,7 +247,7 @@ pub fn page_iter_to_array<
binary::iter_to_array_nested::<i64, _, _>(iter, metadata, data_type)
}
other => Err(ArrowError::NotYetImplemented(format!(
"The conversion of {:?} to arrow still not implemented",
"Reading {:?} from parquet still not implemented",
other
))),
},
Expand All @@ -265,7 +265,7 @@ pub fn page_iter_to_array<
},

other => Err(ArrowError::NotYetImplemented(format!(
"The conversion of {:?} to arrow still not implemented",
"Reading {:?} from parquet still not implemented",
other
))),
}
Expand All @@ -278,7 +278,7 @@ pub async fn page_stream_to_array<I: Stream<Item = std::result::Result<DataPage,
data_type: DataType,
) -> Result<Box<dyn Array>> {
use DataType::*;
match data_type {
match data_type.to_logical_type() {
// INT32
UInt8 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u8).await,
UInt16 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u16).await,
Expand Down Expand Up @@ -321,7 +321,7 @@ pub async fn page_stream_to_array<I: Stream<Item = std::result::Result<DataPage,
binary::stream_to_array::<i64, _, _>(pages, metadata, &data_type).await
}
FixedSizeBinary(size) => Ok(Box::new(
fixed_size_binary::stream_to_array(pages, size, metadata).await?,
fixed_size_binary::stream_to_array(pages, data_type, metadata).await?,
)),
other => Err(ArrowError::NotYetImplemented(format!(
"Async conversion of {:?}",
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
match encoding {
Encoding::PlainDictionary | Encoding::RleDictionary => {
// write DictPage
let dict_page = match array.values().data_type() {
let dict_page = match array.values().data_type().to_logical_type() {
DataType::Int8 => dyn_prim!(i8, i32, array),
DataType::Int16 => dyn_prim!(i16, i32, array),
DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub fn array_to_page(
)));
}

match data_type {
match data_type.to_logical_type() {
DataType::Boolean => {
boolean::array_to_page(array.as_any().downcast_ref().unwrap(), options, descriptor)
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn to_parquet_type(field: &Field) -> Result<ParquetType> {
Repetition::Required
};
// create type from field
match field.data_type() {
match field.data_type().to_logical_type() {
DataType::Null => Ok(ParquetType::try_from_primitive(
name,
PhysicalType::Int32,
Expand Down
6 changes: 6 additions & 0 deletions tests/it/io/ipc/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ fn write_100_decimal() -> Result<()> {
test_file("1.0.0-bigendian", "generated_decimal")
}

#[test]
fn write_100_extension() -> Result<()> {
test_file("1.0.0-littleendian", "generated_extension")?;
test_file("1.0.0-bigendian", "generated_extension")
}

#[test]
fn write_100_union() -> Result<()> {
test_file("1.0.0-littleendian", "generated_union")?;
Expand Down
6 changes: 6 additions & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,12 @@ fn roundtrip_100_dict() -> Result<()> {
test_file("1.0.0-bigendian", "generated_dictionary")
}

#[test]
fn roundtrip_100_extension() -> Result<()> {
test_file("1.0.0-littleendian", "generated_extension")?;
test_file("1.0.0-bigendian", "generated_extension")
}

/// Tests that when arrow-specific types (Duration and LargeUtf8) are written to parquet, we can rountrip its
/// logical types.
#[test]
Expand Down