In [1]:
#Requirements for the spark workflow
from sedona.spark import *
from pyspark.sql.functions import col, count, countDistinct
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
from sedona.spark.sql.st_constructors import ST_Point
from sedona.spark.sql.st_functions import GeometryType
from sedona.spark import SedonaKepler
from pyspark.sql import functions as F
from sedona.spark.geopandas import GeoDataFrame, read_parquet

import sys, os
from shapely.geometry import Point
from itertools import product
import sedona
from sedona.spark import SedonaContext

import sedona.db
import numpy as np



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/06 20:33:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# For anonymous access to public S3 buckets
#sd_cont is needed to read all the csv in
sd_cont = (
    SedonaContext.builder()
    .config(
        "spark.hadoop.fs.s3a.bucket.bucket-name.aws.credentials.provider",
        "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider",
    )
    .getOrCreate()
)


sd = SedonaContext.create(sd_cont)

25/11/06 20:33:38 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/11/06 20:33:39 WARN UDTRegistration: Cannot register UDT for org.geotools.coverage.grid.GridCoverage2D, which is already registered.
25/11/06 20:33:39 WARN SimpleFunctionRegistry: The function rs_union_aggr replaced a previously registered function.
25/11/06 20:33:39 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.geom.Geometry, which is already registered.
25/11/06 20:33:39 WARN UDTRegistration: Cannot register UDT for org.apache.sedona.common.S2Geography.Geography, which is already registered.
25/11/06 20:33:39 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.index.SpatialIndex, which is already registered.
25/11/06 20:33:39 WARN SimpleFunctionRegistry: The function st_envelope_aggr replaced a previously registered function.
25/11/06 20:33:39 WARN SimpleFunctionRegistry: The function st_intersection_aggr replaced a previously 

In [3]:

# Path to the directory containing your CSV files
directory_path = "../2024/"

df = sd_cont.read.option("header", True).format("csv").load(directory_path)


                                                                                

In [4]:
#I'm not going to use sedonaDB because the df is 8.7gb (from 70gb in text files)
#and I only have 12 gb memmory allocted to docker
df_p_diff.explain('cost')

NameError: name 'df_p_diff' is not defined

In [5]:
#Setting the ST_Point after getting unique values
df_p_diff = df.select("STATION", "DATE", "LATITUDE", "LONGITUDE", "ELEVATION", "NAME", "REPORT_TYPE", "SOURCE", "HourlyDryBulbTemperature", "HourlyPressureChange", "HourlyPressureTendency", "HourlySeaLevelPressure", "HourlyStationPressure", "HourlyWindDirection", "HourlyWindGustSpeed", "HourlyWindSpeed").filter(col("HourlyPressureChange").isNotNull())


In [6]:
from sedona.spark import KNNQuery
from shapely.geometry import Point
#This has to be build with unique locations, or at least stations
df_stations = df_p_diff.select("STATION", "LONGITUDE", "LATITUDE").distinct()
df_stations = df_stations.select(ST_Point(col("LONGITUDE"), col("LATITUDE")).alias("GEOMETRY"), "STATION")
#Don't need the RDD because I'm using queries.
#Xi = StructuredAdapter.toSpatialRdd(df_stations, "GEOMETRY")


In [None]:
#Figure out why the number of paritions isn't changing.
#spatialRDD.analyze()
#spatialRDD.spatialPartitioning(GridType.KDBTREE, 2500)
#print(spatialRDD.rawSpatialRDD.getNumPartitions())

In [7]:
# 2. Define a coordinate grid with a 0.5-degree step
longitude_step = .5
latitude_step = .5

longitudes = [i * longitude_step for i in range(int(-180 / longitude_step), int(180 / longitude_step) + 1)]
latitudes = [i * latitude_step for i in range(int(-90 / latitude_step), int(90 / latitude_step) + 1)]

# 3. Generate a list of all coordinate pairs
coordinate_pairs = list(product(longitudes, latitudes))

# 4. Create a Spark DataFrame from the list of coordinates
schema = ["longitude", "latitude"]
df_lat_lon = sd.createDataFrame(coordinate_pairs, schema=schema)

# 5. Create the Sedona geometry points
# ST_Point takes longitude first, then latitude.
spatial_df = df_lat_lon.withColumn(
    "GEOMETRY",
    F.expr(f"ST_Point(longitude, latitude)")
)
X = StructuredAdapter.toSpatialRdd(spatial_df, "GEOMETRY")
# Show the resulting DataFrame
print("Generated spatial DataFrame:")
spatial_df.show(5)
spatial_df.printSchema()

Generated spatial DataFrame:
+---------+--------+------------------+
|longitude|latitude|          GEOMETRY|
+---------+--------+------------------+
|   -180.0|   -90.0|  POINT (-180 -90)|
|   -180.0|   -89.5|POINT (-180 -89.5)|
|   -180.0|   -89.0|  POINT (-180 -89)|
|   -180.0|   -88.5|POINT (-180 -88.5)|
|   -180.0|   -88.0|  POINT (-180 -88)|
+---------+--------+------------------+
only showing top 5 rows

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- GEOMETRY: geometry (nullable = true)



In [8]:
df_stations.createOrReplaceTempView("stations")
spatial_df.createOrReplaceTempView("interp")

25/11/06 20:35:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [10]:
%%time
#Adapted From https://wherobots.com/blog/introducing-knn-join-for-wherobots-and-apache-sedona/

df_knn_join = sd.sql("""
SELECT
        stations.GEOMETRY AS stations_GEOM,
        stations.STATION as station,
        interp.GEOMETRY AS interp_GEOM,
        ST_DISTANCESPHERE(stations.GEOMETRY, interp.GEOMETRY) AS DISTANCE
FROM stations
JOIN interp ON ST_KNN(stations.GEOMETRY, interp.GEOMETRY, 10, FALSE)
""")
df_knn_join.show(5)

[Stage 10:>                                                       (0 + 10) / 10]

+--------------------+-----------+-------------------+------------------+
|       stations_GEOM|    station|        interp_GEOM|          DISTANCE|
+--------------------+-----------+-------------------+------------------+
|POINT (-56.016666...|86960099999|  POINT (-56 -28.5)|16758.474764314607|
|POINT (-56.016666...|86960099999|    POINT (-56 -29)| 38952.12593791301|
|POINT (-56.016666...|86960099999|POINT (-56.5 -28.5)| 50058.20482966996|
|POINT (-56.016666...|86960099999|POINT (-55.5 -28.5)|  53138.2717987768|
|POINT (-56.016666...|86960099999|  POINT (-56.5 -29)| 61087.06711078973|
+--------------------+-----------+-------------------+------------------+
only showing top 5 rows

CPU times: user 947 ms, sys: 143 ms, total: 1.09 s
Wall time: 1min 54s


25/11/06 20:38:23 WARN Executor: Managed memory leak detected; size = 36816692 bytes, task 0.0 in stage 11.0 (TID 11203)
                                                                                

In [11]:
df_knn_join.write.format("geoparquet").mode("overwrite").save("./knn_join")

                                                                                

In [13]:
df_pq = sedona.read.format("geoparquet").load("/tmp/somewhere")
df_pq.show(5)

+--------------------+-----------+------------------+------------------+
|       stations_GEOM|    station|       interp_GEOM|          DISTANCE|
+--------------------+-----------+------------------+------------------+
|POINT (125.7 38.0...|47069099999|  POINT (125.5 38)| 17908.35967397921|
|POINT (125.7 38.0...|47069099999|    POINT (126 38)|26540.968944462053|
|POINT (125.7 38.0...|47069099999|POINT (125.5 38.5)| 54749.88082080413|
|POINT (125.7 38.0...|47069099999|  POINT (126 38.5)|58126.040500040974|
|POINT (125.7 38.0...|47069099999|POINT (125.5 37.5)| 61854.87083688271|
+--------------------+-----------+------------------+------------------+
only showing top 5 rows



In [14]:
df_pq.createOrReplaceTempView("stations_knn")
df_p_diff.createOrReplaceTempView("pressure_diff")

In [19]:
%%time
df_iwd_grid = sd.sql("""
SELECT 
    stations_knn.interp_GEOM,
    SUM(pressure_diff.HourlyPressureChange / POWER(stations_knn.DISTANCE, 2)) / SUM(1 / POWER(stations_knn.DISTANCE, 2)) AS interpolated_value
    from 
      stations_knn
    inner join 
      pressure_diff 
    on 
      stations_knn.station = pressure_diff.station
    group by stations_knn.interp_geom
""")
df_iwd_grid.show(5)



+----------------+--------------------+
|     interp_GEOM|  interpolated_value|
+----------------+--------------------+
|POINT (4.5 53.5)|-0.00546123837219...|
|  POINT (5 51.5)|-0.00391270389632...|
|POINT (3.5 51.5)|-0.00377550987762...|
|  POINT (5 52.5)|-0.00564268740976...|
|  POINT (6 53.5)|-0.00628782606006...|
+----------------+--------------------+
only showing top 5 rows

CPU times: user 777 ms, sys: 89.4 ms, total: 867 ms
Wall time: 2min 4s


                                                                                

In [None]:
#query logic
join knn to pressure on station
group by stations_GEOM
calculate field like this
SUM(pressure_diff.HourlyPressureChange / POWER(stations_knn.DISTANCE, 2)) /
    SUM(1 / POWER(stations_knn.DISTANCE, 2)) AS interpolated_value

In [None]:
%%time
k = 5
using_index = False
#result = KNNQuery.SpatialKnnQuery(spatialRDD, Point(-122, 47.5), k, using_index)
result = KNNQuery.SpatialKnnQuery(Xi, X, k, using_index)
print(result)

In [None]:
#Player around.  Use some of this code.
df_pressure_diff = df.select(ST_Point(col("LONGITUDE"), col("LATITUDE")),"STATION", "DATE", "LATITUDE", "LONGITUDE", "ELEVATION", "NAME", "REPORT_TYPE", "SOURCE", "HourlyDryBulbTemperature", "HourlyPressureChange", "HourlyPressureTendency", "HourlySeaLevelPressure", "HourlyStationPressure", "HourlyWindDirection", "HourlyWindGustSpeed", "HourlyWindSpeed").filter(col("HourlyPressureChange").isNotNull()).show()
df.filter(col("HourlySeaLevelPressure").isNotNull() | col("HourlyStationPressure").isNotNull()).count()
#counts of records with different filters
df.count()
#130,112,717 37,352,572 77,319,947 77,210,243
aggregated = df.groupby('STATION', 'DATE').agg({abs('HourlyPressureChange'): 'min', abs('HourlyPressureChange'): 'min'})
df['AbsPressure'] = df['HourlyPressureChange'].abs()
from pyspark.sql import functions as F
df = df.withColumn("absPressure", F.abs(F.col("HourlyPressureChange")))
df.tail(10)
df.agg(F.min('HourlyPressureChange')).show()
df.filter(col('HourlyPressureChange').isNotNull()).withColumn("HourlyPressureChange", col("HourlyPressureChange").cast(FloatType())).groupby('STATION', 'DATE')\
.agg({abs('HourlyPressureChange'): 'min', abs('HourlyPressureChange'): 'min'})

In [None]:
#Using Overtures instread
#import geopandas as gpd

#url = "https://naciscdn.org/naturalearth/110m/cultural/ne_110m_admin_0_countries.zip"


#gdf = gpd.read_file(url)
#gdf
#df_conus = sedona.createDataFrame(gdf[(gdf.SOV_A3=='US1') & (gdf.TYPE=='Country')][['SOVEREIGNT', 'geometry']])
#map = SedonaKepler.create_map(df=df_conus, name="CONUS")
#map


In [None]:
spatial_df.first()['geometry']

In [None]:
df_boundaries = sd.read_parquet("s3://overturemaps-us-west-2/release/2025-09-24.0/theme=divisions/type=division_area/*.parquet")
df.show(3)

In [None]:
OVERTURE_RELEASE = "2025-09-24.0"
COUNTRY_CODES_OF_INTEREST = ["US"]
SOURCE_DATA_URL = f"s3a://overturemaps-us-west-2/release/{OVERTURE_RELEASE}/theme=divisions/type=division_area"
OUTPUT_FILE = "my_super_cool_data.parquet"

In [None]:
country_overlap_condition = F.arrays_overlap(
    F.col("country"),
    F.array(*[F.lit(x.upper()) for x in COUNTRY_CODES_OF_INTEREST]),
)

In [None]:
source_df = (
    sd.read.format("geoparquet")
    .load(SOURCE_DATA_URL)
    .filter(col("country").isin(COUNTRY_CODES_OF_INTEREST))
    #.filter(col("region")=='US-CA')
    .filter(col("subtype")=='country')
    .withColumn("_overture_release_version", F.lit(OVERTURE_RELEASE))
    .withColumn("_ingest_timestamp", F.current_timestamp())
)

In [None]:
USA_geom = source_df.selectExpr("geometry", "country")
USA_geom.show(5)
map = SedonaKepler.create_map(USA_geom, name="USA")
map

In [None]:
#Used for testing with just CA
#CA_geom = source_df.selectExpr("geometry", "region")
#CA_geom.show(5)
#map1 = SedonaKepler.create_map(CA_geom, name="CA")
#map1
#
#CA_geom = source_df.selectExpr("geometry", "region").filter(GeometryType(col('geometry'))=='MULTIPOLYGON')
#CA_geom.show(5)
#map2 = SedonaKepler.create_map(CA_geom, name="CA")
#map2
#
#CA_geom = source_df.selectExpr("geometry", "region").filter(GeometryType(col('geometry'))=='POLYGON')
#CA_geom.show(5)
#map3 = SedonaKepler.create_map(CA_geom, name="CA")
#map3