In [None]:
from sedona.spark import SedonaContext

config = (
    SedonaContext.builder()
    .config(
        "spark.jars.packages",
        ",".join([
            "org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.7.1",
            "org.datasyslab:geotools-wrapper:1.7.0-28.5",
            "org.apache.hadoop:hadoop-aws:3.3.2"
        ])
    )
    .config("spark.jars.repositories", "https://artifacts.unidata.ucar.edu/repository/unidata-all")        
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
        "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") \
    .config("spark.executor.memory", "12G")
    .config("spark.driver.memory", "12G")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

sedona = SedonaContext.create(config)
sedona.sparkContext.setLogLevel("ERROR")

sedona.conf.set("fs.https.impl", "org.apache.hadoop.fs.http.HttpsFileSystem")

# Part 1: Simple Example

## Create a Dataframe

In [None]:
from pyspark.sql import Row

In [None]:
data = [
    Row(id=1, name="Point A", lat=40.7128, lon=-74.0060),
    Row(id=2, name="Point B", lat=34.0522, lon=-118.2437),
    Row(id=3, name="Point C", lat=37.7749, lon=-122.4194)
]

In [None]:
df = sedona.createDataFrame(data)
df.show()

In [None]:
from pyspark.sql.functions import expr

In [None]:
df_geom = df.withColumn("geom", 
                        expr("ST_Point(cast(lon as Decimal(24, 20)), cast(lat as Decimal(24, 20)))"))

In [None]:
df_geom.show(truncate=False)

## Simple Query

In [None]:
sql = """
SELECT ST_AreaSpheroid(
    ST_GeomFromWKT('Polygon ((34 35, 28 30, 25 34, 34 35))')
) as result
"""

In [None]:
sedona.sql(sql).show(truncate=False)

## Spatial Filter

In [None]:
from pyspark.sql import Row

In [None]:
data = [
    Row(id=1, name="Point A", lat=40.7128, lon=-74.0060),
    Row(id=2, name="Point B", lat=34.0522, lon=-118.2437),
    Row(id=3, name="Point C", lat=37.7749, lon=-122.4194)
]

In [None]:
df = sedona.createDataFrame(data)
df.show()

In [None]:
from pyspark.sql.functions import expr

In [None]:
df_geom = df.withColumn("geom", expr("ST_Point(cast(lon as Decimal(24, 20)), cast(lat as Decimal(24, 20)))"))
df_geom.show(truncate=False)

In [None]:
df_geom.createOrReplaceTempView("points")

In [None]:
sedona.sql("""
SELECT id, name, ST_AsText(geom) AS wkt
FROM points
WHERE ST_Within(geom, ST_GeomFromText('POLYGON((-120 20, -60 20, -60 50, -120 50, -120 20))'))
""").show(truncate=False)

## Same as above, but Pythonic

In [None]:
from sedona.sql.st_constructors import ST_GeomFromText
from sedona.sql.st_functions import ST_AsText
from sedona.sql.st_predicates import ST_Within
from pyspark.sql.functions import lit

In [None]:
polygon_wkt = 'POLYGON((-120 20, -60 20, -60 50, -120 50, -120 20))'
polygon_geom = ST_GeomFromText(lit(polygon_wkt))

In [None]:
df_filtered = df_geom.filter(
    ST_Within(df_geom["geom"], polygon_geom)). \
    select(
        "id", "name", ST_AsText("geom").alias("wkt")
    )

df_filtered.show(truncate=False)

# Part 2: Reading Vector Data

In [None]:
geojson_path = 'data/Neighborhood_Map_Atlas_Neighborhoods.geojson'
geojson_df = sedona.read.format("geojson").load(geojson_path)

In [None]:
geojson_df.printSchema()

In [None]:
from pyspark.sql.functions import expr

In [None]:
geojson_df = (
    sedona.read.format("geojson").option("multiLine", "true").load(geojson_path)
    .selectExpr("explode(features) as features")
    .select("features.*")
    .withColumn("L_HOOD", expr("properties['L_HOOD']"))
    .withColumn("OBJECTID", expr("properties['OBJECTID']"))
    .withColumn("S_HOOD", expr("properties['S_HOOD']"))
    .withColumn("S_HOOD_ALT_NAMES", expr("properties['S_HOOD_ALT_NAMES']"))
    .drop("properties").drop("type")
)

In [None]:
geojson_df.printSchema()

In [None]:
geojson_df.count()

In [None]:
df = (
    sedona.read.format("geopackage")
    .option("showMetadata", "true")
    .load("data/parks.gpkg")
)
df.show()

In [None]:
df = (
    sedona.read.format("geopackage")
    .option("tableName", "parks")
    .load("data/parks.gpkg")
)
df.show()

In [None]:
shapefile_path = 'data/parks/'
shapefile_df = sedona.read.format("shapefile").load(shapefile_path)

In [None]:
# CSV - https://www.kaggle.com/datasets/andykrause/kingcountysales
csv_path = 'data/kingco_sales.csv'
csv_df = sedona.read.format("csv").load(csv_path)
csv_df.show(3)

In [None]:
# Read CSV with header and inferSchema
csv_df = sedona.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_path)

csv_df.show(3)

In [None]:
from pyspark.sql.functions import col

In [None]:
csv_df = csv_df.selectExpr(
    "*", "ST_Point(longitude, latitude) as geometry")

In [None]:
csv_df.show(3)

In [None]:
csv_df.count()

In [None]:
# GeoParquet
geoparquet_path = 'data/Seattle_Transportation_Plan_Bicycle_Element_829016546957421557.parquet'
geoparquet_df = sedona.read.format("geoparquet").load(geoparquet_path)
geoparquet_df.show(3)

In [None]:
geoparquet_df.selectExpr('ST_SRID(geometry)').show(1)

In [None]:
geoparquet_df.selectExpr('''ST_Transform(geometry, 'EPSG:2926', 'EPSG:4326')''').show(1)

In [None]:
# GeoParquet
geoparquet_schools_path = 'data/Seattle_Public_Schools_Sites_2023-2024.parquet'
geoparquet_schools_df = sedona.read.format("geoparquet").load(geoparquet_schools_path)
geoparquet_schools_df.show(3)

# Part 3: Loading Raster Data

In [None]:
netcdf_file = "data/pdsi_current_PRISM.nc"
netcdf_df = sedona.read.format("binaryFile").load(netcdf_file)
netcdf_df.show()

In [None]:
import pyspark.sql.functions as f

In [None]:
netcdf_df.repartition(8)

In [None]:
netcdf_df = netcdf_df.withColumn("raster", f.expr("RS_FromNetCDF(content, 'data', 'longitude', 'latitude')"))

In [None]:
netcdf_df.show()

In [None]:
# https://radiantearth.github.io/stac-browser/#/external/earth-search.aws.element84.com/v1/collections/sentinel-2-l2a/items/S2A_10TET_20250423_0_L2A?.asset=asset-nir

## GeoTIFF

In [None]:
# https://browser.apex.esa.int/external/eoresults.esa.int/stac/collections/ESA_WORLDCOVER_10M_2021_V2/items/ESA_WorldCover_10m_2021_v200_N45W123?.asset=asset-esa_worldcover_10m_map

In [None]:
raster_df = sedona.read.format("binaryFile") \
    .load("data/ESA_WorldCover_10m_2021_v200_N45W123_Map.tif")

In [None]:
raster_df = raster_df.withColumn("raster", f.expr("RS_FromGeoTiff(content)"))

In [None]:
# raster_df.show()

In [None]:
raster_df.repartition(8)

In [None]:
raster_df.selectExpr("RS_TileExplode(raster, 256, 256) as (x, y, raster)").show()

# Part 4: PySpark and SparkSQL

In [None]:
shapefile_df.printSchema()

In [None]:
csv_df.printSchema()

In [None]:
shapefile_df.selectExpr('ST_SRID(geometry)').first()

In [None]:
csv_df.selectExpr('ST_SRID(geometry)').first()

In [None]:
shapefile_df.createOrReplaceTempView("polygons")
csv_df.createOrReplaceTempView("points")

In [None]:
%%time
sedona.sql("""
    SELECT p.sale_id AS point_id, poly.NAME AS park_name
    FROM points p
    JOIN polygons poly
    ON ST_DWithin(
        ST_Transform(poly.geometry, 'EPSG:2926', 'EPSG:26910'), 
        ST_Transform(p.geometry, 'EPSG:4326', 'EPSG:26910'), 
    30)
""").count()

In [None]:
from pyspark.sql.functions import expr

# Transform geometries to EPSG:26910 (UTM zone), then apply ST_DWithin
transformed_points = csv_df.withColumn(
    "geom_utm", expr("ST_Transform(geometry, 'EPSG:4326', 'EPSG:26910')")
)

transformed_polygons = shapefile_df.withColumn(
    "geom_utm", expr("ST_Transform(geometry, 'EPSG:2926', 'EPSG:26910')")
)

# Perform spatial join using ST_DWithin
joined_df = transformed_points.alias("p").join(
    transformed_polygons.alias("poly"),
    expr("ST_DWithin(poly.geom_utm, p.geom_utm, 30)")
)

# Count matching pairs
joined_df.count()

# Part 5: DataFrame Operations

In [None]:
csv_df.show(5, truncate=False)

In [None]:
csv_df.printSchema()

In [None]:
csv_df.select("sale_id", "sale_price").show()

In [None]:
csv_df.count()

In [None]:
points_df = csv_df.select("sale_id", "sale_price")

In [None]:
points_df.first()

In [None]:
points_df = csv_df.select("sale_id", "sale_price")

In [None]:
points_df.orderBy("sale_price").show()

In [None]:
points_df.filter("sale_price > 500000").show()

# Part 6: Vector and Raster Viz

In [None]:
from sedona.spark import SedonaUtils

# Convert raster to base64 image for inline visualization
htmlDF = raster_df.selectExpr("RS_AsImage(raster, 500) as raster_image")

# Display in notebook (Jupyter or Databricks)
SedonaUtils.display_image(htmlDF)

In [None]:
from sedona.spark import SedonaKepler

map = SedonaKepler.create_map()
SedonaKepler.add_df(map, geoparquet_schools_df, name="Seattle Schools")
map

# Part 7: Vector Functions

In [None]:
from sedona.sql.st_functions import ST_Buffer, ST_Transform, ST_ConvexHull

In [None]:
from pyspark.sql.functions import col, lit

buffered_df = geoparquet_schools_df.select(
    ST_Buffer(
        ST_Transform(col("geometry"), lit("EPSG:4326"), lit("EPSG:26910")),
        100
    ).alias("buffer")
)

buffered_df.show(3)

In [None]:
map = SedonaKepler.create_map()
SedonaKepler.add_df(map, buffered_df, name="Seattle Schools Buffer")
map

In [None]:
schools_df = geoparquet_schools_df.selectExpr("*", "ST_Transform(geometry, 'EPSG:4326', 'EPSG:26910') as geom") \
    .drop("geometry") \
    .withColumnRenamed("geom", "geometry")

In [None]:
shapefile_df.createOrReplaceTempView('parks')

In [None]:
centroid = sedona.sql('''select st_centroid(geometry) from parks''')
centroid.show(3)

In [None]:
area = sedona.sql('''select st_area(geometry) from parks''')
area.show(3)

In [None]:
geoparquet_df.createOrReplaceTempView('bikes')

In [None]:
length = sedona.sql('''select st_length(geometry) from bikes''')
length.show(3)

In [None]:
hull_df = shapefile_df.select(
    "geometry",
    ST_ConvexHull("geometry").alias("convex_hull")
)
hull_df.show(3)

In [None]:
# Space Needle - -122.3494692, 47.6205426

In [None]:
from pyspark.sql.functions import lit

dist_df = sedona.sql('''
select ST_Distance(geometry, 
                     ST_Transform(
                        ST_SetSRID(
                            ST_Point(-122.3494692, 47.6205426), 4326), 'EPSG:26910')) as distance
from parks                     
''')

dist_df.show(3)

In [None]:
h3_df = sedona.sql('''
select ST_H3CellIDs(geometry, 8, true) as h3
from parks
limit 3
''')

h3_df.show()

In [None]:
sedona.sql("""
SELECT ST_3DDistance(
    ST_GeomFromText('POINT Z (0 0 0)'),
    ST_GeomFromText('POINT Z (3 4 14)')
) AS dist_3d
""").show()

# Part 8: Spatial Predicates

In [None]:
csv_df.createOrReplaceTempView('homes')
geojson_df.createOrReplaceTempView('neighborhoods')

In [None]:
geojson_df.show(3)

In [None]:
sedona.sql('''
select h.sale_id, st_contains(n.geometry, h.geometry) in_ballard
from homes h
join neighborhoods n on st_contains(n.geometry, h.geometry)
where n.S_HOOD = 'Ballard'
''').show(3)

In [None]:
sedona.sql('''
select h.sale_id
from homes h
join neighborhoods n on st_dwithin(
    st_transform(n.geometry, 'EPSG:4326', 'EPSG:26910'), 
    st_transform(h.geometry, 'EPSG:4326', 'EPSG:26910'),
    500)
where n.S_HOOD = 'Ballard'
''').count()

In [None]:
sedona.sql('''SELECT ST_Contains(
  ST_GeomFromText('POLYGON((0 0, 0 10, 10 10, 10 0, 0 0))'),
  ST_GeomFromText('POINT(5 5)')
) AS contains
''').show()

In [None]:
sedona.sql('''SELECT ST_Contains(
  ST_GeomFromText('POLYGON((0 0, 0 10, 10 10, 10 0, 0 0))'),
  ST_GeomFromText('POINT(0 0)')
) AS contains
''').show()

In [None]:
sedona.sql('''SELECT ST_Intersects(
  ST_GeomFromText('POLYGON((0 0, 0 10, 10 10, 10 0, 0 0))'),
  ST_GeomFromText('LINESTRING(5 5, 15 5)')
) AS intersects''').show()

In [None]:
sedona.sql('''SELECT ST_Intersects(
  ST_GeomFromText('POLYGON((0 0, 0 10, 10 10, 10 0, 0 0))'),
  ST_GeomFromText('POINT(0 0)')
) AS intersects
''').show()

In [None]:
sedona.sql('''SELECT ST_Touches(
  ST_GeomFromText('POLYGON((0 0, 0 10, 10 10, 10 0, 0 0))'),
  ST_GeomFromText('POINT(0 0)')
) AS intersects
''').show()

In [None]:
sedona.sql('''SELECT ST_Touches(
  ST_GeomFromText('POLYGON((0 0, 0 10, 10 10, 10 0, 0 0))'),
  ST_GeomFromText('POINT(5 5)')
) AS intersects
''').show()

# Part 9: Write Data

In [None]:
# Write a single GeoJSON file (e.g., for download or web map use)
geojson_output_path = "parks.geojson"

shapefile_df.coalesce(1).write \
    .mode("overwrite") \
    .format("geojson") \
    .save(geojson_output_path)

In [None]:
# Repartition to 10 files for parallel writes
csv_df.repartition(10).write \
    .mode("overwrite") \
    .format("geoparquet") \
    .save("data/output_geoparquet_10_parts")

In [None]:
csv_df.write \
    .mode("overwrite") \
    .format("geoparquet") \
    .partitionBy("city") \
    .save("data/output_geoparquet_partitioned")

In [None]:
# output_df.write \
#     .mode("overwrite") \
#     .format("geoparquet") \
#     .save("s3a://your-bucket-name/path/to/output/")

# Part 10: Nearest Neighbor Join

In [None]:
geoparquet_schools_df.repartition(8)

In [None]:
geoparquet_schools_df.createOrReplaceTempView('schools')

In [None]:
nearest = sedona.sql('''
select schools.school_name, bikes.id
from schools
join bikes on ST_KNN(
    schools.geometry,
    bikes.geometry,
    3,
    true
)
''')

In [None]:
nearest.show(9)

# Part 11: Raster Functions

In [None]:
sample_area = 'POLYGON((-122.3608417185 47.6460037086, -122.3581854589 47.6460037086, -122.3581854589 47.643222509, -122.3608417185 47.643222509, -122.3608417185 47.6460037086))'

In [None]:
raster_df = raster_df.selectExpr("RS_TileExplode(raster, 256, 256) as (x, y, raster)")

In [None]:
raster_df.createOrReplaceTempView('landcover')

In [None]:
centroids = sedona.sql(f'''
select RS_PixelAsCentroids(raster, 1)
from landcover
where rs_intersects(raster, st_geomfromtext('{sample_area}'))
''')

In [None]:
centroids.show(3)

In [None]:
raster_df.createOrReplaceTempView('landcover')

In [None]:
values = sedona.sql(f'''
select RS_Value(raster, ST_Point(-122.373894, 47.634587))
from landcover
where rs_intersects(raster, st_geomfromtext('{sample_area}'))
''')

In [None]:
values.show()

In [None]:
raster_tiles = raster_df.selectExpr("RS_TileExplode(raster, 256, 256) as (x, y, raster)")

In [None]:
raster_tiles.createOrReplaceTempView('landcover_tiles')

In [None]:
raster_tiles.show()

In [None]:
stats = sedona.sql('''SELECT RS_SummaryStatsAll(raster) AS stats
FROM landcover_tiles
where x = 0 and y = 0''')
stats.show(truncate=False)

In [None]:
metadata = sedona.sql('''SELECT RS_MetaData(raster) AS stats
FROM landcover_tiles
where x = 0 and y = 0''')
metadata.show(truncate=False)

# Part 12: Raster Map Algerbra (NDVI)

In [None]:
# # https://radiantearth.github.io/stac-browser/#/external/earth-search.aws.element84.com/v1/collections/sentinel-2-l2a/items/S2A_10TET_20250423_0_L2A?.asset=asset-nir

In [None]:
from pyspark.sql import functions as f

In [None]:
red_df = sedona.read.format("binaryFile") \
    .load("data/B04.tif")

red_df = red_df.withColumn("raster", f.expr("RS_FromGeoTiff(content)"))

In [None]:
nir_df = sedona.read.format("binaryFile") \
    .load("data/B08.tif")

nir_df = nir_df.withColumn("raster", f.expr("RS_FromGeoTiff(content)"))

In [None]:
red_df.createOrReplaceTempView('red')
nir_df.createOrReplaceTempView('nir')

In [None]:
union = sedona.sql('''
select RS_Union(red.raster, nir.raster) as raster from red, nir
''')

In [None]:
union.createOrReplaceTempView('union')

In [None]:
# NDVI = (NIR - Red) / (NIR + Red)

In [None]:
ndvi = sedona.sql('''
SELECT 
  RS_MapAlgebra(
    raster,                   
    'D',             
    'out = (rast[1] - rast[0])/(rast[1] + rast[0]);'
  ) AS ndvi
FROM union
''')

In [None]:
ndvi.show()

In [None]:
from sedona.spark import SedonaUtils

# Convert raster to base64 image for inline visualization
htmlDF = ndvi.selectExpr("RS_AsImage(ndvi, 500) as raster_image")

# Display in notebook (Jupyter or Databricks)
SedonaUtils.display_image(htmlDF)

# Part 13: Raster Writer

In [None]:
# ndvi.withColumn("raster_binary", expr("RS_AsGeoTiff(ndvi)"))\
#   .write.format("raster")\
#   .option("rasterField", "raster_binary")\
#   .option("pathField", "path")\
#   .option("fileExtension", ".tiff")\
#   .mode("overwrite")\
#   .save("my_raster_file") 

# Part 14: Zonal Stats

In [None]:
ndvi.createOrReplaceTempView('ndvi')

In [None]:
park = 'POLYGON((-122.3607955072 47.6460058027, -122.3582580324 47.6460058027, -122.3582580324 47.6432482444, -122.3607955072 47.6432482444, -122.3607955072 47.6460058027))'

In [None]:
parks_ndvi = sedona.sql(f'''
select 
RS_ZonalStats(ndvi.ndvi, st_transform(
    st_geomfromtext('{park}'), 'epsg:4326', 'epsg:32610'
    ), 1, 'avg', false, false) as avg_ndvi
from ndvi
''')

In [None]:
ndvi = ndvi.repartition(8)

In [None]:
parks_ndvi.show(3)