In [0]:
%pip install fiona

In [0]:
import geopandas as gpd
from shapely.geometry.base import BaseGeometry
from shapely import force_2d
import pandas as pd
import fiona 
import os 
import shutil 
import tempfile
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

In [0]:
gcs_landing_zone= "/Volumes/land_topografisk-gdb_dev/external_dev/landing_zone"
catalog_dev = "`land_topografisk-gdb_dev`"
schema_dev = "ai2025"
spark.sql(f"USE CATALOG {catalog_dev}")
spark.sql(f"USE SCHEMA {schema_dev}")
log_table = "logs_processed_gdbs"
table= "nypolygons_bronze"
layer = "Snuplasser_areal_N50"

In [0]:
q = f"""
CREATE TABLE IF NOT EXISTS {log_table} (
  gdb_name STRING,
  processed_time TIMESTAMP,
  num_inserted INT,
  num_updated INT,
  num_deleted INT

) USING DELTA
"""
spark.sql(q)

In [0]:
def log_processed_gdb(log_data: list):
    schema = StructType([
        StructField("gdb_name", StringType(), True),
        StructField("processed_time", TimestampType(), True),
        StructField("num_inserted", IntegerType(), True),
        StructField("num_updated", IntegerType(), True),
        StructField("num_deleted", IntegerType(), True)
    ])
    spark.createDataFrame(log_data, schema=schema).write.format("delta").mode("append").saveAsTable(log_table)


In [0]:
def check_for_new_gdbs() -> list:

    all_gdbs= [f.path for f in dbutils.fs.ls(gcs_landing_zone) if f.path.endswith(".gdb/")]
    processed_gdbs_df= spark.read.table(log_table).select("gdb_name")
    processed_gdbs= [row["gdb_name"] for row in processed_gdbs_df.collect()]

    return [gdb for gdb in all_gdbs if gdb not in processed_gdbs]

In [0]:
def to_wkt_2d(geom):
    if isinstance(geom, BaseGeometry):
        return force_2d(geom).wkt
    return None

In [0]:
def write_to_sdf(gdb_path: str, gdb_name:str, layer: str) -> DataFrame:

    gdf = gpd.read_file(gdb_path, layer=layer).set_crs("EPSG:32633").to_crs("EPSG:25833")
    gdf['wkt_geometry'] = gdf['geometry'].apply(to_wkt_2d)
    gdf = gdf.drop(columns=['geometry'])

    sdf = spark.createDataFrame(gdf)
    sdf = sdf.withColumnRenamed("wkt_geometry", "geometry")
    sdf = (
        sdf.withColumn("ingest_time", current_timestamp())
        .withColumn("source_file", lit(gdb_name))
        .withColumn("source_layer", lit(layer))
        .withColumn("row_hash", sha2(concat_ws("||", *sdf.columns), 256))
        )
    return sdf

In [0]:
def write_delta_table(sdf: DataFrame):

    if not spark.catalog.tableExists(table):
        sdf.write.format("delta").mode("overwrite").saveAsTable(table)
    else:
        delta_tbl= DeltaTable.forName(spark,table)
        delta_tbl.alias("target").merge(  source=sdf.alias("source"),
                    condition="target.row_hash = source.row_hash"
                ).whenMatchedUpdate(
                    condition="target.row_hash != source.row_hash",
                    set={col: f"source.{col}" for col in sdf.columns}
                ).whenNotMatchedInsert(
                    values={col: f"source.{col}" for col in sdf.columns}
                ).execute()

In [0]:
def write_to_delta_table(sdf: DataFrame, gdb_name: str):
  
    table_exists = False
    if spark.catalog.tableExists(table):
        delta_tbl = DeltaTable.forName(spark, table)
        version_before = delta_tbl.history(1).select("version").collect()[0][0]
        table_exists = True

    write_delta_table(sdf)

    if table_exists:
        version_after = delta_tbl.history(1).select("version").collect()[0][0]
        if version_after > version_before:
            metrics = delta_tbl.history(1).select("operationMetrics").collect()[0][0]
            updated = int(metrics.get("numTargetRowsUpdated", 0))
            inserted = int(metrics.get("numTargetRowsInserted", 0))
            deleted = int(metrics.get("numTargetRowsDeleted", 0))
            print(f"Updated: {updated}, Inserted: {inserted}, Deleted: {deleted}")
        else:
            print("No new Delta version found after merge.")
    else:
        inserted, updated, deleted = sdf.count(), 0, 0
        print(f"Updated: {updated}, Inserted: {inserted}, Deleted: {deleted}")
    
    log_processed_gdb(log_data = [(gdb_name, datetime.now(), inserted, updated, deleted)])

In [0]:
def main():
    gdbs = check_for_new_gdbs()
    for gdb in gdbs:
        gdb_name = gdb.rstrip("/").split("/")[-1]
        gdb_path = gdb.removeprefix('dbfs:')
        print(f"\nProcessing gdb: {gdb_name}")

        sdf = write_to_sdf(gdb_path, gdb_name, layer)
        write_to_delta_table(sdf, gdb_name)

In [0]:
main()

In [0]:
df = spark.read.table(table)
df.limit(10).display()