diff --git a/rust/arrow/src/array/array_binary.rs b/rust/arrow/src/array/array_binary.rs index efd07f83d17ee..9a37541b8ae76 100644 --- a/rust/arrow/src/array/array_binary.rs +++ b/rust/arrow/src/array/array_binary.rs @@ -424,12 +424,10 @@ impl FixedSizeBinaryArray { } bit_util::set_bit(null_buf.as_slice_mut(), len); buffer.extend_from_slice(slice); + } else if let Some(size) = size { + buffer.extend_zeros(size); } else { - if let Some(size) = size { - buffer.extend_zeros(size); - } else { - prepend += 1; - } + prepend += 1; } len += 1; diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index 84b7a4daf66ff..fe9f60666bc9d 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -33,7 +33,10 @@ use crate::errors::{ParquetError::ArrowError, Result}; use crate::file::{metadata::KeyValue, properties::WriterProperties}; use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr}; use crate::{ - basic::{ConvertedType, Repetition, Type as PhysicalType}, + basic::{ + ConvertedType, DecimalType, IntType, LogicalType, Repetition, TimeType, + TimeUnit as ParquetTimeUnit, TimestampType, Type as PhysicalType, + }, errors::ParquetError, }; @@ -321,18 +324,24 @@ fn arrow_to_parquet_type(field: &Field) -> Result { // create type from field match field.data_type() { DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_converted_type(ConvertedType::NONE) + .with_logical_type(Some(LogicalType::UNKNOWN(Default::default()))) .with_repetition(repetition) .build(), DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN) .with_repetition(repetition) .build(), DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_converted_type(ConvertedType::INT_8) + .with_logical_type(Some(LogicalType::INTEGER(IntType { + bit_width: 8, + is_signed: true, + }))) .with_repetition(repetition) .build(), DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_converted_type(ConvertedType::INT_16) + .with_logical_type(Some(LogicalType::INTEGER(IntType { + bit_width: 16, + is_signed: true, + }))) .with_repetition(repetition) .build(), DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32) @@ -342,19 +351,31 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_repetition(repetition) .build(), DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_converted_type(ConvertedType::UINT_8) + .with_logical_type(Some(LogicalType::INTEGER(IntType { + bit_width: 8, + is_signed: false, + }))) .with_repetition(repetition) .build(), DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_converted_type(ConvertedType::UINT_16) + .with_logical_type(Some(LogicalType::INTEGER(IntType { + bit_width: 16, + is_signed: false, + }))) .with_repetition(repetition) .build(), DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_converted_type(ConvertedType::UINT_32) + .with_logical_type(Some(LogicalType::INTEGER(IntType { + bit_width: 32, + is_signed: false, + }))) .with_repetition(repetition) .build(), DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64) - .with_converted_type(ConvertedType::UINT_64) + .with_logical_type(Some(LogicalType::INTEGER(IntType { + bit_width: 64, + is_signed: false, + }))) .with_repetition(repetition) .build(), DataType::Float16 => Err(ArrowError("Float16 arrays not supported".to_string())), @@ -364,32 +385,46 @@ fn arrow_to_parquet_type(field: &Field) -> Result { DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE) .with_repetition(repetition) .build(), - DataType::Timestamp(time_unit, _) => { - Type::primitive_type_builder(name, PhysicalType::INT64) - .with_converted_type(match time_unit { - TimeUnit::Second => ConvertedType::TIMESTAMP_MILLIS, - TimeUnit::Millisecond => ConvertedType::TIMESTAMP_MILLIS, - TimeUnit::Microsecond => ConvertedType::TIMESTAMP_MICROS, - TimeUnit::Nanosecond => ConvertedType::TIMESTAMP_MICROS, - }) - .with_repetition(repetition) - .build() - } + DataType::Timestamp(time_unit, zone) => Type::primitive_type_builder( + name, + PhysicalType::INT64, + ) + .with_logical_type(Some(LogicalType::TIMESTAMP(TimestampType { + is_adjusted_to_u_t_c: matches!(zone, Some(z) if !z.as_str().is_empty()), + unit: match time_unit { + TimeUnit::Second => ParquetTimeUnit::MILLIS(Default::default()), + TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), + TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), + TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), + }, + }))) + .with_repetition(repetition) + .build(), DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_converted_type(ConvertedType::DATE) + .with_logical_type(Some(LogicalType::DATE(Default::default()))) .with_repetition(repetition) .build(), // date64 is cast to date32 DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_converted_type(ConvertedType::DATE) + .with_logical_type(Some(LogicalType::DATE(Default::default()))) .with_repetition(repetition) .build(), DataType::Time32(_) => Type::primitive_type_builder(name, PhysicalType::INT32) - .with_converted_type(ConvertedType::TIME_MILLIS) + .with_logical_type(Some(LogicalType::TIME(TimeType { + is_adjusted_to_u_t_c: false, + unit: ParquetTimeUnit::MILLIS(Default::default()), + }))) .with_repetition(repetition) .build(), - DataType::Time64(_) => Type::primitive_type_builder(name, PhysicalType::INT64) - .with_converted_type(ConvertedType::TIME_MICROS) + DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64) + .with_logical_type(Some(LogicalType::TIME(TimeType { + is_adjusted_to_u_t_c: false, + unit: match unit { + TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), + TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), + u => unreachable!("Invalid unit for Time64: {:?}", u), + }, + }))) .with_repetition(repetition) .build(), DataType::Duration(_) => Err(ArrowError( @@ -414,17 +449,33 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .build() } DataType::Decimal(precision, scale) => { + // Decimal precision determines the Parquet physical type to use. + // TODO(ARROW-12018): Enable the below after ARROW-10818 Decimal support + // + // let (physical_type, length) = if *precision > 1 && *precision <= 9 { + // (PhysicalType::INT32, -1) + // } else if *precision <= 18 { + // (PhysicalType::INT64, -1) + // } else { + // ( + // PhysicalType::FIXED_LEN_BYTE_ARRAY, + // decimal_length_from_precision(*precision) as i32, + // ) + // }; Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_repetition(repetition) .with_length(decimal_length_from_precision(*precision) as i32) - .with_converted_type(ConvertedType::DECIMAL) + .with_logical_type(Some(LogicalType::DECIMAL(DecimalType { + scale: *scale as i32, + precision: *precision as i32, + }))) .with_precision(*precision as i32) .with_scale(*scale as i32) .build() } DataType::Utf8 | DataType::LargeUtf8 => { Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) - .with_converted_type(ConvertedType::UTF8) + .with_logical_type(Some(LogicalType::STRING(Default::default()))) .with_repetition(repetition) .build() } @@ -436,7 +487,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_repetition(Repetition::REPEATED) .build()?, )]) - .with_converted_type(ConvertedType::LIST) + .with_logical_type(Some(LogicalType::LIST(Default::default()))) .with_repetition(repetition) .build() } @@ -583,48 +634,109 @@ impl ParquetTypeConverter<'_> { } fn from_int32(&self) -> Result { - match self.schema.get_basic_info().converted_type() { - ConvertedType::NONE => Ok(DataType::Int32), - ConvertedType::UINT_8 => Ok(DataType::UInt8), - ConvertedType::UINT_16 => Ok(DataType::UInt16), - ConvertedType::UINT_32 => Ok(DataType::UInt32), - ConvertedType::INT_8 => Ok(DataType::Int8), - ConvertedType::INT_16 => Ok(DataType::Int16), - ConvertedType::INT_32 => Ok(DataType::Int32), - ConvertedType::DATE => Ok(DataType::Date32), - ConvertedType::TIME_MILLIS => Ok(DataType::Time32(TimeUnit::Millisecond)), - ConvertedType::DECIMAL => Ok(self.to_decimal()), - other => Err(ArrowError(format!( - "Unable to convert parquet INT32 logical type {}", - other + match ( + self.schema.get_basic_info().logical_type(), + self.schema.get_basic_info().converted_type(), + ) { + (None, ConvertedType::NONE) => Ok(DataType::Int32), + (Some(LogicalType::INTEGER(t)), _) => match (t.bit_width, t.is_signed) { + (8, true) => Ok(DataType::Int8), + (16, true) => Ok(DataType::Int16), + (32, true) => Ok(DataType::Int32), + (8, false) => Ok(DataType::UInt8), + (16, false) => Ok(DataType::UInt16), + (32, false) => Ok(DataType::UInt32), + _ => Err(ArrowError(format!( + "Cannot create INT32 physical type from {:?}", + t + ))), + }, + (Some(LogicalType::DECIMAL(_)), _) => Ok(self.to_decimal()), + (Some(LogicalType::DATE(_)), _) => Ok(DataType::Date32), + (Some(LogicalType::TIME(t)), _) => match t.unit { + ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)), + _ => Err(ArrowError(format!( + "Cannot create INT32 physical type from {:?}", + t.unit + ))), + }, + (None, ConvertedType::UINT_8) => Ok(DataType::UInt8), + (None, ConvertedType::UINT_16) => Ok(DataType::UInt16), + (None, ConvertedType::UINT_32) => Ok(DataType::UInt32), + (None, ConvertedType::INT_8) => Ok(DataType::Int8), + (None, ConvertedType::INT_16) => Ok(DataType::Int16), + (None, ConvertedType::INT_32) => Ok(DataType::Int32), + (None, ConvertedType::DATE) => Ok(DataType::Date32), + (None, ConvertedType::TIME_MILLIS) => { + Ok(DataType::Time32(TimeUnit::Millisecond)) + } + (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()), + (logical, converted) => Err(ArrowError(format!( + "Unable to convert parquet INT32 logical type {:?} or converted type {}", + logical, converted ))), } } fn from_int64(&self) -> Result { - match self.schema.get_basic_info().converted_type() { - ConvertedType::NONE => Ok(DataType::Int64), - ConvertedType::INT_64 => Ok(DataType::Int64), - ConvertedType::UINT_64 => Ok(DataType::UInt64), - ConvertedType::TIME_MICROS => Ok(DataType::Time64(TimeUnit::Microsecond)), - ConvertedType::TIMESTAMP_MILLIS => { + match ( + self.schema.get_basic_info().logical_type(), + self.schema.get_basic_info().converted_type(), + ) { + (None, ConvertedType::NONE) => Ok(DataType::Int64), + (Some(LogicalType::INTEGER(t)), _) if t.bit_width == 64 => { + match t.is_signed { + true => Ok(DataType::Int64), + false => Ok(DataType::UInt64), + } + } + (Some(LogicalType::TIME(t)), _) => match t.unit { + ParquetTimeUnit::MILLIS(_) => Err(ArrowError( + "Cannot create INT64 from MILLIS time unit".to_string(), + )), + ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)), + ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)), + }, + (Some(LogicalType::TIMESTAMP(t)), _) => Ok(DataType::Timestamp( + match t.unit { + ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond, + ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond, + ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond, + }, + if t.is_adjusted_to_u_t_c { + Some("UTC".to_string()) + } else { + None + }, + )), + (None, ConvertedType::INT_64) => Ok(DataType::Int64), + (None, ConvertedType::UINT_64) => Ok(DataType::UInt64), + (None, ConvertedType::TIME_MICROS) => { + Ok(DataType::Time64(TimeUnit::Microsecond)) + } + (None, ConvertedType::TIMESTAMP_MILLIS) => { Ok(DataType::Timestamp(TimeUnit::Millisecond, None)) } - ConvertedType::TIMESTAMP_MICROS => { + (None, ConvertedType::TIMESTAMP_MICROS) => { Ok(DataType::Timestamp(TimeUnit::Microsecond, None)) } - ConvertedType::DECIMAL => Ok(self.to_decimal()), - other => Err(ArrowError(format!( - "Unable to convert parquet INT64 logical type {}", - other + (Some(LogicalType::DECIMAL(_)), _) => Ok(self.to_decimal()), + (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()), + (logical, converted) => Err(ArrowError(format!( + "Unable to convert parquet INT64 logical type {:?} or converted type {}", + logical, converted ))), } } fn from_fixed_len_byte_array(&self) -> Result { - match self.schema.get_basic_info().converted_type() { - ConvertedType::DECIMAL => Ok(self.to_decimal()), - ConvertedType::INTERVAL => { + match ( + self.schema.get_basic_info().logical_type(), + self.schema.get_basic_info().converted_type(), + ) { + (Some(LogicalType::DECIMAL(_)), _) => Ok(self.to_decimal()), + (None, ConvertedType::DECIMAL) => Ok(self.to_decimal()), + (None, ConvertedType::INTERVAL) => { // There is currently no reliable way of determining which IntervalUnit // to return. Thus without the original Arrow schema, the results // would be incorrect if all 12 bytes of the interval are populated @@ -656,12 +768,19 @@ impl ParquetTypeConverter<'_> { } fn from_byte_array(&self) -> Result { - match self.schema.get_basic_info().converted_type() { - ConvertedType::NONE => Ok(DataType::Binary), - ConvertedType::UTF8 => Ok(DataType::Utf8), - other => Err(ArrowError(format!( - "Unable to convert parquet BYTE_ARRAY logical type {}", - other + match (self.schema.get_basic_info().logical_type(), self.schema.get_basic_info().converted_type()) { + (Some(LogicalType::STRING(_)), _) => Ok(DataType::Utf8), + (Some(LogicalType::JSON(_)), _) => Ok(DataType::Binary), + (Some(LogicalType::BSON(_)), _) => Ok(DataType::Binary), + (Some(LogicalType::ENUM(_)), _) => Ok(DataType::Binary), + (None, ConvertedType::NONE) => Ok(DataType::Binary), + (None, ConvertedType::JSON) => Ok(DataType::Binary), + (None, ConvertedType::BSON) => Ok(DataType::Binary), + (None, ConvertedType::ENUM) => Ok(DataType::Binary), + (None, ConvertedType::UTF8) => Ok(DataType::Utf8), + (logical, converted) => Err(ArrowError(format!( + "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}", + logical, converted ))), } } @@ -683,8 +802,12 @@ impl ParquetTypeConverter<'_> { }) }) } else { - match self.schema.get_basic_info().converted_type() { - ConvertedType::LIST => self.to_list(), + match ( + self.schema.get_basic_info().logical_type(), + self.schema.get_basic_info().converted_type(), + ) { + (Some(LogicalType::LIST(_)), _) => self.to_list(), + (None, ConvertedType::LIST) => self.to_list(), _ => self.to_struct(), } } @@ -1441,6 +1564,7 @@ mod tests { } #[test] + #[ignore = "To be addressed as part of ARROW-11365"] fn test_field_to_column_desc() { let message_type = " message arrow_schema { @@ -1550,6 +1674,7 @@ mod tests { .iter() .zip(converted_arrow_schema.columns()) .for_each(|(a, b)| { + // TODO: ARROW-11365: If parsing v1 format, there should be no logical type assert_eq!(a, b); }); } @@ -1688,6 +1813,9 @@ mod tests { // true, // ), Field::new("c35", DataType::Null, true), + Field::new("c36", DataType::Decimal(2, 1), false), + Field::new("c37", DataType::Decimal(50, 20), false), + Field::new("c38", DataType::Decimal(18, 12), true), ], metadata, ); diff --git a/rust/parquet/src/schema/parser.rs b/rust/parquet/src/schema/parser.rs index 50f00bb553437..1f7db5b410d6f 100644 --- a/rust/parquet/src/schema/parser.rs +++ b/rust/parquet/src/schema/parser.rs @@ -222,12 +222,12 @@ impl<'a> Parser<'a> { .next() .ok_or_else(|| general_err!("Expected name, found None"))?; - // Parse logical type if exists + // Parse converted type if exists let converted_type = if let Some("(") = self.tokenizer.next() { let tpe = self .tokenizer .next() - .ok_or_else(|| general_err!("Expected logical type, found None")) + .ok_or_else(|| general_err!("Expected converted type, found None")) .and_then(|v| v.to_uppercase().parse::())?; assert_token(self.tokenizer.next(), ")")?; tpe @@ -280,13 +280,13 @@ impl<'a> Parser<'a> { .next() .ok_or_else(|| general_err!("Expected name, found None"))?; - // Parse logical type + // Parse converted type let (converted_type, precision, scale) = if let Some("(") = self.tokenizer.next() { let tpe = self .tokenizer .next() - .ok_or_else(|| general_err!("Expected logical type, found None")) + .ok_or_else(|| general_err!("Expected converted type, found None")) .and_then(|v| v.to_uppercase().parse::())?; // Parse precision and scale for decimals @@ -548,7 +548,7 @@ mod tests { assert!(result.is_err()); // Invalid decimal because, we always require either precision or scale to be - // specified as part of logical type + // specified as part of converted type let schema = " message root { optional int32 f3 (DECIMAL); diff --git a/rust/parquet/src/schema/types.rs b/rust/parquet/src/schema/types.rs index d80fe0d011f3e..af1d632dde71e 100644 --- a/rust/parquet/src/schema/types.rs +++ b/rust/parquet/src/schema/types.rs @@ -21,7 +21,9 @@ use std::{collections::HashMap, convert::From, fmt, sync::Arc}; use parquet_format::SchemaElement; -use crate::basic::{ConvertedType, LogicalType, Repetition, Type as PhysicalType}; +use crate::basic::{ + ConvertedType, LogicalType, Repetition, TimeType, TimeUnit, Type as PhysicalType, +}; use crate::errors::{ParquetError, Result}; // ---------------------------------------------------------------------- @@ -229,6 +231,8 @@ impl<'a> PrimitiveTypeBuilder<'a> { } /// Sets [`LogicalType`](crate::basic::LogicalType) for this field and returns itself. + /// If only the logical type is populated for a primitive type, the converted type + /// will be automatically populated, and can thus be omitted. pub fn with_logical_type(mut self, logical_type: Option) -> Self { self.logical_type = logical_type; self @@ -266,11 +270,11 @@ impl<'a> PrimitiveTypeBuilder<'a> { /// Creates a new `PrimitiveType` instance from the collected attributes. /// Returns `Err` in case of any building conditions are not met. pub fn build(self) -> Result { - let basic_info = BasicTypeInfo { + let mut basic_info = BasicTypeInfo { name: String::from(self.name), repetition: Some(self.repetition), converted_type: self.converted_type, - logical_type: self.logical_type, + logical_type: self.logical_type.clone(), id: self.id, }; @@ -282,86 +286,102 @@ impl<'a> PrimitiveTypeBuilder<'a> { )); } - match self.converted_type { - ConvertedType::NONE => {} - ConvertedType::UTF8 | ConvertedType::BSON | ConvertedType::JSON => { - if self.physical_type != PhysicalType::BYTE_ARRAY { - return Err(general_err!( - "{} can only annotate BYTE_ARRAY fields", - self.converted_type - )); - } - } - ConvertedType::DECIMAL => { - match self.physical_type { - PhysicalType::INT32 - | PhysicalType::INT64 - | PhysicalType::BYTE_ARRAY - | PhysicalType::FIXED_LEN_BYTE_ARRAY => (), - _ => { + match &self.logical_type { + Some(logical_type) => { + // If a converted type is populated, check that it is consistent with + // its logical type + if self.converted_type != ConvertedType::NONE { + if ConvertedType::from(self.logical_type.clone()) + != self.converted_type + { return Err(general_err!( - "DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED" + "Logical type {:?} is imcompatible with converted type {}", + logical_type, + self.converted_type )); } + } else { + // Populate the converted type for backwards compatibility + basic_info.converted_type = self.logical_type.clone().into(); } - - // Precision is required and must be a non-zero positive integer. - if self.precision < 1 { - return Err(general_err!( - "Invalid DECIMAL precision: {}", - self.precision - )); - } - - // Scale must be zero or a positive integer less than the precision. - if self.scale < 0 { - return Err(general_err!("Invalid DECIMAL scale: {}", self.scale)); - } - - if self.scale >= self.precision { - return Err(general_err!( - "Invalid DECIMAL: scale ({}) cannot be greater than or equal to precision \ - ({})", - self.scale, - self.precision - )); - } - - // Check precision and scale based on physical type limitations. - match self.physical_type { - PhysicalType::INT32 => { - if self.precision > 9 { + // Check that logical type and physical type are compatible + match (logical_type, self.physical_type) { + (LogicalType::MAP(_), _) | (LogicalType::LIST(_), _) => { + return Err(general_err!( + "{:?} cannot be applied to a primitive type", + logical_type + )); + } + (LogicalType::ENUM(_), PhysicalType::BYTE_ARRAY) => {} + (LogicalType::DECIMAL(t), _) => { + // Check that scale and precision are consistent with legacy values + if t.scale != self.scale { return Err(general_err!( - "Cannot represent INT32 as DECIMAL with precision {}", - self.precision + "DECIMAL logical type scale {} must match self.scale {}", + t.scale, + self.scale )); } - } - PhysicalType::INT64 => { - if self.precision > 18 { + if t.precision != self.precision { return Err(general_err!( - "Cannot represent INT64 as DECIMAL with precision {}", + "DECIMAL logical type precision {} must match self.precision {}", + t.precision, self.precision )); } + self.check_decimal_precision_scale()?; } - PhysicalType::FIXED_LEN_BYTE_ARRAY => { - let max_precision = (2f64.powi(8 * self.length - 1) - 1f64) - .log10() - .floor() as i32; - - if self.precision > max_precision { + (LogicalType::DATE(_), PhysicalType::INT32) => {} + ( + LogicalType::TIME(TimeType { + unit: TimeUnit::MILLIS(_), + .. + }), + PhysicalType::INT32, + ) => {} + (LogicalType::TIME(t), PhysicalType::INT64) => { + if t.unit == TimeUnit::MILLIS(Default::default()) { return Err(general_err!( - "Cannot represent FIXED_LEN_BYTE_ARRAY as DECIMAL with length {} and \ - precision {}", - self.length, - self.precision - )); + "Cannot use millisecond unit on INT64 type" + )); } } - _ => (), // For BYTE_ARRAY precision is not limited + (LogicalType::TIMESTAMP(_), PhysicalType::INT64) => {} + (LogicalType::INTEGER(t), PhysicalType::INT32) + if t.bit_width <= 32 => {} + (LogicalType::INTEGER(t), PhysicalType::INT64) + if t.bit_width == 64 => {} + // Null type + (LogicalType::UNKNOWN(_), PhysicalType::INT32) => {} + (LogicalType::STRING(_), PhysicalType::BYTE_ARRAY) => {} + (LogicalType::JSON(_), PhysicalType::BYTE_ARRAY) => {} + (LogicalType::BSON(_), PhysicalType::BYTE_ARRAY) => {} + (LogicalType::UUID(_), PhysicalType::FIXED_LEN_BYTE_ARRAY) => {} + (a, b) => { + return Err(general_err!( + "Cannot annotate {:?} from {} fields", + a, + b + )) + } + } + } + None => {} + } + + match self.converted_type { + ConvertedType::NONE => {} + ConvertedType::UTF8 | ConvertedType::BSON | ConvertedType::JSON => { + if self.physical_type != PhysicalType::BYTE_ARRAY { + return Err(general_err!( + "{} can only annotate BYTE_ARRAY fields", + self.converted_type + )); } } + ConvertedType::DECIMAL => { + self.check_decimal_precision_scale()?; + } ConvertedType::DATE | ConvertedType::TIME_MILLIS | ConvertedType::UINT_8 @@ -419,6 +439,80 @@ impl<'a> PrimitiveTypeBuilder<'a> { precision: self.precision, }) } + + #[inline] + fn check_decimal_precision_scale(&self) -> Result<()> { + match self.physical_type { + PhysicalType::INT32 + | PhysicalType::INT64 + | PhysicalType::BYTE_ARRAY + | PhysicalType::FIXED_LEN_BYTE_ARRAY => (), + _ => { + return Err(general_err!( + "DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY" + )); + } + } + + // Precision is required and must be a non-zero positive integer. + if self.precision < 1 { + return Err(general_err!( + "Invalid DECIMAL precision: {}", + self.precision + )); + } + + // Scale must be zero or a positive integer less than the precision. + if self.scale < 0 { + return Err(general_err!("Invalid DECIMAL scale: {}", self.scale)); + } + + if self.scale >= self.precision { + return Err(general_err!( + "Invalid DECIMAL: scale ({}) cannot be greater than or equal to precision \ + ({})", + self.scale, + self.precision + )); + } + + // Check precision and scale based on physical type limitations. + match self.physical_type { + PhysicalType::INT32 => { + if self.precision > 9 { + return Err(general_err!( + "Cannot represent INT32 as DECIMAL with precision {}", + self.precision + )); + } + } + PhysicalType::INT64 => { + if self.precision > 18 { + return Err(general_err!( + "Cannot represent INT64 as DECIMAL with precision {}", + self.precision + )); + } + } + PhysicalType::FIXED_LEN_BYTE_ARRAY => { + let max_precision = + (2f64.powi(8 * self.length - 1) - 1f64).log10().floor() as i32; + + if self.precision > max_precision { + return Err(general_err!( + "Cannot represent FIXED_LEN_BYTE_ARRAY as DECIMAL with length {} and \ + precision {}. The max precision can only be {}", + self.length, + self.precision, + max_precision + )); + } + } + _ => (), // For BYTE_ARRAY precision is not limited + } + + Ok(()) + } } /// A builder for group types. All attributes are optional except the name. @@ -479,13 +573,17 @@ impl<'a> GroupTypeBuilder<'a> { /// Creates a new `GroupType` instance from the gathered attributes. pub fn build(self) -> Result { - let basic_info = BasicTypeInfo { + let mut basic_info = BasicTypeInfo { name: String::from(self.name), repetition: self.repetition, converted_type: self.converted_type, - logical_type: self.logical_type, + logical_type: self.logical_type.clone(), id: self.id, }; + // Populate the converted type if only the logical type is populated + if self.logical_type.is_some() && self.converted_type == ConvertedType::NONE { + basic_info.converted_type = self.logical_type.into(); + } Ok(Type::GroupType { basic_info, fields: self.fields, @@ -1099,6 +1197,7 @@ fn to_thrift_helper(schema: &Type, elements: &mut Vec) { mod tests { use super::*; + use crate::basic::{DecimalType, IntType}; use crate::schema::parser::parse_message_type; // TODO: add tests for v2 types @@ -1106,7 +1205,10 @@ mod tests { #[test] fn test_primitive_type() { let mut result = Type::primitive_type_builder("foo", PhysicalType::INT32) - .with_converted_type(ConvertedType::INT_32) + .with_logical_type(Some(LogicalType::INTEGER(IntType { + bit_width: 32, + is_signed: true, + }))) .with_id(0) .build(); assert!(result.is_ok()); @@ -1116,6 +1218,13 @@ mod tests { assert!(!tp.is_group()); let basic_info = tp.get_basic_info(); assert_eq!(basic_info.repetition(), Repetition::OPTIONAL); + assert_eq!( + basic_info.logical_type(), + Some(LogicalType::INTEGER(IntType { + bit_width: 32, + is_signed: true + })) + ); assert_eq!(basic_info.converted_type(), ConvertedType::INT_32); assert_eq!(basic_info.id(), 0); match tp { @@ -1126,7 +1235,23 @@ mod tests { } } - // Test illegal inputs + // Test illegal inputs with logical type + result = Type::primitive_type_builder("foo", PhysicalType::INT64) + .with_repetition(Repetition::REPEATED) + .with_logical_type(Some(LogicalType::INTEGER(IntType { + is_signed: true, + bit_width: 8, + }))) + .build(); + assert!(result.is_err()); + if let Err(e) = result { + assert_eq!( + format!("{}", e), + "Parquet error: Cannot annotate INTEGER(IntType { bit_width: 8, is_signed: true }) from INT64 fields" + ); + } + + // Test illegal inputs with converted type result = Type::primitive_type_builder("foo", PhysicalType::INT64) .with_repetition(Repetition::REPEATED) .with_converted_type(ConvertedType::BSON) @@ -1149,7 +1274,24 @@ mod tests { if let Err(e) = result { assert_eq!( format!("{}", e), - "Parquet error: DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED" + "Parquet error: DECIMAL can only annotate INT32, INT64, BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY" + ); + } + + result = Type::primitive_type_builder("foo", PhysicalType::BYTE_ARRAY) + .with_repetition(Repetition::REQUIRED) + .with_logical_type(Some(LogicalType::DECIMAL(DecimalType { + scale: 32, + precision: 12, + }))) + .with_precision(-1) + .with_scale(-1) + .build(); + assert!(result.is_err()); + if let Err(e) = result { + assert_eq!( + format!("{}", e), + "Parquet error: DECIMAL logical type scale 32 must match self.scale -1" ); } @@ -1245,7 +1387,7 @@ mod tests { if let Err(e) = result { assert_eq!( format!("{}", e), - "Parquet error: Cannot represent FIXED_LEN_BYTE_ARRAY as DECIMAL with length 5 and precision 12" + "Parquet error: Cannot represent FIXED_LEN_BYTE_ARRAY as DECIMAL with length 5 and precision 12. The max precision can only be 11" ); } @@ -1355,6 +1497,7 @@ mod tests { let result = Type::group_type_builder("foo") .with_repetition(Repetition::REPEATED) + .with_logical_type(Some(LogicalType::LIST(Default::default()))) .with_fields(&mut fields) .with_id(1) .build(); @@ -1365,7 +1508,11 @@ mod tests { assert!(tp.is_group()); assert!(!tp.is_primitive()); assert_eq!(basic_info.repetition(), Repetition::REPEATED); - assert_eq!(basic_info.converted_type(), ConvertedType::NONE); + assert_eq!( + basic_info.logical_type(), + Some(LogicalType::LIST(Default::default())) + ); + assert_eq!(basic_info.converted_type(), ConvertedType::LIST); assert_eq!(basic_info.id(), 1); assert_eq!(tp.get_fields().len(), 2); assert_eq!(tp.get_fields()[0].name(), "f1");