# Incremental Ingestion from SQL Server to Bronze Layer

This notebook ingests data from a SQL Server table into a Delta Lake managed table in Unity Catalog.
It uses a watermark (ModifiedDate) to load only new or modified records.

In [None]:
# DBTITLE 1,Configuration Widgets
dbutils.widgets.text("jdbc_host", "", "JDBC Host")
dbutils.widgets.text("jdbc_port", "1433", "JDBC Port")
dbutils.widgets.text("jdbc_database", "AdventureWorks", "JDBC Database")
dbutils.widgets.text("jdbc_user", "", "JDBC User")
dbutils.widgets.text("jdbc_password", "", "JDBC Password") # In prod, use secrets!
dbutils.widgets.text("source_schema", "Sales", "Source Schema")
dbutils.widgets.text("source_table", "SalesOrderHeader", "Source Table")
dbutils.widgets.text("target_catalog", "main", "Target Catalog")
dbutils.widgets.text("target_schema", "bronze", "Target Schema")
dbutils.widgets.text("watermark_column", "ModifiedDate", "Watermark Column")

In [None]:
# DBTITLE 1,Get Parameters
jdbc_host = dbutils.widgets.get("jdbc_host")
jdbc_port = dbutils.widgets.get("jdbc_port")
jdbc_database = dbutils.widgets.get("jdbc_database")
jdbc_user = dbutils.widgets.get("jdbc_user")
jdbc_password = dbutils.widgets.get("jdbc_password")
source_schema = dbutils.widgets.get("source_schema")
source_table = dbutils.widgets.get("source_table")
target_catalog = dbutils.widgets.get("target_catalog")
target_schema = dbutils.widgets.get("target_schema")
watermark_column = dbutils.widgets.get("watermark_column")

full_source_table = f"{source_schema}.{source_table}"
full_target_table = f"{target_catalog}.{target_schema}.{source_table.lower()}"

print(f"Ingesting from {full_source_table} to {full_target_table}")

In [None]:
# DBTITLE 1,JDBC Connection Properties
jdbc_url = f"jdbc:sqlserver://{jdbc_host}:{jdbc_port};database={jdbc_database};encrypt=true;trustServerCertificate=true"
connection_properties = {
    "user": jdbc_user,
    "password": jdbc_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [None]:
# DBTITLE 1,Determine Watermark
from pyspark.sql.functions import max, col, lit

# Check if target table exists
table_exists = spark.catalog.tableExists(full_target_table)

watermark_value = "1900-01-01 00:00:00"

if table_exists:
    try:
        # Get the max watermark from the target table
        max_watermark_row = spark.table(full_target_table).select(max(col(watermark_column))).collect()[0][0]
        if max_watermark_row:
            watermark_value = max_watermark_row
    except Exception as e:
        print(f"Warning: Could not get watermark from target table. Defaulting to {watermark_value}. Error: {e}")

print(f"Current Watermark: {watermark_value}")

In [None]:
# DBTITLE 1,Read from Source (Incremental)
# Construct the query with the watermark filter
# Note: We use dbtable with a subquery or pushdown predicate. 
# For simplicity and better pushdown, we can use the 'query' option or 'dbtable' with WHERE clause if supported, 
# but standard JDBC read often takes a table or a subquery.
# A robust way is to use 'dbtable' as a subquery.

query = f"(SELECT * FROM {full_source_table} WHERE {watermark_column} > '{watermark_value}') AS src"

print(f"Reading with query: SELECT * FROM {full_source_table} WHERE {watermark_column} > '{watermark_value}'")

df_source = spark.read.jdbc(
    url=jdbc_url,
    table=query,
    properties=connection_properties
)

In [None]:
# DBTITLE 1,Write to Target (Append)
if df_source.count() > 0:
    print(f"Found {df_source.count()} new records. Writing to {full_target_table}...")
    
    # Write to Delta
    # We use 'append' for Bronze. 
    # If schema evolution is needed, we can enable mergeSchema.
    (df_source.write
        .format("delta")
        .mode("append")
        .option("mergeSchema", "true")
        .saveAsTable(full_target_table)
    )
    print("Write complete.")
else:
    print("No new records found.")