diff --git a/Cargo.toml b/Cargo.toml index 397b7dabc3e..09fae9293a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ futures = { version = "0.3", optional = true } ahash = { version = "0.7", optional = true } # parquet support -parquet2 = { version = "0.12", optional = true, default_features = false, features = ["stream"] } +parquet2 = { version = "0.13", optional = true, default_features = false } # avro support avro-schema = { version = "0.2", optional = true } diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index eef4acc9cb0..887ba77d45b 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -194,7 +194,6 @@ fn main() -> Result<()> { let mut writer = FileWriter::try_new(writer, schema, options)?; - writer.start()?; for group in row_groups { writer.write(group?)?; } diff --git a/benches/write_parquet.rs b/benches/write_parquet.rs index 5bd2848c2eb..b1f5ecfd5ef 100644 --- a/benches/write_parquet.rs +++ b/benches/write_parquet.rs @@ -32,7 +32,6 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> { let mut writer = FileWriter::try_new(writer, schema, options)?; - writer.start()?; for group in row_groups { writer.write(group?)?; } diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index ecbbe486f24..b3baf3bf8fc 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -34,7 +34,6 @@ fn write_batch(path: &str, schema: Schema, columns: Chunk>) -> Re let mut writer = FileWriter::try_new(file, schema, options)?; - writer.start()?; for group in row_groups { writer.write(group?)?; } diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index a313196419c..56230b4a4bb 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -97,7 +97,6 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()> let mut writer = FileWriter::try_new(file, schema, options)?; // Write the file. - writer.start()?; for group in row_groups { writer.write(group?)?; } diff --git a/src/doc/lib.md b/src/doc/lib.md index e27e0ff2c37..dbb013cfbd6 100644 --- a/src/doc/lib.md +++ b/src/doc/lib.md @@ -60,7 +60,6 @@ fn main() -> Result<()> { let mut writer = FileWriter::try_new(file, schema, options)?; // Write the file. - writer.start()?; for group in row_groups { writer.write(group?)?; } diff --git a/src/io/parquet/write/file.rs b/src/io/parquet/write/file.rs index 21bfe55d5d5..42153a93a2b 100644 --- a/src/io/parquet/write/file.rs +++ b/src/io/parquet/write/file.rs @@ -72,11 +72,6 @@ impl FileWriter { }) } - /// Writes the header of the file - pub fn start(&mut self) -> Result<()> { - Ok(self.writer.start()?) - } - /// Writes a row group to the file. pub fn write(&mut self, row_group: RowGroupIter<'_, Error>) -> Result<()> { Ok(self.writer.write(row_group)?) diff --git a/src/io/parquet/write/sink.rs b/src/io/parquet/write/sink.rs index 8a953162aa2..1f3e44fa648 100644 --- a/src/io/parquet/write/sink.rs +++ b/src/io/parquet/write/sink.rs @@ -82,7 +82,7 @@ where ) -> Result { let parquet_schema = crate::io::parquet::write::to_parquet_schema(&schema)?; let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); - let mut writer = FileStreamer::new( + let writer = FileStreamer::new( writer, parquet_schema.clone(), ParquetWriteOptions { @@ -91,16 +91,9 @@ where }, created_by, ); - let task = Some( - async move { - writer.start().await?; - Ok(Some(writer)) - } - .boxed(), - ); Ok(Self { - writer: None, - task, + writer: Some(writer), + task: None, options, schema, encoding, @@ -196,7 +189,7 @@ where match futures::ready!(this.poll_complete(cx)) { Ok(()) => { let writer = this.writer.take(); - if let Some(writer) = writer { + if let Some(mut writer) = writer { let meta = std::mem::take(&mut this.metadata); let metadata = if meta.is_empty() { None @@ -209,13 +202,10 @@ where }; let kv_meta = add_arrow_schema(&this.schema, metadata); - this.task = Some( - writer - .end(kv_meta) - .map_ok(|_| None) - .map_err(Error::from) - .boxed(), - ); + this.task = Some(Box::pin(async move { + writer.end(kv_meta).map_err(Error::from).await?; + Ok(None) + })); this.poll_complete(cx) } else { Poll::Ready(Ok(())) diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 68a0ea43e06..79951951e22 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -815,7 +815,6 @@ fn integration_write(schema: &Schema, batches: &[Chunk>]) -> Resu let mut writer = FileWriter::try_new(writer, schema.clone(), options)?; - writer.start()?; for group in row_groups { writer.write(group?)?; } diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index d6e49707757..6dd88618ae9 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -94,7 +94,6 @@ fn read_with_indexes( let writer = vec![]; let mut writer = FileWriter::try_new(writer, schema, options)?; - writer.start()?; writer.write(row_group)?; writer.end(None)?; let data = writer.into_inner(); diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index 2c41aac5fa2..471b9d72afd 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -47,7 +47,6 @@ fn round_trip( let writer = Cursor::new(vec![]); let mut writer = FileWriter::try_new(writer, schema, options)?; - writer.start()?; for group in row_groups { writer.write(group?)?; }