Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Migrated to latest
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 15, 2022
1 parent ecd2ff3 commit c6be676
Show file tree
Hide file tree
Showing 15 changed files with 62 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/io/parquet/write/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use parquet2::{
page::DataPage,
schema::types::PrimitiveType,
statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics},
write::WriteOptions,
};

use super::super::WriteOptions;
use super::super::utils;
use crate::{
array::{Array, BinaryArray, Offset},
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use parquet2::metadata::Descriptor;
use parquet2::{encoding::Encoding, page::DataPage, write::WriteOptions};
use parquet2::{encoding::Encoding, page::DataPage};

use super::super::{levels, utils};
use super::super::{levels, utils, WriteOptions};
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::{
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use parquet2::{
metadata::Descriptor,
page::DataPage,
statistics::{serialize_statistics, BooleanStatistics, ParquetStatistics, Statistics},
write::WriteOptions,
};

use super::super::utils;
use super::super::WriteOptions;
use crate::array::*;
use crate::{error::Result, io::parquet::read::schema::is_nullable};

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, write::WriteOptions};
use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage};

use super::super::{levels, utils};
use super::super::{levels, utils, WriteOptions};
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::{
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use parquet2::{
metadata::Descriptor,
page::{EncodedDictPage, EncodedPage},
statistics::ParquetStatistics,
write::{DynIter, WriteOptions},
write::DynIter,
};

use super::WriteOptions;
use super::binary::build_statistics as binary_build_statistics;
use super::binary::encode_plain as binary_encode_plain;
use super::fixed_len_bytes::build_statistics as fixed_binary_build_statistics;
Expand Down
21 changes: 16 additions & 5 deletions src/io/parquet/write/file.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::io::Write;

use parquet2::metadata::KeyValue;
use parquet2::metadata::SchemaDescriptor;
use parquet2::write::RowGroupIter;
use parquet2::{metadata::KeyValue, write::WriteOptions};
use parquet2::write::WriteOptions as FileWriteOptions;

use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};

use super::{schema::schema_to_metadata_key, to_parquet_schema};
use super::{schema::schema_to_metadata_key, to_parquet_schema, WriteOptions};

/// Attaches [`Schema`] to `key_value_metadata`
pub fn add_arrow_schema(
Expand All @@ -26,13 +27,14 @@ pub fn add_arrow_schema(
pub struct FileWriter<W: Write> {
writer: parquet2::write::FileWriter<W>,
schema: Schema,
options: WriteOptions,
}

// Accessors
impl<W: Write> FileWriter<W> {
/// The options assigned to the file
pub fn options(&self) -> &WriteOptions {
self.writer.options()
pub fn options(&self) -> WriteOptions {
self.options
}

/// The [`SchemaDescriptor`] assigned to this file
Expand All @@ -56,8 +58,17 @@ impl<W: Write> FileWriter<W> {
let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string());

Ok(Self {
writer: parquet2::write::FileWriter::new(writer, parquet_schema, options, created_by),
writer: parquet2::write::FileWriter::new(
writer,
parquet_schema,
FileWriteOptions {
version: options.version,
write_statistics: options.write_statistics,
},
created_by,
),
schema,
options,
})
}

Expand Down
3 changes: 1 addition & 2 deletions src/io/parquet/write/fixed_len_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ use parquet2::{
page::DataPage,
schema::types::PrimitiveType,
statistics::{serialize_statistics, FixedLenStatistics, ParquetStatistics, Statistics},
write::WriteOptions,
};

use super::{binary::ord_binary, utils};
use super::{binary::ord_binary, utils, WriteOptions};
use crate::{
array::{Array, FixedSizeBinaryArray},
error::Result,
Expand Down
15 changes: 12 additions & 3 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,21 @@ pub use parquet2::{
metadata::{Descriptor, KeyValue, SchemaDescriptor},
page::{CompressedDataPage, CompressedPage, EncodedPage},
schema::types::ParquetType,
write::{
compress, Compressor, DynIter, DynStreamingIterator, RowGroupIter, Version, WriteOptions,
},
write::{compress, Compressor, DynIter, DynStreamingIterator, RowGroupIter, Version},
FallibleStreamingIterator,
};

/// Currently supported options to write to parquet
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WriteOptions {
/// Whether to write statistics
pub write_statistics: bool,
/// The page and file version to use
pub version: Version,
/// The compression to apply to every page
pub compression: Compression,
}

pub use file::FileWriter;
pub use row_group::{row_group_iter, RowGroupIterator};
pub use schema::to_parquet_type;
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use parquet2::{
schema::types::PrimitiveType,
statistics::{serialize_statistics, ParquetStatistics, PrimitiveStatistics, Statistics},
types::NativeType,
write::WriteOptions,
};

use super::super::utils;
use super::super::WriteOptions;
use crate::{
array::{Array, PrimitiveArray},
error::Result,
Expand Down
6 changes: 2 additions & 4 deletions src/io/parquet/write/primitive/nested.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use parquet2::{
encoding::Encoding, metadata::Descriptor, page::DataPage, types::NativeType,
write::WriteOptions,
};
use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, types::NativeType};

use super::super::levels;
use super::super::utils;
use super::super::WriteOptions;
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::{
Expand Down
24 changes: 17 additions & 7 deletions src/io/parquet/write/sink.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::{collections::HashMap, pin::Pin, sync::Arc, task::Poll};

use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink, TryFutureExt};
use parquet2::metadata::KeyValue;
use parquet2::write::FileStreamer;
use parquet2::write::WriteOptions as ParquetWriteOptions;

use crate::{
array::Array,
chunk::Chunk,
datatypes::Schema,
error::ArrowError,
io::parquet::write::{Encoding, SchemaDescriptor, WriteOptions},
};
use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink, TryFutureExt};
use parquet2::metadata::KeyValue;
use parquet2::write::FileStreamer;
use std::{collections::HashMap, pin::Pin, sync::Arc, task::Poll};

use super::{Encoding, SchemaDescriptor, WriteOptions};
use super::file::add_arrow_schema;

/// Sink that writes array [`chunks`](Chunk) as a Parquet file.
Expand Down Expand Up @@ -82,10 +85,17 @@ where
encoding: Vec<Encoding>,
options: WriteOptions,
) -> Result<Self, ArrowError> {
// let mut writer = FileStreamer::try_new(writer, schema.clone(), options)?;
let parquet_schema = crate::io::parquet::write::to_parquet_schema(&schema)?;
let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string());
let mut writer = FileStreamer::new(writer, parquet_schema.clone(), options, created_by);
let mut writer = FileStreamer::new(
writer,
parquet_schema.clone(),
ParquetWriteOptions {
version: options.version,
write_statistics: options.write_statistics,
},
created_by,
);
let task = Some(
async move {
writer.start().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/utf8/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use parquet2::{
page::DataPage,
schema::types::PrimitiveType,
statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics},
write::WriteOptions,
};

use super::super::WriteOptions;
use super::super::binary::{encode_delta, ord_binary};
use super::super::utils;
use crate::{
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/utf8/nested.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, write::WriteOptions};
use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage};

use super::super::{levels, utils};
use super::super::{levels, utils, WriteOptions};
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::{
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use parquet2::{
metadata::Descriptor,
page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2},
statistics::ParquetStatistics,
write::WriteOptions,
};

use super::WriteOptions;
use crate::error::Result;

use super::Version;
Expand Down
6 changes: 1 addition & 5 deletions tests/it/io/parquet/write_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,10 @@ use arrow2::{
error::Result,
io::parquet::{
read::{infer_schema, read_columns_many_async, read_metadata_async, RowGroupDeserializer},
write::Encoding,
write::{Compression, Encoding, Version, WriteOptions},
},
};
use futures::{future::BoxFuture, io::Cursor, SinkExt};
use parquet2::{
compression::Compression,
write::{Version, WriteOptions},
};

use super::FileSink;

Expand Down

0 comments on commit c6be676

Please sign in to comment.