diff --git a/crates/core/CHANGELOG.md b/crates/core/CHANGELOG.md index 7f881f48..8db4b2cf 100644 --- a/crates/core/CHANGELOG.md +++ b/crates/core/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Use `Encoder` instead of `TableBuilder` for `geoarrow` (breaking) ([#840](https://github.com/stac-utils/rustac/pull/840)) +- Break apart geoparquet writing into a `WriterBuilder` and a `Writer` (breaking) ([#841](https://github.com/stac-utils/rustac/pull/841)) ## [0.13.3] - 2025-11-13 diff --git a/crates/core/src/geoparquet.rs b/crates/core/src/geoparquet.rs index 9196a176..a01d12c2 100644 --- a/crates/core/src/geoparquet.rs +++ b/crates/core/src/geoparquet.rs @@ -2,7 +2,7 @@ use crate::{ Catalog, Collection, Error, Item, ItemCollection, Result, Value, - geoarrow::{VERSION, VERSION_KEY}, + geoarrow::{Encoder, Options, VERSION, VERSION_KEY}, }; use bytes::Bytes; use geoparquet::{ @@ -70,7 +70,9 @@ pub fn into_writer(writer: W, item_collection: impl Into) -> where W: Write + Send, { - WriterBuilder::new(writer, item_collection).write() + WriterBuilder::new(writer) + .build(item_collection.into().items)? + .finish() } /// Writes a [ItemCollection] to a [std::io::Write] as @@ -88,58 +90,186 @@ where /// ``` pub fn into_writer_with_compression( writer: W, + // TODO should we switch to just take a vector of items in the signature? item_collection: impl Into, compression: Compression, ) -> Result<()> where W: Write + Send, { - WriterBuilder::new(writer, item_collection) + WriterBuilder::new(writer) .compression(compression) - .write() + .build(item_collection.into().items) + .and_then(|writer| writer.finish()) } -struct WriterBuilder { +/// Builder for a stac-geoparquet writer. +#[derive(Debug)] +pub struct WriterBuilder { writer: W, - item_collection: ItemCollection, + options: Options, compression: Option, } +/// Write items to stac-geoparquet. +#[allow(missing_debug_implementations)] +pub struct Writer { + geoarrow_encoder: Encoder, + encoder: GeoParquetRecordBatchEncoder, + arrow_writer: ArrowWriter, +} + impl WriterBuilder { - fn new(writer: W, item_collection: impl Into) -> WriterBuilder { + /// Creates a new writer builder. + /// + /// # Examples + /// + /// ``` + /// use std::io::Cursor; + /// use stac::{Item, geoparquet::WriterBuilder}; + /// + /// let item: Item = stac::read("examples/simple-item.json").unwrap(); + /// let cursor = Cursor::new(Vec::new()); + /// let writer = WriterBuilder::new(cursor).build(vec![item]).unwrap(); + /// ``` + pub fn new(writer: W) -> WriterBuilder { WriterBuilder { writer, - item_collection: item_collection.into(), + options: Options::default(), compression: Some(default_compression()), } } - fn compression(mut self, compression: impl Into>) -> WriterBuilder { + /// Sets the parquet compression. + /// + /// # Examples + /// + /// ``` + /// use std::io::Cursor; + /// use stac::{Item, geoparquet::{WriterBuilder, Compression}}; + /// + /// let item: Item = stac::read("examples/simple-item.json").unwrap(); + /// let cursor = Cursor::new(Vec::new()); + /// let writer = WriterBuilder::new(cursor) + /// .compression(Compression::SNAPPY) + /// .build(vec![item]) + /// .unwrap(); + /// ``` + pub fn compression(mut self, compression: impl Into>) -> WriterBuilder { self.compression = compression.into(); self } - fn write(self) -> Result<()> { - let (record_batch, schema) = crate::geoarrow::encode(self.item_collection.items)?; + /// Sets the geoarrow encoding options + /// + /// # Examples + /// + /// ``` + /// use std::io::Cursor; + /// use stac::{Item, geoarrow::Options, geoparquet::WriterBuilder}; + /// + /// let item: Item = stac::read("examples/simple-item.json").unwrap(); + /// let cursor = Cursor::new(Vec::new()); + /// let options = Options::default(); + /// let writer = WriterBuilder::new(cursor) + /// .options(options) + /// .build(vec![item]) + /// .unwrap(); + /// ``` + pub fn options(mut self, options: Options) -> WriterBuilder { + self.options = options; + self + } + + /// Builds the writer. + /// + /// # Examples + /// + /// ``` + /// use std::io::Cursor; + /// use stac::{Item, geoparquet::WriterBuilder}; + /// + /// let item: Item = stac::read("examples/simple-item.json").unwrap(); + /// let cursor = Cursor::new(Vec::new()); + /// let writer = WriterBuilder::new(cursor).build(vec![item]).unwrap(); + /// writer.finish().unwrap(); + /// ``` + pub fn build(self, items: Vec) -> Result> { + Writer::new(self.writer, self.options, self.compression, items) + } +} + +impl Writer { + fn new( + writer: W, + options: Options, + compression: Option, + items: Vec, + ) -> Result { + let (geoarrow_encoder, record_batch) = Encoder::new(items, options)?; let options = GeoParquetWriterOptionsBuilder::default() .set_primary_column("geometry".to_string()) .build(); - let mut encoder = GeoParquetRecordBatchEncoder::try_new(&schema, &options)?; + let mut encoder = GeoParquetRecordBatchEncoder::try_new(&record_batch.schema(), &options)?; let mut builder = WriterProperties::builder(); - if let Some(compression) = self.compression { + if let Some(compression) = compression { builder = builder.set_compression(compression); } let properties = builder.build(); - let mut writer = - ArrowWriter::try_new(self.writer, encoder.target_schema(), Some(properties))?; + let mut arrow_writer = + ArrowWriter::try_new(writer, encoder.target_schema(), Some(properties))?; let record_batch = encoder.encode_record_batch(&record_batch)?; - writer.write(&record_batch)?; - writer.append_key_value_metadata(encoder.into_keyvalue()?); - writer.append_key_value_metadata(KeyValue::new( + arrow_writer.write(&record_batch)?; + Ok(Writer { + geoarrow_encoder, + encoder, + arrow_writer, + }) + } + + /// Writes more items to this writer. + /// + /// # Examples + /// + /// ``` + /// use std::io::Cursor; + /// use stac::{Item, geoparquet::WriterBuilder}; + /// + /// let item1: Item = stac::read("examples/simple-item.json").unwrap(); + /// let item2 = item1.clone(); + /// let cursor = Cursor::new(Vec::new()); + /// let mut writer = WriterBuilder::new(cursor).build(vec![item1]).unwrap(); + /// writer.write(vec![item2]).unwrap(); + /// writer.finish().unwrap(); + /// ``` + pub fn write(&mut self, items: Vec) -> Result<()> { + let record_batch = self.geoarrow_encoder.encode(items)?; + let record_batch = self.encoder.encode_record_batch(&record_batch)?; + self.arrow_writer.write(&record_batch)?; + Ok(()) + } + + /// Finishes writing. + /// + /// # Examples + /// + /// ``` + /// use std::io::Cursor; + /// use stac::{Item, geoparquet::WriterBuilder}; + /// + /// let item: Item = stac::read("examples/simple-item.json").unwrap(); + /// let cursor = Cursor::new(Vec::new()); + /// let writer = WriterBuilder::new(cursor).build(vec![item]).unwrap(); + /// writer.finish().unwrap(); + /// ``` + pub fn finish(mut self) -> Result<()> { + self.arrow_writer + .append_key_value_metadata(self.encoder.into_keyvalue()?); + self.arrow_writer.append_key_value_metadata(KeyValue::new( VERSION_KEY.to_string(), Some(VERSION.to_string()), )); - let _ = writer.finish()?; + let _ = self.arrow_writer.finish()?; Ok(()) } }