# GeoParquet & Havasu Iceberg With SedonaDB & Wherobots Cloud

This notebook explores the GeoParquet file format for working with vector data and also the Havasu Iceberg table format.

* Examining contents of a GeoParquet file
* Loading GeoParquet files in SedonaDB
* Creating and querying GeoParquet using SedonaDB
* Options for spatial partitioning of GeoParquet files with performance comparisons
* Working with Havasu Iceberg tables 


This notebook is intended to be run in [Wherobots Cloud](https://wherobots.serices). Anyone can [create a free account](https://wherobots.services) to get started.

In [None]:
# Other dependencies are intsalled by default in Wherobots Cloud

!pip install pyarrow

In [None]:
import pyarrow as pa
import pyarrow.parquet
import json
import os
from sedona.spark import *
from pyspark.sql.functions import expr
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.utils.adapter import Adapter

# GeoParquet

[GeoParquet](https://geoparquet.org) is a specification the defines how to storage vector geospatial data using Apache Parquet. Parquet is a columnar data format commonly used with cloud object stores like AWS S3 for efficent storage and retrieval of large scale data. The main concepts of GeoParquet include:

* Specifying how geometries (points, lines, polygons) should be serialized in Parquet files (WKB) and specifying a new Parquet type `geometry`
* Additional metadata that describes the geospatial data stored in each Parquet file

### GeoParquet Metadata & Geometries

The [GeoParquet specification ](https://github.com/opengeospatial/geoparquet) specifies the following types of metadata:

* File metadata
    * **version**, **primary_column**, **columns**
* Column metadata
    * **encoding (WKB)**, **geometry_types**, crs, orientation, edges, bbox, epoch
    
    
Let's look at an example

In [None]:
# Download the example GeoParquet file 

!wget https://github.com/opengeospatial/geoparquet/raw/main/examples/example.parquet

We can inspect the metadata included in this GeoParquet file:

In [None]:
table = pa.parquet.read_table('./example.parquet')
metadata_str = table.schema.metadata[b'geo'].decode('utf-8')
metadata = json.loads(metadata_str)
print(json.dumps(metadata, indent=2))

And to view the schema and GeoParquet file contents:

In [None]:
print(table)

## Load GeoParquet File In SedonaDB

GeoParquet is compatible with any Parquet reader, however readers that implement GeoParquet are able to take advantage of the `geometry` type serialization and associated metadata. Let's load the example GeoParquet file in [SedonaDB.](https://wherobots.com/sedona-db/)


In [None]:
# Configure SedonaContext, specify credentials for AWS S3 bucket(s) (optional)

config = SedonaContext.builder(). \
    config("spark.hadoop.fs.s3a.bucket.wherobots-examples.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"). \
    getOrCreate()

sedona = SedonaContext.create(config)

We'll load the GeoParquet file directly into a SedonaDB Spatial DataFrame and specify a named view for the DataFrame to enable querying using Spatial SQL

In [None]:
df = sedona.read.format("geoparquet").load("s3://wherobots-examples-prod/data/geoparquet/example.parquet")
df.createOrReplaceTempView("example")
df.show()

We can also inspect the schema of the DataFrame

In [None]:
df.printSchema()

We can visualize this data using SedonaKepler, the Kepler.gl integration for SedonaDB and Apache Sedona

In [None]:
SedonaKepler.create_map(df, name="Example")

One of the benefits of GeoParquet is that geometries are loaded directly as geometry types, we don't need to explicitly cast or create them and can directly start working with geometries. Here we use the `ST_Centroid` Spatial SQL function to calculate centroids for each polygon.

In [None]:
sedona.sql("""
SELECT *, ST_Centroid(geometry) AS centroid
FROM example
""").show()

## Creating GeoParquet Files Using SedonaDB

Now that we've seen how to load GeoParquet files in SedonaDB, let's see how to create GeoParquet files. We'll use sample data from [BirdBuddy smart birdfeeders](https://live.mybirdbuddy.com/) that record bird species observations. We'll load this data from a public S3 bucket.


In [None]:
BB_S3_URL = "s3://wherobots-examples-prod/data/examples/birdbuddy_oct23.csv"

The BirdBuddy data is distributed as CSV so we'll need to explicitly convert numeric latitude and longitude fields into a point geometry using the `ST_Point` spatial SQL function.

In [None]:
bb_df = sedona.read.format('csv').option('header','true').option('delimiter', ',').load(BB_S3_URL)
bb_df.show(5, truncate=False)

In [None]:
bb_df = bb_df.selectExpr('ST_Point(CAST(anonymized_longitude AS float), CAST(anonymized_latitude AS float)) AS location', 'CAST(timestamp AS timestamp) AS timestamp', 'common_name', 'scientific_name')
bb_df.createOrReplaceTempView('bb')
bb_df.show(15, truncate=False)

In [None]:
bb_df.printSchema()

In [None]:
bb_df.count()

We can explore and visualize the data, for example let's find all Junco observations.

In [None]:
chickadee_df = sedona.sql("SELECT * FROM bb WHERE common_name = 'mountain chickadee' ")
chickadee_df.show(5, truncate=False)

In [None]:
SedonaKepler.create_map(df=chickadee_df, name='Mountain Chickadee')


A common analysis that we might want to explore with this data is calculating the range of each species. We can do this using the `ST_ConvexHull` spatial SQL function.

In [None]:
range_df = sedona.sql("""
    SELECT common_name, COUNT(*) AS num, ST_ConvexHull(ST_Union_aggr(location)) AS geometry 
    FROM bb 
    WHERE common_name IN ('california towhee', 'steller’s jay', 'mountain chickadee', 'eastern bluebird') 
    GROUP BY common_name 
    ORDER BY num DESC
""")
range_df.show()

In [None]:
SedonaKepler.create_map(df=range_df, name="Bird species range")

Now, let's save the BirdBuddy observation data as GeoParquet, then compare the performance of loading and querying the GeoParquet version of this dataset to the CSV format. We can use SedonaDB's built in GeoParquet writer. Note that we repartition to save a *single* GeoParquet file. We'll explore partitioned GeoParquet next. 

But first we need to configure where we'll be saving our GeoParquet files. By default every Wherobots Cloud account includes access to a private AWS S3 cloud storage bucket accessible to each specific Wherobots Cloud user. We can find the S3 URL for our private S3 storage using the `USER_S3_PATH` environment variable. You can also access, manage, and upload data via the Wherobots Cloud File Browser.

![](https://wherobots.com/wp-content/uploads/2024/01/Wherobots_File_Browser.png)



In [None]:
USER_S3_PATH = os.environ.get("USER_S3_PATH")
print(USER_S3_PATH)

In [None]:
bb_df.repartition(1).write.mode("overwrite").format("geoparquet").save(USER_S3_PATH + "geoparquet/" + "birdbuddy.parquet")

One of the benefits of GeoParquet is efficient data storage thanks to Parquet's encoding and compression. If we compare file sizes to the original CSV:

* CSV format: 425 MB
* GeoParquet format: 40 MB 

Let's load this GeoParquet file in SedonaDB and compare the performance of a spatial filter query.

In [None]:
bb_df_pq = sedona.read.format('geoparquet').load(USER_S3_PATH + "geoparquet/" + "birdbuddy.parquet")

In [None]:
bb_df_pq.show(truncate=False)

In [None]:
# Define a spatial filter to find bird observations within a bounding box

spatial_filter = "ST_Within(location, ST_PolygonFromEnvelope(-112.473156, 33.179649, -111.502991, 33.868652))"

In [None]:
%%time
bb_df.where(spatial_filter).count()

In [None]:
%%time
bb_df_pq.where(spatial_filter).count()

### Partitioned GeoParquet

Can we further improve performance by creating Partitioned GeoParquet files? 

First, we'll need to decide on a partitioning strategy. Options for partioning include:

 * Geohash
 * S2 index
 * Administrative boundary
 
 
 Choosing which partitioning option is an exercise is balancing the number of data files and the spatial distribution 



#### Geohash partitioned GeoParquet

In [None]:
bb_df_geohash = bb_df.withColumn("geohash", expr("ST_GeoHash(location, 2)"))

In [None]:
bb_df_geohash.repartition("geohash").write.mode("overwrite").partitionBy("geohash").format("geoparquet").save(USER_S3_PATH + "geoparquet/" + "birdbuddy_geohash.parquet")

In [None]:
bb_df_geohash_pq = sedona.read.format("geoparquet").load(USER_S3_PATH + "geoparquet/" + "birdbuddy_geohash.parquet")

In [None]:
%%time
bb_df_geohash_pq.where(spatial_filter).count()

#### S2 Index Partitioned GeoParquet

In [None]:
bb_df_pq = bb_df.withColumn("s2", expr("array_max(ST_S2CellIds(location, 2))"))

In [None]:
bb_df_pq.repartition("s2").write.mode("overwrite").partitionBy("s2").format("geoparquet").save(USER_S3_PATH + "geoparquet/" + "birdbuddy_s2.parquet")

In [None]:
bb_df_s2 = sedona.read.format("geoparquet").load(USER_S3_PATH + "geoparquet/" + "birdbuddy_s2.parquet")

In [None]:
%%time
bb_df_s2.where(spatial_filter).count()

#### Administrative Boundary Partitioned GeoParquet

Requires a spatial join with an administrative boundary dataset

In [None]:
S3_NE_ADMIN1_URL = "s3://wherobots-examples-prod/data/ne_10m_admin_1_states_provinces/"

In [None]:
spatialRDD = ShapefileReader.readToGeometryRDD(sedona, S3_NE_ADMIN1_URL)
admin_df = Adapter.toDf(spatialRDD, sedona)
admin_df.createOrReplaceTempView("admins")
admin_df.printSchema()

In [None]:
admin_df.show(5)

In [None]:
bb_admin_df = sedona.sql("""
SELECT bb.location AS location, bb.timestamp AS timestamp, bb.common_name AS common_name, bb.scientific_name AS scientific_name, admins.iso_3166_2 AS state 
FROM bb
JOIN admins 
WHERE ST_Intersects(admins.geometry, bb.location)
""").repartition("state")

bb_admin_df.createOrReplaceTempView("bb_admin")

In [None]:
bb_admin_df.show(5, truncate=False)

In [None]:
sedona.sql("""
WITH distinct_states AS (SELECT DISTINCT state FROM bb_admin)
SELECT COUNT(*) AS num FROM distinct_states
""").show()

In [None]:
state_count_df = sedona.sql("""
WITH states_count AS (SELECT COUNT(*) AS num, state
FROM bb_admin
GROUP BY state
ORDER BY num DESC)
SELECT states_count.num, states_count.state, admins.geometry FROM states_count
JOIN admins ON states_count.state = admins.iso_3166_2
""")

In [None]:
SedonaKepler.create_map(state_count_df, name="Bird observations by state")

Partitioning by state/province is probably not the best option given the number of data files vs the size of the dataset!

In [None]:
#bb_admin_df.write.mode("overwrite").partitionBy("state").format("geoparquet").save(USER_S3_PATH + "geoparquet/" + "birdbuddy_state.parquet")

In [None]:
#bb_admin_state_df = sedona.read.format("geoparquet").load(USER_S3_PATH + "geoparquet/" + "birdbuddy_state.parquet")

In [None]:
#%%time
#bb_admin_state_df.where(spatial_filter).count()

## Creating Spatial Data Lakehouse With Apache Iceberg Havasu

We've seen the benefit of working with GeoParquet, but how do we handle new data updates? What if wanted the developer experience of a database for our GeoParquet files? This is exactly what table formats enable - the "Data Lakehouse", being able to work with a Data Lake (in this case GeoParquet files) like the data warehouse database. We'll make use of the Havasu table format, which extends Apache Iceberg to offer:

* ACID transactions
* schema evolution
* time travel

on spatial data, including geometries and raster data.

Let's see Havasu in action by creating a table using our BirdBuddy observation data.


In [None]:
sedona.sql("DROP TABLE IF EXISTS wherobots.birdbuddy.observations").show()

The catalog is the top-level namespace in Havasu. By default Whereobots Cloud is configured to use the wherobots catalog

In [None]:
sedona.sql("SHOW CATALOGS").show()

Let's create a table that matches the schema of our BirdBuddy observation data

In [None]:
sedona.sql("""
CREATE TABLE IF NOT EXISTS wherobots.birdbuddy.observations (
location GEOMETRY,
timestamp STRING,
common_name STRING,
scientific_name STRING
)
""").show()

We can view all databases (or schemas) in the `wherobots` catalog

In [None]:
sedona.sql("SHOW SCHEMAS IN wherobots").show()

Similarly, we can view all tables in a given schema

In [None]:
sedona.sql("SHOW TABLES IN wherobots.birdbuddy").show()

And we can view the schema of each table. Note the `geometry` data type - this is introduced by Havasu.

In [None]:
sedona.sql("DESCRIBE TABLE wherobots.birdbuddy.observations").show()

We can insert data into our table using SQL

In [None]:
sedona.sql("""
INSERT INTO wherobots.birdbuddy.observations
VALUES (ST_GeomFromText('POINT (-73.96969 40.749244)'), current_timestamp(), 'purple finch', 'haemorhous purpureus')
""").show()

And we can use SQL to query our Havasu tables.

In [None]:
obs_df = sedona.sql("""
SELECT * FROM wherobots.birdbuddy.observations
""")

In [None]:
SedonaKepler.create_map(obs_df, name="Observations")

We can also write Sedona Spatial DataFrames to Havasu tables.

In [None]:
bb_df.writeTo("wherobots.birdbuddy.observations").append()

Havasu supports spatial filter push-down by recording spatial statistics of data files, which includes the bounding box of geometries in each data file. At query time data files can be excluded based on this bounding box. For best performance we will cluster our table by spatial proximity based on the geometry column. Havasu enables this by supporting a Hilbert curve based index.

In [None]:
sedona.sql("CREATE SPATIAL INDEX FOR wherobots.birdbuddy.observations USING hilbert(location, 10)").show()

We can query our table using Spatial SQL functionality

In [None]:
buffer_df = sedona.sql("""
SELECT * FROM wherobots.birdbuddy.observations
WHERE ST_Intersects(location, ST_Buffer(ST_GeomFromText('POINT (-73.96969 40.749244)'), 0.25))
""")

In [None]:
SedonaKepler.create_map(buffer_df, "Bird observations")

Let's compare the performance of our Havasu table to the other formats using the same spatial filter query as above

In [None]:

filter_df = sedona.sql("""
SELECT COUNT(*) AS num FROM wherobots.birdbuddy.observations
WHERE {spatial_filter}
""".format(spatial_filter=spatial_filter))



In [None]:
import time

start_time = time.time()

filter_df.show()

print(f"Execution time: {time.time() - start_time}")

Havasu supports all features of Apache Iceberg and also works with raster data.