diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index b0abf400edf..95563836d69 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -68,68 +68,57 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( let physical_type = &type_.physical_type; let logical_type = &type_.logical_type; - Ok(match data_type.to_logical_type() { - Null => null::iter_to_arrays(pages, data_type, chunk_size, num_rows), - Boolean => dyn_iter(boolean::Iter::new(pages, data_type, chunk_size, num_rows)), - UInt8 => dyn_iter(iden(primitive::IntegerIter::new( + Ok(match (physical_type, data_type.to_logical_type()) { + (_, Null) => null::iter_to_arrays(pages, data_type, chunk_size, num_rows), + (PhysicalType::Boolean, Boolean) => { + dyn_iter(boolean::Iter::new(pages, data_type, chunk_size, num_rows)) + } + (PhysicalType::Int32, UInt8) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, |x: i32| x as u8, ))), - UInt16 => dyn_iter(iden(primitive::IntegerIter::new( + (PhysicalType::Int32, UInt16) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, |x: i32| x as u16, ))), - UInt32 => match physical_type { - PhysicalType::Int32 => dyn_iter(iden(primitive::IntegerIter::new( - pages, - data_type, - num_rows, - chunk_size, - |x: i32| x as u32, - ))), - // some implementations of parquet write arrow's u32 into i64. - PhysicalType::Int64 => dyn_iter(iden(primitive::IntegerIter::new( - pages, - data_type, - num_rows, - chunk_size, - |x: i64| x as u32, - ))), - other => { - return Err(Error::NotYetImplemented(format!( - "Reading uin32 from {other:?}-encoded parquet still not implemented" - ))) - } - }, - Int8 => dyn_iter(iden(primitive::IntegerIter::new( + (PhysicalType::Int32, UInt32) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, - |x: i32| x as i8, + |x: i32| x as u32, ))), - Int16 => dyn_iter(iden(primitive::IntegerIter::new( + (PhysicalType::Int64, UInt32) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, - |x: i32| x as i16, + |x: i64| x as u32, ))), - Int32 | Date32 | Time32(_) => dyn_iter(iden(primitive::IntegerIter::new( + (PhysicalType::Int32, Int8) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, - |x: i32| x, + |x: i32| x as i8, ))), - - Timestamp(time_unit, _) => { + (PhysicalType::Int32, Int16) => dyn_iter(iden(primitive::IntegerIter::new( + pages, + data_type, + num_rows, + chunk_size, + |x: i32| x as i16, + ))), + (PhysicalType::Int32, Int32 | Date32 | Time32(_)) => dyn_iter(iden( + primitive::IntegerIter::new(pages, data_type, num_rows, chunk_size, |x: i32| x), + )), + (PhysicalType::Int64 | PhysicalType::Int96, Timestamp(time_unit, _)) => { let time_unit = *time_unit; return timestamp( pages, @@ -141,12 +130,10 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( time_unit, ); } - - FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new( - pages, data_type, num_rows, chunk_size, - )), - - Interval(IntervalUnit::YearMonth) => { + (PhysicalType::FixedLenByteArray(_), FixedSizeBinary(_)) => dyn_iter( + fixed_size_binary::Iter::new(pages, data_type, num_rows, chunk_size), + ), + (PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::YearMonth)) => { let n = 12; let pages = fixed_size_binary::Iter::new( pages, @@ -171,8 +158,7 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( Box::new(arrays) as _ } - - Interval(IntervalUnit::DayTime) => { + (PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::DayTime)) => { let n = 12; let pages = fixed_size_binary::Iter::new( pages, @@ -197,80 +183,83 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( Box::new(arrays) as _ } + (PhysicalType::Int32, Decimal(_, _)) => dyn_iter(iden(primitive::IntegerIter::new( + pages, + data_type, + num_rows, + chunk_size, + |x: i32| x as i128, + ))), + (PhysicalType::Int64, Decimal(_, _)) => dyn_iter(iden(primitive::IntegerIter::new( + pages, + data_type, + num_rows, + chunk_size, + |x: i64| x as i128, + ))), + (PhysicalType::FixedLenByteArray(n), Decimal(_, _)) if *n > 16 => { + return Err(Error::NotYetImplemented(format!( + "Can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}" + ))) + } + (PhysicalType::FixedLenByteArray(n), Decimal(_, _)) => { + let n = *n; - Decimal(_, _) => match physical_type { - PhysicalType::Int32 => dyn_iter(iden(primitive::IntegerIter::new( - pages, - data_type, - num_rows, - chunk_size, - |x: i32| x as i128, - ))), - PhysicalType::Int64 => dyn_iter(iden(primitive::IntegerIter::new( + let pages = fixed_size_binary::Iter::new( pages, - data_type, + DataType::FixedSizeBinary(n), num_rows, chunk_size, - |x: i64| x as i128, - ))), - PhysicalType::FixedLenByteArray(n) if *n > 16 => { - return Err(Error::NotYetImplemented(format!( - "Can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}" - ))) - } - PhysicalType::FixedLenByteArray(n) => { - let n = *n; - - let pages = fixed_size_binary::Iter::new( - pages, - DataType::FixedSizeBinary(n), - num_rows, - chunk_size, - ); - - let pages = pages.map(move |maybe_array| { - let array = maybe_array?; - let values = array - .values() - .chunks_exact(n) - .map(|value: &[u8]| super::super::convert_i128(value, n)) - .collect::>(); - let validity = array.validity().cloned(); - - PrimitiveArray::::try_new(data_type.clone(), values.into(), validity) - }); - - let arrays = pages.map(|x| x.map(|x| x.boxed())); - - Box::new(arrays) as _ - } - _ => unreachable!(), - }, - - // INT64 - Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter(iden(primitive::IntegerIter::new( + ); + + let pages = pages.map(move |maybe_array| { + let array = maybe_array?; + let values = array + .values() + .chunks_exact(n) + .map(|value: &[u8]| super::super::convert_i128(value, n)) + .collect::>(); + let validity = array.validity().cloned(); + + PrimitiveArray::::try_new(data_type.clone(), values.into(), validity) + }); + + let arrays = pages.map(|x| x.map(|x| x.boxed())); + + Box::new(arrays) as _ + } + (PhysicalType::Int32, Date64) => dyn_iter(iden(primitive::IntegerIter::new( + pages, + data_type, + num_rows, + chunk_size, + |x: i32| x as i64 * 86400000, + ))), + (PhysicalType::Int64, Date64) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, |x: i64| x, ))), - UInt64 => dyn_iter(iden(primitive::IntegerIter::new( + (PhysicalType::Int64, Int64 | Time64(_) | Duration(_)) => dyn_iter(iden( + primitive::IntegerIter::new(pages, data_type, num_rows, chunk_size, |x: i64| x), + )), + (PhysicalType::Int64, UInt64) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, |x: i64| x as u64, ))), - - Float32 => dyn_iter(iden(primitive::Iter::new( + (PhysicalType::Float, Float32) => dyn_iter(iden(primitive::Iter::new( pages, data_type, num_rows, chunk_size, |x: f32| x, ))), - Float64 => dyn_iter(iden(primitive::Iter::new( + (PhysicalType::Double, Float64) => dyn_iter(iden(primitive::Iter::new( pages, data_type, num_rows, @@ -278,22 +267,21 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( |x: f64| x, ))), - Utf8 | Binary => Box::new(binary::Iter::::new( - pages, data_type, chunk_size, num_rows, - )), - LargeBinary | LargeUtf8 => Box::new(binary::Iter::::new( + (PhysicalType::ByteArray, Utf8 | Binary) => Box::new(binary::Iter::::new( pages, data_type, chunk_size, num_rows, )), + (PhysicalType::ByteArray, LargeBinary | LargeUtf8) => Box::new( + binary::Iter::::new(pages, data_type, chunk_size, num_rows), + ), - Dictionary(key_type, _, _) => { + (_, Dictionary(key_type, _, _)) => { return match_integer_type!(key_type, |$K| { dict_read::<$K, _>(pages, physical_type, logical_type, data_type, num_rows, chunk_size) }) } - - other => { + (from, to) => { return Err(Error::NotYetImplemented(format!( - "Reading {other:?} from parquet still not implemented" + "Reading parquet type {from:?} to {to:?} still not implemented" ))) } }) @@ -435,50 +423,50 @@ fn dict_read<'a, K: DictionaryKey, I: Pages + 'a>( panic!() }; - Ok(match values_data_type.to_logical_type() { - UInt8 => dyn_iter(primitive::DictIter::::new( + Ok(match (physical_type, values_data_type.to_logical_type()) { + (PhysicalType::Int32, UInt8) => dyn_iter(primitive::DictIter::::new( iter, data_type, num_rows, chunk_size, |x: i32| x as u8, )), - UInt16 => dyn_iter(primitive::DictIter::::new( + (PhysicalType::Int32, UInt16) => dyn_iter(primitive::DictIter::::new( iter, data_type, num_rows, chunk_size, |x: i32| x as u16, )), - UInt32 => dyn_iter(primitive::DictIter::::new( + (PhysicalType::Int32, UInt32) => dyn_iter(primitive::DictIter::::new( iter, data_type, num_rows, chunk_size, |x: i32| x as u32, )), - UInt64 => dyn_iter(primitive::DictIter::::new( + (PhysicalType::Int64, UInt64) => dyn_iter(primitive::DictIter::::new( iter, data_type, num_rows, chunk_size, |x: i64| x as u64, )), - Int8 => dyn_iter(primitive::DictIter::::new( + (PhysicalType::Int32, Int8) => dyn_iter(primitive::DictIter::::new( iter, data_type, num_rows, chunk_size, |x: i32| x as i8, )), - Int16 => dyn_iter(primitive::DictIter::::new( + (PhysicalType::Int32, Int16) => dyn_iter(primitive::DictIter::::new( iter, data_type, num_rows, chunk_size, |x: i32| x as i16, )), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { + (PhysicalType::Int32, Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth)) => { dyn_iter(primitive::DictIter::::new( iter, data_type, @@ -488,7 +476,7 @@ fn dict_read<'a, K: DictionaryKey, I: Pages + 'a>( )) } - Timestamp(time_unit, _) => { + (PhysicalType::Int64, Timestamp(time_unit, _)) => { let time_unit = *time_unit; return timestamp_dict::( iter, @@ -501,7 +489,7 @@ fn dict_read<'a, K: DictionaryKey, I: Pages + 'a>( ); } - Int64 | Date64 | Time64(_) | Duration(_) => { + (PhysicalType::Int64, Int64 | Date64 | Time64(_) | Duration(_)) => { dyn_iter(primitive::DictIter::::new( iter, data_type, @@ -510,14 +498,14 @@ fn dict_read<'a, K: DictionaryKey, I: Pages + 'a>( |x: i64| x, )) } - Float32 => dyn_iter(primitive::DictIter::::new( + (PhysicalType::Float, Float32) => dyn_iter(primitive::DictIter::::new( iter, data_type, num_rows, chunk_size, |x: f32| x, )), - Float64 => dyn_iter(primitive::DictIter::::new( + (PhysicalType::Double, Float64) => dyn_iter(primitive::DictIter::::new( iter, data_type, num_rows, @@ -525,15 +513,15 @@ fn dict_read<'a, K: DictionaryKey, I: Pages + 'a>( |x: f64| x, )), - Utf8 | Binary => dyn_iter(binary::DictIter::::new( - iter, data_type, num_rows, chunk_size, - )), - LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::::new( - iter, data_type, num_rows, chunk_size, - )), - FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::::new( + (PhysicalType::ByteArray, Utf8 | Binary) => dyn_iter(binary::DictIter::::new( iter, data_type, num_rows, chunk_size, )), + (PhysicalType::ByteArray, LargeUtf8 | LargeBinary) => dyn_iter( + binary::DictIter::::new(iter, data_type, num_rows, chunk_size), + ), + (PhysicalType::FixedLenByteArray(_), FixedSizeBinary(_)) => dyn_iter( + fixed_size_binary::DictIter::::new(iter, data_type, num_rows, chunk_size), + ), other => { return Err(Error::nyi(format!( "Reading dictionaries of type {other:?}" diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index e1ad0505647..927695d2905 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -454,9 +454,17 @@ fn push( ))), }, Int32 => primitive::push::(from, min, max, Ok), - Int64 | Date64 | Time64(_) | Duration(_) => { - primitive::push::(from, min, max, Ok) - } + Date64 => match physical_type { + ParquetPhysicalType::Int64 => primitive::push::(from, min, max, Ok), + // some implementations of parquet write arrow's date64 into i32. + ParquetPhysicalType::Int32 => { + primitive::push(from, min, max, |x: i32| Ok(x as i64 * 86400000)) + } + other => Err(Error::NotYetImplemented(format!( + "Can't decode Date64 type from parquet type {other:?}" + ))), + }, + Int64 | Time64(_) | Duration(_) => primitive::push::(from, min, max, Ok), UInt64 => primitive::push(from, min, max, |x: i64| Ok(x as u64)), Timestamp(time_unit, _) => { let time_unit = *time_unit;