Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed

- Use `Encoder` instead of `TableBuilder` for `geoarrow` (breaking) ([#840](https://github.com/stac-utils/rustac/pull/840))
- Break apart geoparquet writing into a `WriterBuilder` and a `Writer` (breaking) ([#841](https://github.com/stac-utils/rustac/pull/841))

## [0.13.3] - 2025-11-13

Expand Down
168 changes: 149 additions & 19 deletions crates/core/src/geoparquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::{
Catalog, Collection, Error, Item, ItemCollection, Result, Value,
geoarrow::{VERSION, VERSION_KEY},
geoarrow::{Encoder, Options, VERSION, VERSION_KEY},
};
use bytes::Bytes;
use geoparquet::{
Expand Down Expand Up @@ -70,7 +70,9 @@ pub fn into_writer<W>(writer: W, item_collection: impl Into<ItemCollection>) ->
where
W: Write + Send,
{
WriterBuilder::new(writer, item_collection).write()
WriterBuilder::new(writer)
.build(item_collection.into().items)?
.finish()
}

/// Writes a [ItemCollection] to a [std::io::Write] as
Expand All @@ -88,58 +90,186 @@ where
/// ```
pub fn into_writer_with_compression<W>(
writer: W,
// TODO should we switch to just take a vector of items in the signature?
item_collection: impl Into<ItemCollection>,
compression: Compression,
) -> Result<()>
where
W: Write + Send,
{
WriterBuilder::new(writer, item_collection)
WriterBuilder::new(writer)
.compression(compression)
.write()
.build(item_collection.into().items)
.and_then(|writer| writer.finish())
}

struct WriterBuilder<W: Write + Send> {
/// Builder for a stac-geoparquet writer.
#[derive(Debug)]
pub struct WriterBuilder<W: Write + Send> {
writer: W,
item_collection: ItemCollection,
options: Options,
compression: Option<Compression>,
}

/// Write items to stac-geoparquet.
#[allow(missing_debug_implementations)]
pub struct Writer<W: Write + Send> {
geoarrow_encoder: Encoder,
encoder: GeoParquetRecordBatchEncoder,
arrow_writer: ArrowWriter<W>,
}

impl<W: Write + Send> WriterBuilder<W> {
fn new(writer: W, item_collection: impl Into<ItemCollection>) -> WriterBuilder<W> {
/// Creates a new writer builder.
///
/// # Examples
///
/// ```
/// use std::io::Cursor;
/// use stac::{Item, geoparquet::WriterBuilder};
///
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
/// let cursor = Cursor::new(Vec::new());
/// let writer = WriterBuilder::new(cursor).build(vec![item]).unwrap();
/// ```
pub fn new(writer: W) -> WriterBuilder<W> {
WriterBuilder {
writer,
item_collection: item_collection.into(),
options: Options::default(),
compression: Some(default_compression()),
}
}

fn compression(mut self, compression: impl Into<Option<Compression>>) -> WriterBuilder<W> {
/// Sets the parquet compression.
///
/// # Examples
///
/// ```
/// use std::io::Cursor;
/// use stac::{Item, geoparquet::{WriterBuilder, Compression}};
///
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
/// let cursor = Cursor::new(Vec::new());
/// let writer = WriterBuilder::new(cursor)
/// .compression(Compression::SNAPPY)
/// .build(vec![item])
/// .unwrap();
/// ```
pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> WriterBuilder<W> {
self.compression = compression.into();
self
}

fn write(self) -> Result<()> {
let (record_batch, schema) = crate::geoarrow::encode(self.item_collection.items)?;
/// Sets the geoarrow encoding options
///
/// # Examples
///
/// ```
/// use std::io::Cursor;
/// use stac::{Item, geoarrow::Options, geoparquet::WriterBuilder};
///
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
/// let cursor = Cursor::new(Vec::new());
/// let options = Options::default();
/// let writer = WriterBuilder::new(cursor)
/// .options(options)
/// .build(vec![item])
/// .unwrap();
/// ```
pub fn options(mut self, options: Options) -> WriterBuilder<W> {
self.options = options;
self
}

/// Builds the writer.
///
/// # Examples
///
/// ```
/// use std::io::Cursor;
/// use stac::{Item, geoparquet::WriterBuilder};
///
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
/// let cursor = Cursor::new(Vec::new());
/// let writer = WriterBuilder::new(cursor).build(vec![item]).unwrap();
/// writer.finish().unwrap();
/// ```
pub fn build(self, items: Vec<Item>) -> Result<Writer<W>> {
Writer::new(self.writer, self.options, self.compression, items)
}
}

impl<W: Write + Send> Writer<W> {
fn new(
writer: W,
options: Options,
compression: Option<Compression>,
items: Vec<Item>,
) -> Result<Self> {
let (geoarrow_encoder, record_batch) = Encoder::new(items, options)?;
let options = GeoParquetWriterOptionsBuilder::default()
.set_primary_column("geometry".to_string())
.build();
let mut encoder = GeoParquetRecordBatchEncoder::try_new(&schema, &options)?;
let mut encoder = GeoParquetRecordBatchEncoder::try_new(&record_batch.schema(), &options)?;
let mut builder = WriterProperties::builder();
if let Some(compression) = self.compression {
if let Some(compression) = compression {
builder = builder.set_compression(compression);
}
let properties = builder.build();
let mut writer =
ArrowWriter::try_new(self.writer, encoder.target_schema(), Some(properties))?;
let mut arrow_writer =
ArrowWriter::try_new(writer, encoder.target_schema(), Some(properties))?;
let record_batch = encoder.encode_record_batch(&record_batch)?;
writer.write(&record_batch)?;
writer.append_key_value_metadata(encoder.into_keyvalue()?);
writer.append_key_value_metadata(KeyValue::new(
arrow_writer.write(&record_batch)?;
Ok(Writer {
geoarrow_encoder,
encoder,
arrow_writer,
})
}

/// Writes more items to this writer.
///
/// # Examples
///
/// ```
/// use std::io::Cursor;
/// use stac::{Item, geoparquet::WriterBuilder};
///
/// let item1: Item = stac::read("examples/simple-item.json").unwrap();
/// let item2 = item1.clone();
/// let cursor = Cursor::new(Vec::new());
/// let mut writer = WriterBuilder::new(cursor).build(vec![item1]).unwrap();
/// writer.write(vec![item2]).unwrap();
/// writer.finish().unwrap();
/// ```
pub fn write(&mut self, items: Vec<Item>) -> Result<()> {
let record_batch = self.geoarrow_encoder.encode(items)?;
let record_batch = self.encoder.encode_record_batch(&record_batch)?;
self.arrow_writer.write(&record_batch)?;
Ok(())
}

/// Finishes writing.
///
/// # Examples
///
/// ```
/// use std::io::Cursor;
/// use stac::{Item, geoparquet::WriterBuilder};
///
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
/// let cursor = Cursor::new(Vec::new());
/// let writer = WriterBuilder::new(cursor).build(vec![item]).unwrap();
/// writer.finish().unwrap();
/// ```
pub fn finish(mut self) -> Result<()> {
self.arrow_writer
.append_key_value_metadata(self.encoder.into_keyvalue()?);
self.arrow_writer.append_key_value_metadata(KeyValue::new(
VERSION_KEY.to_string(),
Some(VERSION.to_string()),
));
let _ = writer.finish()?;
let _ = self.arrow_writer.finish()?;
Ok(())
}
}
Expand Down