# Spatial Join with H3 and GeoAnalytics

Using H3 indices to lower the computational load when performing a spatial join between points and polygons.

## Setup and Initialization

### Imports

In [None]:
import json
from pathlib import Path

import geoanalytics_fabric.sql.functions as fns_ga
from geoanalytics_fabric.sql import Polygon
from h3.api import basic_int as h3_int
from pyspark.sql import SparkSession
import pyspark.sql.functions as fns
from pyspark.sql.types import ArrayType, IntegerType, StringType, LongType

StatementMeta(, ea3b6b13-c2f8-4484-aeb1-7bda22e32c2d, 5, Finished, Available, Finished)

### Parameters for Integration into Pipeline

In [None]:
# year=2025
# month=7
# day=8

# delivery date parts
year=0
month=0
day=0

# h3 resolution
h3_res = 9
h3_col = f"h3_{h3_res:02d}"

# US States in Living Atlas Feature Layer URL with WA query applied
poly_url = "https://services.arcgis.com/P3ePLMYs2RVChkJx/ArcGIS/rest/services/USA_Boundaries_2023/FeatureServer/1"

StatementMeta(, ea3b6b13-c2f8-4484-aeb1-7bda22e32c2d, 6, Finished, Available, Finished)

### Introspectively Get Most Recent Year, Month and Day

In [None]:
def get_most_recent_year_month_day():
    # path to lakehouse data available in notebook
    pqt_pth = Path("/lakehouse/default/Files/raw")

    # retrieve the most current listed path in the 
    pth_lst = [pth for pth in pqt_pth.glob("dt=*")]
    pth_lst.sort()
    cur_pth = pth_lst[-1]

    # pull out the date parts
    year, month, day = [int(val) for val in cur_pth.stem.lstrip('dt=').split('-')]

    return year,  month, day

# if year, month and day not provied as notbook paramaters, read from storage
if year == 0:
    year, month, day = get_most_recent_year_month_day()

StatementMeta(, ea3b6b13-c2f8-4484-aeb1-7bda22e32c2d, 7, Finished, Available, Finished)

### Create Paths Based on Year, Month and Day

In [None]:
# create the datetimes string
dt_str = f'{year}-{month:02d}-{day:02d}'

# create the paths based on the delivery date
source_path = f'Files/raw/dt={year:04d}-{month:02d}-{day:02d}/places/parquet'
overlay_pth = f'Files/interim/places/overlay/parquet/delivery_year={year:04d}/delivery_month={month:02d}'
nooverlay_pth = f'Files/interim/places/nooverlay/parquet/delivery_year={year:04d}/delivery_month={month:02d}'

StatementMeta(, ea3b6b13-c2f8-4484-aeb1-7bda22e32c2d, 8, Finished, Available, Finished)

### Ensure Spark Session Active

In [None]:
spark = SparkSession.builder.getOrCreate()

spark

StatementMeta(, ea3b6b13-c2f8-4484-aeb1-7bda22e32c2d, 9, Finished, Available, Finished)

## Read and Set Up Foursquare Data

In [None]:
# create the source path based on the delivery date
source_path = f'Files/raw/dt={dt_str}/places/parquet'

# read the data into a PySpark data frame
df = spark.read.parquet(source_path)

# add date columns
df = (df
    .withColumn('dt', fns.to_date(fns.lit(dt_str)))
    .withColumn('delivery_year', fns.lit(year))
    .withColumn('delivery_month', fns.lit(month))
)

# add geoanalytics point geometry
df = df.withColumn(
    'geometry', 
    fns_ga.point_from_binary(fns.col('geom'), sr=4326)
)

# add h3 indices
df = df.withColumn(h3_col, fns_ga.bin_id(fns_ga.h3_bin('geometry', h3_res)))

# keep foursquare id column consistent with past schema
df = df.withColumn(
    'fsq_id', fns.col('fsq_place_id')
)

# don't need the bounding box
df = df.drop(*['bbox', 'fsq_place_id', 'geom'])

# filter to just washington state
df = df.filter(
    (fns.col('country') == 'US')
    & (fns.col('region') == 'WA')
)

print(f'POI Count: {df.count():,}')
df.printSchema()
df.show(2, vertical=True, truncate=False)

StatementMeta(, ea3b6b13-c2f8-4484-aeb1-7bda22e32c2d, 10, Finished, Available, Finished)

POI Count: 499,878
root
 |-- name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- address: string (nullable = true)
 |-- locality: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- admin_region: string (nullable = true)
 |-- post_town: string (nullable = true)
 |-- po_box: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_created: string (nullable = true)
 |-- date_refreshed: string (nullable = true)
 |-- date_closed: string (nullable = true)
 |-- tel: string (nullable = true)
 |-- website: string (nullable = true)
 |-- email: string (nullable = true)
 |-- facebook_id: long (nullable = true)
 |-- instagram: string (nullable = true)
 |-- twitter: string (nullable = true)
 |-- fsq_category_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fsq_category_labels: array (nullable = true)
 |    |-- element: string (con

## Get Washington State From Living Atlas

In [None]:
df_poly = (
    spark.read.format("feature-service").load(poly_url)
    .select(
        fns.col("STATE_ABBR").alias("iso2"),
        fns.col("shape").alias("geometry")
    )
    .filter(fns.col("iso2") == "WA")
)

df_poly.printSchema()
df_poly.show()

StatementMeta(, ea3b6b13-c2f8-4484-aeb1-7bda22e32c2d, 11, Finished, Available, Finished)

root
 |-- iso2: string (nullable = true)
 |-- geometry: polygon (nullable = true)



+----+--------------------+
|iso2|            geometry|
+----+--------------------+
|  WA|{"rings":[[[-122....|
+----+--------------------+



## Get H3 Indices Completely Within Polygon Geometry

In [None]:
@fns.udf(returnType=ArrayType(LongType()))
def poly_to_h3_within(geo_interface_geom: str) -> list[int]:
    """Get the H3 indices for an input polygon completely contained inside the polygon."""

    # if not a dictionary, convert to dictionary
    if not isinstance(geo_interface_geom, dict):
        geo_interface_geom = json.loads(geo_interface_geom)

    # convert the __geo_interface__ geometry (GeoJSON) to the h3 geometry
    h3_geom = h3_int.geo_to_h3shape(geo_interface_geom)

    # get the h3 indices completely within the polygon
    h3_idx_lst: list[int] = h3_int.h3shape_to_cells_experimental(h3_geom, res=h3_res, contain="full")

    return h3_idx_lst


df_poly_h3 = df_poly.select(
    fns.explode(
        poly_to_h3_within(
            fns_ga.as_geojson('geometry')
        )
    ).alias(h3_col),
    fns.col('iso2').alias('region_h3')
)

print(f"H3 Count: {df_poly_h3.count():,}")
df_poly_h3.printSchema()
df_poly_h3.show()

StatementMeta(, ea3b6b13-c2f8-4484-aeb1-7bda22e32c2d, 12, Submitted, Running, Running)

H3 Count: 1,827,520
root
 |-- h3_09: long (nullable = true)
 |-- region_h3: string (nullable = true)

+------------------+---------+
|             h3_09|region_h3|
+------------------+---------+
|617713351682949119|       WA|
|617713351690551295|       WA|
|617713351690813439|       WA|
|617713351691075583|       WA|
|617713351693434879|       WA|
|617713351693697023|       WA|
|617713351694745599|       WA|
|617713352208809983|       WA|
|617713332095287295|       WA|
|617713332098695167|       WA|
|617713332098957311|       WA|
|617713332099219455|       WA|
|617713332099481599|       WA|
|617713352304754687|       WA|
|617713352305016831|       WA|
|617713352305278975|       WA|
|617713352305541119|       WA|
|617713352306065407|       WA|
|617713332996276223|       WA|
|617713332997849087|       WA|
+------------------+---------+
only showing top 20 rows



## Identify Points Inside H3 Indices Wholly Within Washington

In [None]:
df_join = df.join(fns.broadcast(df_poly_h3), on=h3_col, how='left').cache()

df_join.printSchema()

StatementMeta(, , -1, Waiting, , Waiting)

root
 |-- h3_09: long (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- address: string (nullable = true)
 |-- locality: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- admin_region: string (nullable = true)
 |-- post_town: string (nullable = true)
 |-- po_box: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_created: string (nullable = true)
 |-- date_refreshed: string (nullable = true)
 |-- date_closed: string (nullable = true)
 |-- tel: string (nullable = true)
 |-- website: string (nullable = true)
 |-- email: string (nullable = true)
 |-- facebook_id: long (nullable = true)
 |-- instagram: string (nullable = true)
 |-- twitter: string (nullable = true)
 |-- fsq_category_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fsq_category_labels: array (nullable = true)
 |    |-- elem

In [None]:
df_outer = df_join.filter(fns.col("region_h3").isNull())
df_inner = df_join.filter(fns.col("region_h3").isNotNull())

print(f"Outer Count: {df_outer.count():,}")
print(f"Inner Count: {df_inner.count():,}")

StatementMeta(, , -1, Waiting, , Waiting)

Outer Count: 25,932
Inner Count: 473,946


## Use Geometry Intersect with Outer Points

In [None]:
df_ovr = df_poly.select(
    fns.col('iso2').alias('region_ovr'),
    fns.col('geometry').alias('poly_geom')
)

df_ovr.printSchema()

StatementMeta(, , -1, Waiting, , Waiting)

root
 |-- region_ovr: string (nullable = true)
 |-- poly_geom: polygon (nullable = true)



In [None]:
df_sptl_jn = (
    df_outer.join(
        other=df_ovr,
        on=fns_ga.intersects(
            geometry1=df_outer["geometry"], 
            geometry2=df_ovr["poly_geom"]
        ),
        how="left",
    )
    .drop('poly_geom')
).cache()

df_sptl_jn.printSchema()
df_sptl_jn.show(5, vertical=True, truncate=False)

StatementMeta(, , -1, Waiting, , Waiting)

root
 |-- h3_09: long (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- address: string (nullable = true)
 |-- locality: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- admin_region: string (nullable = true)
 |-- post_town: string (nullable = true)
 |-- po_box: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_created: string (nullable = true)
 |-- date_refreshed: string (nullable = true)
 |-- date_closed: string (nullable = true)
 |-- tel: string (nullable = true)
 |-- website: string (nullable = true)
 |-- email: string (nullable = true)
 |-- facebook_id: long (nullable = true)
 |-- instagram: string (nullable = true)
 |-- twitter: string (nullable = true)
 |-- fsq_category_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fsq_category_labels: array (nullable = true)
 |    |-- elem

In [None]:
df_sptl_jn_not = df_sptl_jn.filter(fns.col('region_ovr').isNotNull())
df_sptl_jn_null = df_sptl_jn.filter(fns.col('region_ovr').isNull())

print(f"Overlay in WA Count: {df_sptl_jn_not.count():,}")
print(f"Overlay not in WA Count: {df_sptl_jn_null.count():,}")

StatementMeta(, , -1, Waiting, , Waiting)

Overlay in WA Count: 23,666
Overlay not in WA Count: 2,266
