Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,4 @@ tracing-subscriber = { version = "0.3.18", features = [
tracing-indicatif = "0.3.9"
url = "2.3"
webpki-roots = "1.0.0"
wkb = "0.9.0"
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ geoarrow = [
"dep:arrow-schema",
"dep:geo-traits",
"dep:geo-types",
"dep:wkb",
]
geoparquet = ["geoarrow", "dep:geoparquet", "dep:parquet"]

Expand Down Expand Up @@ -49,6 +50,7 @@ stac-derive.workspace = true
thiserror.workspace = true
tracing.workspace = true
url = { workspace = true, features = ["serde"] }
wkb = { workspace = true, optional = true }

[dev-dependencies]
assert-json-diff.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ pub enum Error {
/// [url::ParseError]
#[error(transparent)]
UrlParse(#[from] url::ParseError),

/// [wkb::error::WkbError]
#[error(transparent)]
#[cfg(feature = "geoarrow")]
Wkb(#[from] wkb::error::WkbError),
}
33 changes: 29 additions & 4 deletions crates/core/src/geoarrow/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,35 @@ fn set_column_for_json_rows(
}
}
_ => {
return Err(ArrowError::JsonError(format!(
"data type {:?} not supported in nested map for json writer",
array.data_type()
)));
if col_name == "proj:geometry" {
let binary_array = as_generic_binary_array::<i32>(array);
rows.iter_mut()
.zip(binary_array.iter())
.filter_map(|(maybe_row, maybe_value)| {
maybe_row.as_mut().map(|row| (row, maybe_value))
})
.try_for_each(|(row, maybe_value)| -> Result<(), ArrowError> {
let maybe_value = maybe_value
.map(|value| -> Result<_, ArrowError> {
let wkb = wkb::reader::read_wkb(value)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))?;
let value = geojson::Value::from(&wkb.to_geometry());
Ok(value)
})
.transpose()?;
if let Some(j) = maybe_value {
row.insert(col_name.to_string(), Value::from(&j));
} else if explicit_nulls {
row.insert(col_name.to_string(), Value::Null);
}
Ok(())
})?;
} else {
return Err(ArrowError::JsonError(format!(
"data type {:?} not supported in nested map for json writer",
array.data_type()
)));
}
}
}
Ok(())
Expand Down
67 changes: 45 additions & 22 deletions crates/core/src/geoarrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
pub mod json;

use crate::{Error, ItemCollection, Result};
use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader, cast::AsArray};
use arrow_array::{
Array, RecordBatch, RecordBatchIterator, RecordBatchReader, builder::BinaryBuilder,
cast::AsArray,
};
use arrow_json::ReaderBuilder;
use arrow_schema::{DataType, Field, SchemaBuilder, SchemaRef, TimeUnit};
use geo_types::Geometry;
Expand All @@ -14,17 +17,14 @@ use geoarrow_array::{
};
use geoarrow_schema::{GeoArrowType, GeometryType, Metadata};
use serde_json::{Value, json};
use std::{collections::HashMap, sync::Arc};
use std::{io::Cursor, sync::Arc};

/// The stac-geoparquet version metadata key.
pub const VERSION_KEY: &str = "stac:geoparquet_version";

/// The stac-geoparquet version.
pub const VERSION: &str = "1.0.0";

/// Geometry columns.
pub const GEOMETRY_COLUMNS: [&str; 2] = ["geometry", "proj:geometry"];

/// Datetime columns.
pub const DATETIME_COLUMNS: [&str; 8] = [
"datetime",
Expand Down Expand Up @@ -79,7 +79,8 @@ impl TableBuilder {
/// ```
pub fn build(self) -> Result<Table> {
let mut values = Vec::with_capacity(self.item_collection.items.len());
let mut geometry_builders = HashMap::new();
let mut geometry_builder = GeometryBuilder::new(GeometryType::new(Default::default()));
let mut proj_geometry_builder = BinaryBuilder::new();

for item in self.item_collection.items {
let mut value =
Expand All @@ -88,18 +89,20 @@ impl TableBuilder {
let value = value
.as_object_mut()
.expect("a flat item should serialize to an object");
for key in GEOMETRY_COLUMNS {
if let Some(value) = value.remove(key) {
let entry = geometry_builders.entry(key).or_insert_with(|| {
let geometry_type = GeometryType::new(Default::default());
GeometryBuilder::new(geometry_type)
});
let geometry =
geojson::Geometry::from_json_value(value).map_err(Box::new)?;
entry.push_geometry(Some(
&(Geometry::try_from(geometry).map_err(Box::new)?),
))?;
}
if let Some(value) = value.remove("geometry") {
let geometry = geojson::Geometry::from_json_value(value).map_err(Box::new)?;
geometry_builder
.push_geometry(Some(&(Geometry::try_from(geometry).map_err(Box::new)?)))?;
}
if let Some(value) = value.remove("proj:geometry") {
let geometry = geojson::Geometry::from_json_value(value).map_err(Box::new)?;
let mut cursor = Cursor::new(Vec::new());
wkb::writer::write_geometry(
&mut cursor,
&Geometry::try_from(geometry).map_err(Box::new)?,
&Default::default(),
)?;
proj_geometry_builder.append_value(cursor.into_inner());
}
if let Some(bbox) = value.remove("bbox") {
let bbox = convert_bbox(bbox)?;
Expand Down Expand Up @@ -132,10 +135,14 @@ impl TableBuilder {
// Add the geometries back in.
let mut schema_builder = SchemaBuilder::from(schema.fields());
let mut columns = record_batch.columns().to_vec();
for (key, geometry_builder) in geometry_builders {
let geometry_array = geometry_builder.finish();
columns.push(geometry_array.to_array_ref());
schema_builder.push(geometry_array.data_type().to_field(key, true));
let geometry_array = geometry_builder.finish();
columns.push(geometry_array.to_array_ref());
schema_builder.push(geometry_array.data_type().to_field("geometry", true));
let proj_geometry_array = proj_geometry_builder.finish();
if !proj_geometry_array.is_empty() {
let data_type = proj_geometry_array.data_type().clone();
columns.push(Arc::new(proj_geometry_array));
schema_builder.push(Field::new("proj:geometry", data_type, true));
}
let _ = schema_builder
.metadata_mut()
Expand Down Expand Up @@ -350,4 +357,20 @@ mod tests {
let record_batch = record_batches.pop().unwrap();
let _ = super::with_wkb_geometry(record_batch, "geometry").unwrap();
}

#[test]
fn has_proj_geometry() {
let item: Item =
crate::read("examples/extensions-collection/proj-example/proj-example.json").unwrap();
let table = Table::from_item_collection(vec![item]).unwrap();
let (mut record_batches, _) = table.into_inner();
assert_eq!(record_batches.len(), 1);
let record_batch = record_batches.pop().unwrap();
assert!(
record_batch
.schema()
.column_with_name("proj:geometry")
.is_some()
);
}
}
26 changes: 26 additions & 0 deletions crates/core/src/geoparquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,30 @@ mod tests {
let cursor = Cursor::new(Vec::new());
super::into_writer(cursor, items).unwrap();
}

#[test]
fn no_proj_geometry_metadata() {
let item: Item =
crate::read("examples/extensions-collection/proj-example/proj-example.json").unwrap();
let mut cursor = Cursor::new(Vec::new());
super::into_writer(&mut cursor, vec![item]).unwrap();
let bytes = Bytes::from(cursor.into_inner());
let reader = SerializedFileReader::new(bytes).unwrap();
let key_value = reader
.metadata()
.file_metadata()
.key_value_metadata()
.unwrap()
.iter()
.find(|key_value| key_value.key == "geo")
.unwrap();
let value: serde_json::Value =
serde_json::from_str(key_value.value.as_deref().unwrap()).unwrap();
assert!(
!value["columns"]
.as_object()
.unwrap()
.contains_key("proj:geometry")
);
}
}
Loading