From b9eae795549bdf81e2c85edeb869df7b21c76e36 Mon Sep 17 00:00:00 2001 From: Dexter Duckworth Date: Wed, 2 Mar 2022 23:11:19 -0500 Subject: [PATCH 1/3] implemented `futures::Sink` for parquet async writer (#877) --- src/io/parquet/write/mod.rs | 4 +- src/io/parquet/write/sink.rs | 223 +++++++++++++++++++++++++++++ src/io/parquet/write/stream.rs | 71 --------- tests/it/io/parquet/mod.rs | 1 + tests/it/io/parquet/write_async.rs | 82 +++++++++++ 5 files changed, 308 insertions(+), 73 deletions(-) create mode 100644 src/io/parquet/write/sink.rs delete mode 100644 src/io/parquet/write/stream.rs create mode 100644 tests/it/io/parquet/write_async.rs diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 5fec264cfd3..8bebbd8e293 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -8,7 +8,7 @@ mod levels; mod primitive; mod row_group; mod schema; -mod stream; +mod sink; mod utf8; mod utils; @@ -39,7 +39,7 @@ pub use parquet2::{ pub use file::FileWriter; pub use row_group::{row_group_iter, RowGroupIterator}; pub use schema::to_parquet_type; -pub use stream::FileStreamer; +pub use sink::FileSink; pub(self) fn decimal_length_from_precision(precision: usize) -> usize { // digits = floor(log_10(2^(8*n - 1) - 1)) diff --git a/src/io/parquet/write/sink.rs b/src/io/parquet/write/sink.rs new file mode 100644 index 00000000000..8906be9431b --- /dev/null +++ b/src/io/parquet/write/sink.rs @@ -0,0 +1,223 @@ +use crate::{ + array::Array, + chunk::Chunk, + datatypes::Schema, + error::ArrowError, + io::parquet::write::{Encoding, SchemaDescriptor, WriteOptions}, +}; +use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink, TryFutureExt}; +use parquet2::metadata::KeyValue; +use parquet2::write::FileStreamer; +use std::{collections::HashMap, pin::Pin, sync::Arc, task::Poll}; + +use super::file::add_arrow_schema; + +/// Sink that writes array [`chunks`](Chunk) as a Parquet file. +/// +/// Any values in the sink's `metadata` field will be written to the file's footer +/// when the sink is closed. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Arc; +/// use futures::SinkExt; +/// use arrow2::array::{Array, Int32Array}; +/// use arrow2::datatypes::{DataType, Field, Schema}; +/// use arrow2::chunk::Chunk; +/// use arrow2::io::parquet::write::{Encoding, WriteOptions, Compression, Version}; +/// # use arrow2::io::parquet::write::FileSink; +/// # futures::executor::block_on(async move { +/// +/// let schema = Schema::from(vec![ +/// Field::new("values", DataType::Int32, true), +/// ]); +/// let encoding = vec![Encoding::Plain]; +/// let options = WriteOptions { +/// write_statistics: true, +/// compression: Compression::Uncompressed, +/// version: Version::V2, +/// }; +/// +/// let mut buffer = vec![]; +/// let mut sink = FileSink::try_new( +/// &mut buffer, +/// schema, +/// encoding, +/// options, +/// )?; +/// +/// for i in 0..3 { +/// let values = Int32Array::from(&[Some(i), None]); +/// let chunk = Chunk::new(vec![Arc::new(values) as Arc]); +/// sink.feed(chunk).await?; +/// } +/// sink.metadata.insert(String::from("key"), Some(String::from("value"))); +/// sink.close().await?; +/// # arrow2::error::Result::Ok(()) +/// # }).unwrap(); +/// ``` +pub struct FileSink<'a, W: AsyncWrite + Send + Unpin> { + writer: Option>, + task: Option>, ArrowError>>>, + options: WriteOptions, + encoding: Vec, + schema: Schema, + parquet_schema: SchemaDescriptor, + /// Key-value metadata that will be written to the file on close. + pub metadata: HashMap>, +} + +impl<'a, W> FileSink<'a, W> +where + W: AsyncWrite + Send + Unpin + 'a, +{ + /// Create a new sink that writes arrays to the provided `writer`. + /// + /// # Error + /// If the Arrow schema can't be converted to a valid Parquet schema. + pub fn try_new( + writer: W, + schema: Schema, + encoding: Vec, + options: WriteOptions, + ) -> Result { + // let mut writer = FileStreamer::try_new(writer, schema.clone(), options)?; + let parquet_schema = crate::io::parquet::write::to_parquet_schema(&schema)?; + let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); + let mut writer = FileStreamer::new(writer, parquet_schema.clone(), options, created_by); + let task = Some( + async move { + writer.start().await?; + Ok(Some(writer)) + } + .boxed(), + ); + Ok(Self { + writer: None, + task, + options, + schema, + encoding, + parquet_schema, + metadata: HashMap::default(), + }) + } + + /// The Arrow [`Schema`] for the file. + pub fn schema(&self) -> &Schema { + &self.schema + } + + /// The Parquet [`SchemaDescriptor`] for the file. + pub fn parquet_schema(&self) -> &SchemaDescriptor { + &self.parquet_schema + } + + /// The write options for the file. + pub fn options(&self) -> &WriteOptions { + &self.options + } + + fn poll_complete( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if let Some(task) = &mut self.task { + match futures::ready!(task.poll_unpin(cx)) { + Ok(writer) => { + self.task = None; + self.writer = writer; + Poll::Ready(Ok(())) + } + Err(error) => { + self.task = None; + Poll::Ready(Err(error)) + } + } + } else { + Poll::Ready(Ok(())) + } + } +} + +impl<'a, W> Sink>> for FileSink<'a, W> +where + W: AsyncWrite + Send + Unpin + 'a, +{ + type Error = ArrowError; + + fn start_send(self: Pin<&mut Self>, item: Chunk>) -> Result<(), Self::Error> { + let this = self.get_mut(); + if let Some(mut writer) = this.writer.take() { + let count = item.len(); + let rows = crate::io::parquet::write::row_group_iter( + item, + this.encoding.clone(), + this.parquet_schema.columns().to_vec(), + this.options, + ); + this.task = Some(Box::pin(async move { + writer.write(rows, count).await?; + Ok(Some(writer)) + })); + Ok(()) + } else { + Err(ArrowError::Io(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "writer closed".to_string(), + ))) + } + } + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_mut().poll_complete(cx) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.get_mut().poll_complete(cx) + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + match futures::ready!(this.poll_complete(cx)) { + Ok(()) => { + let writer = this.writer.take(); + if let Some(writer) = writer { + let meta = std::mem::take(&mut this.metadata); + let metadata = if meta.is_empty() { + None + } else { + Some( + meta.into_iter() + .map(|(k, v)| KeyValue::new(k, v)) + .collect::>(), + ) + }; + let kv_meta = add_arrow_schema(&this.schema, metadata); + + this.task = Some( + writer + .end(kv_meta) + .map_ok(|_| None) + .map_err(ArrowError::from) + .boxed(), + ); + this.poll_complete(cx) + } else { + Poll::Ready(Ok(())) + } + } + Err(error) => Poll::Ready(Err(error)), + } + } +} diff --git a/src/io/parquet/write/stream.rs b/src/io/parquet/write/stream.rs deleted file mode 100644 index 259c5aaa347..00000000000 --- a/src/io/parquet/write/stream.rs +++ /dev/null @@ -1,71 +0,0 @@ -//! Contains `async` APIs to write to parquet. -use futures::AsyncWrite; - -use parquet2::metadata::{KeyValue, SchemaDescriptor}; -use parquet2::write::RowGroupIter; - -use crate::datatypes::*; -use crate::error::{ArrowError, Result}; - -use super::file::add_arrow_schema; -use super::{to_parquet_schema, WriteOptions}; - -/// An interface to write a parquet to a [`AsyncWrite`] -pub struct FileStreamer { - writer: parquet2::write::FileStreamer, - schema: Schema, -} - -// Accessors -impl FileStreamer { - /// The options assigned to the file - pub fn options(&self) -> &WriteOptions { - self.writer.options() - } - - /// The [`SchemaDescriptor`] assigned to this file - pub fn parquet_schema(&self) -> &SchemaDescriptor { - self.writer.schema() - } - - /// The [`Schema`] assigned to this file - pub fn schema(&self) -> &Schema { - &self.schema - } -} - -impl FileStreamer { - /// Returns a new [`FileStreamer`]. - /// # Error - /// If it is unable to derive a parquet schema from [`Schema`]. - pub fn try_new(writer: W, schema: Schema, options: WriteOptions) -> Result { - let parquet_schema = to_parquet_schema(&schema)?; - - let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); - - Ok(Self { - writer: parquet2::write::FileStreamer::new(writer, parquet_schema, options, created_by), - schema, - }) - } - - /// Writes the header of the file - pub async fn start(&mut self) -> Result<()> { - Ok(self.writer.start().await?) - } - - /// Writes a row group to the file. - pub async fn write( - &mut self, - row_group: RowGroupIter<'_, ArrowError>, - num_rows: usize, - ) -> Result<()> { - Ok(self.writer.write(row_group, num_rows).await?) - } - - /// Writes the footer of the parquet file. Returns the total size of the file. - pub async fn end(self, key_value_metadata: Option>) -> Result<(u64, W)> { - let key_value_metadata = add_arrow_schema(&self.schema, key_value_metadata); - Ok(self.writer.end(key_value_metadata).await?) - } -} diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 8eb009204b2..0528b85d846 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -10,6 +10,7 @@ use crate::io::ipc::read_gzip_json; mod read; mod write; +mod write_async; type ArrayStats = (Arc, Option>); diff --git a/tests/it/io/parquet/write_async.rs b/tests/it/io/parquet/write_async.rs new file mode 100644 index 00000000000..5f9d09515e5 --- /dev/null +++ b/tests/it/io/parquet/write_async.rs @@ -0,0 +1,82 @@ +use std::{collections::HashMap, sync::Arc}; + +use arrow2::{ + array::{Array, Float32Array, Int32Array}, + chunk::Chunk, + datatypes::{DataType, Field, Schema}, + error::Result, + io::parquet::{ + read::{infer_schema, read_columns_many_async, read_metadata_async, RowGroupDeserializer}, + write::Encoding, + }, +}; +use futures::{future::BoxFuture, io::Cursor, SinkExt}; +use parquet2::{ + compression::Compression, + write::{Version, WriteOptions}, +}; + +use super::FileSink; + +#[tokio::test] +async fn test_parquet_async_roundtrip() { + let mut data = vec![]; + for i in 0..5 { + let a1 = Int32Array::from(&[Some(i), None, Some(i + 1)]); + let a2 = Float32Array::from(&[None, Some(i as f32), None]); + let chunk = Chunk::new(vec![ + Arc::new(a1) as Arc, + Arc::new(a2) as Arc, + ]); + data.push(chunk); + } + let schema = Schema::from(vec![ + Field::new("a1", DataType::Int32, true), + Field::new("a2", DataType::Float32, true), + ]); + let encoding = vec![Encoding::Plain, Encoding::Plain]; + let options = WriteOptions { + write_statistics: true, + compression: Compression::Uncompressed, + version: Version::V2, + }; + + let mut buffer = Cursor::new(Vec::new()); + let mut sink = FileSink::try_new(&mut buffer, schema.clone(), encoding, options).unwrap(); + sink.metadata + .insert(String::from("key"), Some("value".to_string())); + for chunk in &data { + sink.feed(chunk.clone()).await.unwrap(); + } + sink.close().await.unwrap(); + drop(sink); + + buffer.set_position(0); + let metadata = read_metadata_async(&mut buffer).await.unwrap(); + let kv = HashMap::>::from_iter( + metadata + .key_value_metadata() + .to_owned() + .unwrap() + .into_iter() + .map(|kv| (kv.key, kv.value)), + ); + assert_eq!(kv.get("key").unwrap(), &Some("value".to_string())); + let read_schema = infer_schema(&metadata).unwrap(); + assert_eq!(read_schema, schema); + let factory = || Box::pin(futures::future::ready(Ok(buffer.clone()))) as BoxFuture<_>; + + let mut out = vec![]; + for group in &metadata.row_groups { + let column_chunks = read_columns_many_async(factory, group, schema.fields.clone(), None) + .await + .unwrap(); + let chunks = RowGroupDeserializer::new(column_chunks, group.num_rows() as usize, None); + let mut chunks = chunks.collect::>>().unwrap(); + out.append(&mut chunks); + } + + for i in 0..5 { + assert_eq!(data[i], out[i]); + } +} From a87c758a80facbfbcde4768ad42cdf4c5c2cc5a8 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Thu, 3 Mar 2022 05:12:24 +0100 Subject: [PATCH 2/3] remove superfluous assert (#872) --- src/array/growable/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/array/growable/mod.rs b/src/array/growable/mod.rs index c6d45356237..03f5d2345d5 100644 --- a/src/array/growable/mod.rs +++ b/src/array/growable/mod.rs @@ -73,7 +73,6 @@ pub fn make_growable<'a>( ) -> Box + 'a> { assert!(!arrays.is_empty()); let data_type = arrays[0].data_type(); - assert!(arrays.iter().all(|&item| item.data_type() == data_type)); use PhysicalType::*; match data_type.to_physical_type() { From 66616ddda06b27cc96e07d43aadafe990387c927 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Thu, 3 Mar 2022 17:57:30 +0100 Subject: [PATCH 3/3] expose ListValuesIter (#874) --- src/array/list/iterator.rs | 2 +- src/array/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/array/list/iterator.rs b/src/array/list/iterator.rs index 6885bf9d925..f4d6ae71f71 100644 --- a/src/array/list/iterator.rs +++ b/src/array/list/iterator.rs @@ -13,7 +13,7 @@ pub struct ListValuesIter<'a, A: IterableListArray> { impl<'a, A: IterableListArray> ListValuesIter<'a, A> { #[inline] - pub fn new(array: &'a A) -> Self { + pub(crate) fn new(array: &'a A) -> Self { Self { array, index: 0, diff --git a/src/array/mod.rs b/src/array/mod.rs index bff8effb0a6..b58633ad8cb 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -375,7 +375,7 @@ pub use boolean::{BooleanArray, MutableBooleanArray}; pub use dictionary::{DictionaryArray, DictionaryKey, MutableDictionaryArray}; pub use fixed_size_binary::{FixedSizeBinaryArray, MutableFixedSizeBinaryArray}; pub use fixed_size_list::{FixedSizeListArray, MutableFixedSizeListArray}; -pub use list::{ListArray, MutableListArray}; +pub use list::{ListArray, ListValuesIter, MutableListArray}; pub use map::MapArray; pub use null::NullArray; pub use primitive::*;