Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read Map from parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 3, 2022
1 parent d607c33 commit 1ed9f14
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 8 deletions.
28 changes: 27 additions & 1 deletion parquet_integration/write_parquet.py
Expand Up @@ -261,6 +261,32 @@ def case_nested_edge():
)


def case_map() -> Tuple[dict, pa.Schema, str]:
s1 = ["a1", "a2"]
s2 = ["b1", "b2"]
schema = pa.schema(
[
pa.field(
"map",
pa.map_(pa.string(), pa.string()),
),
pa.field(
"map_nullable",
pa.map_(pa.string(), pa.string()),
),
]
)

return (
{
"map": pa.MapArray.from_arrays([0, 2], s1, s2),
"map_nullable": pa.MapArray.from_arrays([0, 2], s1, ["b1", None]),
},
schema,
f"map_required_10.parquet",
)


def write_pyarrow(
case,
page_version: int,
Expand Down Expand Up @@ -299,7 +325,7 @@ def write_pyarrow(
)


for case in [case_basic_nullable, case_basic_required, case_nested, case_struct, case_nested_edge]:
for case in [case_basic_nullable, case_basic_required, case_nested, case_struct, case_nested_edge, case_map]:
for version in [1, 2]:
for use_dict in [True, False]:
for compression in ["lz4", None, "snappy"]:
Expand Down
1 change: 0 additions & 1 deletion src/ffi/schema.rs
Expand Up @@ -306,7 +306,6 @@ unsafe fn to_data_type(schema: &ArrowSchema) -> Result<DataType> {
let size = size_raw
.parse::<usize>()
.map_err(|_| Error::OutOfSpec("size is not a valid integer".to_string()))?;
println!("schema: {}", size);
DataType::FixedSizeBinary(size)
}
["+w", size_raw] => {
Expand Down
1 change: 0 additions & 1 deletion src/io/parquet/read/deserialize/binary/nested.rs
Expand Up @@ -120,7 +120,6 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
}
State::OptionalDictionary(page_validity, page_values) => {
println!("optional_dict");
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();

Expand Down
32 changes: 29 additions & 3 deletions src/io/parquet/read/deserialize/mod.rs
Expand Up @@ -11,9 +11,9 @@ mod struct_;
mod utils;

use crate::{
array::{Array, BinaryArray, FixedSizeListArray, ListArray, Utf8Array},
array::{Array, BinaryArray, FixedSizeListArray, ListArray, MapArray, Utf8Array},
datatypes::{DataType, Field},
error::Result,
error::{Error, Result},
};

use self::nested_utils::{InitNested, NestedArrayIter, NestedState};
Expand Down Expand Up @@ -300,7 +300,33 @@ where
let columns = columns.into_iter().rev().collect();
Box::new(struct_::StructIterator::new(columns, fields.clone()))
}
other => todo!("{other:?}"),
Map(inner, _) => {
println!("{:?}", init);
init.push(InitNested::List(field.is_nullable));
let iter = columns_to_iter_recursive(
columns,
types,
inner.as_ref().clone(),
init,
chunk_size,
)?;
Box::new(iter.map(move |x| {
let (nested, inner) = x?;
println!("{:?}", inner);
let array = MapArray::new(
field.data_type().clone(),
vec![0, inner.len() as i32].into(),
inner,
None,
);
Ok((nested, array.arced()))
}))
}
other => {
return Err(Error::nyi(format!(
"Deserializing type {other:?} from parquet"
)))
}
})
}

Expand Down
11 changes: 11 additions & 0 deletions src/io/parquet/read/schema/convert.rs
Expand Up @@ -219,6 +219,10 @@ fn non_repeated_group(
match (logical_type, converted_type) {
(Some(GroupLogicalType::List), _) => to_list(fields, parent_name),
(None, Some(GroupConvertedType::List)) => to_list(fields, parent_name),
(Some(GroupLogicalType::Map), _) => to_list(fields, parent_name),
(None, Some(GroupConvertedType::Map) | Some(GroupConvertedType::MapKeyValue)) => {
to_map(fields)
}
_ => to_struct(fields),
}
}
Expand All @@ -234,6 +238,13 @@ fn to_struct(fields: &[ParquetType]) -> Option<DataType> {
}
}

/// Converts a parquet group type to an arrow [`DataType::Struct`].
/// Returns [`None`] if all its fields are empty
fn to_map(fields: &[ParquetType]) -> Option<DataType> {
let inner = to_field(&fields[0])?;
Some(DataType::Map(Box::new(inner), false))
}

/// Entry point for converting parquet group type.
///
/// This function takes care of logical type and repetition.
Expand Down
64 changes: 64 additions & 0 deletions src/io/parquet/read/statistics/map.rs
@@ -0,0 +1,64 @@
use crate::{
array::{Array, MapArray, MutableArray},
datatypes::DataType,
error::Error,
};

use super::make_mutable;

#[derive(Debug)]
pub struct DynMutableMapArray {
data_type: DataType,
pub inner: Box<dyn MutableArray>,
}

impl DynMutableMapArray {
pub fn try_with_capacity(data_type: DataType, capacity: usize) -> Result<Self, Error> {
let inner = match data_type.to_logical_type() {
DataType::Map(inner, _) => inner,
_ => unreachable!(),
};
let inner = make_mutable(inner.data_type(), capacity)?;

Ok(Self { data_type, inner })
}
}

impl MutableArray for DynMutableMapArray {
fn data_type(&self) -> &DataType {
&self.data_type
}

fn len(&self) -> usize {
self.inner.len()
}

fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> {
None
}

fn as_box(&mut self) -> Box<dyn Array> {
Box::new(MapArray::new(
self.data_type.clone(),
vec![0, self.inner.len() as i32].into(),
self.inner.as_arc(),
None,
))
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

fn push_null(&mut self) {
todo!()
}

fn shrink_to_fit(&mut self) {
todo!()
}
}
55 changes: 55 additions & 0 deletions src/io/parquet/read/statistics/mod.rs
Expand Up @@ -22,6 +22,7 @@ mod boolean;
mod dictionary;
mod fixlen;
mod list;
mod map;
mod primitive;
mod struct_;
mod utf8;
Expand All @@ -37,6 +38,8 @@ pub enum Count {
Single(UInt64Array),
/// struct arrays have a count as a struct of UInt64
Struct(StructArray),
/// map arrays have a count as a map of UInt64
Map(MapArray),
}

/// Arrow-deserialized parquet Statistics of a file
Expand Down Expand Up @@ -76,6 +79,15 @@ impl From<MutableStatistics> for Statistics {
.unwrap()
.clone();
Count::Struct(a)
} else if let PhysicalType::Map = s.null_count.data_type().to_physical_type() {
let a = s
.null_count
.as_box()
.as_any()
.downcast_ref::<MapArray>()
.unwrap()
.clone();
Count::Map(a)
} else {
let a = s
.null_count
Expand All @@ -96,6 +108,15 @@ impl From<MutableStatistics> for Statistics {
.unwrap()
.clone();
Count::Struct(a)
} else if let PhysicalType::Map = s.null_count.data_type().to_physical_type() {
let a = s
.null_count
.as_box()
.as_any()
.downcast_ref::<MapArray>()
.unwrap()
.clone();
Count::Map(a)
} else {
let a = s
.distinct_count
Expand Down Expand Up @@ -151,6 +172,10 @@ fn make_mutable(data_type: &DataType, capacity: usize) -> Result<Box<dyn Mutable
data_type.clone(),
capacity,
)?),
PhysicalType::Map => Box::new(map::DynMutableMapArray::try_with_capacity(
data_type.clone(),
capacity,
)?),
other => {
return Err(Error::NotYetImplemented(format!(
"Deserializing parquet stats from {:?} is still not implemented",
Expand All @@ -168,6 +193,11 @@ fn create_dt(data_type: &DataType) -> DataType {
.map(|f| Field::new(&f.name, create_dt(&f.data_type), f.is_nullable))
.collect(),
)
} else if let DataType::Map(f, ordered) = data_type.to_logical_type() {
DataType::Map(
Box::new(Field::new(&f.name, create_dt(&f.data_type), f.is_nullable)),
*ordered,
)
} else {
DataType::UInt64
}
Expand Down Expand Up @@ -329,6 +359,31 @@ fn push(
)
});
}
Map(_, _) => {
let min = min
.as_mut_any()
.downcast_mut::<map::DynMutableMapArray>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<map::DynMutableMapArray>()
.unwrap();
let distinct_count = distinct_count
.as_mut_any()
.downcast_mut::<map::DynMutableMapArray>()
.unwrap();
let null_count = null_count
.as_mut_any()
.downcast_mut::<map::DynMutableMapArray>()
.unwrap();
return push(
stats,
min.inner.as_mut(),
max.inner.as_mut(),
distinct_count.inner.as_mut(),
null_count.inner.as_mut(),
);
}
_ => {}
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/statistics/struct_.rs
Expand Up @@ -30,7 +30,7 @@ impl MutableArray for DynMutableStructArray {
}

fn len(&self) -> usize {
self.inner.len()
self.inner[0].len()
}

fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> {
Expand Down

0 comments on commit 1ed9f14

Please sign in to comment.