In [0]:
from pyspark.sql.functions import current_timestamp
def add_ingestion_date(input_df):
    output_df =input_df.withColumn("ingestion_date", current_timestamp())
    return output_df

In [0]:
from delta.tables import DeltaTable

def merge_delta_data(input_df, db_name, table_name, merge_condition, partitioncolumn, catalog=None):
    spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning.enabled", "true")

    if catalog:
        spark.sql(f"USE CATALOG {catalog}")
        spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db_name}")
        ns = f"{catalog}.{db_name}"
    else:
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
        ns = db_name

    full = f"{ns}.{table_name}"

    # Existence check via SHOW TABLES (whitelisted)
    exists = spark.sql(f"SHOW TABLES IN {ns}") \
                 .filter(f"tableName = '{table_name}'") \
                 .count() > 0

    if exists:
        (DeltaTable.forName(spark, full)
            .alias("tgt")
            .merge(input_df.alias("src"), merge_condition)
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute())
    else:
        (input_df.write
            .format("delta")
            .mode("overwrite")
            .partitionBy(partitioncolumn)
            .saveAsTable(full))
