In [None]:
# nb_functions

In [1]:
from pyspark.sql.types import StructType, ArrayType
import hashlib
from pyspark.sql.functions import *
from delta.tables import DeltaTable
from pyspark.sql.functions import to_timestamp, col

StatementMeta(, , -1, Waiting, , Waiting)

In [1]:
# --- File System Helpers ---
def is_directory_empty(path):
    try: return len(mssparkutils.fs.ls(path)) == 0
    except: return True

# --- Transformation Helpers ---
def explode_col(df, column): 
    return df.withColumn(column, explode(col(column)))

def flatten_struct_col(df, column): 
    return df.select("*", f"{column}.*").drop(column)

def zip_explode_and_flatten(df, parent, children):
    zipped = arrays_zip(*[col(f"{parent}.{c}") for c in children])
    df = df.withColumn("temp", explode(zipped))
    for child in children:
        df = df.withColumn(child, col(f"temp.{child}"))
    return df.drop("temp", parent)

# --- Metadata & Security Helpers ---
def hash_column(objecttype_col, bron_col, lokaal_id_col):
    return md5(concat(objecttype_col, bron_col, lokaal_id_col))

def select_columns_safe(df, columns):
    """Prevents crashes if a column is missing in source."""
    return df.select(*[c for c in columns if c in df.columns])

def clean_column_names_and_schemas(df):
    """Standardizes column names to snake_case."""
    import re
    for col_name in df.columns:
        new_name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', col_name).lower()
        df = df.withColumnRenamed(col_name, new_name)
    return df

# --- üü¢ IMPROVED: Load & Merge Logic ---
def load_data_into_delta_table(data, sink_path, full_load, primary_col_name):
    """
    Handles both initial creation and incremental Upserts (Merge).
    Added: check for path existence to prevent 'Path does not exist' errors.
    """
    # 1. Check if table exists
    try:
        is_delta = DeltaTable.isDeltaTable(spark, sink_path)
    except:
        is_delta = False

    # 2. Load Logic
    if full_load or not is_delta:
        # Overwrite/Create
        data.write.format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .save(sink_path)
    else:
        # Incremental Merge (Upsert)
        # ‚ö†Ô∏è CRITICAL: We deduplicate 'data' here to prevent the 
        # MULTIPLE_SOURCE_ROW_MATCHING error we saw earlier.
        deduplicated_data = data.dropDuplicates([primary_col_name])
        
        dt = DeltaTable.forPath(spark, sink_path)
        dt.alias("t").merge(
            deduplicated_data.alias("s"), 
            f"t.{primary_col_name} = s.{primary_col_name}"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()

# --- Table Registration ---
def create_lakehouse_table(table_name, path):
    """Registers the Delta files as a proper SQL table in the Lakehouse."""
    spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{path}'")
    spark.sql(f"REFRESH TABLE {table_name}")


StatementMeta(, , -1, Waiting, , Waiting)