In [None]:
import os
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import KryoSerializer, SedonaKryoRegistrator

In [None]:
extra_jars_dir:str = os.path.join(os.environ["SPARK_HOME"], "extra_jars")
extra_jars:list = [os.path.join(extra_jars_dir, x) for x in os.listdir(extra_jars_dir)]

spark:SparkSession = SparkSession. \
                   builder. \
                   master("local[*]").\
                   appName("my_sedona_app"). \
                   config("spark.serializer", KryoSerializer.getName). \
                   config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
                   config('spark.jars.packages',
                          'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.1-incubating,'
                          'org.datasyslab:geotools-wrapper:1.3.0-27.2'). \
                   config("spark.jars", ",".join(extra_jars)).\
                   enableHiveSupport(). \
                   getOrCreate()

SedonaRegistrator.registerAll(spark)
spark.sparkContext.setLogLevel("ERROR")

In [None]:
mineral_resources = spark.read.csv("Resources.txt", sep ='\t', header = True)
coordinates = spark.read.csv("Coords.txt", sep ='\t', header = True)
location = spark.read.csv("Location.txt", sep ='\t', header = True)

In [None]:
coordinates_only = coordinates.select("wgs84_lon", "wgs84_lat", "dep_id")
states = location.select("dep_id", "state_prov")
join = mineral_resources.join(coordinates_only, "dep_id", "inner").join(states, "dep_id", "inner")

In [None]:
join = join.withColumn("wgs84_lon",join.wgs84_lon.cast('double')).withColumn("wgs84_lat",join.wgs84_lat.cast('double'))

In [None]:
join.createOrReplaceTempView("mineral_resources")

In [None]:
spark.sql("CREATE TABLE mineral_resources_with_geom AS (SELECT *, ST_SetSRID(ST_Point(wgs84_lon, wgs84_lat), 4326) AS geom FROM mineral_resources);")

In [None]:
sql_query = "WITH california AS (SELECT geom AS geom_cal FROM mineral_resources_with_geom WHERE state_prov = 'California'),\
             texas AS (SELECT geom AS geom_tex FROM mineral_resources_with_geom WHERE state_prov = 'Texas')\
             SELECT AVG(ST_Distance(geom_cal, geom_tex)) FROM california, texas"
spark.sql(sql_query).show()