diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 3971ff3e548..7f296aed3f4 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -261,6 +261,32 @@ def case_nested_edge(): ) +def case_map() -> Tuple[dict, pa.Schema, str]: + s1 = ["a1", "a2"] + s2 = ["b1", "b2"] + schema = pa.schema( + [ + pa.field( + "map", + pa.map_(pa.string(), pa.string()), + ), + pa.field( + "map_nullable", + pa.map_(pa.string(), pa.string()), + ), + ] + ) + + return ( + { + "map": pa.MapArray.from_arrays([0, 2], s1, s2), + "map_nullable": pa.MapArray.from_arrays([0, 2], s1, ["b1", None]), + }, + schema, + f"map_required_10.parquet", + ) + + def write_pyarrow( case, page_version: int, @@ -299,7 +325,7 @@ def write_pyarrow( ) -for case in [case_basic_nullable, case_basic_required, case_nested, case_struct, case_nested_edge]: +for case in [case_basic_nullable, case_basic_required, case_nested, case_struct, case_nested_edge, case_map]: for version in [1, 2]: for use_dict in [True, False]: for compression in ["lz4", None, "snappy"]: diff --git a/src/ffi/schema.rs b/src/ffi/schema.rs index fa7156766a8..4f4fc5b8be3 100644 --- a/src/ffi/schema.rs +++ b/src/ffi/schema.rs @@ -306,7 +306,6 @@ unsafe fn to_data_type(schema: &ArrowSchema) -> Result { let size = size_raw .parse::() .map_err(|_| Error::OutOfSpec("size is not a valid integer".to_string()))?; - println!("schema: {}", size); DataType::FixedSizeBinary(size) } ["+w", size_raw] => { diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index cfc5f2034df..4182cb10f7a 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -11,9 +11,9 @@ mod struct_; mod utils; use crate::{ - array::{Array, BinaryArray, FixedSizeListArray, ListArray, Utf8Array}, + array::{Array, BinaryArray, FixedSizeListArray, ListArray, MapArray, Utf8Array}, datatypes::{DataType, Field}, - error::Result, + error::{Error, Result}, }; use self::nested_utils::{InitNested, NestedArrayIter, NestedState}; @@ -300,7 +300,33 @@ where let columns = columns.into_iter().rev().collect(); Box::new(struct_::StructIterator::new(columns, fields.clone())) } - other => todo!("{other:?}"), + Map(inner, _) => { + println!("{:?}", init); + init.push(InitNested::List(field.is_nullable)); + let iter = columns_to_iter_recursive( + columns, + types, + inner.as_ref().clone(), + init, + chunk_size, + )?; + Box::new(iter.map(move |x| { + let (nested, inner) = x?; + println!("{:?}", inner); + let array = MapArray::new( + field.data_type().clone(), + vec![0, inner.len() as i32].into(), + inner, + None, + ); + Ok((nested, array.arced())) + })) + } + other => { + return Err(Error::nyi(format!( + "Deserializing type {other:?} from parquet" + ))) + } }) } diff --git a/src/io/parquet/read/schema/convert.rs b/src/io/parquet/read/schema/convert.rs index 9e32ee30572..5f23dc71dd6 100644 --- a/src/io/parquet/read/schema/convert.rs +++ b/src/io/parquet/read/schema/convert.rs @@ -219,6 +219,10 @@ fn non_repeated_group( match (logical_type, converted_type) { (Some(GroupLogicalType::List), _) => to_list(fields, parent_name), (None, Some(GroupConvertedType::List)) => to_list(fields, parent_name), + (Some(GroupLogicalType::Map), _) => to_list(fields, parent_name), + (None, Some(GroupConvertedType::Map) | Some(GroupConvertedType::MapKeyValue)) => { + to_map(fields) + } _ => to_struct(fields), } } @@ -234,6 +238,13 @@ fn to_struct(fields: &[ParquetType]) -> Option { } } +/// Converts a parquet group type to an arrow [`DataType::Struct`]. +/// Returns [`None`] if all its fields are empty +fn to_map(fields: &[ParquetType]) -> Option { + let inner = to_field(&fields[0])?; + Some(DataType::Map(Box::new(inner), false)) +} + /// Entry point for converting parquet group type. /// /// This function takes care of logical type and repetition. diff --git a/src/io/parquet/read/statistics/map.rs b/src/io/parquet/read/statistics/map.rs new file mode 100644 index 00000000000..ddc09c9c518 --- /dev/null +++ b/src/io/parquet/read/statistics/map.rs @@ -0,0 +1,64 @@ +use crate::{ + array::{Array, MapArray, MutableArray}, + datatypes::DataType, + error::Error, +}; + +use super::make_mutable; + +#[derive(Debug)] +pub struct DynMutableMapArray { + data_type: DataType, + pub inner: Box, +} + +impl DynMutableMapArray { + pub fn try_with_capacity(data_type: DataType, capacity: usize) -> Result { + let inner = match data_type.to_logical_type() { + DataType::Map(inner, _) => inner, + _ => unreachable!(), + }; + let inner = make_mutable(inner.data_type(), capacity)?; + + Ok(Self { data_type, inner }) + } +} + +impl MutableArray for DynMutableMapArray { + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn len(&self) -> usize { + self.inner.len() + } + + fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> { + None + } + + fn as_box(&mut self) -> Box { + Box::new(MapArray::new( + self.data_type.clone(), + vec![0, self.inner.len() as i32].into(), + self.inner.as_arc(), + None, + )) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + + fn push_null(&mut self) { + todo!() + } + + fn shrink_to_fit(&mut self) { + todo!() + } +} diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index d1b6d9e4b5e..4f3eaeff7bd 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -22,6 +22,7 @@ mod boolean; mod dictionary; mod fixlen; mod list; +mod map; mod primitive; mod struct_; mod utf8; @@ -37,6 +38,8 @@ pub enum Count { Single(UInt64Array), /// struct arrays have a count as a struct of UInt64 Struct(StructArray), + /// map arrays have a count as a map of UInt64 + Map(MapArray), } /// Arrow-deserialized parquet Statistics of a file @@ -76,6 +79,15 @@ impl From for Statistics { .unwrap() .clone(); Count::Struct(a) + } else if let PhysicalType::Map = s.null_count.data_type().to_physical_type() { + let a = s + .null_count + .as_box() + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + Count::Map(a) } else { let a = s .null_count @@ -96,6 +108,15 @@ impl From for Statistics { .unwrap() .clone(); Count::Struct(a) + } else if let PhysicalType::Map = s.null_count.data_type().to_physical_type() { + let a = s + .null_count + .as_box() + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + Count::Map(a) } else { let a = s .distinct_count @@ -151,6 +172,10 @@ fn make_mutable(data_type: &DataType, capacity: usize) -> Result Box::new(map::DynMutableMapArray::try_with_capacity( + data_type.clone(), + capacity, + )?), other => { return Err(Error::NotYetImplemented(format!( "Deserializing parquet stats from {:?} is still not implemented", @@ -168,6 +193,11 @@ fn create_dt(data_type: &DataType) -> DataType { .map(|f| Field::new(&f.name, create_dt(&f.data_type), f.is_nullable)) .collect(), ) + } else if let DataType::Map(f, ordered) = data_type.to_logical_type() { + DataType::Map( + Box::new(Field::new(&f.name, create_dt(&f.data_type), f.is_nullable)), + *ordered, + ) } else { DataType::UInt64 } @@ -329,6 +359,31 @@ fn push( ) }); } + Map(_, _) => { + let min = min + .as_mut_any() + .downcast_mut::() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::() + .unwrap(); + let distinct_count = distinct_count + .as_mut_any() + .downcast_mut::() + .unwrap(); + let null_count = null_count + .as_mut_any() + .downcast_mut::() + .unwrap(); + return push( + stats, + min.inner.as_mut(), + max.inner.as_mut(), + distinct_count.inner.as_mut(), + null_count.inner.as_mut(), + ); + } _ => {} } diff --git a/src/io/parquet/read/statistics/struct_.rs b/src/io/parquet/read/statistics/struct_.rs index 085737d6241..1f59188b5a4 100644 --- a/src/io/parquet/read/statistics/struct_.rs +++ b/src/io/parquet/read/statistics/struct_.rs @@ -30,7 +30,7 @@ impl MutableArray for DynMutableStructArray { } fn len(&self) -> usize { - self.inner.len() + self.inner[0].len() } fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> { diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index d60796378ce..700f4557586 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -790,6 +790,147 @@ pub fn pyarrow_struct_statistics(column: &str) -> Statistics { } } +pub fn pyarrow_map(column: &str) -> Box { + match column { + "map" => { + let s1 = [Some("a1"), Some("a2")]; + let s2 = [Some("b1"), Some("b2")]; + let dt = DataType::Struct(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, true), + ]); + MapArray::try_new( + DataType::Map(Box::new(Field::new("entries", dt.clone(), false)), false), + vec![0, 2].into(), + StructArray::try_new( + dt, + vec![ + Utf8Array::::from(s1).arced(), + Utf8Array::::from(s2).arced(), + ], + None, + ) + .unwrap() + .arced(), + None, + ) + .unwrap() + .boxed() + } + "map_nullable" => { + let s1 = [Some("a1"), Some("a2")]; + let s2 = [Some("b1"), None]; + let dt = DataType::Struct(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, true), + ]); + MapArray::try_new( + DataType::Map(Box::new(Field::new("entries", dt.clone(), false)), false), + vec![0, 2].into(), + StructArray::try_new( + dt, + vec![ + Utf8Array::::from(s1).arced(), + Utf8Array::::from(s2).arced(), + ], + None, + ) + .unwrap() + .arced(), + None, + ) + .unwrap() + .boxed() + } + _ => unreachable!(), + } +} + +pub fn pyarrow_map_statistics(column: &str) -> Statistics { + let new_map = |arrays: Vec>, names: Vec| { + let fields = names + .into_iter() + .zip(arrays.iter()) + .map(|(n, a)| Field::new(n, a.data_type().clone(), true)) + .collect::>(); + MapArray::new( + DataType::Map( + Box::new(Field::new("items", DataType::Struct(fields.clone()), false)), + false, + ), + vec![0, arrays[0].len() as i32].into(), + StructArray::new(DataType::Struct(fields), arrays, None).arced(), + None, + ) + }; + + let names = vec!["key".to_string(), "value".to_string()]; + + match column { + "map" => Statistics { + distinct_count: Count::Map(new_map( + vec![ + Arc::new(UInt64Array::from([None])), + Arc::new(UInt64Array::from([None])), + ], + names.clone(), + )), + null_count: Count::Map(new_map( + vec![ + Arc::new(UInt64Array::from([Some(0)])), + Arc::new(UInt64Array::from([Some(0)])), + ], + names.clone(), + )), + min_value: Box::new(new_map( + vec![ + Arc::new(Utf8Array::::from_slice(["a1"])), + Arc::new(Utf8Array::::from_slice(["b1"])), + ], + names.clone(), + )), + max_value: Box::new(new_map( + vec![ + Arc::new(Utf8Array::::from_slice(["a2"])), + Arc::new(Utf8Array::::from_slice(["b2"])), + ], + names, + )), + }, + "map_nullable" => Statistics { + distinct_count: Count::Map(new_map( + vec![ + Arc::new(UInt64Array::from([None])), + Arc::new(UInt64Array::from([None])), + ], + names.clone(), + )), + null_count: Count::Map(new_map( + vec![ + Arc::new(UInt64Array::from([Some(0)])), + Arc::new(UInt64Array::from([Some(1)])), + ], + names.clone(), + )), + min_value: Box::new(new_map( + vec![ + Arc::new(Utf8Array::::from_slice(["a1"])), + Arc::new(Utf8Array::::from_slice(["b1"])), + ], + names.clone(), + )), + max_value: Box::new(new_map( + vec![ + Arc::new(Utf8Array::::from_slice(["a2"])), + Arc::new(Utf8Array::::from_slice(["b1"])), + ], + names, + )), + }, + _ => unreachable!(), + } +} + fn integration_write(schema: &Schema, batches: &[Chunk>]) -> Result> { let options = WriteOptions { write_statistics: true, diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 57b6cbf155d..4e426ef5bae 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -38,6 +38,7 @@ fn test_pyarrow_integration( ("nested", false) => pyarrow_nested_nullable(column), ("nested_edge", false) => pyarrow_nested_edge(column), ("struct", false) => pyarrow_struct(column), + ("map", true) => pyarrow_map(column), _ => unreachable!(), }; @@ -47,11 +48,13 @@ fn test_pyarrow_integration( ("nested", false) => pyarrow_nested_nullable_statistics(column), ("nested_edge", false) => pyarrow_nested_edge_statistics(column), ("struct", false) => pyarrow_struct_statistics(column), + ("map", true) => pyarrow_map_statistics(column), _ => unreachable!(), }; assert_eq!(expected.as_ref(), array.as_ref()); if ![ + // pyarrow outputs an incorrect number of null count for nested types - ARROW-16299 "list_int16", "list_large_binary", "list_int64", @@ -62,10 +65,12 @@ fn test_pyarrow_integration( "list_bool", "list_nested_inner_required_required_i64", "list_nested_inner_required_i64", + // pyarrow reports an incorrect min/max for MapArray + "map", + "map_nullable", ] .contains(&column) { - // pyarrow outputs an incorrect number of null count for nested types - ARROW-16299 assert_eq!(expected_statistics, statistics); } @@ -458,6 +463,16 @@ fn v1_nested_edge_2() -> Result<()> { test_pyarrow_integration("null", 1, "nested_edge", false, false, None) } +#[test] +fn v1_map() -> Result<()> { + test_pyarrow_integration("map", 1, "map", false, true, None) +} + +#[test] +fn v1_map_nullable() -> Result<()> { + test_pyarrow_integration("map_nullable", 1, "map", false, true, None) +} + #[cfg(feature = "io_parquet_compression")] #[test] fn all_types() -> Result<()> {