diff --git a/src/io/parquet/write/binary/basic.rs b/src/io/parquet/write/binary/basic.rs index 5d67dbf90f8..6810a373810 100644 --- a/src/io/parquet/write/binary/basic.rs +++ b/src/io/parquet/write/binary/basic.rs @@ -4,9 +4,9 @@ use parquet2::{ page::DataPage, schema::types::PrimitiveType, statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics}, - write::WriteOptions, }; +use super::super::WriteOptions; use super::super::utils; use crate::{ array::{Array, BinaryArray, Offset}, diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index 941a910ac3a..a6e65e2f7e4 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -1,7 +1,7 @@ use parquet2::metadata::Descriptor; -use parquet2::{encoding::Encoding, page::DataPage, write::WriteOptions}; +use parquet2::{encoding::Encoding, page::DataPage}; -use super::super::{levels, utils}; +use super::super::{levels, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; use crate::{ diff --git a/src/io/parquet/write/boolean/basic.rs b/src/io/parquet/write/boolean/basic.rs index e70a0de769d..643a25cd5b2 100644 --- a/src/io/parquet/write/boolean/basic.rs +++ b/src/io/parquet/write/boolean/basic.rs @@ -3,10 +3,10 @@ use parquet2::{ metadata::Descriptor, page::DataPage, statistics::{serialize_statistics, BooleanStatistics, ParquetStatistics, Statistics}, - write::WriteOptions, }; use super::super::utils; +use super::super::WriteOptions; use crate::array::*; use crate::{error::Result, io::parquet::read::schema::is_nullable}; diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index d758bd5097e..40645eea3d2 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -1,6 +1,6 @@ -use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, write::WriteOptions}; +use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage}; -use super::super::{levels, utils}; +use super::super::{levels, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; use crate::{ diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index d0d2fbb3ae3..bd9763f6092 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -3,9 +3,10 @@ use parquet2::{ metadata::Descriptor, page::{EncodedDictPage, EncodedPage}, statistics::ParquetStatistics, - write::{DynIter, WriteOptions}, + write::DynIter, }; +use super::WriteOptions; use super::binary::build_statistics as binary_build_statistics; use super::binary::encode_plain as binary_encode_plain; use super::fixed_len_bytes::build_statistics as fixed_binary_build_statistics; diff --git a/src/io/parquet/write/file.rs b/src/io/parquet/write/file.rs index 2354fead5e7..72b95574b18 100644 --- a/src/io/parquet/write/file.rs +++ b/src/io/parquet/write/file.rs @@ -1,13 +1,14 @@ use std::io::Write; +use parquet2::metadata::KeyValue; use parquet2::metadata::SchemaDescriptor; use parquet2::write::RowGroupIter; -use parquet2::{metadata::KeyValue, write::WriteOptions}; +use parquet2::write::WriteOptions as FileWriteOptions; use crate::datatypes::Schema; use crate::error::{ArrowError, Result}; -use super::{schema::schema_to_metadata_key, to_parquet_schema}; +use super::{schema::schema_to_metadata_key, to_parquet_schema, WriteOptions}; /// Attaches [`Schema`] to `key_value_metadata` pub fn add_arrow_schema( @@ -26,13 +27,14 @@ pub fn add_arrow_schema( pub struct FileWriter { writer: parquet2::write::FileWriter, schema: Schema, + options: WriteOptions, } // Accessors impl FileWriter { /// The options assigned to the file - pub fn options(&self) -> &WriteOptions { - self.writer.options() + pub fn options(&self) -> WriteOptions { + self.options } /// The [`SchemaDescriptor`] assigned to this file @@ -56,8 +58,17 @@ impl FileWriter { let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); Ok(Self { - writer: parquet2::write::FileWriter::new(writer, parquet_schema, options, created_by), + writer: parquet2::write::FileWriter::new( + writer, + parquet_schema, + FileWriteOptions { + version: options.version, + write_statistics: options.write_statistics, + }, + created_by, + ), schema, + options, }) } diff --git a/src/io/parquet/write/fixed_len_bytes.rs b/src/io/parquet/write/fixed_len_bytes.rs index 32bce74d1c8..59ae75134e8 100644 --- a/src/io/parquet/write/fixed_len_bytes.rs +++ b/src/io/parquet/write/fixed_len_bytes.rs @@ -4,10 +4,9 @@ use parquet2::{ page::DataPage, schema::types::PrimitiveType, statistics::{serialize_statistics, FixedLenStatistics, ParquetStatistics, Statistics}, - write::WriteOptions, }; -use super::{binary::ord_binary, utils}; +use super::{binary::ord_binary, utils, WriteOptions}; use crate::{ array::{Array, FixedSizeBinaryArray}, error::Result, diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 7972a50661d..07bf6211edd 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -29,12 +29,21 @@ pub use parquet2::{ metadata::{Descriptor, KeyValue, SchemaDescriptor}, page::{CompressedDataPage, CompressedPage, EncodedPage}, schema::types::ParquetType, - write::{ - compress, Compressor, DynIter, DynStreamingIterator, RowGroupIter, Version, WriteOptions, - }, + write::{compress, Compressor, DynIter, DynStreamingIterator, RowGroupIter, Version}, FallibleStreamingIterator, }; +/// Currently supported options to write to parquet +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct WriteOptions { + /// Whether to write statistics + pub write_statistics: bool, + /// The page and file version to use + pub version: Version, + /// The compression to apply to every page + pub compression: Compression, +} + pub use file::FileWriter; pub use row_group::{row_group_iter, RowGroupIterator}; pub use schema::to_parquet_type; diff --git a/src/io/parquet/write/primitive/basic.rs b/src/io/parquet/write/primitive/basic.rs index 2b7ea6e08b9..1c58804fa1a 100644 --- a/src/io/parquet/write/primitive/basic.rs +++ b/src/io/parquet/write/primitive/basic.rs @@ -5,10 +5,10 @@ use parquet2::{ schema::types::PrimitiveType, statistics::{serialize_statistics, ParquetStatistics, PrimitiveStatistics, Statistics}, types::NativeType, - write::WriteOptions, }; use super::super::utils; +use super::super::WriteOptions; use crate::{ array::{Array, PrimitiveArray}, error::Result, diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index ff8d3f1a658..86732fdae97 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -1,10 +1,8 @@ -use parquet2::{ - encoding::Encoding, metadata::Descriptor, page::DataPage, types::NativeType, - write::WriteOptions, -}; +use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, types::NativeType}; use super::super::levels; use super::super::utils; +use super::super::WriteOptions; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; use crate::{ diff --git a/src/io/parquet/write/sink.rs b/src/io/parquet/write/sink.rs index 47b994840d1..574e9168610 100644 --- a/src/io/parquet/write/sink.rs +++ b/src/io/parquet/write/sink.rs @@ -1,15 +1,18 @@ +use std::{collections::HashMap, pin::Pin, sync::Arc, task::Poll}; + +use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink, TryFutureExt}; +use parquet2::metadata::KeyValue; +use parquet2::write::FileStreamer; +use parquet2::write::WriteOptions as ParquetWriteOptions; + use crate::{ array::Array, chunk::Chunk, datatypes::Schema, error::ArrowError, - io::parquet::write::{Encoding, SchemaDescriptor, WriteOptions}, }; -use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink, TryFutureExt}; -use parquet2::metadata::KeyValue; -use parquet2::write::FileStreamer; -use std::{collections::HashMap, pin::Pin, sync::Arc, task::Poll}; +use super::{Encoding, SchemaDescriptor, WriteOptions}; use super::file::add_arrow_schema; /// Sink that writes array [`chunks`](Chunk) as a Parquet file. @@ -82,10 +85,17 @@ where encoding: Vec, options: WriteOptions, ) -> Result { - // let mut writer = FileStreamer::try_new(writer, schema.clone(), options)?; let parquet_schema = crate::io::parquet::write::to_parquet_schema(&schema)?; let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); - let mut writer = FileStreamer::new(writer, parquet_schema.clone(), options, created_by); + let mut writer = FileStreamer::new( + writer, + parquet_schema.clone(), + ParquetWriteOptions { + version: options.version, + write_statistics: options.write_statistics, + }, + created_by, + ); let task = Some( async move { writer.start().await?; diff --git a/src/io/parquet/write/utf8/basic.rs b/src/io/parquet/write/utf8/basic.rs index cf45e4e9cd6..64a4a75360a 100644 --- a/src/io/parquet/write/utf8/basic.rs +++ b/src/io/parquet/write/utf8/basic.rs @@ -4,9 +4,9 @@ use parquet2::{ page::DataPage, schema::types::PrimitiveType, statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics}, - write::WriteOptions, }; +use super::super::WriteOptions; use super::super::binary::{encode_delta, ord_binary}; use super::super::utils; use crate::{ diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 69f395f17d9..32b83cc0d7e 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -1,6 +1,6 @@ -use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, write::WriteOptions}; +use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage}; -use super::super::{levels, utils}; +use super::super::{levels, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; use crate::{ diff --git a/src/io/parquet/write/utils.rs b/src/io/parquet/write/utils.rs index 4f011401ef1..851034d6ae9 100644 --- a/src/io/parquet/write/utils.rs +++ b/src/io/parquet/write/utils.rs @@ -6,9 +6,9 @@ use parquet2::{ metadata::Descriptor, page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}, statistics::ParquetStatistics, - write::WriteOptions, }; +use super::WriteOptions; use crate::error::Result; use super::Version; diff --git a/tests/it/io/parquet/write_async.rs b/tests/it/io/parquet/write_async.rs index 5f9d09515e5..86ff5434df6 100644 --- a/tests/it/io/parquet/write_async.rs +++ b/tests/it/io/parquet/write_async.rs @@ -7,14 +7,10 @@ use arrow2::{ error::Result, io::parquet::{ read::{infer_schema, read_columns_many_async, read_metadata_async, RowGroupDeserializer}, - write::Encoding, + write::{Compression, Encoding, Version, WriteOptions}, }, }; use futures::{future::BoxFuture, io::Cursor, SinkExt}; -use parquet2::{ - compression::Compression, - write::{Version, WriteOptions}, -}; use super::FileSink;