In [1]:
import time

from sedona.spark import SedonaContext
from sedona.sql import ST_GeoHash, ST_MakeValid

In [2]:
import os
import sys

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-19-openjdk-amd64"

print("Python version:", sys.version)
print("Python path:", sys.executable)
print("Working directory:", os.getcwd())
print("JAVA_HOME:", os.environ.get("JAVA_HOME"))
print("SPARK_HOME:", os.environ.get("SPARK_HOME"))
print("PYTHONPATH:", os.environ.get("PYTHONPATH"))

Python version: 3.10.12 (main, Sep 11 2024, 15:47:36) [GCC 11.4.0]
Python path: /usr/bin/python3
Working directory: /app/geospeed
JAVA_HOME: /usr/lib/jvm/java-19-openjdk-amd64
SPARK_HOME: /opt/spark
PYTHONPATH: /opt/spark/python


In [3]:
start_time = time.time()

config = (
    SedonaContext.builder()
    .config(
        "spark.jars.packages",
        "org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.6.1," "org.datasyslab:geotools-wrapper:1.6.1-28.2",
    )
    .getOrCreate()
)

sedona = SedonaContext.create(config)

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.sedona#sedona-spark-shaded-3.0_2.12 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-176f7a64-bb53-4e13-b5a8-9f6939ee6261;1.0
	confs: [default]
	found org.apache.sedona#sedona-spark-shaded-3.0_2.12;1.6.1 in central
	found org.datasyslab#geotools-wrapper;1.6.1-28.2 in central
:: resolution report :: resolve 123ms :: artifacts dl 4ms
	:: modules in use:
	org.apache.sedona#sedona-spark-shaded-3.0_2.12;1.6.1 from central in [default]
	org.datasyslab#geotools-wrapper;1.6.1-28.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      de

In [4]:
building_columns = ["oid", "gebnutzbez", "gfkzshh", "name", "anzahlgs", "gmdschl", "lagebeztxt", "funktion"]
usage_columns = ["oid", "nutzart", "bez", "flstkennz"]

build_gdf = (
    sedona.read.format("shapefile")
    .option("recursiveFileLookup", "true")
    .load("../ALKIS/*/GebauedeBauwerk.shp")
    .dropDuplicates(["oid"])
)
build_gdf = build_gdf.select(
    ST_MakeValid(build_gdf["geometry"]).alias("geometry"), *[build_gdf[col] for col in building_columns]
)
build_gdf.createOrReplaceTempView("buildings")
use_gdf = (
    sedona.read.format("shapefile")
    .option("recursiveFileLookup", "true")
    .load("../ALKIS/*/NutzungFlurstueck.shp")
    .dropDuplicates(["oid"])
)
use_gdf = use_gdf.select(ST_MakeValid(use_gdf["geometry"]).alias("geometry"), *[use_gdf[col] for col in usage_columns])
use_gdf.createOrReplaceTempView("usage")

In [5]:
result_gdf = sedona.sql("""SELECT ST_Intersection(b.geometry, u.geometry) AS geometry, b.oid AS building_oid, b.gebnutzbez, b.gfkzshh, b.name, b.anzahlgs, b.gmdschl, b.lagebeztxt, b.funktion, u.oid AS flur_oid, u.nutzart, u.bez, u.flstkennz
                       FROM buildings b, usage u
                       WHERE ST_Intersects(b.geometry, u.geometry)""")

In [6]:
proj = """{"$schema": "https://proj.org/schemas/v0.7/projjson.schema.json","type": "ProjectedCRS","name": "ETRS89 / UTM zone 33N","base_crs": {"name": "ETRS89","datum_ensemble": {"name": "European Terrestrial Reference System 1989 ensemble","members": [{"name": "European Terrestrial Reference Frame 1989"},{"name": "European Terrestrial Reference Frame 1990"},{"name": "European Terrestrial Reference Frame 1991"},{"name": "European Terrestrial Reference Frame 1992"},{"name": "European Terrestrial Reference Frame 1993"},{"name": "European Terrestrial Reference Frame 1994"},{"name": "European Terrestrial Reference Frame 1996"},{"name": "European Terrestrial Reference Frame 1997"},{"name": "European Terrestrial Reference Frame 2000"},{"name": "European Terrestrial Reference Frame 2005"},{"name": "European Terrestrial Reference Frame 2014"}],"ellipsoid": {"name": "GRS 1980","semi_major_axis": 6378137,"inverse_flattening": 298.257222101},"accuracy": "0.1"},"coordinate_system": {"subtype": "ellipsoidal","axis": [{"name": "Geodetic latitude","abbreviation": "Lat","direction": "north","unit": "degree"},{"name": "Geodetic longitude","abbreviation": "Lon","direction": "east","unit": "degree"}]},"id": {"authority": "EPSG","code": 4258}},"conversion": {"name": "UTM zone 33N","method": {"name": "Transverse Mercator","id": {"authority": "EPSG","code": 9807}},"parameters": [{"name": "Latitude of natural origin","value": 0,"unit": "degree","id": {"authority": "EPSG","code": 8801}},{"name": "Longitude of natural origin","value": 15,"unit": "degree","id": {"authority": "EPSG","code": 8802}},{"name": "Scale factor at natural origin","value": 0.9996,"unit": "unity","id": {"authority": "EPSG","code": 8805}},{"name": "False easting","value": 500000,"unit": "metre","id": {"authority": "EPSG","code": 8806}},{"name": "False northing","value": 0,"unit": "metre","id": {"authority": "EPSG","code": 8807}}]},"coordinate_system": {"subtype": "Cartesian","axis": [{"name": "Easting","abbreviation": "E","direction": "east","unit": "metre"},{"name": "Northing","abbreviation": "N","direction": "north","unit": "metre"}]},"scope": "Engineering survey, topographic mapping.","area": "Europe between 12°E and 18°E: Austria; Denmark - offshore and offshore; Germany - onshore and offshore; Norway including Svalbard - onshore and offshore.","bbox": {"south_latitude": 46.4,"west_longitude": 12,"north_latitude": 84.42,"east_longitude": 18.01},"id": {"authority": "EPSG","code": 25833}}"""
columns = [
    "building_oid",
    "gebnutzbez",
    "gfkzshh",
    "name",
    "anzahlgs",
    "gmdschl",
    "lagebeztxt",
    "funktion",
    "flur_oid",
    "nutzart",
    "bez",
    "flstkennz",
]


(
    result_gdf.select(
        result_gdf["geometry"],
        ST_GeoHash(result_gdf["geometry"], 5).alias("geom_hash"),
        *[result_gdf[col] for col in columns],
    )
    .orderBy("geom_hash")
    .coalesce(1)
    .write.format("geoparquet")
    .mode("overwrite")
    .option("geoparquet.version", "1.0.0")
    .option("geoparquet.crs", proj)
    .option("parquet.compression", "zstd")
    .save("sedona_data.parquet")
)

24/10/10 21:14:40 WARN JoinQuery: UseIndex is true, but no index exists. Will build index on the fly.
                                                                                

In [7]:
print(f"Excecution took: {time.time() - start_time} sec.")

Excecution took: 163.84152150154114 sec.
