In [0]:
%sql
CREATE CATALOG IF NOT EXISTS fleetops_project;
CREATE SCHEMA IF NOT EXISTS fleetops_project.bronze;

In [0]:
from pyspark.sql.functions import col, max, lit, current_timestamp

table_name = dbutils.widgets.get("table_name")

if not table_name or table_name == "":
    print("No table name provided. Exiting.")
    dbutils.notebook.exit("No table name")

CATALOG_NAME = "fleetops_project"
SCHEMA_NAME = "bronze"
WATERMARK_COL = "updated_at"

print(f"Starting ingestion for: {table_name}")

def ingest_incremental_one_table(tbl):
    target_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.{tbl}"
    
    last_watermark = "1900-01-01 00:00:00"
    if spark.catalog.tableExists(target_table):
        try:
            max_val = spark.read.table(target_table).select(max(col(WATERMARK_COL))).collect()[0][0]
            if max_val: last_watermark = max_val
        except:
            pass 
            
    print(f"Watermark: {last_watermark}")
    
    try:
        df = spark.table(f"con_supabase.public.{tbl}") \
                  .filter(col(WATERMARK_COL) > lit(last_watermark))
                  
        if df.isEmpty():
            print("No new data.")
            return "No Data"

        df.withColumn("_ingested_at", current_timestamp()) \
          .write.format("delta").mode("append").option("mergeSchema", "true") \
          .saveAsTable(target_table)
          
        return "Success"
        
    except Exception as e:
        raise e 

ingest_incremental_one_table(table_name)